一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
不仅可以通过线程完成多任务,进程也是可以的

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
- # -*- coding: utf-8 -*-
- from multiprocessing import Process
- import time
-
-
- def run_proc():
- """子进程要执行的代码"""
- while True:
- print("----2----")
- time.sleep(1)
-
-
- if __name__ == '__main__':
- p = Process(target=run_proc)
- p.start()
- while True:
- print("----1----")
- time.sleep(1)
- # -*- coding: utf-8 -*-
- from multiprocessing import Process
- import os
-
-
- def run_proc():
- """子进程要执行的代码"""
- print('子进程运行中,pid=%d...' % os.getpid()) # os.getpid获取当前进程的进程号
- print('子进程将要结束...')
-
-
- if __name__ == '__main__':
- print('父进程pid: %d' % os.getpid()) # os.getpid获取当前进程的进程号
- p = Process(target=run_proc)
- p.start()
Process([group [, target [, name [, args [, kwargs]]]]])
Process创建的实例对象的常用方法:
Process创建的实例对象的常用属性:
- # -*- coding: utf-8 -*-
- from multiprocessing import Process
- import os
- from time import sleep
-
-
- def run_proc(name, age, **kwargs):
- for i in range(10):
- print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
- print(kwargs)
- sleep(0.2)
-
-
- if __name__ == '__main__':
- p = Process(target=run_proc, args=('test', 18), kwargs={"m": 20})
- p.start()
- sleep(1) # 1秒中之后,立即结束子进程
- p.terminate()
- p.join()
-
- """
- 子进程运行中,name= test,age=18 ,pid=142080...
- {'m': 20}
- 子进程运行中,name= test,age=18 ,pid=142080...
- {'m': 20}
- 子进程运行中,name= test,age=18 ,pid=142080...
- {'m': 20}
- 子进程运行中,name= test,age=18 ,pid=142080...
- {'m': 20}
- """
2.5 不同进程共享全局变量
- # -*- coding: utf-8 -*-
- from multiprocessing import Process
- import os
- import time
-
- nums = [11, 22]
-
-
- def work1():
- """子进程要执行的代码"""
- print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
- for i in range(3):
- nums.append(i)
- time.sleep(1)
- print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
-
-
- def work2():
- """子进程要执行的代码"""
- print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
-
-
- if __name__ == '__main__':
- p1 = Process(target=work1)
- p1.start()
- p1.join()
-
- p2 = Process(target=work2)
- p2.start()
-
- """
- in process1 pid=137576 ,nums=[11, 22]
- in process1 pid=137576 ,nums=[11, 22, 0]
- in process1 pid=137576 ,nums=[11, 22, 0, 1]
- in process1 pid=137576 ,nums=[11, 22, 0, 1, 2]
- in process2 pid=140280 ,nums=[11, 22]
- """
线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。
Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。
可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
Queue.get_nowait():相当Queue.get(False);
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
- # -*- coding: utf-8 -*-
- from multiprocessing import Queue
-
- q = Queue(3) # 初始化一个Queue对象,最多可接收三条put消息
- q.put("消息1")
- q.put("消息2")
- print(q.full()) # False
- q.put("消息3")
- print(q.full()) # True
-
- # 因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常
- try:
- q.put("消息4", True, 2)
- except:
- print("消息列队已满,现有消息数量:%s" % q.qsize())
-
- try:
- q.put_nowait("消息4")
- except:
- print("消息列队已满,现有消息数量:%s" % q.qsize())
-
- # 推荐的方式,先判断消息列队是否已满,再写入
- if not q.full():
- q.put_nowait("消息4")
-
- # 读取消息时,先判断消息列队是否为空,再读取
- if not q.empty():
- for i in range(q.qsize()):
- print(q.get_nowait())
-
- """
- False
- True
- 消息列队已满,现有消息数量:3
- 消息列队已满,现有消息数量:3
- 消息1
- 消息2
- 消息3
- """
Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据
- # -*- coding: utf-8 -*-
- from multiprocessing import Process, Queue
- import os, time, random
-
-
- # 写数据进程执行的代码:
- def write(q):
- for value in ['A', 'B', 'C']:
- print('Put %s to queue...' % value)
- q.put(value)
- time.sleep(random.random())
-
-
- # 读数据进程执行的代码:
- def read(q):
- while True:
- if not q.empty():
- value = q.get(True)
- print('Get %s from queue.' % value)
- time.sleep(random.random())
- else:
- break
-
-
- if __name__ == '__main__':
- # 父进程创建Queue,并传给各个子进程:
- q = Queue()
- pw = Process(target=write, args=(q,))
- pr = Process(target=read, args=(q,))
- # 启动子进程pw,写入:
- pw.start()
- # 等待pw结束:
- pw.join()
- # 启动子进程pr,读取:
- pr.start()
- pr.join()
- # pr进程里是死循环,无法等待其结束,只能强行终止:
- print('')
- print('所有数据都写入并且读完')
-
- """
- Put A to queue...
- Put B to queue...
- Put C to queue...
- Get A from queue.
- Get B from queue.
- Get C from queue.
- 所有数据都写入并且读完
- """
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务
- # -*- coding: utf-8 -*-
- from multiprocessing import Pool
- import os, time, random
-
-
- def worker(msg):
- t_start = time.time()
- print("%s开始执行,进程号为%d" % (msg, os.getpid()))
- # random.random()随机生成0~1之间的浮点数
- time.sleep(random.random() * 2)
- t_stop = time.time()
- print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))
-
-
- if __name__ == '__main__':
- po = Pool(3) # 定义一个进程池,最大进程数3
- for i in range(0, 10):
- # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
- # 每次循环将会用空闲出来的子进程去调用目标
- po.apply_async(worker, (i,))
- print("----start----")
- po.close() # 关闭进程池,关闭后po不再接收新的请求
- po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
- print("-----end-----")
-
- """
- ----start----
- 0开始执行,进程号为22380
- 1开始执行,进程号为22488
- 2开始执行,进程号为21156
- 2 执行完毕,耗时0.23
- 3开始执行,进程号为21156
- 1 执行完毕,耗时0.36
- 4开始执行,进程号为22488
- 0 执行完毕,耗时0.41
- 5开始执行,进程号为22380
- 5 执行完毕,耗时0.55
- 6开始执行,进程号为22380
- 3 执行完毕,耗时1.47
- 7开始执行,进程号为21156
- 4 执行完毕,耗时1.92
- 8开始执行,进程号为22488
- 7 执行完毕,耗时0.83
- 9开始执行,进程号为21156
- 6 执行完毕,耗时1.81
- 9 执行完毕,耗时0.53
- 8 执行完毕,耗时1.63
- -----end-----
- """
使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会报如下的错误:
RuntimeError: Queue objects should only be shared between processes through inheritance.
- # -*- coding: utf-8 -*-
- from multiprocessing import Manager, Pool
- import os, time, random
-
-
- def reader(q):
- print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
- for i in range(q.qsize()):
- print("reader从Queue获取到消息:%s" % q.get(True))
-
-
- def writer(q):
- print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
- for i in "itcast":
- q.put(i)
-
-
- if __name__ == "__main__":
- print("(%s) start" % os.getpid())
- q = Manager().Queue() # 使用Manager中的Queue
- po = Pool()
- po.apply_async(writer, (q,))
-
- time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
-
- po.apply_async(reader, (q,))
- po.close()
- po.join()
- print("(%s) End" % os.getpid())
-
- """
- (10840) start
- writer启动(19680),父进程为(10840)
- reader启动(20656),父进程为(10840)
- reader从Queue获取到消息:i
- reader从Queue获取到消息:t
- reader从Queue获取到消息:c
- reader从Queue获取到消息:a
- reader从Queue获取到消息:s
- reader从Queue获取到消息:t
- (10840) End
- """