目录
6.2.3 采用精细粒度的锁和条件变量实现线程安全的队列容器
设计并发数据结构是为了让多线程并发访问,并且线程可对数据结构做相同或不同的操作。多线程环境下,无数据丢失和损毁,所有的数据需要维持原样,且无条件竞争的数据结构,称之为“线程安全”的数据结构。通常情况下,多个线程对数据结构进行并发操作是安全的,但不同操作需要单线程独立访问数据结构。当线程执行不同的操作时,对同一数据结构的并发操作是安全的,而多线程执行同样的操作时,可能会出现问题。
实际的设计意义并不止上面提到的那样,而是要为线程提供并发访问数据结构的机会。本质上,在互斥量的保护下同一时间内只有一个线程可以获取锁。互斥量为了保护数据,会显式阻止线程对数据结构的并发访问。
串行化(serialzation)则是线程轮流访问数据,对数据进行串行访问。因此,需要对数据结构仔细斟酌,确保能进行真正的并发。虽然,有些数据结构比其他结构的并发访问范围更大,但思路都是一样的:减少保护区域,减少序列化操作,提升并发访问的能力。
基于锁的并发数据结构需要确保访问线程持有锁的时间最短,对于只有一个互斥量的数据结构来说十分困难。需要锁之外的操作不能访问数据,保证不会产生条件竞争。使用多个互斥量保护数据结构不同的区域时,问题会更加明显。当操作需要获取多个互斥锁时,可能会产生死锁,所以使用多个互斥量时要格外小心。
代码6.1 线程安全栈的类定义
- #include
-
- struct empty_stack: std::exception
- {
- const char* what() const throw();
- };
-
- template<typename T>
- class threadsafe_stack
- {
- private:
- std::stack
data; - mutable std::mutex m;
- public:
- threadsafe_stack(){}
- threadsafe_stack(const threadsafe_stack& other)
- {
- std::lock_guard
lock(other.m) ; - data=other.data;
- }
-
- threadsafe_stack& operator=(const threadsafe_stack&) = delete;
-
- void push(T new_value)
- {
- std::lock_guard
lock(m) ; - data.push(std::move(new_value)); // 1
- }
- std::shared_ptr
pop() - {
- std::lock_guard
lock(m) ; - if(data.empty()) throw empty_stack(); // 2
- std::shared_ptr
const res( - std::make_shared
(std::move(data.top()))) ; // 3 - data.pop(); // 4
- return res;
- }
- void pop(T& value)
- {
- std::lock_guard
lock(m) ; - if(data.empty()) throw empty_stack();
- value=std::move(data.top()); // 5
- data.pop(); // 6
- }
- bool empty() const
- {
- std::lock_guard
lock(m) ; - return data.empty();
- }
- };
代码6.2 使用条件变量实现的线程安全队列
- template<typename T>
- class threadsafe_queue
- {
- private:
- mutable std::mutex mut;
- std::queue
data_queue; - std::condition_variable data_cond;
-
- public:
- threadsafe_queue()
- {}
-
- void push(T data)
- {
- std::lock_guard
lk(mut) ; - data_queue.push(std::move(data));
- data_cond.notify_one(); // 1
- }
-
- void wait_and_pop(T& value) // 2
- {
- std::unique_lock
lk(mut) ; - data_cond.wait(lk,[this]{return !data_queue.empty();});
- value=std::move(data_queue.front());
- data_queue.pop();
- }
-
- std::shared_ptr
wait_and_pop() // 3 - {
- std::unique_lock
lk(mut) ; - data_cond.wait(lk,[this]{return !data_queue.empty();}); // 4
- std::shared_ptr
res( - std::make_shared
(std::move(data_queue.front()))) ; - data_queue.pop();
- return res;
- }
-
- bool try_pop(T& value)
- {
- std::lock_guard
lk(mut) ; - if(data_queue.empty())
- return false;
- value=std::move(data_queue.front());
- data_queue.pop();
- return true;
- }
-
- std::shared_ptr
try_pop() - {
- std::lock_guard
lk(mut) ; - if(data_queue.empty())
- return std::shared_ptr
(); // 5 - std::shared_ptr
res( - std::make_shared
(std::move(data_queue.front()))) ; - data_queue.pop();
- return res;
- }
-
- bool empty() const
- {
- std::lock_guard
lk(mut) ; - return data_queue.empty();
- }
- };
异常安全会有一些变化,不止一个线程等待对队列进行推送操作时,只会有一个线程因data_cond.notify_one()而继续工作。但是,如果工作线程在wait_and_pop()中抛出一个异常,例如:构造新的std::shared_ptr<>对象④时抛出异常,那么其他线程则会永世长眠。这种情况不可以,所以调用函数需要改成data_cond.notify_all(),这个函数将唤醒所有的工作线程,不过当大多线程发现队列依旧是空时,又会耗费资源让线程重新进入睡眠。第二种替代方案,有异常抛出时,让wait_and_pop()函数调用notify_one(),从而让个另一个线程去索引存储的值。第三种替代方案,将std::shared_ptr<>的初始化过程移到push()中,并且存储std::shared_ptr<>实例,而不是直接使用数据值,将std::shared_ptr<>拷贝到内部std::queue<>中就不会抛出异常了,这样wait_and_pop()又是安全的了。下面的代码,就是根据第三种方案修改的。
代码6.3 持有std::shared_ptr<>实例的线程安全队列
- template<typename T>
- class threadsafe_queue
- {
- private:
- mutable std::mutex mut;
- std::queue
> data_queue; - std::condition_variable data_cond;
- public:
- threadsafe_queue()
- {}
-
- void wait_and_pop(T& value)
- {
- std::unique_lock
lk(mut) ; - data_cond.wait(lk,[this]{return !data_queue.empty();});
- value=std::move(*data_queue.front()); // 1
- data_queue.pop();
- }
-
- bool try_pop(T& value)
- {
- std::lock_guard
lk(mut) ; - if(data_queue.empty())
- return false;
- value=std::move(*data_queue.front()); // 2
- data_queue.pop();
- return true;
- }
-
- std::shared_ptr
wait_and_pop() - {
- std::unique_lock
lk(mut) ; - data_cond.wait(lk,[this]{return !data_queue.empty();});
- std::shared_ptr
res=data_queue.front(); // 3 - data_queue.pop();
- return res;
- }
-
- std::shared_ptr
try_pop() - {
- std::lock_guard
lk(mut) ; - if(data_queue.empty())
- return std::shared_ptr
(); - std::shared_ptr
res=data_queue.front(); // 4 - data_queue.pop();
- return res;
- }
-
- void push(T new_value)
- {
- std::shared_ptr
data( - std::make_shared
(std::move(new_value))) ; // 5 - std::lock_guard
lk(mut) ; - data_queue.push(data);
- data_cond.notify_one();
- }
-
- bool empty() const
- {
- std::lock_guard
lk(mut) ; - return data_queue.empty();
- }
- };
代码6.6 线程安全队列——细粒度锁版
- template<typename T>
- class threadsafe_queue
- {
- private:
- struct node
- {
- std::shared_ptr
data; - std::unique_ptr
next; - };
- std::mutex head_mutex;
- std::unique_ptr
head; - std::mutex tail_mutex;
- node* tail;
-
- node* get_tail()
- {
- std::lock_guard
tail_lock(tail_mutex) ; - return tail;
- }
-
- std::unique_ptr
pop_head() - {
- std::lock_guard
head_lock(head_mutex) ; - if(head.get()==get_tail())
- {
- return nullptr;
- }
- std::unique_ptr
old_head=std::move(head); - head=std::move(old_head->next);
- return old_head;
- }
- public:
- threadsafe_queue():
- head(new node),tail(head.get())
- {}
- threadsafe_queue(const threadsafe_queue& other)=delete;
- threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
-
- std::shared_ptr
try_pop() - {
- std::unique_ptr
old_head=pop_head(); - return old_head?old_head->data:std::shared_ptr
(); - }
-
- void push(T new_value)
- {
- std::shared_ptr
new_data( - std::make_shared
(std::move(new_value))) ; - std::unique_ptr
p(new node) ; - node* const new_tail=p.get();
- std::lock_guard
tail_lock(tail_mutex) ; - tail->data=new_data;
- tail->next=std::move(p);
- tail=new_tail;
- }
- };
代码6.11 线程安全的查询表
- template<typename Key,typename Value,typename Hash=std::hash
> - class threadsafe_lookup_table
- {
- private:
- class bucket_type
- {
- private:
- typedef std::pair
bucket_value; - typedef std::list
bucket_data; - typedef typename bucket_data::iterator bucket_iterator;
-
- bucket_data data;
- mutable std::shared_mutex mutex; // 1
-
- bucket_iterator find_entry_for(Key const& key) const // 2
- {
- return std::find_if(data.begin(),data.end(),
- [&](bucket_value const& item)
- {return item.first==key;});
- }
- public:
- Value value_for(Key const& key,Value const& default_value) const
- {
- std::shared_lock
lock(mutex) ; // 3 - bucket_iterator const found_entry=find_entry_for(key);
- return (found_entry==data.end())?
- default_value:found_entry->second;
- }
-
- void add_or_update_mapping(Key const& key,Value const& value)
- {
- std::unique_lock
lock(mutex) ; // 4 - bucket_iterator const found_entry=find_entry_for(key);
- if(found_entry==data.end())
- {
- data.push_back(bucket_value(key,value));
- }
- else
- {
- found_entry->second=value;
- }
- }
-
- void remove_mapping(Key const& key)
- {
- std::unique_lock
lock(mutex) ; // 5 - bucket_iterator const found_entry=find_entry_for(key);
- if(found_entry!=data.end())
- {
- data.erase(found_entry);
- }
- }
- };
-
- std::vector
> buckets; // 6 - Hash hasher;
-
- bucket_type& get_bucket(Key const& key) const // 7
- {
- std::size_t const bucket_index=hasher(key)%buckets.size();
- return *buckets[bucket_index];
- }
-
- public:
- typedef Key key_type;
- typedef Value mapped_type;
-
- typedef Hash hash_type;
- threadsafe_lookup_table(
- unsigned num_buckets=19,Hash const& hasher_=Hash()):
- buckets(num_buckets),hasher(hasher_)
- {
- for(unsigned i=0;i
- {
- buckets[i].reset(new bucket_type);
- }
- }
-
- threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
- threadsafe_lookup_table& operator=(
- threadsafe_lookup_table const& other)=delete;
-
- Value value_for(Key const& key,
- Value const& default_value=Value()) const
- {
- return get_bucket(key).value_for(key,default_value); // 8
- }
-
- void add_or_update_mapping(Key const& key,Value const& value)
- {
- get_bucket(key).add_or_update_mapping(key,value); // 9
- }
-
- void remove_mapping(Key const& key)
- {
- get_bucket(key).remove_mapping(key); // 10
- }
- };
6.3.2 采用多种锁编写线程安全的链表、
提供了这些操作,链表才能为通用容器,这将帮助我们添加更多功能,比如:指定位置上插入元素,不过这对于查询表来说就没有必要了,所以算是给读者们留的一个作业吧。
代码6.13 线程安全链表——支持迭代器(包含我写的指定位置插入---丐版)
- #ifndef THREADSAFE_LIST_H_
- #define THREADSAFE_LIST_H
- #include
- #include
- template<typename T>
- class threadsafe_list
- {
- struct node // 1
- {
- std::mutex m;
- std::shared_ptr
data; - std::unique_ptr
next; - node(): // 2
- next()
- {}
-
- node(T const& value): // 3
- data(std::make_shared
(value)) - {}
- };
-
- node head;
-
- public:
- threadsafe_list()
- {}
-
- ~threadsafe_list()
- {
- remove_if([](node const&){return true;});
- }
-
- threadsafe_list(threadsafe_list const& other)=delete;
- threadsafe_list& operator=(threadsafe_list const& other)=delete;
-
- void push_front(T const& value)
- {
- std::unique_ptr
new_node(new node(value)) ; // 4 - std::lock_guard
lk(head.m) ; - new_node->next=std::move(head.next); // 5
- head.next=std::move(new_node); // 6
- }
-
- template<typename Function>
- void for_each(Function f) // 7
- {
- node* current=&head;
- std::unique_lock
lk(head.m) ; // 8 - while(node* const next=current->next.get()) // 9
- {
- std::unique_lock
next_lk(next->m) ; // 10 - lk.unlock(); // 11
- f(*next->data); // 12
- current=next;
- lk=std::move(next_lk); // 13
- }
- }
-
- template<typename Predicate>
- std::shared_ptr
find_first_if(Predicate p) // 14 - {
- node* current=&head;
- std::unique_lock
lk(head.m) ; - while(node* const next=current->next.get())
- {
- std::unique_lock
next_lk(next->m) ; - lk.unlock();
- if(p(*next->data)) // 15
- {
- return next->data; // 16
- }
- current=next;
- lk=std::move(next_lk);
- }
- return std::shared_ptr
(); - }
-
- template<typename Predicate>
- void remove_if(Predicate p) // 17
- {
- node* current=&head;
- std::unique_lock
lk(head.m) ; - while(node* const next=current->next.get())
- {
- std::unique_lock
next_lk(next->m) ; - if(p(*next->data)) // 18
- {
- std::unique_ptr
old_next=std::move(current->next); - current->next=std::move(next->next);
- next_lk.unlock();
- } // 20
- else
- {
- lk.unlock(); // 21
- current=next;
- lk=std::move(next_lk);
- }
- }
- }
-
- void insert_in_position(T const& value,const int& position){
- std::unique_ptr
new_node(new node(value)) ; - node* current=&head;
- node* next = current->next.get();
-
- std::unique_lock
lk(head.m) ; - for(int i = 0; i
- std::unique_lock
next_lk(next->m) ; - lk.unlock();
- current = next;
- lk = std::move(next_lk);
- next = current->next.get();
- }
- lk.unlock();
-
- std::unique_lock
cur_lk(current->m) ; - new_node->next = std::move(current->next);
- current->next = std::move(new_node);
- }
- };
-
- #endif
-
相关阅读:
PyTorch之张量的相关操作大全 ->(个人学习记录笔记)
38.CSS文本动画效果
微信小程序组件化
k8s运维问题整理
@ConditionalOnBean系列注解使用误区
python中protobuf和json互相转换应用
spring
【信号与系统】信号频谱和测量之汉明窗
预测杭州五一黄金周的旅游出行人数
策略引擎Kyverno
-
原文地址:https://blog.csdn.net/qq_52758467/article/details/133383048