• Python编程-----并行处理应用程序


    目录

    一.进程

    二.线程

    三.Python标准库中并行处理的相关模块

    Threading模块

    (1)使用Thread对象创建线程

    (2)自定义派生于Thread的对象

    (3)线程加入join()

     (4)用户线程和daemon线程

    (5)Timer线程

    线程同步

    Lock/RLock对象:

    Condition对象:

    基于queue模块中队列的同步:

    基于Event的同步和通信:

    基于进程的并行计算:


    一.进程

    进程是操作系统中正在执行的应用程序的一个实例,每一个进程有自己的地址空间,一般包括文本区域、数据区域和堆栈区域。

    •文本区域:处理器执行的代码

    •数据区域:变量和进程执行期间使用的动态分配的内存

    •堆栈区域:活动过程调用的指令和本地变量

    二.线程

    线程是进程中的一个实体,是CPU独立调度和分派处理器时间的基本单位。

    线程由线程ID、当前指令指针、寄存器集合和堆栈组成,线程与同属一个进程的其他线程共享进程所拥有的全部资源。

    线程使程序能够执行并发处理,例如:

    •一个线程执行后台计算任务,另一线程监视输入

    •高优先级线程管理关键任务

    •服务器创建线程池,处理并发客户端请求

    一个进程包含多个线程。多个线程相对独立,线程有自己的上下文,切换受系统控制。

    三.Python标准库中并行处理的相关模块

    - _thread和_dummy_thread模块:底层低级线程API。

    - threading模块:线程及其同步处理。

    - multiprocessing模块:多进程处理和进程池。

    - concurrent.futures模块:启动并行任务。

    - queue模块:线程安全队列,用于线程或进程间信息传递。

    - asyncio模块:异步IO、事件循环和任务处理。

    Threading模块

    这里重点看threading模块:threading模块包括创建线程,启动线程和线程同步:

    - threading.Thread对象实例化,可创建线程;

    - Thread对象的start(),可启动线程。

    - 创建Thread的派生类,重写run(),其对象实例也可创建线程。

    - 设置线程对象的daemon属性,线程可设为用户线程daemon线程

    (1)使用Thread对象创建线程

    threading模块封装了_thread模块,虽然可使用_thread模块中的start_new_thread()函数创建线程,但一般建议使用threading模块

    Thread(target=None, name=None,args=(),kwargs={})

    Thread是线程运行函数;

    name是线程的名称;

    args和kwargs是传递给target的参数元组和命名参数字典。

    1. >>>t1 = threading.Thread(target=timer, args=(5,))
    2. >>>t1.start() #启动线程
    3. >>>t1.is_alive() #判断线程是否活动
    4. >>>t1.name #属性,线程名
    5. >>>t1.id #线程标识符
    6. >>>threading.get_ident() #返回当前线程的标识符
    7. >>>threading.current_thread() #返回当前线程
    8. >>>threading.active_count() #返回活动的线程数目
    9. >>>threading.enumerate() #返回线程的列表
    1. #创建和启动新线程
    2. import threading, time, random
    3. def timer(interval):
    4. for i in range(3):
    5. time.sleep(random.choice(range(interval))) #随机睡眠interval秒
    6. thread_id = threading.get_ident() #获取当前线程标识符
    7. print('Thread:{0} Time:{1}'.format(thread_id, time.ctime()))
    8. if __name__=='__main__':
    9. t1 = threading.Thread(target=timer, args=(5,)) #创建线程
    10. t2 = threading.Thread(target=timer, args=(3,)) #创建线程
    11. t1.start(); t2.start() #启动线程
    (2)自定义派生于Thread的对象

    声明Thread的派生类,重写对象的run方法,创建对象实例,通过对象的start方法,可启动线程,并自动执行对象的run方法,例如如下操作:

    1. import threading, time, random
    2. class MyThread(threading.Thread): #继承threading.Thread
    3. def __init__(self, interval): #构造函数
    4. threading.Thread.__init__(self) #调用父类构造函数
    5. self.interval = interval #对象属性
    6. def run(self): #定义run方法
    7. for i in range(3):
    8. time.sleep(random.choice(range(self.interval))) #随机睡眠interval秒
    9. thread_id = threading.get_ident() #获取当前线程标识符
    10. print(f'Thread:{thread_id}, time:{time.ctime()}')
    11. if __name__ == '__main__':
    12. t1 = MyThread(5) #创建对象
    13. t2 = MyThread(3)
    14. t1.start() #启动线程
    15. t2.start()
    (3)线程加入join()

    所谓线程加入t.join(),即让包含t.join()的线程(tc,即当前线程)“加入”到另外一个线程(t)的尾部。在线程(t)执行完毕之前,线程(tc)不能执行

    join(timeout=None)

    timeout是超时参数,单位为秒。如果指定了超时,则线程t执行完毕或者超时都可能使当前线程继续,此时可通过t.is_alive()来判断线程是否终止。

    注意:线程不能加入自己,否则导致RuntimeError,因为这样将导致死锁。线程也不能加入未启动的线程,否则将导致RuntimeError。

    1. import threading, time, random
    2. class MyThread(threading.Thread): #继承threading.Thread
    3. def __init__(self): #构造函数
    4. threading.Thread.__init__(self) #调用父类构造函数
    5. def run(self): #定义run方法
    6. for i in range(5):
    7. time.sleep(1) #睡眠1秒
    8. t = threading.current_thread() #获取当前线程
    9. print(f'{t.name} at {time.ctime()}')#打印线程名、当前时间
    10. print('线程t1结束')
    11. def test():
    12. t1 = MyThread() #创建线程对象
    13. t1.name = 't1' #设置线程名称
    14. t1.start() #启动线程
    15. print('主线程开始等待线程(t1)');
    16. t1.join(2) #主线程阻塞,等待t1结束或超时
    17. print('主线程等待线程(t1)2s结束')
    18. print('主线程开始等待线程结束');
    19. t1.join() #主线程阻塞,等待t1结束
    20. print('主线程结束')
    21. if __name__=='__main__':
    22. test()
     (4)用户线程和daemon线程

    线程可以分为用户线程和daemon线程:

    - 用户线程(非daemon线程)是通常意义的线程,应用程序运行即为主线程,在主线程中可以创建和启动新线程,默认为用户线程。只有当所有的非daemon的用户线程(包括主线程)结束后,应用程序终止。

    - daemon线程,又称守护线程,其优先级最低,一般为其它的线程提供服务。通常,daemon线程是一个无限循环的程序。所有非daemon线程都结束了,则daemon线程自动终止。

    - t.daemon = True 

    1. import threading, time
    2. class MyThread(threading.Thread): #继承threading.Thread
    3. def __init__(self, interval): #构造函数
    4. threading.Thread.__init__(self) #调用父类构造函数
    5. self.interval = interval #对象属性
    6. def run(self): #定义run方法
    7. t = threading.current_thread() #获取当前线程
    8. print('线程' + t.name + '开始')
    9. time.sleep(self.interval) #延迟self.interval秒
    10. print('线程' + t.name + '结束')
    11. class MyThreadDaemon(threading.Thread): #继承threading.Thread
    12. def __init__(self, interval): #构造函数
    13. threading.Thread.__init__(self) #调用父类构造函数
    14. self.interval = interval #对象属性
    15. def run(self): #定义run方法
    16. t = threading.current_thread() #获取当前线程
    17. print('线程' + t.name + '开始')
    18. while True:
    19. time.sleep(self.interval) #延迟self.interval秒
    20. print('daemon线程' + t.name + '正在运行')
    21. print('线程' + t.name + '结束')
    22. def test():
    23. print('主线程开始')
    24. t1 = MyThread(5) #创建线程对象
    25. t2 = MyThreadDaemon(1) #创建线程对象
    26. t1.name = 't1'; t2.name = 't2' #设置线程名称
    27. t2.daemon = True #设置为daemon
    28. t1.start() #启动线程
    29. t2.start()
    30. print('主线程结束')
    31. if __name__=='__main__':
    32. test()
    (5)Timer线程

    在实际应用中,经常使用定时器触发一些事件,而 threading中的Timer线程(Thread的子类),可以很方便实现定时器功能。

    Timer(interval,function,args=None,kwargs=None)

    #构造函数,在指定时间interval后执行函数function

    start() #启动线程,即启动计时器

    cancel() #取消计时器

    1. import threading
    2. def f():
    3. print('Hello Timer!') #创建定时器,1秒后运行
    4. timer = threading.Timer(1, f)
    5. timer.start()
    6. timer = threading.Timer(1, f) #创建定时器,1秒后运行
    7. timer.start()
    线程同步

    当多个线程调用单个对象的属性和方法时,一个线程可能会中断另一个线程正在执行的任务,使该对象处于一种无效状态(被占用),因此必须针对这些调用进行同步处理

    多种线程同步处理解决方案:Lock/RLock对象、Condition对象、Semaphore对象、Event对象

    Lock/RLock对象:

    - threading模块的Lock对象锁可实现线程简单同步。

    - threading模块的RLock是可重入的同步锁。如果一个线程在持有锁的情况下再次调用acquire()方法,那么它将会被阻塞,因为锁已经被这个线程所持有,导致死锁的发生。可重入锁的特点是允许同一个线程多次调用acquire()来获取锁,而不会发生死锁。

    注:每次调用acquire()都必须对应一次release()来释放锁,否则其他线程将无法再次获取该锁。

    acquire() #获取锁,锁会被系统变为blocked状态

    release() 

    #释放锁,锁进入unlocked状态,blocked状态的线程会受到一个通知,并有权利获得锁。多个线程处于blocked状态,系统会选择一个线程来获得锁(具体哪个线程获得与实现有关)

    1. import threading
    2. lock = threading.Lock()
    3. lock.acquire()
    4. #do something...
    5. lock.release()
    6. Lock对象支持with语句
    7. lock = threading.Lock()
    8. with lock:
    9. #do something...

     示例:

    创建工作线程,模拟银行现金帐户取款。多个线程同时执行取款操作时,如果不使用同步处理,会造成账户余额混乱;尝试使用同步锁对象Lock,以保证多个线程同时执行取款操作时银行现金帐户取款的有效和一致

    1. import threading, time, random
    2. class Account(threading.Thread): #继承threading.Thread
    3. lock = threading.Lock() #创建锁
    4. def __init__(self, amount): #构造函数
    5. threading.Thread.__init__(self) #调用父类构造函数
    6. Account.amount = amount #账户金额,类变量
    7. def run(self): #定义run方法
    8. self.withdraw() #取款
    9. def withdraw(self):
    10. Account.lock.acquire() #获取锁。注释不使用同步处理
    11. t = threading.current_thread()
    12. a = random.choice(range(50,101))
    13. if Account.amount < a:
    14. print(f'{t.name}交易失败。取款前余额:{Account.amount},取款额:{a}')
    15. Account.lock.release() #释放锁
    16. return 0 #拒绝交易
    17. time.sleep(random.choice(range(5))) #随机睡眠[0-5)秒
    18. prev = Account.amount
    19. Account.amount -= a #取款
    20. print(f'{t.name}取款前余额:{prev}, 取款额:{a}, 取款后额:{Account.amount}')
    21. Account.lock.release() #释放锁。注释不使用同步处理
    22. def test():
    23. for i in range(5): #创建5个线程对象并启动
    24. Account(200).start()
    25. if __name__=='__main__':
    26. test()
    Condition对象:

    线程A获得同步锁lock,在执行同步代码时需要某资源,而该资源由线程B提供。线程B无法获取线程A占用的同步锁lock,故无法提供线程A资源,从而导致死锁。

    - 解决死锁问题,可使用基于条件变量(condition)的线程间通信的通信机制

    1.线程A获得同步锁lock,在执行同步代码时,需要等待线程B占用的资源,则可以调用wait()/wait(毫秒数)方法阻塞当前线程运行,并释放其占用的同步锁lock;

    2.notify()方法,通知等待同步锁lock的线程B,线程B获取同步锁lock后执行资源生产或者释放资源操作,然后释放同步锁,并调用notify()通知线程A

    3.线程A获取同步锁lock,继续执行

    cv = threading.Condition(lock=None)

    #条件变量对象关联一个锁lock。创建时可传入参数lock,没有传入则默认自动创建一个。

    支持with语句,等价于cv.acquire()和cv.release()

    with cv:

            同步操作

    wait()/wait(timeout)  #释放锁,并阻塞当前线程,直到其他线程使用notify()和notifyAll()唤醒后重新获取锁

    notify()  #唤醒一个等待线程

    notifyAll() #唤醒所有线程

    1. import threading, time, random
    2. class Container1(): #基于同步和通信
    3. def __init__(self): #构造函数
    4. self.contents = 0 #容器内容
    5. self.available = False #容器内容
    6. self.cv = threading.Condition() #条件变量
    7. def put(self, value): #生产函数
    8. with self.cv: #使用条件变量同步
    9. if self.available: #如果已经生产过,则等待
    10. self.cv.wait() #等待
    11. self.contents = value #生产,设置内容
    12. t = threading.current_thread()
    13. print(f'{t.name}生产{self.contents}')
    14. self.available = True #设置容器状态:已生产
    15. self.cv.notify() #通知等待的消费者
    16. def get(self): #消费函数
    17. with self.cv: #使用条件变量同步
    18. if not self.available: #如果未生产,则等待
    19. self.cv.wait() #等待
    20. t = threading.current_thread()
    21. print(f'{t.name}消费{self.contents}')
    22. self.available = False #设置容器状态:未生产
    23. self.cv.notify() #通知等待的生产者
    24. class Container2(): #无同步和通信
    25. def __init__(self): #构造函数
    26. self.contents = 0 #容器内容
    27. self.available = False #容器内容
    28. def put(self, value): #生产函数
    29. if self.available: #如果已经生产
    30. pass
    31. else:
    32. self.contents = value #生产,设置内容
    33. t = threading.current_thread()
    34. print(f'{t.name}生产{self.contents}')
    35. self.available = True #设置容器状态:已生产
    36. def get(self): # 消费函数
    37. if not self.available: # 如果未生产,不操作
    38. pass
    39. else:
    40. t = threading.current_thread()
    41. print(f'{t.name}消费{self.contents}')
    42. self.available = False # 设置容器状态:未生产
    43. class Producer(threading.Thread): # 生产者类
    44. def __init__(self, container): # 构造函数
    45. threading.Thread.__init__(self) # 调用父类构造函数
    46. self.container = container # 容器
    47. def run(self): # 定义run方法
    48. for i in range(1, 6):
    49. time.sleep(random.choice(range(2))) # 随机睡眠[0-5)秒
    50. self.container.put(i) # 生产
    51. class Consumer(threading.Thread): #消费者类
    52. def __init__(self, container): #构造函数
    53. threading.Thread.__init__(self) #调用父类构造函数
    54. self.container = container #容器
    55. def run(self): #定义run方法
    56. for i in range(1,6):
    57. time.sleep(random.choice(range(2)))#随机睡眠[0-5)秒
    58. self.container.get() #消费
    59. def test1():
    60. print('基本同步和通信的生产者消费者模型:')
    61. container = Container1() #创建容器
    62. Producer(container).start() #创建消费者线程并启动
    63. Consumer(container).start() #创建消费者线程并启动
    64. def test2():
    65. print('无同步和通信的生产者消费者模型:')
    66. container = Container2() #创建容器
    67. Producer (container).start() #创建消费者线程并启动
    68. Consumer(container).start() #创建消费者线程并启动
    69. if __name__=='__main__':
    70. test1()
    基于queue模块中队列的同步:

    - 模块queue提供了适用于多线程编程的先进先出的数据结构(即队列),用来在生产者和消费者线程之间的信息传递。使用queue模块中的线程安全的队列,可以快捷实现生产者和消费者模型

    - Queue模块中包含三种线程安全的队列:Queue、LifoQueue和PriorityQueue。以Queue为例,其主要方法包括:

    (1)Queue(maxsize=0):构造函数,构造指定大小的队列。默认不限定大小

    (2)put(item, block=True, timeout=None):向队列中添加一个项。默认阻塞,即队列满的时候,程序阻塞等待

    (3)get(block=True, timeout=None):从队列中拿出一个项。默认阻塞,即队列为空的时候,程序阻塞等待

    1. #基于queue.Queue的生产者和消费者案例
    2. import time
    3. import queue
    4. import threading
    5. q = queue.Queue(10) #创建一个大小为10的队列
    6. def productor(i):
    7. while True:
    8. time.sleep(1) #休眠1秒钟,即每秒钟做一个包子
    9. q.put("厨师{}做的包子!".format(i)) #如果队列满,则等待
    10. def consumer(j):
    11. while True:
    12. print("顾客{}吃了一个{}\n".format(j, q.get()),end=’’) #如果队列空,则等待
    13. time.sleep(1) #休眠1秒钟,即每秒钟吃一个包子
    14. for i in range(3): #3个厨师不停做包子,
    15. t = threading.Thread(target=productor, args=(i,))
    16. t.start()
    17. for k in range(10): #10个顾客等待吃包子
    18. v = threading.Thread(target=consumer, args=(k,))
    19. v.start()
    基于Event的同步和通信:

    - Event相当于红绿灯信号,可用于主线程控制其他线程的执行。当flag为False时,其他的线程调用e.wait()阻塞等待这个信号;当设置flag为True时,等待的线程解除阻塞继续执行。

    - Event对象主要包括下列方法:

    (1)wait([timeout]):阻塞等待,直到Event对象的flag为True或超时

    (2)set():将flag设置为True

    (3)clear():将flag设置为False

    (4)isSet():判断flag是否为True

    1. import threading
    2. import random
    3. def f(i, e):
    4. e.wait() #检测Event的标志,如果是False则阻塞
    5. print(f"线程{i}的随机结果为{random.randrange(1,100)}")
    6. if __name__ == '__main__':
    7. event = threading.Event() #创建事件对象,默认标志为False
    8. for i in range(3): #创建3个线程并运行,默认阻塞等待Event
    9. t = threading.Thread(target=f, args=(i, event))
    10. t.start()
    11. ready = input('请输入1开始继续执行阻塞的线程:')
    12. if ready == "1":
    13. event.set() #设置Event的flag为True
    基于进程的并行计算:

    - multiprocessing模块提供了与进程相关的操作:创建进程、启动进程、进程同步

    - 模块multiprocessing还提供进程池和线程池

    - multiprocessing模块的API与threading.Thread类似,大大减少其使用复杂度

    注意:windows平台中所有与进程相关的代码必须放置在

    if __name__ == ‘__main__’:之中

    创建进程有两种方法,即创建Process的实例对象,创建Process的子类

    Process(target=None, name=None,args=(),kwargs={})

    target是进程运行的函数;

    name是进程的名称;

    args和kwargs是传递给target的参数元组和命名参数字典

    t.start() #启动线程

    t.is_alive() #判断线程是否活动

    t.join() #进程加入

    terminate() #终止进程

    t.name #进程名

    t.pid #进程id

    t.daemon #设置进程为用户进程(False)或Daemon进程(True)

    cpu_count() #可用的CPU核数

    current_process() #返回当前进程

    active_children() #活动的子进程

    log_to_stderr() #设置输出日志信息到标准错误输出(控制台)

    1. #使用Process对象创建和启动新进程
    2. import time, random
    3. import multiprocessing as mp
    4. def timer(interval):
    5. for i in range(3):
    6. time.sleep(random.choice(range(interval))) #随机睡眠interval秒
    7. pid = mp.current_process().pid #获取当前进程ID
    8. print('Process:{0} Time:{1}'.format(pid, time.ctime()))
    9. if __name__=='__main__':
    10. p1 = mp.Process(target=timer, args=(5,)) #创建进程
    11. p2 = mp.Process(target=timer, args=(5,)) #创建进程
    12. p1.start() #启动线程
    13. p2.start()
    14. p1.join()
    15. p2.join()

    模块multiprocessing为进程间提供了两种通信方法:Queue和Pipe

    - 模块multiprocessing中的Queue类似于queue.Queue,为进程间通信提供了一个线程和进程安全的队列

    - 模块multiprocessing中的Pipe()返回一个管道(包括两个连接对象),两个进程可以分别连接到不同的端的连接对象,然后通过其send()方法发送数据或者通过recv()方法接收数据。

    基于Queue队列的生产者和消费者模型

    1. import time
    2. import multiprocessing as mp
    3. def productor(i, q):
    4. while True:
    5. time.sleep(1) #休眠1秒钟,即每秒钟做一个包子
    6. q.put("厨师{}做的包子!".format(i)) #如果队列满,则等待
    7. def consumer(j, q):
    8. while True:
    9. print("顾客{}吃了一个{}" .format(j, q.get())) #如果队列空,则等待
    10. time.sleep(1) #休眠1秒钟,即每秒钟吃一个包子
    11. if __name__=='__main__':
    12. q = mp.Queue(10) #创建一个大小为10的队列
    13. for i in range(3): #3个厨师不停做包子,
    14. p = mp.Process(target=productor, args=(i,q))
    15. p.start()
    16. for k in range(10): #10个顾客等待吃包子
    17. p = mp.Process(target=consumer, args=(k,q))
    18. p.start()

    基于Pipe的进程间通信

    1. import multiprocessing as mp
    2. import time, random, itertools
    3. def consumer(conn):
    4. #从管道读取数据
    5. while True:
    6. try:
    7. item = conn.recv()
    8. time.sleep(random.randrange(2)) #随机休眠,代表处理过程
    9. print("consume:{}".format(item))
    10. except EOFError:
    11. break
    12. def producer(conn):
    13. #生产项目并将其发送到连接的管道上
    14. for i in itertools.count(1): #从1开始无限循环
    15. time.sleep(random.randrange(2)) #随机休眠,代表处理过程
    16. conn.send(i)
    17. print("produce:{}".format(i))
    18. if __name__=="__main__":
    19. # 创建管道,返回两个连接对象的元组
    20. conn_out, conn_in = mp.Pipe()
    21. # 创建并启动生产者进程,传入参数管道一端的连接对象
    22. p_producer = mp.Process(target=producer, args=(conn_out,))
    23. p_producer.start()
    24. # 创建并启动消费者进程,传入参数管道另一端的连接对象
    25. p_consumer = mp.Process(target=consumer, args=(conn_in,))
    26. p_consumer.start()
    27. #加入进程,等待完成
    28. p_producer.join(); p_consumer.join()
  • 相关阅读:
    前端:css中的多列的实现与介绍
    springboot 整合 es 配合使用 IK 分词器
    域名及地址正确外,若依后台无法正常加载页面和退出报404问题
    力扣刷题记录(Java)(一)
    Flowable-6.7.2:一个简单流程分析
    深入理解javascript对象
    剪枝Prune
    react18的使用技巧
    (二)算法分析
    Centos7安装nginx及网页如何放入nginx
  • 原文地址:https://blog.csdn.net/weixin_69884785/article/details/134417643