目录
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点:
1.阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护来。
2.生产者线程要向阻塞队列当中Push数据,若阻塞队列已经满了,那么此时该生产者需要进行等待,直到阻塞队列中有空间时再被唤醒。消费者线程要从阻塞队列当中Pop数据,若阻塞队列为空,那么此时该消费者需要进行等待,直到阻塞队列中有新的数据时再被唤醒。
3.需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。若是队列满了,生产者要进行等待,若是队列为空,消费者要进行等待。
4.当生产者或者消费者执行后,都要去唤醒另一方。
BlockQueue.hpp代码:
- #include
- #include
- #include
- #include
- #include
-
- using namespace std;
-
- const int capacity=5;
- template<class T>
- class BlockQueue
- {
- public:
- BlockQueue(int size=capacity)//初始化
- :size_(size)
- {
- pthread_mutex_init(&mutex_,nullptr);
- pthread_cond_init(&proCond_,nullptr);
- pthread_cond_init(&conCond_,nullptr);
- }
- ~BlockQueue()//变量的销毁
- {
- pthread_mutex_destroy(&mutex_);
- pthread_cond_destroy(&proCond_);
- pthread_cond_destroy(&conCond_);
- }
-
- void _push(const T in)
- {
- lockQueue();//生产之前先加锁
- while(is_full())
- {
- pthread_cond_wait(&proCond_,&mutex_);//若队列满了,则进行等待,同时把锁释放,再次醒来时会重新获取锁
- }
- _q.push(in);
- unlockQueue();//生产完后解锁
- pthread_cond_signal(&conCond_);//消费者可能在等待,要去唤醒
- }
-
- T _pop()
- {
- lockQueue();//消费之前先加锁
- while(_q.empty())
- {
- pthread_cond_wait(&conCond_,&mutex_);//若队列为空,则进行等待,同时把锁释放,再次醒来时会重新获取锁
- }
- T out=_q.front();
- _q.pop();
- unlockQueue();//消费完后解锁
- pthread_cond_signal(&proCond_);//唤醒生产者
- return out;
- }
- private:
- void lockQueue()
- {
- pthread_mutex_lock(&mutex_);
- }
- void unlockQueue()
- {
- pthread_mutex_unlock(&mutex_);
- }
- bool is_full()
- {
- return size_==_q.size();
- }
-
- queue
_q; - pthread_mutex_t mutex_;
- pthread_cond_t proCond_;
- pthread_cond_t conCond_;
- int size_;
- };
模拟任务task.cpp代码:
- #include
- #include
-
- class Task
- {
- public:
- Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op)
- {
- }
- int operator()()
- {
- return run();
- }
- int run()
- {
- int result = 0;
- switch (operator_)
- {
- case '+':
- result = elemOne_ + elemTwo_;
- break;
- case '-':
- result = elemOne_ - elemTwo_;
- break;
- case '*':
- result = elemOne_ * elemTwo_;
- break;
- case '/':
- {
- if (elemTwo_ == 0)
- {
- std::cout << "div zero, abort" << std::endl;
- result = -1;
- }
- else
- {
- result = elemOne_ / elemTwo_;
- }
- }
-
- break;
- case '%':
- {
- if (elemTwo_ == 0)
- {
- std::cout << "mod zero, abort" << std::endl;
- result = -1;
- }
- else
- {
- result = elemOne_ % elemTwo_;
- }
- }
- break;
- default:
- std::cout << "非法操作: " << operator_ << std::endl;
- break;
- }
- return result;
- }
- int get(int *e1, int *e2, char *op)
- {
- *e1 = elemOne_;
- *e2 = elemTwo_;
- *op = operator_;
- }
- private:
- int elemOne_;
- int elemTwo_;
- char operator_;
- };
BlockQueue.cpp代码:
- #include"BlockQueue.hpp"
- #include"task.hpp"
- #include
-
- const std::string ops = "+-*/%";
- void *productor(void *args)
- {
- BlockQueue
*bqp = static_cast *>(args); - while (true)
- {
- //制作任务
- int one = rand() % 50;
- int two = rand() % 20;
- char op = ops[rand() % ops.size()];
- Task t(one, two, op);
-
- bqp->_push(t);//生产任务
- cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;
- sleep(1);
- }
- }
- void *consumer(void *args)
- {
- BlockQueue
*bqp = static_cast *>(args); - while (true)
- {
- Task t = bqp->_pop(); // 消费任
-
- int result = t(); //处理任务
- int one, two;
- char op;
- t.get(&one, &two, &op);
- cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;
- }
- }
- int main()
- {
- srand((unsigned long)time(nullptr));
- BlockQueue
bq; //阻塞队列 -
- pthread_t tid1, tid2;
- pthread_create(&tid1, nullptr, consumer, &bq);
- pthread_create(&tid2, nullptr, productor, &bq);
-
- pthread_join(tid1, nullptr);
- pthread_join(tid2, nullptr);
- return 0;
- }
结果:

