• [Common c/c++] 生产者消费者模型 using mutex/cv/semaphore


    前言:

    生产者消费者模型是老生常谈的话题,实现手段也是各种各样,不同的手段的 运行效率也是天壤之别。代码简洁度,数据安全性,运行稳定性,运行性能等等要素很难做到兼顾。




    基础模型 -> 大粒度锁 + 忙等(高效 , 高cpu)

    组件:

    mutex

    概述:

    优点:代码简洁易懂,方便阅读和修改,逻辑清晰。

    缺点:

    1)cpu和运行效率无法兼得,要么cpu忙(这往往是绝对无法接收的);

    2)要么运行效率无法得到保障(sleep间隔长了则效率低,短了则cpu忙);

    3)竞争数据的加锁粒度大,一次性把整个list都锁住了。不过这一点不是太大的问题,而且优化起来难度较高,一般属于无锁编程范畴。不属于严重的缺点。

    4)有多个消费者时,多个消费者之间会因为加锁的问题互相阻塞。

    代码:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. std::list<long> FIFO;
    7. std::mutex lock;
    8. long consumer_v = -1;
    9. long producer_v = 9999999;
    10. void consumer(){
    11. static long times=0;
    12. while(consumer_v!=0){
    13. std::unique_lock ul(lock);
    14. if(!FIFO.empty()){
    15. consumer_v =std::move(FIFO.front());
    16. FIFO.pop_front();
    17. times++;
    18. }else{
    19. //usleep(1); //降低轮询次数以节省cpu
    20. times++;
    21. }
    22. }
    23. printf("consumer times : %ld\n" , times);
    24. }
    25. void producer(){
    26. static long times=0;
    27. while(producer_v--!=0){
    28. std::unique_lock ul(lock);
    29. FIFO.push_back(producer_v);
    30. times++;
    31. }
    32. printf("producer times : %ld\n" , times);
    33. }
    34. int main()
    35. {
    36. std::thread cons(consumer);
    37. std::thread prod(producer);
    38. cons.join();
    39. prod.join();
    40. }

    以上代码中,cpu通常会达到200%,原因是 consumer 中需要判断FIFO 中是否有数据,如果没有数据要再次加锁和判断,因此这数据 busy check 代码结构,这个过程会非常耗费 cpu 。

    通过top命令查看:

      %CPU
     200.0

    可以通过usleep来降低轮询频率从而降低cpu ,但是弊端代码就是执行时间会变长。

    $ time ./1
    producer times : 9999999
    consumer times : 10002394

    real    0m16.661s
    user    0m19.614s
    sys     0m13.541s

    每次运行上述代码都会发现输出结果中,consumer times 的值会有很大波动,有时比 producer times 大几百,有时大几千,这些就是无用轮询的次数。




    改善CPU的基础模型-> 大粒度锁 + 多消费者-多生产者(低效率 , 低CPU)

    组件:

    semaphore

    概述:

    上一个模型除了忙等的缺点外,还有一个问题就是实际上只有一个消费者能够同时跳出阻塞状态,多个消费者会互相阻塞在 mutex 的 lock 上,如果共享资源可以通过读写锁进行访问,那么这就不是一个好的实现。

    需要注意的是,生产者消费者模型中,semaphore只是扮演通知者的角色,共享资源的保护还是要使用 mutex-like 组件进行。虽然说 mutex 是 semaphore 的二元化,但正是这二元化促成了排他性访问

    代码1:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include /* For O_* constants */
    8. #include /* For mode constants */
    9. #include
    10. const char* sempname = "test";
    11. #define FLAG O_CREAT
    12. #define DEFAULT_SEMP_CNT 0
    13. std::list<long> FIFO;
    14. std::mutex lock;
    15. sem_t* sem;
    16. long consumer_v = -1;
    17. long producer_v = 99999;
    18. void clearsemp(sem_t* sem)
    19. {
    20. int value=0;
    21. do{
    22. sem_trywait(sem);
    23. sem_getvalue(sem,&value);
    24. }while(value!=DEFAULT_SEMP_CNT);
    25. }
    26. void consumer(){
    27. static long times=0;
    28. while(consumer_v!=0){
    29. sem_wait(sem);
    30. std::unique_lock lk(lock);
    31. consumer_v = std::move(FIFO.front());
    32. FIFO.pop_front();
    33. lk.unlock();
    34. times++;
    35. printf("[%d]consumer times : %ld\n" ,gettid(), times);
    36. }
    37. }
    38. void producer(){
    39. static long times=0;
    40. while(producer_v--!=0){
    41. std::unique_lock lk(lock);
    42. FIFO.push_back(producer_v);
    43. sem_post(sem);
    44. lk.unlock();
    45. times++;
    46. printf("producer times : %ld\n" , times);
    47. }
    48. }
    49. int main()
    50. {
    51. umask(0);
    52. sem = sem_open(sempname,FLAG,0777,DEFAULT_SEMP_CNT);
    53. if(SEM_FAILED==sem){
    54. return 0;
    55. }
    56. clearsemp(sem);
    57. std::thread cons(consumer);
    58. std::thread cons1(consumer);
    59. std::thread prod(producer);
    60. cons.join();
    61. cons1.join();
    62. prod.join();
    63. clearsemp(sem);
    64. }

    以上代码的运行效率并不会比忙等要快,但是同样不会占用高CPU。信号量在某些场景下能发挥很好的效果,比如多生产者-多消费者模型。

    上面的代码并没有解决大粒度锁的问题,不论是 cons ,cons2 还是 prod 线程,他们都会互相阻塞在获取 lock 处。

    但是 cons线程在sem_wait处是有机会早点进入lock竞争的,使用这一点,可以减少同时竞争 lock 的 cons线程数量。

    优化代码1:

    注意:上述代码中,生产者在 post 信号的时候是没有束缚的,如果不控制生产量的话,会导致系统资源被耗尽。

    一种方法是判断 FIFO 的尺寸,如果 FIFO 已经满了,则停止本次生产,接着sleep一定时间等待消费者从队列中取走数据,然后判断队列是否为空或者是否降到一定阈值,如果满足则继续填充队列。这种方法有一个问题,那就是如果消费者突然间在短时间内把数据都取走了,那么生产者sleep的就是影响了效率,如果缩减sleep的周期,那么又会导致cpu升高。

    另外一种方法是使用两个信号量,此时不再使用一个信号量来管理整个队列的计数,而是预先把队列的上限确定下来,然后用两个信号量分别表示队列中空余(empty)位置的数量 和 已被使用(filled/full)位置的数量,这两个值的和是队列的上限。

    生产者的逻辑为 :

    1)wait 是否有empty位置可用(sem_wait(empty_sem)) 。

    2)一旦跳出阻塞则说明有被标记为 empty 的位置可用,即有未被填充的单元,那么 lock 队列(mutex_lock/sem_wait(二元sem),这里可以选择mutex,也可以用二元sem,mutex 也同时支持 线程和进程级别。

    3)填充队列。

    4)unlock 队列。

    5)post 增加一个 filled/full 可用位置(sem_post(filled_sem))。

    消费者逻辑为:

    1)wait 是否有 filled 位置可用(sem_wait(filled_sem))。

    2)  一旦跳出阻塞则说明有标记为的 filled 的位置可用,即有已经被填充的单元,那么 lock 队列(mutex_lock/sem_wait(二元sem),这里可以选择mutex,也可以用二元sem,mutex 也同时支持 线程和进程级别。

    3)从队列中取数据。

    4)unlock 队列

    5)post 增加一个 empty 可用位置(sem_post(empty_sem))。

    1. mutex = 1
    2. Full = 0 // Initially, all slots are empty. Thus full slots are 0
    3. Empty = n // All slots are empty initially
    4. //Solution for Producer –
    5. do{
    6. //produce an item
    7. wait(empty);
    8. wait(mutex);
    9. //place in buffer
    10. signal(mutex);
    11. signal(full);
    12. }while(true)
    13. //Solution for Consumer –
    14. do{
    15. wait(full);
    16. wait(mutex);
    17. // consume item from buffer
    18. signal(mutex);
    19. signal(empty);
    20. }while(true)




    改善CPU的基础模型 -> 大粒度锁 + 休眠唤醒 (低效率,低CPU)

    为了改善 cpu 忙等问题,可以使用休眠唤醒机制。把唤醒工作交给内核,达到在不进行 busy check 的前提下,还可以来提升等待线程的响应效率的目的。

    组件: 

    conditional variable

    代码1:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. std::list<long> FIFO;
    8. std::mutex lock;
    9. std::condition_variable cv;
    10. long consumer_v = -1;
    11. long producer_v = 99999;
    12. void consumer(){
    13. static long times=0;
    14. while(consumer_v!=0){
    15. std::unique_lock lk(lock);
    16. cv.wait(lk,[]{return !FIFO.empty();});
    17. consumer_v = std::move(FIFO.front());
    18. FIFO.pop_front();
    19. lk.unlock();
    20. times++;
    21. printf("consumer times : %ld\n" , times);
    22. }
    23. }
    24. void producer(){
    25. static long times=0;
    26. while(producer_v--!=0){
    27. std::unique_lock lk(lock);
    28. FIFO.push_back(producer_v);
    29. cv.notify_one();
    30. lk.unlock();
    31. times++;
    32. printf("producer times : %ld\n" , times);
    33. }
    34. }
    35. int main()
    36. {
    37. std::thread cons(consumer);
    38. std::thread prod(producer);
    39. cons.join();
    40. prod.join();
    41. }

    99999次的运行时间为:

    producer times : 99999
    consumer times : 99931
    ...

    consumer times : 99999

    real    0m8.045s
    user    0m0.899s
    sys     0m2.625s

    以上代码运行时,cpu的使用率很低,consumer times 的值和 producer times 的值完全一直,这说明不会出现无效的轮询。

    代码2:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. std::list<long> FIFO;
    9. std::mutex lock;
    10. std::condition_variable cv;
    11. long consumer_v = -1;
    12. long producer_v = 99999;
    13. void consumer(){
    14. static long times=0;
    15. while(consumer_v!=0){
    16. std::unique_lock lk(lock);
    17. cv.wait(lk,[]{return !FIFO.empty();});
    18. consumer_v = std::move(FIFO.front());
    19. FIFO.pop_front();
    20. lk.unlock();
    21. times++;
    22. printf("[%d]consumer times : %ld\n" ,gettid(), times);
    23. }
    24. }
    25. void producer(){
    26. static long times=0;
    27. while(producer_v--!=0){
    28. std::unique_lock lk(lock);
    29. FIFO.push_back(producer_v);
    30. cv.notify_all();
    31. lk.unlock();
    32. times++;
    33. printf("producer times : %ld\n" , times);
    34. }
    35. }
    36. int main()
    37. {
    38. std::thread cons(consumer);
    39. std::thread cons1(consumer);
    40. std::thread prod(producer);
    41. cons.join();
    42. cons1.join();
    43. prod.join();
    44. }

    上述代码启动了2个消费者线程,如果消费者运行缓慢的话,这种模式可以有效地提高效率。

    优缺点:

    优点:代码简洁移动,方便阅读和修改,逻辑清晰。通过睡眠的方式代替忙等,避免cpu占用过高。不会出现无效的消费轮询次数。

    缺点:避免cpu高负载忙等的代价是运行速度缓慢。对比忙等,可发现消耗的时间是忙等的 50倍。

    更多条件变量的介绍见:[modern c++] std::condition_variable 条件变量的使用_condition_variable里的stop_waiting是什么含义-CSDN博客文章浏览阅读357次。代码片段:std::mutex mut;std::queue data_queue;std::condition_variable data_cond;void data_preparation_thread(){while (more_data_to_prepare()){data_chunk const data = prepare_data();std::lock_guard lk(mut);_condition_variable里的stop_waiting是什么含义https://blog.csdn.net/ykun089/article/details/114735322




    其他:

    当我们锁住列表的时候,释放锁的时机要控制好,建议通过 std::move 把需要处理的数据从 FIFO 中拿出来,或者 通过拷贝的方式拷贝拿出来,然后立刻就把锁释放掉,这样不会影响其他线程加锁。不可以在锁住状态中执行耗时操作,除非你有充分的理由或者知道自己在干啥。

  • 相关阅读:
    B - Dungeon Master
    spring高级篇(三)
    react项目实现文件预览,比如PDF、txt、word、Excel、ppt等常见文件(腾讯云cos)
    单行、多行文本超出显示省略号
    C/C++语言 数据结构 创建邻接表存储的无向图及其邻接表的输出
    Nginx 服务器 SSL 证书安装部署
    Java_题目_集合的遍历方式_字符串/数字/字符
    MySQL MHA
    接口自动化测试专栏博客汇总
    Git Rebase-提交整洁之道
  • 原文地址:https://blog.csdn.net/ykun089/article/details/133786131