推荐文章:搞定python多线程和多进程 - morra - 博客园 (cnblogs.com)
现在很多操作系统,例如:MAC OS X、UNIX、LINUX、Windows等,都是支持多任务的操作系统,而多任务,就和字面意思相同,执行多个任务。例如:边听歌编写代码边百度这样,同时有三个任务正在运行,并且还有很多任务在后台执行,只是在桌面没有显示而已
现在都是多核CPU,自然可以执行多任务,CPU执行代码都是顺序执行的,而以前的单核CPU也可以执行多任务,其中的原理就是:
操作系统轮流让各个任务交替执行,任务A执行0.01秒,然后切换到任务B,也执行0.01秒,然后再切换到任务C,再执行0.01秒,依次执行,表面看,每个任务都是交替执行的,但是,因为CPU的执行速度很快,感觉上来说就像是同时执行一样
虽然说因为CPU速度快,从而使得我们感觉单核CPU也能执行多任务,但是真正的并行执行多任务只能在多核CPU上实现,由于任务数量远远多于CPU的核心数量,所以操作系统也会自动的把很多任务轮流调度到每个核心上执行
对于操作系统来说,一个任务就是一个进程(Process),例如打开一个浏览器,就是启动一个浏览器进程,打开一个记事本,就是启动一个记事本进程。有些进程会同时干多件事情,例如打开一个word,可以同时进行打字、拼写检查、打印等多件事情。
想要在一个进程内部干多件事情,就需要同时运行多个子任务,这些子任务又叫做线程
由于每个进程至少要干一件事,所以一个进程至少会有一个线程,并且可以有多个线程,多个线程可以同时执行,多线程执行方式和多进程是一样的:
由操作系统在多个线程之间快速切换,让每个线程都短暂的交替运行,看起来就像同时执行一样,而
真正执行多线程同样也需要多核CPU才能实现
之前编写的Python程序都是单任务的进程,也就是只有一个线程,如果想要执行多任务怎么办,有三种解决方案:
- 多进程模式:启动多个进程,每个进程虽然只有一个线程,但是多个进程可以一起执行多个任务
- 多线程模式:启动一个进程,在单个进程内,创建多个线程,多个线程一起执行多个任务
- 多进程+多线程模式:启动多个进程,每个进程内启动多个线程,可以同时执行更多的任务,但是这种模型负载,实际很少采用
同时执行多任务时,任务之间都是有关联的,需要相互通信和协调。例如:
任务A执行过程中必须要暂停,等待任务B执行完成之后任务A才能继续执行,任务C和任务D不能同时执行等
多进程、多线程的程序复杂度要远远高于之前的单进程单线程程序。因为复杂度高、调式困难,所以通常来说如果没有需求,是不会编写多任务程序的,但是有时必须要编写多任务才能实现需求,所以我们需要知道如何编写多任务的程序
小结:
线程是最小的执行单元,而进程由至少一个线程组成,如何调用进程和线程,这完全由操作系统决定,程序并不能决定什么时候执行,执行多长时间等
多进程和多线程的程序涉及到了同步、数据共享的问题,编写起来也更加复杂
注意:进程调度是操作系统决定的,千万不要在代码里假定哪个先执行
Uinx、Linux操作系统提供了一个fork()系统调用函数,这个函数非常特殊,普通的函数调用,调用一次,返回一次,但是使用fork()函数调用,调用一次,返回两次,这是因为操作系统自动把当前进程复制了一份,也就是父进程复制了一份子进程,然后,分别在父进程和子进程内返回
子进程永远返回0,而父进程返回子进程的ID,这样做的原因是:
一个父进程可以fork出很多的子进程,所以,父进程需要记住每个子进程的ID,子进程只需要调用
getppid()函数就可以拿到父进程的ID
Python的os模块封装了常见的系统调用,其中就包括fork,下面来看示例:
首先要知道:
os.getpid():可以获取当前进程的PID
os.getppid():可以获取当前进程的主进程的PID
#下面的代码只有在unix或者linux中才能正常运行,这里使用centos,mac的内核也是unix的一种,所以也可以执行
[root@centos-1 ~]# cat test.py
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import os
print("父进程ID为:%s" % os.getpid())
pid = os.fork()
if pid == 0:
print('子进程ID为:%s,父进程ID为:%s' %(os.getpid(),os.getppid()))
else:
print('父进程ID %s 创建了子进程ID %s' %(os.getpid(),pid))
#执行:
[root@centos-1 ~]# python test.py
父进程ID为:973
父进程ID 973 创建了子进程ID 974
子进程ID为:974,父进程ID为:973
#解析:
1、要看懂输出,就需要先认真看一下上面的概述,'使用fork()函数后,调用一次,返回两次,并且分别在父进程和子进程内返回',仔细读过这句话之后,我们可以知道,在'pid = os.fork()'之后,分成了两个进程分别执行之后的代码
2、从代码输出的第二行可以看出,代码先执行的是pid不等于0的语句,同样从上面的概述中可以知道,'子进程永远返回为0,父进程返回子进程的id',只有子进程才会返回0,所以,这里很明显是先执行的父进程,那么pid的值就等于子进程的id,也就是974,而os.getpid()可以得到当前进程的id也就是973,所以最终的输出为:'父进程ID 973 创建了子进程ID 974'
3、通过上面的解析后,最后一行输出就很好理解了,父进程执行完成、返回之后,开始执行子进程,因为子进程永远返回为0,所以pid为0,然后执行if判断为True的语句,os.getpid()得到当前进程id,也就是974,os.getppid()得到当前进程的父进程id,也就得到了973,最终输出:'子进程ID为:974,父进程ID为:973'
有了fork调用,一个进程在接到新任务时就可以复制一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当收到新的http请求时,就会fork出子进程去处理新的http请求
由于Windows没有fork调用,所以上面的fork示例并不适用于windows,而如果想在windows上编写多进程的服务器程序,我们可以使用multiprocessing模块,multiprocessing模块是跨平台版本的多进程模块
multiprocessing模块提供了一个Process类来代表一个进程对象,下面来看示例:
在编写脚本之前,要知道:
- process模块使用:
#作用: process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建 #使用: Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) #参数: group:进程所属组,基本上不用,参数没有使用则值始终为None target:表示调用对象,即子进程将要执行的任务 args:表示调用对象的位置参数元组,例如 args=(1,2,'test',) kwargs:表示调用对象的字典,例如 kwargs={'name':'zhangsan','age':22} name:设置子进程的名称 #注意事项: 1、每个参数需要使用关键字方式来指定,例如 name='test'这样的,指定参数为指定的值 2、args指定的是给target参数传入的位置参数,是一个元组,要记得在末尾加上逗号 #方法: 在创建实例p后 p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.join([timeout]):主线程等待实例p终止'(要注意主线程处于等待状态,但是p的子进程是处于运行状态的)',timeout是可选参数,指定超时时间,需要注意的是,p.join()只能终止p.start()开启的进程,而不能终止p.run开启的进程,通常用于进程之间的同步 p.terminate():用来杀死子进程的 p.is_alive():判断子进程是否存活,返回True为存活,False为已经关闭
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
#编写脚本,脚本在windows、linux、unix都能执行
#-*- coding: utf-8 -*-
from multiprocessing import Process
import os
def run_proc(name):
print("运行子进程,名称:%s,ID:%s" %(name,os.getpid()))
if __name__ == "__main__":
print("父进程ID:%s" % os.getpid())
p = Process(target=run_proc,args=('test',))
print('子进程开始执行')
p.start()
p.join()
print('子进程执行结束')
#执行输出
父进程ID:15460
子进程开始执行
运行子进程,名称:test,ID:15204
子进程执行结束
#解析
1、看过process的使用方法之后,这里就比较好理解了,p是创建的子进程实例,这个子进程会执行run_proc函数,并且传入参数'test'
2、p.start()开始执行p子进程,也就是执行run_proc函数,然后p.join()终止子进程,最后输出print
当需要启动大量的子进程时,我们可以使用进程池的方式批量创建新进程
Pool类可以提供指定数量的进程,当有新的请求提交到Pool池中时,如果池还没有满,那么就会创建一个新的进程来执行请求,如果Pool池满了,请求就会等待,直到池中有进程结束,才会创建新的进程来处理请求
Pool类的方法:
pool = multiprocessing.Pool(processes = 3) #最多允许3个进程
- 1
apply():此函数用于传递不定参数,主进程会被阻塞直到函数执行结束,不建议使用,3.x之后不再出现,函数参数:apply(func,args=(),kwds={})
- 1
apply_async():与上面的apply()方法用法一致,不同的是,此函数是非阻塞的并且支持结果返回后进行回调,函数参数:apply_async(func[,args=()[,kwds={}[,callback=None]]])
- 1
map():这个map方法和内置的map函数用法差不多,此方法会使进程阻塞直到结果返回,函数参数:map(func,iterable,chunksize=None) #虽然iterable是迭代器,但是实际使用时,必须在整个队列就绪后,程序才会执行子进程
- 1
map_async():和上面的map方法用法一致,但是此方法是非阻塞的,函数参数:map_async(func,iterable,chunksize,callback)
- 1
close():关闭进程池,不在接收新的任务terminal():结束工作进程,不再处理未处理的任务join():主进程阻塞,等待子进程的退出,join方法需要再close()或者terminate()之后使用
下面来看示例:
#-*- coding: utf-8 -*-
from multiprocessing import Pool
import os,time,random
def time_test(name):
print('运行进程名称 %s,ID %s' %(name,os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('执行进程 %s ID %s 所使用的时间是 %0.2f 秒' %(name,os.getpid(),(end - start)))
if __name__ == '__main__':
print('主进程ID %s'% os.getpid())
p = Pool(processes=4) #这里指定进程最多4个
for i in range(5):
p.apply_async(time_test,args=('test',))
print('等待所有进程执行完毕')
p.close()
p.join()
print('所有进程执行完毕')
#执行返回结果:
主进程ID 10708
等待所有进程执行完毕
运行进程名称 test,ID 12620
运行进程名称 test,ID 14176
运行进程名称 test,ID 16088
运行进程名称 test,ID 7744
执行进程 test ID 7744 所使用的时间是 0.64 秒
运行进程名称 test,ID 7744
执行进程 test ID 16088 所使用的时间是 0.91 秒
执行进程 test ID 12620 所使用的时间是 1.85 秒
执行进程 test ID 14176 所使用的时间是 2.17 秒
执行进程 test ID 7744 所使用的时间是 2.94 秒
所有进程执行完毕
#解析:
在看过process后,这里也比较好理解,但是可以看到输出信息中,在第四次运行time_test函数后,有一个进程先返回了,然后才执行第五次,这是因为当pool线程池数量满了之后,新进来的请求需要等待前面有进程处理完之后,再处理新的请求,因为设置了进程最多4个,而for循环的调用是5次,所以第5次请求就需要等待有进程执行完毕后才会执行新请求
'可以发现,执行第五次的进程id,是复用了一开始创建的进程'
注意:Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前,必须先调用close(),调用完close()之后就不能再添加新的进程了
很多时候,子进程并不是自身,而是一个外部进程,我们创建了子进程后,还需要控制子进程的输入和输出
subprocess模块可以启动一个子进程,然后控制子进程的输入和输出
下面来看关于subprocess的一些方法:
subprocess模块允许我们启动一个新进程,并连接到他们的输入、输出、错误管道,从而获取返回值
- subprocess.run()方法:Python 3.5中新增的函数。执行指定的命令,等待命令执行完成后返回一个包含执行结果的CompletedProcess类的实例,参数:
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False)
- 1
- subprocess.call()方法:执行指定的命令,返回命令执行状态,其功能类似于os.system(cmd),参数:
subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)
- 1
- subprocess.check_call()方法:Python 2.5中新增的函数。 执行指定的命令,如果执行成功则返回状态码,否则抛出异常。其功能与subprocess.run(…, check=True)相同,参数:
subprocess.check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None)
- 1
- subprocess.check_output()方法:Python 2.7中新增的的函数。执行指定的命令,如果执行状态码为0则返回命令执行结果,否则抛出异常,参数:
subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)
- 1
- subprocess.getoutput()方法:接收字符串格式的命令,执行命令并返回执行结果,其功能类似于os.popen(cmd).read()和commands.getoutput(cmd),参数:
subprocess.getstatusoutput(cmd)
- 1
- subprocess.getstatusoutput()方法:执行cmd命令,返回一个元组,元组中有命令执行状态, 命令执行结果输出,其功能类似于commands.getstatusoutput(),参数:
subprocess.getoutput(cmd)
- 1
- 参数解析:
1、'args': 要执行的shell命令,默认应该是一个字符串序列,如['df', '-Th']或('df', '-Th'),也可以是一个字符串,如'df -Th',但是想要直接使用linux命令的话,需要把shell参数的值设置为True 2、'shell': 如果shell为True,那么指定的命令将通过shell执行。如果我们需要访问某些shell的特性,如管道符、文件名通配符、环境变量扩展功能,这将是非常有用的。当然,python本身也提供了许多类似shell的特性的实现,如glob、fnmatch、os.walk()、os.path.expandvars()、os.expanduser()和shutil等 3、'check': 如果check参数的值是True,且执行命令的进程以非0状态码退出,则会抛出一个CalledProcessError的异常,且该异常对象会包含 参数、退出状态码、以及stdout和stderr(如果它们有被捕获的话) 4、'stdout, stderr:input':`该参数是传递给Popen.communicate()`,通常该参数的值必须是一个字节序列,如果universal_newlines=True,则其值应该是一个字符串 ---------------------------------------------------------------------- - run()函数默认不会捕获命令执行结果的正常输出和错误输出,如果我们向获取这些内容需要传递subprocess.PIPE,然后可以通过返回的CompletedProcess类实例的stdout和stderr属性或捕获相应的内容 - call()和check_call()函数返回的是命令执行的状态码,而不是CompletedProcess类实例,所以对于它们而言stdout和stderr不适合赋值为subprocess.PIPE - check_output()函数默认就会返回命令执行结果,所以不用设置stdout的值,如果我们希望在结果中捕获错误信息,可以执行stderr=subprocess.STDOUT - `subprocess.PIPE表示为进程创建新的管道,subprocess.DEVNULL表示使用os.devnull(代表当前系统的回收站)` ---------------------------------------------------------------------- 5、'universal_newlines': 该参数影响的是输入与输出的数据格式,比如它的值默认为False,此时stdout和stderr的输出是字节序列;当该参数的值设置为True时,stdout和stderr的输出是字符串
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
通过使用上面的被封装后的高级函数可以很方便的完成一些常见需求,但是由于subprocess模块底层的进程创建和管理都是由Popen类的处理的,所以当我们无法通过上面的高级函数实现一些不常见的功能时,就可以通过subprocess.Popen类提供的API来完成这些需求
subprocess.Popen类用于在一个新的进程中执行一个子程序,下面来看Popen类的相关信息
- subprocess.Popen的构造函数:
class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startup_info=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=())
- 1
- 2
- 3
- 参数说明:
1、'args': 要执行的shell命令,可以是字符串,也可以是命令各个参数组成的序列。当该参数的值是一个字符串时,该命令的解释过程是与平台相关的,因此通常建议将args参数作为一个序列传递 2、'bufsize': 指定缓存策略,0表示不缓冲,1表示行缓冲,其他大于1的数字表示缓冲区大小,负数 表示使用系统默认缓冲策略。 3、'stdin, stdout, stderr': 分别表示程序标准输入、输出、错误句柄 4、'preexec_fn': 用于指定一个将在子进程运行之前被调用的可执行对象,只在Unix平台下有效 5、'close_fds': 如果该参数的值为True,则除了0,1和2之外的所有文件描述符都将会在子进程执行之前被关闭 6、'shell': 该参数用于标识是否使用shell作为要执行的程序,如果shell值为True,则建议将args参数作为一个字符串传递而不要作为一个序列传递 7、'cwd': 如果该参数值不是None,则该函数将会在执行这个子进程之前改变当前工作目录 8、'env': 用于指定子进程的环境变量,如果env=None,那么子进程的环境变量将从父进程中继承。如果env!=None,它的值必须是一个映射对象 9、'universal_newlines': 如果该参数值为True,则该文件对象的stdin,stdout和stderr将会作为文本流被打开,否则他们将会被作为二进制流被打开 10、'startupinfo和creationflags': 这两个参数只在Windows下有效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如主窗口的外观,进程优先级等
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 可调用方法:
1、'Popen.poll()':用于检查子进程(命令)是否已经执行结束,没结束返回None,结束后返回状态码 2、'Popen.wait(timeout=None)':等待子进程结束,并返回状态码;如果在timeout指定的秒数之后进程还没有结束,将会抛出一个TimeoutExpired异常 3、'Popen.communicate(input=None, timeout=None)':该方法可用来与进程进行交互,比如发送数据到stdin,从stdout和stderr读取数据,直到到达文件末尾 ------------------------------------------------------------------------------------- #Popen.communicate()方法注释: - 该方法中的可选参数 input 应该是将被发送给子进程的数据,或者如没有数据发送给子进程,该参数应该是None。input参数的数据类型必须是字节串,如果universal_newlines参数值为True,则input参数的数据类型必须是字符串 - 该方法返回一个元组(stdout_data, stderr_data),这些数据将会是字节串或字符串,如果universal_newlines的值为True - 如果在timeout指定的秒数后该进程还没有结束,将会抛出一个TimeoutExpired异常。捕获这个异常,然后重新尝试通信不会丢失任何输出的数据。但是超时之后子进程并没有被杀死,为了合理的清除相应的内容,一个好的应用应该手动杀死这个子进程来结束通信 - 需要注意的是,这里读取的数据是缓冲在内存中的,所以,如果数据大小非常大或者是无限的,就不应该使用这个方法 ----------------------------------------------------------------------------------------- 4、'Popen.send_signal(signal)':发送指定的信号给这个子进程 5、'Popen.terminate()':停止该子进程 6、'Popen.kill()':杀死该子进程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
下面来看示例,执行nslookup www.baidu.com命令
#-*- coding: utf-8 -*-
import subprocess
print('$ nslookup www.baidu.com')
r = subprocess.call(['nslookup','www.baidu.com']) #执行指定命令,返回状态
print(r)
#输出:
$ nslookup www.baidu.com
服务器: UnKnown
Address: 10.10.11.41
非权威应答:
名称: www.a.shifen.com
Addresses: 110.242.68.3
110.242.68.4
Aliases: www.baidu.com
0
如果子进程需要输入,可以通过communicate()方法:
#-*- coding: utf-8 -*-
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'www.baidu.com\nexit') #使用\n换行符,可以执行多条命令
print(output.decode('unicode_escape')) #指定字符集,原文中是utf-8,但是windows报错了,所以改成unicode
print('Exit code:', p.returncode)
#输出:
$ nslookup
ĬÈÏ·þÎñÆ÷: UnKnown #因为是unicode所以是乱码
Address: 10.10.11.41
> ·þÎñÆ÷: UnKnown
Address: 10.10.11.41
Ãû³Æ: www.a.shifen.com
Addresses: 110.242.68.4
110.242.68.3
Aliases: www.baidu.com
> exit
Exit code: 0
#解析:
上面的代码相当于执行了'nslookup',然后手动输入'www.baidu.com'、'exit'
multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据#-*- coding: utf-8 -*-
from multiprocessing import Process,Queue
import os,time,random
def write(q):
print('写进程为:%s' % os.getpid())
for value in ['A','B','C']:
print('将 %s 放入队列' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('读进程为:%s' % os.getpid())
while True:
value = q.get(True)
print('在队列中获取 %s' % value)
if __name__ == '__main__':
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate() #杀死子进程
#输出结果:
写进程为:3576
将 A 放入队列
读进程为:880
在队列中获取 A
将 B 放入队列
在队列中获取 B
将 C 放入队列
在队列中获取 C
#解析
q是Queue的实例,Queue的方法put()可以将元素放入item队列,get()可以查看队列中的元素
get(True)可以控制是否使用报错,如果使用False的话,在队列没有值的话会报错,反之则不会,默认为True
- 在Linux或Unix下,可以使用
fork()调用实现多进程multiprocessing模块可以实现跨平台的多进程- 进程间通信可以通过
multiprocessing模块的Queue、Pipes方法来实现
多线程完成,一个进程至少拥有一个线程"线程",而不是模拟出的线程_thread和threading,_thread是低级模块,threading是高级模块,threading对_thread进行了封装,大部分情况下,通常只需要使用threading这个高级模块Thread实例,然后调用start()开始执行:首先要知道,任何进程默认都会启动一个线程,我们把这个线程称之为
主线程,主线程又可以启动新的线程而Python中的
threading模块有一个current_thread()函数,此函数永远返回当前线程的实例名称主线程的实例名称叫做
MainThread,子线程的名称是在创建时指定的,一般使用LoopThread来命名子线程,名称仅仅只是打印时用来显示,除此之外没有其他意义,如果不指定名称,则Python会自动以Thread-1、Thread-2这样的顺序取名
#-*- coding: utf-8 -*-
import time, threading
# 新线程执行的代码:
def loop():
print('线程 %s 正在运行' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('线程 %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('线程 %s 结束' % threading.current_thread().name)
print('线程 %s 正在运行' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('线程 %s 已经结束' % threading.current_thread().name)
#输出:
线程 MainThread 正在运行
线程 LoopThread 正在运行
线程 LoopThread >>> 1
线程 LoopThread >>> 2
线程 LoopThread >>> 3
线程 LoopThread >>> 4
线程 LoopThread >>> 5
线程 LoopThread 结束
线程 MainThread 已经结束
#解析
第一行输出的名称'MainThread',其实就是当前进程创建的主线程名称,然后创建了线程,并且命名为'LoopThread',当线程结束后,最后主线程也结束
多线程和多进程最大的不同就是:
多进程中,同一个变量,各自有一份拷贝保存在每个进程中,修改是互不影响的,但是在多线程中,所有的变量都会由所有线程共享,从而使任何一个变量都可以被任何线程修改,因此,线程之间共享数据最大的危险在于多个线程同时修改一个变量,使变量的值出现混乱
下面来看案例:
#-*- coding: utf-8 -*-
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(20000000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
#执行结果:
电脑速度过快的话,这里的输出可能还是0,可以把循环增加几百倍,我这里使用虚拟机用centos执行的,结果都是大于0的
#解析:
这里定义了共享变量'balance',初始值为0,并且启动两个线程去执行,从逻辑来看,现存后取,最终结果肯定也是0,但是由于线程的调度是由操作系统决定的,所以当两个线程交替执行并且循环次数足够多使,最终的结果就不一定为0了
出现上面的这种情况,是因为修改balance变量的值需要多条语句,而执行这几条语句时,线程可能会中断,从而导致多个线程把同一个对象的值改乱了
如果我们想要最终结果为0,那么就需要确保在一个线程修改balance变量时,别的线程不能修改。为了确保balance变量值计算正确,我们可以给change_it()方法上一把锁,锁的作用:
当某个线程开始执行
change_it()方法时,如果该线程获得了锁,那么其他线程是不能同时执行change_it()方法的,只能等待锁被释放后,获得该锁后才能继续执行,但是由于锁只有一个,所以无论有多少线程,同一时刻最多只能有一个线程持有锁,这样就不会造成修改的冲突
通过threading.Lock()方法可以创建锁,下面来看案例:
看之前需要知道
threading.Lock()的方法:
acquire():查询锁状态,如果锁是locked上锁状态,则同步阻塞,如果是unlocked非上锁状态,则将其上锁
release():解锁
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import time, threading
# 假定这是你的银行存款:
balance = 0
lock = threading.Lock() #创建实例
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n): #修改run_thread方法
for i in range(2000000):
lock.acquire() #先获取锁
try:
change_it(n)
finally:
lock.release() #执行完后要释放锁,否则会造成死锁
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
#执行结果:
0
#解析:
每次在执行change_it()函数时,需要先获取锁,然后等待执行完毕之后通过finally释放锁,这样在执行函数时就会只有一个线程执行,从而避免变量被多个线程修改
在多个线程同时执行lock.acquire()时,只有一个线程可以成功获取到锁,然后继续执行代码,其他线程需要等待锁释放之后才能获取到锁
锁的优缺点:
优点:锁确保了某段关键代码只能由一个线程从头到尾完整的执行
缺点:阻止了多线程并发执行,包含锁的某段代码实际上只能单线程执行,效率会下降很多。并且由于可以存在多个锁,在不同的线程持有不同的锁、并且试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行也无法结束,只能由操作系统强制终止
注意:获得锁的线程用完后一定要释放锁,否则其他的线程就会一直进行等待,成为死锁,我们可以使用try...finally来确保锁一定会释放
多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时又要小心死锁的发生
Python的线程虽然是真正的线程,但是解释器在执行代码时,有一个CIL(Global Interpreter Lock)全局锁,任何Python线程执行前,必须要先获取GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行
GIL全局锁实际上把所有线程的执行代码都上了锁,所以多线程在Python中只能交替执行,即使100个线程跑在100核CPU,也只能用到1核
GIL锁是Python解释器设计的历史遗留问题,通常我们用的解释器是官方的CPython,要真正利用多核,需要重写一个不带GIL的解释器,所以在Python中,可以使用多线程,但是无法有效利用多核,如果一定要通过多线程利用多核,那么只能通过C扩展来实现,不过这样就失去了Python简单易用的特点
不过Python虽然不能利用多线程实现多核任务,但是可以通过多进程实现多核任务,多个Python进程都各自拥有独立的GIL锁,互相不影响
一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己可以看到,不会影响其他线程,而使用全局变量的话就需要加锁,不然就可能使变量的值发生混乱def process_student(name):
std = Student(name) #编写了一个调用其他两个函数的函数,并且创建std实例,然后把实例传入其他函数,这里的std就相当于局部变量
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
Student对象,然后以这个字典本身获取线程对应的Student对象,下面来看案例:global_dict = {} #全局字典
def std_thread(name):
std = Student(name)
global_dict[threading.current_thread()] = std #写入键值,键值为 '当前线程名称':'std实例'
do_task_1()
do_task_2()
def do_task_1():
std = global_dict[threading.current_thread()] #根据当前线程名称取出std实例
...
def do_task_2():
std = global_dict[threading.current_thread()] #根据当前线程名称取出std实例
...
ThreadLocal,使用ThreadLocal不用查找字典,它会帮我们自动做这件事import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name)) #std是process_thread函数传入的name变量,第二个是线程名称
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
#执行结果:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个字典,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
小结
一个
ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
Master-Worker模式,其中Master负责分配任务,而Worker负责执行任务,因此多任务环境下,通常是一个Master和多个Worker多进程实现Master-Worker,主进程是Master,其他进程就是Worker
多线程实现Master-Worker,主线程是Master,其他进程就是Worker
多进程:
- 优点:多进程稳定性高,因为一个子进程崩溃了是不会影响主进程和其他子进程的。需要注意的是如果是主进程挂了,那么所有的进程就挂了,但是主进程只负责分配任务,挂掉的几率很低,Apache最早就是采用多进程模式
- 缺点:多进程创建进程的代价大,如果在Unix/Linux系统下,使用
fork()创建进程还行,但是如果是在Windows系统,那么创建进程开销会很大。并且操作系统能够同时运行的进程数量也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,那么操作系统连调度都会成为问题
多线程:
- 优点:多线程模式通常要比多进程快一点,但是也快不到哪里去。但是在Windows系统下,多线程的效率要比多进程高,微软的IIS服务器默认都是使用多线程模式
- 缺点:多线程模式的致命缺点就是
任何一个线程挂掉都可能会使整个进程崩溃,因为所有线程共享进程的内存,如果一个线程执行的代码除了问题,可能会看到提示“该程序执行了非法操作,即将关闭”,这样的提示往往是某个线程出现了问题,然后操作系统会强制结束整个进程
每次处理一个任务,然后依次做完,这种方式称之为单任务模型,或者批处理任务模型
处理一个任务一分钟然后切换到下一个任务,以此推类,依次循环往复,只要切换的速度够快,这种方式其实就和单核CPU执行多任务是一样的,这种方式就叫多任务模型。
但是切换任务也是有代价的,每次切换之前需要先保存当前执行的线程环境,例如CPU寄存器状态、内存页等,然后把新任务的执行环境准备好,即恢复上次的CPU寄存器状态、内存页等,准备好之后才能开始执行,这个过程虽然很快但是也会耗费时间,如果有几千个任务同时执行,操作系统可能会忙着切换任务,从而没有时间去执行任务,最常见的就是硬盘狂响,点击窗口桌面没有反应,系统处于假死状态
总之,多任务一旦多到一个限度,就会消耗掉系统的所有资源,使效率急剧下降,所有任务都做不好
计算密集型:
- 计算密集型任务的特点就是需要进行大量的计算,消耗CPU资源,例如计算圆周率、对视频进行高清解码等,主要依靠CPU的运算能力,这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低。
- 如果想要高效的利用CPU,那么计算密集型任务同时进行的数量应该和CPU的核数相等
- 计算密集型任务主要消耗CPU,因此,代码运行效率至关重要,Python这样的脚本语言运行效率很低,完全不适合计算密集型任务,对于计算密集型任务应该使用C语言编写
IO密集型:
- IO密集型任务涉及到网络、磁盘IO,这类任务的特点是CPU消耗少,任务的大部分时间都在等待IO操作完成,这是因为IO的速度远远低于CPU和内存的速度。
- 对于IO密集型任务,任务越多,CPU效率越高,但是也有一个限度。常见的大部分任务都是IO密集型任务,例如Web应用
- IO密集型任务在执行期间,99%的时间都花在IO上,花在CPU的时间很少,因为就算使用C语言替换Python,也不会提升运行效率,对于IO密集型任务,最合适的就算开发效率高、代码量少的语言,首选脚本语言,例如Pyhon,C语言最差
Thread线程和Process进程中,优先选择Process进程,因为进程更加稳定,并且,进程可以分布到多台机器上,而线程最多只能分布到同一台机器的多个CPU上multiprocessing模块不但支持多进程,其中的managers子模块还支持把多进程分布到多台机器上。managers模块封装很好,所以我们不必了解网络通信的细节,就可以编写分布式多进程程序,例如:当我们有一个通过
Queue通信的多进程程序在同一台机器上运行,而现在处理任务的进程任务繁重,需要把发送任务的进程和处理任务的进程分布到两台机器上可以这么做,原有的
Queue可以继续使用,我们通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问这个Queue了
因为原文的代码有点问题,这里使用别的案例,案例来源:https://blog.csdn.net/u011318077/article/details/88094583
使用模块
使用multiprocessing和queue模块
- 使用multiprocessing.managers中的BaseManager创建分布式管理器
- 使用Queue创建队列,用于多个进程之间的通信
服务进程test.py
逻辑步骤:
- 定义两个Queue队列,一个用于发送任务,一个接收结果
- 把上面创建的两个队列注册在网络上,利用register方法
- 绑定端口8001,设置验证口令,这个相当于对象的初始化
- 启动管理器,启动Queue队列,监听信息通道
- 通过管理实例的方法获访问网络中的Queue对象
- 添加任务,获取返回的结果
- 关闭服务
# -*- coding:utf-8 -*-
import random, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#1.定义两个Queue队列,tesk_queue用于发送任务,result_queue用于接收结果
task_queue = queue.Queue()
result_queue = queue.Queue()
#创建QueueManager类,继承BaseManager,用于后面创建管理器
class QueueManager(BaseManager):
pass
#定义两个函数,返回结果是Queue队列
def return_task_queue():
global task_queue #定义全局变量
return task_queue #返回发送任务的队列
def return_result_queue():
global result_queue
return result_queue #返回接收结果的队列
#2.利用register方法把上面创建的两个队列(也就是两个函数)注册在网络上
#callable参数关联了Queue对象,将Queue对象在网络中暴露
#QueueManager.register的第一个参数是注册在网络上队列的名称
def test():
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
#3.绑定端口8001,设置验证口令,这个相当于对象的初始化
#绑定端口并填写验证口令,windows下需要填写IP地址,Linux下默认为本地,地址为空
manager = QueueManager(address=('127.0.0.1', 8001), authkey=b'abc') #口令必须写成类似b'abc'形式,只写'abc'运行错误,其中口令为'abc'
#4.启动管理器,启动Queue队列,监听信息通道
manager.start()
#5.通过管理实例的方法获访问网络中的Queue对象,即通过网络访问获取任务队列和结果队列,创建了两个Queue实例,
task = manager.get_task_queue() #发送
result = manager.get_result_queue() #接收
#6.添加任务,获取返回的结果,将任务放到Queue队列中
for i in range(10):
n = random.randint(0, 10) #返回0到10之间的随机数
print("Put task %s ..." % n)
task.put(n) # 将n放入到任务队列中
#从结果队列中取出结果
print("Try get results...")
for i in range(11): # 注意,这里结果队列中取结果设置为11次,总共只有10个任务和10个结果,多1次主要是确认队列中是不是已经空了
#总共循环10次,上面放入了10个数字作为任务
#加载一个异常捕获
try:
r = result.get(timeout=5) # 每次等待5秒,取结果队列中的值
print("Result: %s" % r)
except queue.Empty: #如果上面代码出现异常则输出这个
print("result queue is empty.")
#7.关闭服务
#一定要关闭,否则会报管道未关闭的错误
manager.shutdown()
print("master exit.")
if __name__ == '__main__':
freeze_support() #Windows下多进程可能出现问题,添加这个可以缓解
print("Start!")
# 运行服务进程
test()
逻辑步骤
- 使用QueueManager注册用于获取Queue的方法名称
- 连接到服务器,也就是运行服务进程代码的机器
- 从网络上获取Queue对象,并进行本地化,与服务进程是同一个队列
- 从task队列获取任务,并把结果写入到resul队列
- 任务结束
# coding: utf-8
# 定义具体的任务进程,具体的工作任务是什么
import time, sys, queue
from multiprocessing.managers import BaseManager
#创建QueueManager类,继承BaseManager,用于后面创建管理器
class QueueManager(BaseManager):
pass
#1.使用QueueManager注册用于获取Queue的方法名称
#前面服务进程已经将队列名称暴露到网络中,该任务进程注册时只需要提供名称即可,与服务进程中队列名称一致
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#2.连接到服务器,也就是运行服务进程代码的机器
server_addr = '127.0.0.1'
print("Connet to server %s..." % server_addr)
#创建一个管理器实例,端口和验证口令保持与服务进程中完全一致
m = QueueManager(address=(server_addr, 8001), authkey=b'abc')
#连接到网络服务器
m.connect()
#3.从网络上获取Queue对象,并进行本地化,与服务进程是同一个队列
task = m.get_task_queue()
result = m.get_result_queue()
#4.从task队列获取任务,并把结果写入到resul队列,这里也循环10次,因为上面的服务队列也是计算10次
for i in range(10):
try:
#前面服务进程向task队列中放入了n,这里取出n,计算n和n相乘,并将相乘的算式和结果放入到result队列中去
n = task.get(timeout=1) #每次等待1秒后取出任务
print("run task %d * %d..." % (n, n))
r = '%d * %d = %d' % (n, n, n*n) #进行计算
time.sleep(1)
result.put(r) #把r的值返回到result队列中
except queue.Empty:
print("task queue is empty.")
# 任务处理结束
print("worker exit.")
