• 基于循环队列和信号量的生产和消费者模型


    这一节为什么要基于信号量来实现同一个模型,原因:

    1. void push(const T& in)
    2. {
    3. pthread_mutex_lock(&_lock);
    4. while(is_Full())
    5. {
    6. //这里说明阻塞队列是满的,需要让生产者等待
    7. pthread_cond_wait(&_pcond,&_lock);
    8. }
    9. //这里说明阻塞队列至少有一个空位可以插入
    10. _queue.push(in);
    11. //唤醒消费者去消费
    12. pthread_cond_signal(&_ccond);
    13. pthread_mutex_unlock(&_lock);
    14. }

    在我们访问公共资源的时候,消费者需要竞争同一把锁,然后还要继续判断是否能在临界区中生产数据。如果竞争到了锁然后判断不能生产数据,则需要继续等待。竞争锁需要消耗时间,判断等待也需要,这就导致了程序效率的低下。因此信号量的存在就解决了这一个问题:在竞争锁之前就提前预知了临界资源是否就绪。

    POSIX信号量

    信号量就相当于一个计数器,计数了临界资源中资源的数目,接下来我们学习使用它的接口:

    初始化信号量

    #include
    int sem_init(sem_t *sem, int pshared, unsigned int value);
    参数:
    pshared:0 表示线程间共享,非零表示进程间共享
    value :信号量初始值

    销毁信号量

    int sem_destroy(sem_t *sem);

    等待信号量

    功能:等待信号量,会将信号量的值减 1
    int sem_wait(sem_t *sem); //P()

    发布信号量

    功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1
    int sem_post(sem_t *sem);//V()

    基队于环形列的生产消型费模型

    引入循环队列

    循环队列是一个逻辑结构,是由数组演变而成。通过对数组的下标访问进行模运算变成一个循环访问,这就是循环队列。当消费者和生产者在同一位置时,可能循环队列为空,另一种可能就是生产者消费满了,消费者来不及消费。因此其余时间生产消费者都不在同一位置生产和消费数据。

    实现模型的三个必要条件

    1.生产者不能套消费者一圈。

    2.消费者不能超越生产者。

    3.如果生产者和消费者在同一个位置,队列为空让生产者先走,队列为满让消费者先走。

    引入信号量

    通过信号量我们可以预知知道队列中资源的数目,如果生产者套了消费者一圈,这时生产者就申请不到信号量,只有消费者能够申请,因此只有消费者可以消费,生产者必须等待。同理消费者把数据消费光和生产者在同一位置时,消费者申请不到信号量只有生产者可以,因此只有生产者可以生产而消费者必须等待,通过引入信号量满足了该模型的三个必要条件。

    代码实现

    简易版的代码实现:

    main.cc

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include "RingQueue.hpp"
    7. using namespace std;
    8. void *consumer(void *queue)
    9. {
    10. RingQueue<int> *rq = static_castint> *>(queue);
    11. while (true)
    12. {
    13. int result = 0;
    14. rq->pop(&result);
    15. cout << "消费者消费了一个数据:" << result << endl;
    16. sleep(1);
    17. }
    18. }
    19. void *product(void *queue)
    20. {
    21. RingQueue<int> *rq = static_castint> *>(queue);
    22. while (true)
    23. {
    24. //(1);
    25. int data = rand() % 10 + 1;
    26. rq->push(data);
    27. cout << "生产者生产了一个数据:" << data << endl;
    28. }
    29. }
    30. int main()
    31. {
    32. srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    33. RingQueue<int> *rq = new RingQueue<int>();
    34. pthread_t c, p;
    35. pthread_create(&c, nullptr, consumer, (void *)rq);
    36. pthread_create(&p, nullptr, product, (void *)rq);
    37. pthread_join(c, nullptr);
    38. pthread_join(p, nullptr);
    39. delete rq;
    40. return 0;
    41. }

    RingQueue.hpp 

    1. #include
    2. #include
    3. #include
    4. #include
    5. using namespace std;
    6. static const int gcap = 6;
    7. template <class T>
    8. class RingQueue
    9. {
    10. private:
    11. void P(sem_t *sem) // 申请信号量 --
    12. {
    13. int n = sem_wait(sem);
    14. assert(n == 0);
    15. (void)n;
    16. }
    17. void V(sem_t *sem) // 释放信号量 ++
    18. {
    19. int n = sem_post(sem);
    20. assert(n == 0);
    21. (void)n;
    22. }
    23. public:
    24. RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    25. {
    26. int n = sem_init(&_spaceSem, 0, cap);
    27. assert(n == 0);
    28. int m = sem_init(&_dataSem, 0, 0);
    29. assert(m == 0);
    30. _productStep = _consumerStep = 0;
    31. }
    32. void push(const T &in)
    33. {
    34. P(&_spaceSem); // 申请空间信号量
    35. _queue[_productStep++] = in;
    36. _productStep %= _cap;
    37. V(&_dataSem); // 释放数据信号量
    38. }
    39. void pop(T *out)
    40. {
    41. P(&_dataSem);
    42. *out = _queue[_consumerStep++];
    43. _consumerStep %= _cap;
    44. V(&_spaceSem);
    45. }
    46. ~RingQueue()
    47. {
    48. sem_destroy(&_spaceSem);
    49. sem_destroy(&_dataSem);
    50. }
    51. private:
    52. std::vector _queue;
    53. int _cap;
    54. sem_t _spaceSem;
    55. sem_t _dataSem;
    56. int _productStep;
    57. int _consumerStep;
    58. };

    引入计算任务:

    Task.hpp

    1. #include
    2. #include
    3. #include
    4. class CalTask
    5. {
    6. public:
    7. using func_t = std::function<int(int, int, char)>;
    8. CalTask() {}
    9. CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    10. {
    11. }
    12. std::string operator()()
    13. {
    14. int result = _callback(_x, _y, _op);
    15. char buffer[64];
    16. snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);
    17. return buffer;
    18. }
    19. std::string to_string()
    20. {
    21. char buffer[64];
    22. snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
    23. return buffer;
    24. }
    25. ~CalTask()
    26. {
    27. }
    28. private:
    29. int _x;
    30. int _y;
    31. char _op;
    32. func_t _callback;
    33. };
    34. int mymath(int x, int y, char op)
    35. {
    36. int result = 0;
    37. switch (op)
    38. {
    39. case '+':
    40. result = x + y;
    41. break;
    42. case '-':
    43. result = x - y;
    44. break;
    45. case '*':
    46. result = x * y;
    47. break;
    48. case '/':
    49. if (y == 0)
    50. {
    51. std::cerr << "div zero error" << std::endl;
    52. break;
    53. }
    54. result = x / y;
    55. break;
    56. case '%':
    57. if (y == 0)
    58. {
    59. std::cerr << "mod zero error" << std::endl;
    60. break;
    61. }
    62. result = x % y;
    63. break;
    64. }
    65. return result;
    66. }

    RingQueue.hpp

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. using namespace std;
    7. static const int gcap = 6;
    8. static const string symbol ="+-*/%";
    9. template <class T>
    10. class RingQueue
    11. {
    12. private:
    13. void P(sem_t *sem) // 申请信号量 --
    14. {
    15. int n = sem_wait(sem);
    16. assert(n == 0);
    17. (void)n;
    18. }
    19. void V(sem_t *sem) // 释放信号量 ++
    20. {
    21. int n = sem_post(sem);
    22. assert(n == 0);
    23. (void)n;
    24. }
    25. public:
    26. RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    27. {
    28. int n = sem_init(&_spaceSem, 0, cap);
    29. assert(n == 0);
    30. int m = sem_init(&_dataSem, 0, 0);
    31. assert(m == 0);
    32. _productStep = _consumerStep = 0;
    33. }
    34. void push(const T &in)
    35. {
    36. P(&_spaceSem); // 申请空间信号量
    37. _queue[_productStep++] = in;
    38. _productStep %= _cap;
    39. V(&_dataSem); // 释放数据信号量
    40. }
    41. void pop(T *out)
    42. {
    43. P(&_dataSem);
    44. *out = _queue[_consumerStep++];
    45. _consumerStep %= _cap;
    46. V(&_spaceSem);
    47. }
    48. ~RingQueue()
    49. {
    50. sem_destroy(&_spaceSem);
    51. sem_destroy(&_dataSem);
    52. }
    53. private:
    54. std::vector _queue;
    55. int _cap;
    56. sem_t _spaceSem;
    57. sem_t _dataSem;
    58. int _productStep;
    59. int _consumerStep;
    60. };

    main.cc

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include "RingQueue.hpp"
    7. #include "Task.hpp"
    8. using namespace std;
    9. void *consumer(void *queue)
    10. {
    11. RingQueue *rq = static_cast *>(queue);
    12. while (true)
    13. {
    14. CalTask result ;
    15. rq->pop(&result);
    16. cout << "消费者处理了一个任务:" << result() << endl;
    17. sleep(1);
    18. }
    19. }
    20. void *product(void *queue)
    21. {
    22. RingQueue *rq = static_cast *>(queue);
    23. while (true)
    24. {
    25. int x =rand()%10;
    26. int y =rand()%10;
    27. char op =symbol[rand()%symbol.size()];
    28. CalTask t(x,y,op, mymath);
    29. rq->push(t);
    30. cout << "生产者生产了一个任务:" << t.to_string() << endl;
    31. }
    32. }
    33. int main()
    34. {
    35. srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    36. RingQueue *rq = new RingQueue();
    37. pthread_t c, p;
    38. pthread_create(&c, nullptr, consumer, (void *)rq);
    39. pthread_create(&p, nullptr, product, (void *)rq);
    40. pthread_join(c, nullptr);
    41. pthread_join(p, nullptr);
    42. delete rq;
    43. return 0;
    44. }

    多线程多并发执行:

    想要实现多线程并发的进行消费和生产,我们只需要保证临界资源的访问安全,因此我们可以在访问临界资源的时候加上锁:

    但是这么加锁有一个问题:当我们申请锁成功后才能申请信号量,锁只有一把需要多个线程竞争后才能交付并且申请信号量也需要花费时间。因此我们可以先申请信号量,让多个线程先预定好共享资源中的资源,然后再申请锁进行临界资源的访问,这样做提高了程序的效率。

    最终代码:

    1. //RingQueue.hpp
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. using namespace std;
    8. static const int gcap = 6;
    9. static const string symbol ="+-*/%";
    10. template <class T>
    11. class RingQueue
    12. {
    13. private:
    14. void P(sem_t *sem) // 申请信号量 --
    15. {
    16. int n = sem_wait(sem);
    17. assert(n == 0);
    18. (void)n;
    19. }
    20. void V(sem_t *sem) // 释放信号量 ++
    21. {
    22. int n = sem_post(sem);
    23. assert(n == 0);
    24. (void)n;
    25. }
    26. public:
    27. RingQueue(const int &cap = gcap) : _queue(cap), _cap(cap)
    28. {
    29. int n = sem_init(&_spaceSem, 0, cap);
    30. assert(n == 0);
    31. int m = sem_init(&_dataSem, 0, 0);
    32. assert(m == 0);
    33. _productStep = _consumerStep = 0;
    34. pthread_mutex_init(&_pmutex,nullptr);
    35. pthread_mutex_init(&_cmutex,nullptr);
    36. }
    37. void push(const T &in)
    38. {
    39. P(&_spaceSem); // 申请空间信号量
    40. pthread_mutex_lock(&_pmutex);
    41. _queue[_productStep++] = in;
    42. _productStep %= _cap;
    43. pthread_mutex_unlock(&_pmutex);
    44. V(&_dataSem); // 释放数据信号量
    45. }
    46. void pop(T *out)
    47. {
    48. P(&_dataSem);
    49. pthread_mutex_lock(&_cmutex);
    50. *out = _queue[_consumerStep++];
    51. _consumerStep %= _cap;
    52. pthread_mutex_unlock(&_cmutex);
    53. V(&_spaceSem);
    54. }
    55. ~RingQueue()
    56. {
    57. sem_destroy(&_spaceSem);
    58. sem_destroy(&_dataSem);
    59. pthread_mutex_destroy(&_pmutex);
    60. pthread_mutex_destroy(&_cmutex);
    61. }
    62. private:
    63. std::vector _queue;
    64. int _cap;
    65. sem_t _spaceSem;
    66. sem_t _dataSem;
    67. int _productStep;
    68. int _consumerStep;
    69. pthread_mutex_t _pmutex;
    70. pthread_mutex_t _cmutex;
    71. };
    72. //Task.hpp
    73. #include
    74. #include
    75. #include
    76. class CalTask
    77. {
    78. public:
    79. using func_t = std::function<int(int, int, char)>;
    80. CalTask() {}
    81. CalTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    82. {
    83. }
    84. std::string operator()()
    85. {
    86. int result = _callback(_x, _y, _op);
    87. char buffer[64];
    88. snprintf(buffer, sizeof buffer, "%d %c %d =%d", _x, _op, _y, result);
    89. return buffer;
    90. }
    91. std::string to_string()
    92. {
    93. char buffer[64];
    94. snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
    95. return buffer;
    96. }
    97. ~CalTask()
    98. {
    99. }
    100. private:
    101. int _x;
    102. int _y;
    103. char _op;
    104. func_t _callback;
    105. };
    106. int mymath(int x, int y, char op)
    107. {
    108. int result = 0;
    109. switch (op)
    110. {
    111. case '+':
    112. result = x + y;
    113. break;
    114. case '-':
    115. result = x - y;
    116. break;
    117. case '*':
    118. result = x * y;
    119. break;
    120. case '/':
    121. if (y == 0)
    122. {
    123. std::cerr << "div zero error" << std::endl;
    124. break;
    125. }
    126. result = x / y;
    127. break;
    128. case '%':
    129. if (y == 0)
    130. {
    131. std::cerr << "mod zero error" << std::endl;
    132. break;
    133. }
    134. result = x % y;
    135. break;
    136. }
    137. return result;
    138. }
    139. //main.cc
    140. #include
    141. #include
    142. #include
    143. #include
    144. #include
    145. #include "RingQueue.hpp"
    146. #include "Task.hpp"
    147. using namespace std;
    148. string SelfName()
    149. {
    150. char name[64];
    151. snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
    152. return name;
    153. }
    154. void *consumer(void *queue)
    155. {
    156. RingQueue *rq = static_cast *>(queue);
    157. while (true)
    158. {
    159. CalTask result ;
    160. rq->pop(&result);
    161. cout << SelfName()<<"消费者处理了一个任务:" << result() << endl;
    162. sleep(1);
    163. }
    164. }
    165. void *product(void *queue)
    166. {
    167. RingQueue *rq = static_cast *>(queue);
    168. while (true)
    169. {
    170. int x =rand()%10;
    171. int y =rand()%10;
    172. char op =symbol[rand()%symbol.size()];
    173. CalTask t(x,y,op, mymath);
    174. rq->push(t);
    175. cout << SelfName()<<"生产者生产了一个任务:" << t.to_string() << endl;
    176. }
    177. }
    178. int main()
    179. {
    180. srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123456);
    181. RingQueue *rq = new RingQueue();
    182. pthread_t c[10], p[6];
    183. for(int i =0;i<10;++i)pthread_create(c+i, nullptr, consumer, (void *)rq);
    184. for(int i =0;i<6;++i)pthread_create(p+i, nullptr, product, (void *)rq);
    185. for(int i =0;i<10;++i)pthread_join(c[i], nullptr);
    186. for(int i =0;i<6;++i)pthread_join(p[i], nullptr);
    187. delete rq;
    188. return 0;
    189. }

    这里所说的消费者和生产者模型的高效性和上一节基于阻塞队列的是一样的,如果不清楚的话可以点击《基于阻塞队列的生产和消费模型》进行复习!!!到这里生产者和消费者模型讲解就结束了,下一节我们学习线程池,敬请期待!!!!!记得点赞多多支持喔!!!

  • 相关阅读:
    完整目标检测项目流程——从使用LabelImg标注到使用YOLOv5训练测试
    图形学-(视图变换,投影变换)
    【云原生】一文带你吃透FlexManager数据传入华为云IOT
    大模型改变了NLP的游戏规则了吗
    Linux 文件/目录管理
    2022最新调优、微服务、框架、分布式指南,我的“大厂”不是梦
    CleanClip for Mac 剪切板 粘贴工具 历史记录 安装(保姆级教程,新手小白轻松上手)
    备份和恢复Gitlab数据
    thinkphp6(tp6)创建定时任务
    【C++】vector 的常用接口
  • 原文地址:https://blog.csdn.net/m0_69005269/article/details/132698705