• Python之进程,线程锁,Queue相关


    进程锁(multiprocess.Lock)

            在多线程或多进程并发编程中,可能会出现资源竞争的情况,导致数据出错或程序崩溃。为了解决这个问题,可以使用锁机制来控制对共享资源的访问。Python 提供了 threading 和 multiprocessing 模块中的 Lock 对象和 RLock 对象来实现进程锁。

            Lock 对象是最基本的锁,它是一种互斥锁,同一时刻只允许一个线程或进程访问共享资源。当一个线程或进程获取了 Lock 对象后,其他线程或进程必须等待锁被释放才能继续访问共享资源。

            RLock 对象是可重入锁,它可以被一个线程或进程多次获取,而不会导致死锁。当一个线程或进程获取了 RLock 对象后,可以再次获取该锁对象,但必须释放相同次数的锁,才能让其他线程或进程访问共享资源。

    以下是一个使用 Lock 对象的例子:

    1. import time
    2. from multiprocessing import Process, Lock
    3. def task(i, lock):
    4. # 上一把锁
    5. lock.acquire()
    6. print("进程%s来了" % i)
    7. time.sleep(1)
    8. print("进程%s走了" % i)
    9. # 释放锁
    10. lock.release()
    11. """只要你上了锁,一定别忘了最后释放锁,否则的话,别的进程永远进不来"""
    12. # 加锁一定好码? 虽然保证了数据的安全,但是呢,执行的效率一定是降低了
    13. # 有些场景该加锁的时候一定要加锁,
    14. if __name__ == '__main__':
    15. lock=Lock() # 得到一把锁
    16. for i in range(3):
    17. p = Process(target=task, args=(i+1, lock))
    18. p.start()

    以上代码中,我们创建了五个进程,每个进程都会获取锁对象并输出一条 “Hello from process” 的消息,然后释放锁。

    以下是一个使用 RLock 对象的例子:

    1. from multiprocessing import RLock, Process
    2. def f(lock, i):
    3. lock.acquire()
    4. print('Hello from process', i)
    5. lock.acquire()
    6. print('Hello again from process', i)
    7. lock.release()
    8. lock.release()
    9. if __name__ == '__main__':
    10. lock = RLock()
    11. processes = []
    12. for i in range(5):
    13. p = Process(target=f, args=(lock, i))
    14. processes.append(p)
    15. p.start()
    16. for p in processes:
    17. p.join()

    以上代码中,我们同样创建了五个进程,每个进程会获取 RLock 对象并输出两条消息,然后释放锁。需要注意的是,同一线程或进程可以多次获取 RLock 对象,但必须释放相同次数的锁,否则会出现死锁。

    如何查看进程号(pid)

    1. import time
    2. import os
    3. from multiprocessing import Process, Lock
    4. """有了进程号,我们就可以通过进程号来结束进程的执行 kill 9176 kill -9 9176"""
    5. # taskkill /pid {pid}
    6. def task():
    7. print("task进程的进程号:", os.getpid()) # os.getpid() 写在哪个进程里面就会输出哪个进程的进程号
    8. print("task进程的父进程的进程号:", os.getppid()) # parent process
    9. import time
    10. time.sleep(20)
    11. if __name__ == '__main__':
    12. p=Process(target=task, )
    13. p.start()
    14. print('子进程的进程号:', p.pid)
    15. print("主进程的进程号", os.getpid())
    16. time.sleep(10)

    进程之间数据隔离问题

    在 Python 中,每个进程都有自己的内存空间和资源,因此数据也会被隔离在各自的进程中。这意味着一个进程无法直接访问另一个进程中的数据。

    不过,Python 提供了一些机制来允许进程之间传递数据,包括:

    1. 使用 IPC(进程间通信)机制,例如管道、消息队列、共享内存等。这些机制允许不同进程之间进行数据传递和同步,但需要考虑一些同步和竞争的问题。

    2. 使用网络通信,例如使用 Socket 进行进程间通信。这也需要考虑一些同步和竞争的问题,但具有更好的灵活性和扩展性。

    3. 使用外部存储,例如文件或数据库等进行数据共享。这种方法需要考虑访问冲突和同步问题,但在某些情况下可以提供更稳定和可扩展的解决方案。

    需要注意的是,进程之间的数据隔离问题是一个比较复杂的问题,需要根据具体情况选择合适的解决方案,并确保正确地处理同步和竞争问题。例如:

    1. from multiprocessing import Process
    2. def work():
    3. global n
    4. n=0
    5. print('子进程内: ',n)
    6. if __name__ == '__main__':
    7. n = 100
    8. p=Process(target=work)
    9. p.start()
    10. print('主进程内: ',n)

    队列(multiprocess.Queue)

    Python中的Queue是一个线程安全的队列类,其中可以存储任意类型的数据。Queue实现了FIFO(先进先出)的算法。在Python中,Queue类有几种不同的实现,包括:

    1. queue.Queue:这是最常用的队列实现。它是线程安全的,支持多个生产者和消费者。

    2. queue.LifoQueue:这是一个LIFO(后进先出)队列。它同样是线程安全的。

    3. queue.PriorityQueue:这是一个基于优先级的队列。队列中的元素会按照优先级顺序(从小到大)被取出。它也是线程安全的。

    Queue类的一些常用方法包括

            Queue([maxsize]):创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
    Queue的实例q具有以下方法:

            q.get( [ block [ ,timeout ] ] ):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

            q.get_nowait() :同q.get(False)方法。

            q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

            q.qsize( ) :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

            q.empty( ) :如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

            q.full( ) :如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

    其他方法(了解)

            q.close( ) :关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

            q.cancel_join_thread( ) :不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

            q.join_thread( ) :连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

    1. '''
    2. multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    3. 都是基于消息传递实现的,但是队列接口
    4. '''
    5. from multiprocessing import Queue
    6. q=Queue(3)
    7. #put ,get ,put_nowait,get_nowait,full,empty
    8. q.put(3)
    9. q.put(3)
    10. q.put(3)
    11. # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
    12. # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    13. try:
    14. q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    15. except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    16. print('队列已经满了')
    17. # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    18. print(q.full()) #满了
    19. print(q.get())
    20. print(q.get())
    21. print(q.get())
    22. # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    23. try:
    24. q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    25. except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    26. print('队列已经空了')
    27. print(q.empty()) #空了

    如何开启线程以及线程守护

    开启线程

    1. import time
    2. def task(a, b):
    3. print("from task")
    4. time.sleep(2)
    5. print("aaa")
    6. from multiprocessing import Process
    7. if __name__ == '__main__':
    8. p=Process(target=task)
    9. p.start()

    线程守护

    在Python中,通过设置线程的daemon属性为True来实现线程守护。线程守护是一种特殊的线程,它会在主线程退出时自动结束,无论其是否完成。具体步骤如下:

    1. 在创建线程对象时,设置daemon属性为True。
    2. 在运行线程之前,通过setDaemon()方法将daemon属性设置为True。
    3. 在主线程中使用join()方法等待子线程完成。

    示例代码如下:

    1. import threading
    2. import time
    3. def worker():
    4. print("Start worker thread.")
    5. time.sleep(5)
    6. print("Worker thread finished.")
    7. # 创建子线程,设置daemon属性为True
    8. t = threading.Thread(target=worker, daemon=True)
    9. # 运行子线程
    10. t.start()
    11. # 等待子线程,这里并不需要使用join()方法
    12. time.sleep(1)
    13. print("Main thread finished.")

    在上面的示例代码中,使用了daemon=True来创建子线程,并在主线程中等待了1秒。由于子线程的daemon属性被设置为True,所以它会在主线程结束时自动退出,不需要使用join()方法来等待其完成。

    生产者消费者模型

            在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

            在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

            生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    基于队列实现生产者消费者模型

            在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题
    该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度

    在Python中实现生产者消费者模型可以使用队列(Queue)来实现。队列是线程安全的,可以在多线程环境中安全的进行读写操作。

    以下是一个简单的生产者消费者模型的Python代码示例:

    1. import queue
    2. import threading
    3. import time
    4. MAX_QUEUE_SIZE = 10
    5. def producer(q):
    6. while True:
    7. if q.qsize() < MAX_QUEUE_SIZE:
    8. item = produce_item()
    9. q.put(item)
    10. print(f"Produced item {item}")
    11. time.sleep(1)
    12. def consumer(q):
    13. while True:
    14. if not q.empty():
    15. item = q.get()
    16. consume_item(item)
    17. print(f"Consumed item {item}")
    18. time.sleep(2)
    19. def produce_item():
    20. # 产生一个随机的item
    21. return str(time.time())
    22. def consume_item(item):
    23. # 消费item
    24. pass
    25. if __name__ == '__main__':
    26. q = queue.Queue(maxsize=MAX_QUEUE_SIZE)
    27. p = threading.Thread(target=producer, args=(q,))
    28. c = threading.Thread(target=consumer, args=(q,))
    29. p.start()
    30. c.start()

    在这个例子中,我们定义了一个最大队列长度为10的队列,一个生产者线程和一个消费者线程。生产者线程不断地生成item,并将它们放入队列中,如果队列已经满了,则暂停生产。消费者线程不断地从队列中取出item,并进行消费,如果队列为空,则暂停消费。

    注意,在这个例子中我们使用了Python的多线程模块(threading),而不是多进程模块(multiprocessing)。这是因为在Python中,多进程模块比多线程模块更适合于CPU密集型任务,而多线程模块则更适合于I/O密集型任务,因为多线程模块的线程比多进程模块的进程更轻量级,更容易切换和管理。

    END

  • 相关阅读:
    从零开始的C++(十四)
    Docker部署服务(实战)
    跟我学C++中级篇——Pimpl中的unique_ptr
    JVM上篇之虚拟机与java虚拟机介绍
    【综合笔试题】30. 串联所有单词的子串
    uniapp分包
    Confluence漏洞学习——CVE-2021-26084/85,CVE-2022-26134漏洞复现
    acwing算法基础之基础算法--求逆序对的数目
    SpringMVC 环境配置
    一、深度学习介绍
  • 原文地址:https://blog.csdn.net/qq_38104453/article/details/133906526