当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
概念:本质上就是一个计数器,去划分临界资源的数量。
每个执行流在进入临界区之前都要先申请信号量,申请成功就有了访问临界资源的权限,当操作完毕后再释放信号量。就是对信号量做加减操作。
信号量的PV操作
p操作:申请信号量为p操作,当申请成功,就获得了访问该临界资源的权限,同时,该临界资源的数量也减少了一份,所以计数器要减一。
v操作:释放信号量为v操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一。
补充:PV操作必须是原子性的。多个执行流访问临界资源也是竞争式的,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。当信号量值为零时被申请,那么该执行流会在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
初始化信号量函数:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数解释:
销毁信号量函数:
int sem_destroy(sem_t *sem);
申请信号量函数(等待):
int sem_wait(sem_t *sem);
申请成功后值减一
释放信号量函数(发布):
int sem_post(sem_t *sem);
释放信号量后值加一
上面这些函数调用成功返回0,失败返回-1。
当把信号量的值设置为1时,那么它说明临界资源的数量只有一分,信号量的作用基本等价于互斥锁。
- class Sem{
- public:
- Sem(int num)
- {
- sem_init(&_sem, 0, num);
- }
- ~Sem()
- {
- sem_destroy(&_sem);
- }
- void P()
- {
- sem_wait(&_sem);
- }
- void V()
- {
- sem_post(&_sem);
- }
- private:
- sem_t _sem;
- };
-
- Sem sem(1); //二元信号量
- int tickets=1000;
- void* getTickets(void* args)
- {
- string s=(char*)args;
- while(true)
- {
- sem.P();
- if(tickets>0)
- {
- usleep(10000);
- cout<
" "<<"抢到票了,"<<"票数还剩下:"<<--tickets<- sem.V();
- }
- else
- {
- cout<<"票已经完了"<<" "<
" "<<"退出了"<- sem.V();
- break;
- }
-
- }
- return nullptr;
- }
- int main()
- {
- pthread_t tid1;
- pthread_t tid2;
- pthread_t tid3;
- pthread_t tid4;
-
- pthread_create(&tid1,nullptr,getTickets,(void*)"pthread1");
- pthread_create(&tid2,nullptr,getTickets,(void*)"pthread2");
- pthread_create(&tid3,nullptr,getTickets,(void*)"pthread3");
- pthread_create(&tid4,nullptr,getTickets,(void*)"pthread4");
-
- pthread_detach(tid1);
- pthread_detach(tid2);
- pthread_detach(tid3);
- pthread_detach(tid4);
-
- while(true)
- {
- ;
- }
- return 0;
- }
-
相关阅读:
Mac上安装和配置Git
GIT | git只上传文件夹目录,不监控目录里面的文件的解决方法
VMware认证考试科目及课程内容
智能井盖传感器:城市安全卫士
华为交换机配置ACL
Grafana----Grafana快速体验
这有几个常见的电脑故障解决方法,需要的朋友快来
流媒体传输 - RTP 荷载 H265
基於RISC-V QEMU 仿真運行Linux 系統環境搭建
20220801使用安信可的ESP-01S模块实现WIFI的UART传输功能
-
原文地址:https://blog.csdn.net/m0_64397669/article/details/128060846