• 利用多线程和queue实现生产-消费者模式--消息队列


    1、生产者-消费者模式

    生产者消费者模式并不是GOF提出的众多模式之一,但它依然是开发同学编程过程中最常用的一种模式,
    在这里插入图片描述

    生产者模块儿负责产生数据,放入缓冲区,这些数据由另一个消费者模块儿来从缓冲区取出并进行消费者相应的处理。该模式的优点在于:

    • 解耦:缓冲区的存在可以让生产者和消费者降低互相之间的依赖性,一个模块儿代码变化,不会直接影响另一个模块儿
    • 并发:由于缓冲区,生产者和消费者不是直接调用,而是两个独立的并发主体,生产者产生数据之后把它放入缓冲区,就继续生产数据,不依赖消费者的处理速度

    该模式常用消息队列来实现:

    消息队列是在消息的传输过程中保存消息的容器,消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息。从而实现解耦和并发

    2、多进程模块 multiprocessing

    操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现,python提供了Queue模块来专门实现消息队列:

    • Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

    • Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

    • Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

    • Queue.full():判断消息是否满

    • Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

    • Queue.put_nowait(item):相当于put(item,False)

    • Queue.get(block=True,timeout=None):获取一个消息,其他等同put

    以下两个函数用来判断消息对应的任务是否完成:

    • Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

    • Queue.join():实际上意味着等到队列为空,再执行别的操作

    from multiprocessing import Queue, Process
    q = Queue() 
    q.put(data)  #生产消息
    data = q.get() #消费消息
    
    • 1
    • 2
    • 3
    • 4

    3、使用多线程实现消息队列

    生产者函数:

    import time
    from multiprocessing import Queue, Process  #多进程实现
    from threading import Thread  # 多线程实现
    
    class Proceduer(Thread):
        '''定义一个xx队列'''
        def __init__(self,queue):
            super(Proceduer,self).__init__() # 超类 子类继承了父类的所有属性
            self.queue = queue  
    
        def run(self):
            try:
                for i in range(1,100):
                    print("put data is: {0} to queue".format(i))
                    self.queue.put(i)
                    time.sleep(1)
            except Exception as e :
                print("put data error : %s" %e)
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    消费者函数

    # -*- coding:utf-8 -*- 
    # author: 使用queue实现多进程生产消费者模式
    # data : 2022-11-02
    
    
    import time
    from multiprocessing import Queue, Process  #多进程实现
    from threading import Thread  # 多线程实现
    
    from log_send import Proceduer
    
    class Consumer_h3(Thread):
        def __init__(self,queue):
            super(Consumer_h3, self).__init__() 
            self.queue = queue        
    
        def run(self):
            try:
                while self.queue.empty:
                    number = self.queue.get() #取到消息值
                    if number %2 != 0:
                        print("get {0} from queue ODD".format(number))
                    else:
                        self.queue.put(number)  #将信息放回队列中
                time.sleep(1)
            except Exception as e :
                raise e
    
    class Consumer_top(Thread):
        def __init__(self,queue):
            super(Consumer_top, self).__init__()  # 超类 共享所有的属性
            self.queue = queue        
    
        def run(self):
            try:
                while self.queue.empty:
                    number = self.queue.get() #取到消息值
                    if number %2 == 0:
                        print("get {0} from queue top".format(number))
                    else:
                        self.queue.put(number)  #将信息放回队列中
                time.sleep(1)
            except Exception as e :
                raise e
    
    
    def main():
        '''实现一个生产者-消费者队列模式'''
        queues = Queue()  #实例化一个消息队列
    
        # 实例化生产者,并将消息队列作为参数来提供
        print(" Proceduer start!")
        p = Proceduer(queue = queues)  
    
        # 实例化消费者,并将消息队列作为参数来提供
        print(" Consumer_h3 start!")
        c1 = Consumer_h3(queue = queues) 
        c2 = Consumer_top(queue = queues)
    
        # 启动进程去做xx
        c1.start()
        c2.start()
        p.start()
    
        c1.join()
        c2.join()
        p.join()
        print("All threads terminate!")
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    运行结果打印

    Proceduer start!
     Consumer_h3 start!
    put data is: 1 to queue
    get 1 from queue ODD
    put data is: 2 to queue
    get 2 from queue top
    put data is: 3 to queue
    get 3 from queue ODD
    put data is: 4 to queue
    get 4 from queue top
    put data is: 5 to queue
    get 5 from queue ODD
    put data is: 6 to queue
    get 6 from queue top
    put data is: 7 to queue
    get 7 from queue ODD
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4、总结

    生产者消费者模式也是一种常见的设计模式,而消息队列是较好的实现方式之一,解耦和支持并发,还有其他不同的方式来实现消息队列,后面再继续补充。

  • 相关阅读:
    下篇 | 使用 🤗 Transformers 进行概率时间序列预测
    基于ASP.NET ZERO,开发SaaS版供应链管理系统
    任务拆解,悠然自得,自动版本的ChatGPT,AutoGPT自动人工智能AI任务实践(Python3.10)
    新建一个Python Jupyter Notebook的运行环境
    JS-(14)表单验证
    认识RocketMQ4.x架构设计
    Win10 系统下VisualStudio2019 配置Open3D-0.15.2(C++)
    超详细!魔改为中文sqlmap的使用教程
    程序员必知的8大排序 (java)
    Mysql基础篇(创建、管理、增删改表)
  • 原文地址:https://blog.csdn.net/cy15625010944/article/details/127657531