multiprocessing
threading
asyncio
IO 指输入输出,有文件 IO 和网络 IO,如文件读写、数据库读写、网络请求(爬虫)
好用的多线程目标:
| 方案 | 优点 | 缺点 | 耗时/s |
|---|---|---|---|
| 基准 | 33.05 | ||
| _thread | 1. 后台运行 2. 适合 GUI | 1. 需要程序一直运行 2. 难以获取返回值 | 142.75 |
| Thread类 | 1. 获取返回值有点麻烦 2. 数据同步需要用到 Lock 或 Queue | 29.22 | |
| multiprocessing.dummy | 1. 启动方便 2. 有返回值 3. 数据同步 | 需先收集参数,编写逻辑有点不同 | 28.81 |
| 线程池 | 1. 启动方便 2. 有返回值 3. 数据同步 | 需先收集参数,编写逻辑有点不同 | 30.09 |
以简单的文件读写为例,模拟 IO 操作
def benchmark(n):
"""多线程基准函数"""
i = 0
with open('{}.txt'.format(n), 'w') as f:
for i in range(n * 1000000):
f.write(str(i) + '\n')
return i
if __name__ == '__main__':
from timeit import timeit
def f():
for n in range(10):
print(benchmark(n))
print(timeit(f, number=1))
import _thread
from tool import benchmark
def f():
for n in range(10):
print(_thread.start_new_thread(benchmark, (n,)))
if __name__ == '__main__':
f()
while True:
pass
缺点:
import threading
from tool import benchmark
class MyThread(threading.Thread):
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
super().join()
return self._return
def f():
threads = []
for n in range(10):
threads.append(MyThread(target=benchmark, args=(n,)))
for thread in threads:
thread.start()
for thread in threads:
print(thread.join())
if __name__ == '__main__':
from timeit import timeit
print(timeit(f, number=1))
缺点:
import time
import threading
from threading import Thread, Lock
lock = Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
with lock:
if account.balance >= amount:
time.sleep(0.1)
print(threading.current_thread().name, '取钱成功')
account.balance -= amount
print(threading.current_thread().name, '余额', account.balance)
else:
print(threading.current_thread().name, '取钱失败,余额不足')
if __name__ == '__main__':
account = Account(1000)
ta = Thread(target=draw, args=(account, 800), name='ta')
tb = Thread(target=draw, args=(account, 800), name='tb')
ta.start()
tb.start()
import threading
from queue import Queue
from tool import benchmark
def f(queue):
n = queue.get()
print(benchmark(n))
if __name__ == '__main__':
queue = Queue()
for n in range(10):
queue.put(n)
for n in range(10):
thread = threading.Thread(target=f, args=(queue,))
thread.start()
这种写法数据不同步
耗时:26.44
from multiprocessing.dummy import Pool
from tool import benchmark
def f():
n_list = [n for n in range(10)]
pool = Pool(processes=8)
results = pool.map(benchmark, n_list)
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
from timeit import timeit
print(timeit(f, number=1))
线程池
from concurrent.futures import ThreadPoolExecutor
from tool import benchmark
def f():
n_list = [n for n in range(10)]
with ThreadPoolExecutor() as executor:
results = list(executor.map(benchmark, n_list))
print(results)
if __name__ == '__main__':
from timeit import timeit
print(timeit(f, number=1))
要用多个参数时,可用 lambda 函数进行封装,如
import time
from concurrent.futures import ThreadPoolExecutor
def f(x=1, y=2):
time.sleep(1)
return x * y
x_list = [1, 2, 3]
y_list = [4, 5, 6]
with ThreadPoolExecutor() as executor:
results = list(executor.map(f, x_list, y_list))
print(results) # [4, 10, 18]
results = list(executor.map(lambda y: f(y=y), y_list))
print(results) # [4, 5, 6]
from concurrent.futures import ThreadPoolExecutor
from tool import benchmark
def f():
n_list = [n for n in range(10)]
with ThreadPoolExecutor() as executor:
results = list(executor.map(benchmark, n_list))
print(results)
if __name__ == '__main__':
from timeit import timeit
print(timeit(f, number=1))