• C++ 并发编程实战 第六章 设计基于锁的并发数据结构


    目录

    6.1 并发设计的内涵

    6.2 基于锁的并发数据结构

    6.2.1 采用锁实现线程安全的栈容器

    6.2.2 采用锁和条件变量实现线程安全的队列容器

    6.2.3 采用精细粒度的锁和条件变量实现线程安全的队列容器

    6.3 设计更复杂的基于锁的并发数据结构

    6.3.1 采用锁编写线程安全的查找表

    6.3.2 采用多种锁编写线程安全的链表、


    参考:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/chapter5/5.0-chinese.md

    6.1 并发设计的内涵

    设计并发数据结构是为了让多线程并发访问,并且线程可对数据结构做相同或不同的操作。多线程环境下,无数据丢失和损毁,所有的数据需要维持原样,且无条件竞争的数据结构,称之为“线程安全”的数据结构。通常情况下,多个线程对数据结构进行并发操作是安全的,但不同操作需要单线程独立访问数据结构。当线程执行不同的操作时,对同一数据结构的并发操作是安全的,而多线程执行同样的操作时,可能会出现问题。

    实际的设计意义并不止上面提到的那样,而是要为线程提供并发访问数据结构的机会。本质上,在互斥量的保护下同一时间内只有一个线程可以获取锁。互斥量为了保护数据,会显式阻止线程对数据结构的并发访问。

    串行化(serialzation)则是线程轮流访问数据,对数据进行串行访问。因此,需要对数据结构仔细斟酌,确保能进行真正的并发。虽然,有些数据结构比其他结构的并发访问范围更大,但思路都是一样的:减少保护区域,减少序列化操作,提升并发访问的能力。

    6.2 基于锁的并发数据结构

    基于锁的并发数据结构需要确保访问线程持有锁的时间最短,对于只有一个互斥量的数据结构来说十分困难。需要锁之外的操作不能访问数据,保证不会产生条件竞争。使用多个互斥量保护数据结构不同的区域时,问题会更加明显。当操作需要获取多个互斥锁时,可能会产生死锁,所以使用多个互斥量时要格外小心。

    6.2.1 采用锁实现线程安全的栈容器

    代码6.1 线程安全栈的类定义

    1. #include
    2. struct empty_stack: std::exception
    3. {
    4. const char* what() const throw();
    5. };
    6. template<typename T>
    7. class threadsafe_stack
    8. {
    9. private:
    10. std::stack data;
    11. mutable std::mutex m;
    12. public:
    13. threadsafe_stack(){}
    14. threadsafe_stack(const threadsafe_stack& other)
    15. {
    16. std::lock_guard lock(other.m);
    17. data=other.data;
    18. }
    19. threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    20. void push(T new_value)
    21. {
    22. std::lock_guard lock(m);
    23. data.push(std::move(new_value)); // 1
    24. }
    25. std::shared_ptr pop()
    26. {
    27. std::lock_guard lock(m);
    28. if(data.empty()) throw empty_stack(); // 2
    29. std::shared_ptr const res(
    30. std::make_shared(std::move(data.top()))); // 3
    31. data.pop(); // 4
    32. return res;
    33. }
    34. void pop(T& value)
    35. {
    36. std::lock_guard lock(m);
    37. if(data.empty()) throw empty_stack();
    38. value=std::move(data.top()); // 5
    39. data.pop(); // 6
    40. }
    41. bool empty() const
    42. {
    43. std::lock_guard lock(m);
    44. return data.empty();
    45. }
    46. };

    6.2.2 采用锁和条件变量实现线程安全的队列容器

    代码6.2 使用条件变量实现的线程安全队列

    1. template<typename T>
    2. class threadsafe_queue
    3. {
    4. private:
    5. mutable std::mutex mut;
    6. std::queue data_queue;
    7. std::condition_variable data_cond;
    8. public:
    9. threadsafe_queue()
    10. {}
    11. void push(T data)
    12. {
    13. std::lock_guard lk(mut);
    14. data_queue.push(std::move(data));
    15. data_cond.notify_one(); // 1
    16. }
    17. void wait_and_pop(T& value) // 2
    18. {
    19. std::unique_lock lk(mut);
    20. data_cond.wait(lk,[this]{return !data_queue.empty();});
    21. value=std::move(data_queue.front());
    22. data_queue.pop();
    23. }
    24. std::shared_ptr wait_and_pop() // 3
    25. {
    26. std::unique_lock lk(mut);
    27. data_cond.wait(lk,[this]{return !data_queue.empty();}); // 4
    28. std::shared_ptr res(
    29. std::make_shared(std::move(data_queue.front())));
    30. data_queue.pop();
    31. return res;
    32. }
    33. bool try_pop(T& value)
    34. {
    35. std::lock_guard lk(mut);
    36. if(data_queue.empty())
    37. return false;
    38. value=std::move(data_queue.front());
    39. data_queue.pop();
    40. return true;
    41. }
    42. std::shared_ptr try_pop()
    43. {
    44. std::lock_guard lk(mut);
    45. if(data_queue.empty())
    46. return std::shared_ptr(); // 5
    47. std::shared_ptr res(
    48. std::make_shared(std::move(data_queue.front())));
    49. data_queue.pop();
    50. return res;
    51. }
    52. bool empty() const
    53. {
    54. std::lock_guard lk(mut);
    55. return data_queue.empty();
    56. }
    57. };

    异常安全会有一些变化,不止一个线程等待对队列进行推送操作时,只会有一个线程因data_cond.notify_one()而继续工作。但是,如果工作线程在wait_and_pop()中抛出一个异常,例如:构造新的std::shared_ptr<>对象④时抛出异常,那么其他线程则会永世长眠。这种情况不可以,所以调用函数需要改成data_cond.notify_all(),这个函数将唤醒所有的工作线程,不过当大多线程发现队列依旧是空时,又会耗费资源让线程重新进入睡眠。第二种替代方案,有异常抛出时,让wait_and_pop()函数调用notify_one(),从而让个另一个线程去索引存储的值。第三种替代方案,将std::shared_ptr<>的初始化过程移到push()中,并且存储std::shared_ptr<>实例,而不是直接使用数据值,将std::shared_ptr<>拷贝到内部std::queue<>中就不会抛出异常了,这样wait_and_pop()又是安全的了。下面的代码,就是根据第三种方案修改的。

    代码6.3 持有std::shared_ptr<>实例的线程安全队列

    1. template<typename T>
    2. class threadsafe_queue
    3. {
    4. private:
    5. mutable std::mutex mut;
    6. std::queue > data_queue;
    7. std::condition_variable data_cond;
    8. public:
    9. threadsafe_queue()
    10. {}
    11. void wait_and_pop(T& value)
    12. {
    13. std::unique_lock lk(mut);
    14. data_cond.wait(lk,[this]{return !data_queue.empty();});
    15. value=std::move(*data_queue.front()); // 1
    16. data_queue.pop();
    17. }
    18. bool try_pop(T& value)
    19. {
    20. std::lock_guard lk(mut);
    21. if(data_queue.empty())
    22. return false;
    23. value=std::move(*data_queue.front()); // 2
    24. data_queue.pop();
    25. return true;
    26. }
    27. std::shared_ptr wait_and_pop()
    28. {
    29. std::unique_lock lk(mut);
    30. data_cond.wait(lk,[this]{return !data_queue.empty();});
    31. std::shared_ptr res=data_queue.front(); // 3
    32. data_queue.pop();
    33. return res;
    34. }
    35. std::shared_ptr try_pop()
    36. {
    37. std::lock_guard lk(mut);
    38. if(data_queue.empty())
    39. return std::shared_ptr();
    40. std::shared_ptr res=data_queue.front(); // 4
    41. data_queue.pop();
    42. return res;
    43. }
    44. void push(T new_value)
    45. {
    46. std::shared_ptr data(
    47. std::make_shared(std::move(new_value))); // 5
    48. std::lock_guard lk(mut);
    49. data_queue.push(data);
    50. data_cond.notify_one();
    51. }
    52. bool empty() const
    53. {
    54. std::lock_guard lk(mut);
    55. return data_queue.empty();
    56. }
    57. };

    6.2.3 采用精细粒度的锁和条件变量实现线程安全的队列容器

    代码6.6 线程安全队列——细粒度锁版

    1. template<typename T>
    2. class threadsafe_queue
    3. {
    4. private:
    5. struct node
    6. {
    7. std::shared_ptr data;
    8. std::unique_ptr next;
    9. };
    10. std::mutex head_mutex;
    11. std::unique_ptr head;
    12. std::mutex tail_mutex;
    13. node* tail;
    14. node* get_tail()
    15. {
    16. std::lock_guard tail_lock(tail_mutex);
    17. return tail;
    18. }
    19. std::unique_ptr pop_head()
    20. {
    21. std::lock_guard head_lock(head_mutex);
    22. if(head.get()==get_tail())
    23. {
    24. return nullptr;
    25. }
    26. std::unique_ptr old_head=std::move(head);
    27. head=std::move(old_head->next);
    28. return old_head;
    29. }
    30. public:
    31. threadsafe_queue():
    32. head(new node),tail(head.get())
    33. {}
    34. threadsafe_queue(const threadsafe_queue& other)=delete;
    35. threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
    36. std::shared_ptr try_pop()
    37. {
    38. std::unique_ptr old_head=pop_head();
    39. return old_head?old_head->data:std::shared_ptr();
    40. }
    41. void push(T new_value)
    42. {
    43. std::shared_ptr new_data(
    44. std::make_shared(std::move(new_value)));
    45. std::unique_ptr p(new node);
    46. node* const new_tail=p.get();
    47. std::lock_guard tail_lock(tail_mutex);
    48. tail->data=new_data;
    49. tail->next=std::move(p);
    50. tail=new_tail;
    51. }
    52. };

    6.3 设计更复杂的基于锁的并发数据结构

    6.3.1 采用锁编写线程安全的查找表

    代码6.11 线程安全的查询表

    1. template<typename Key,typename Value,typename Hash=std::hash >
    2. class threadsafe_lookup_table
    3. {
    4. private:
    5. class bucket_type
    6. {
    7. private:
    8. typedef std::pair bucket_value;
    9. typedef std::list bucket_data;
    10. typedef typename bucket_data::iterator bucket_iterator;
    11. bucket_data data;
    12. mutable std::shared_mutex mutex; // 1
    13. bucket_iterator find_entry_for(Key const& key) const // 2
    14. {
    15. return std::find_if(data.begin(),data.end(),
    16. [&](bucket_value const& item)
    17. {return item.first==key;});
    18. }
    19. public:
    20. Value value_for(Key const& key,Value const& default_value) const
    21. {
    22. std::shared_lock lock(mutex); // 3
    23. bucket_iterator const found_entry=find_entry_for(key);
    24. return (found_entry==data.end())?
    25. default_value:found_entry->second;
    26. }
    27. void add_or_update_mapping(Key const& key,Value const& value)
    28. {
    29. std::unique_lock lock(mutex); // 4
    30. bucket_iterator const found_entry=find_entry_for(key);
    31. if(found_entry==data.end())
    32. {
    33. data.push_back(bucket_value(key,value));
    34. }
    35. else
    36. {
    37. found_entry->second=value;
    38. }
    39. }
    40. void remove_mapping(Key const& key)
    41. {
    42. std::unique_lock lock(mutex); // 5
    43. bucket_iterator const found_entry=find_entry_for(key);
    44. if(found_entry!=data.end())
    45. {
    46. data.erase(found_entry);
    47. }
    48. }
    49. };
    50. std::vector > buckets; // 6
    51. Hash hasher;
    52. bucket_type& get_bucket(Key const& key) const // 7
    53. {
    54. std::size_t const bucket_index=hasher(key)%buckets.size();
    55. return *buckets[bucket_index];
    56. }
    57. public:
    58. typedef Key key_type;
    59. typedef Value mapped_type;
    60. typedef Hash hash_type;
    61. threadsafe_lookup_table(
    62. unsigned num_buckets=19,Hash const& hasher_=Hash()):
    63. buckets(num_buckets),hasher(hasher_)
    64. {
    65. for(unsigned i=0;i
    66. {
    67. buckets[i].reset(new bucket_type);
    68. }
    69. }
    70. threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
    71. threadsafe_lookup_table& operator=(
    72. threadsafe_lookup_table const& other)=delete;
    73. Value value_for(Key const& key,
    74. Value const& default_value=Value()) const
    75. {
    76. return get_bucket(key).value_for(key,default_value); // 8
    77. }
    78. void add_or_update_mapping(Key const& key,Value const& value)
    79. {
    80. get_bucket(key).add_or_update_mapping(key,value); // 9
    81. }
    82. void remove_mapping(Key const& key)
    83. {
    84. get_bucket(key).remove_mapping(key); // 10
    85. }
    86. };

    6.3.2 采用多种锁编写线程安全的链表、

    提供了这些操作,链表才能为通用容器,这将帮助我们添加更多功能,比如:指定位置上插入元素,不过这对于查询表来说就没有必要了,所以算是给读者们留的一个作业吧。

    代码6.13 线程安全链表——支持迭代器(包含我写的指定位置插入---丐版)

    1. #ifndef THREADSAFE_LIST_H_
    2. #define THREADSAFE_LIST_H
    3. #include
    4. #include
    5. template<typename T>
    6. class threadsafe_list
    7. {
    8. struct node // 1
    9. {
    10. std::mutex m;
    11. std::shared_ptr data;
    12. std::unique_ptr next;
    13. node(): // 2
    14. next()
    15. {}
    16. node(T const& value): // 3
    17. data(std::make_shared(value))
    18. {}
    19. };
    20. node head;
    21. public:
    22. threadsafe_list()
    23. {}
    24. ~threadsafe_list()
    25. {
    26. remove_if([](node const&){return true;});
    27. }
    28. threadsafe_list(threadsafe_list const& other)=delete;
    29. threadsafe_list& operator=(threadsafe_list const& other)=delete;
    30. void push_front(T const& value)
    31. {
    32. std::unique_ptr new_node(new node(value)); // 4
    33. std::lock_guard lk(head.m);
    34. new_node->next=std::move(head.next); // 5
    35. head.next=std::move(new_node); // 6
    36. }
    37. template<typename Function>
    38. void for_each(Function f) // 7
    39. {
    40. node* current=&head;
    41. std::unique_lock lk(head.m); // 8
    42. while(node* const next=current->next.get()) // 9
    43. {
    44. std::unique_lock next_lk(next->m); // 10
    45. lk.unlock(); // 11
    46. f(*next->data); // 12
    47. current=next;
    48. lk=std::move(next_lk); // 13
    49. }
    50. }
    51. template<typename Predicate>
    52. std::shared_ptr find_first_if(Predicate p) // 14
    53. {
    54. node* current=&head;
    55. std::unique_lock lk(head.m);
    56. while(node* const next=current->next.get())
    57. {
    58. std::unique_lock next_lk(next->m);
    59. lk.unlock();
    60. if(p(*next->data)) // 15
    61. {
    62. return next->data; // 16
    63. }
    64. current=next;
    65. lk=std::move(next_lk);
    66. }
    67. return std::shared_ptr();
    68. }
    69. template<typename Predicate>
    70. void remove_if(Predicate p) // 17
    71. {
    72. node* current=&head;
    73. std::unique_lock lk(head.m);
    74. while(node* const next=current->next.get())
    75. {
    76. std::unique_lock next_lk(next->m);
    77. if(p(*next->data)) // 18
    78. {
    79. std::unique_ptr old_next=std::move(current->next);
    80. current->next=std::move(next->next);
    81. next_lk.unlock();
    82. } // 20
    83. else
    84. {
    85. lk.unlock(); // 21
    86. current=next;
    87. lk=std::move(next_lk);
    88. }
    89. }
    90. }
    91. void insert_in_position(T const& value,const int& position){
    92. std::unique_ptr new_node(new node(value));
    93. node* current=&head;
    94. node* next = current->next.get();
    95. std::unique_lock lk(head.m);
    96. for(int i = 0; i
    97. std::unique_lock next_lk(next->m);
    98. lk.unlock();
    99. current = next;
    100. lk = std::move(next_lk);
    101. next = current->next.get();
    102. }
    103. lk.unlock();
    104. std::unique_lock cur_lk(current->m);
    105. new_node->next = std::move(current->next);
    106. current->next = std::move(new_node);
    107. }
    108. };
    109. #endif

  • 相关阅读:
    PyTorch之张量的相关操作大全 ->(个人学习记录笔记)
    38.CSS文本动画效果
    微信小程序组件化
    k8s运维问题整理
    @ConditionalOnBean系列注解使用误区
    python中protobuf和json互相转换应用
    spring
    【信号与系统】信号频谱和测量之汉明窗
    预测杭州五一黄金周的旅游出行人数
    策略引擎Kyverno
  • 原文地址:https://blog.csdn.net/qq_52758467/article/details/133383048