
推文作者:Amiee
常规爬虫都是爬完一个网页接着爬下一个网页,不适应数据量大的网页,本文介绍了多线程处理同时爬取多个网页的内容,提升爬虫效率。
一般而言,常规爬虫都是爬完一个网页接着爬下一个网页。如果当爬取的数据量非常庞大时,爬虫程序的时间开销往往很大,这个时候可以通过多线程或者多进程处理即可完成多个网页内容同时爬取的效果,数据获取速度大大提升。
简单来说,CPU是进程的父级单位,一个CPU可以控制多个进程;进程是线程的父级单位,一个进程可以控制多个线程,那么到底什么是进程,什么是线程呢?
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程;打开一个QQ就启动一个QQ进程;打开一个Word就启动了一个Word进程,打开两个Word就启动两个Word进程。
那什么叫作线程呢?在一个进程内部往往不止同时干一件事,比如浏览器,它可以同时浏览网页、听音乐、看视频、下载文件等。在一个进程内部这同时运行的多个“子任务”,便称之称为线程(Thread),线程是程序工作的最小单元。
此外有个注意点,对于单个CPU而言,某一个时点只能执行一个任务,那么如果这样是怎么在现实中同时执行多个任务(进程)的呢?比如一边用浏览器听歌,一边用QQ和好友聊天是如何实现的呢?
答案是操作系统会通过调度算法,轮流让各个任务(进程)交替执行。以时间片轮转算法为例:有5个正在运行的程序(即5个进程) : QQ、微信、谷歌浏览器、网易云音乐、腾讯会议,操作系统会让CPU轮流来调度运行这些进程,一个进程每次运行0.1ms,因为CPU执行的速度非常快,这样看起来就像多个进程同时在运行。同理,对于多个线程,例如通过谷歌浏览器(进程)可以同时访问网页(线程1)、听在线音乐(线程2)和下载网络文件(线程3)等操作,也是通过类似的时间片轮转算法使得各个子任务(线程)近似同时执行。
- from threading import Thread
- def func():
- for i in range(10):
- print('func', i)
- if name == '__main__':
- t = Thread(target=func) # 创建线程
- t.start() # 多线程状态,可以开始工作了,具体时间有CPU决定
- for i in range(10):
- print('main', i)
- 执行结果如下:
- func 0
- func 1
- func 2
- func 3
- func 4
- main 0
- main 1
- main 2
- main 3
- main 4
- mainfunc 5 5
- func
- main 66
- func 7
- main
- func 8
- func 9
- 7
- main 8
- main 9
大佬是这个写法
- from threading import Thread
- class MyThread(Thread):
- def run(self):
- for i in range(10):
- print('MyThread', i)
- if name == '__main__':
- t = MyThread()
- # t.run() # 调用run就是单线程
- t.start() # 开启线程
- for i in range(10):
- print('main', i)
- 执行结果:
- MyThread 0
- MyThread 1
- MyThread 2
- MyThread 3
- MyThread 4
- MyThread 5main 0
- main 1
- main 2
- main 3
- main 4
- MyThread
- main 5
- 6
- main MyThread 67
- mainMyThread 78
- mainMyThread 89
- main 9
- from threading import Thread
- def func(name):
- for i in range(10):
- print(name, i)
- if name == '__main__':
- t1 = Thread(target=func, args=('子线程1',)) # 创建线程
- t1.start() # 多线程状态,可以开始工作了,具体时间又CPU决定
- t2 = Thread(target=func, args=('子线程2',)) # 创建线程
- t2.start() # 多线程状态,可以开始工作了,具体时间又CPU决定
- for i in range(10):
- print('main', i)
一般不建议使用,因为开进程比较费资源
- from multiprocessing import Process
- def func():
- for i in range(1000000):
- print('func', i)
- if name == '__main__':
- p = Process(target=func)
- p.start() # 开启线程
- for i in range(100000):
- print('mainn process', i)
线程池:一次性开辟一些线程,我们用户直接给线程池提交任务,线程任务的调度由线程池来完成
- from concurrent.futures import ThreadPoolExecutor
- def func(name):
- for i in range(10):
- print(name, i)
- if name == '__main__':
- # 创建线程池
- with ThreadPoolExecutor(50) as t:
- for i in range(100):
- t.submit(func, name=f'Thread{i}=')
- # 等待线程池中的人物全部执行完成,才继续执行;也称守护进程
- print('执行守护线程')
进程池
- from concurrent.futures import ProcessPoolExecutor
- def func(name):
- for i in range(10):
- print(name, i)
- if name == '__main__':
- # 创建线程池
- with ProcessPoolExecutor(50) as t:
- for i in range(100):
- t.submit(func, name=f'Thread{i}=')
- # 等待线程池中的人物全部执行完成,才继续执行;也称守护进程
- print('执行守护进程')
单个线程怎么办;上线程池,多个页面同时爬取
- import requests
- from lxml import etree
- import csv
- from concurrent.futures import ThreadPoolExecutor
-
- f = open('xifadi.csv', mode='w', newline='')
- csv_writer = csv.writer(f)
- def download_one_page(url):
- resp = requests.get(url)
- resp.encoding = 'utf-8'
- html = etree.HTML(resp.text)
- table = html.xpath(r'/html/body/div[2]/div[4]/div[1]/table')[0]
- # trs = table.xpath(r'./tr')[1:] # 跳过表头
- trs = table.xpath(r'./tr[position()>1]')
- for tr in trs:
- td = tr.xpath('./td/text()')
- # 处理数据中的 \\ 或 /
- txt = (item.replace('\\','').replace('/','') for item in td)
- csv_writer.writerow(txt)
- resp.close()
- print(url, '提取完毕')
-
- if name == '__main__':
- with ThreadPoolExecutor(50) as t:
- for i in range(1, 200):
- t.submit(download_one_page,f'http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml')
- print('全部下载完毕')
程序处于阻塞状态的情形包含以下几个方面:
•input():等待用户输入
•requests.get():网络请求返回数据之前
•当程序处理IO操作时,线程都处于阻塞状态
•time.sleep():处于阻塞状态
协程的逻辑是当程序遇见IO操作时,可以选择性的切换到其他任务上;协程在微观上任务的切换,切换条件一般就是IO操作;在宏观上,我们看到的是多个任务都是一起执行的;上方的一切都是在在单线程的条件下,充分的利用单线程的资源。
要点梳理:
•函数被asyn修饰,函数被调用时,它不会被立即执行;该函数被调用后会返回一个协程对象。
•创建一个协程对象:构建一个asyn修饰的函数,然后调用该函数返回的就是一个协程对象
•任务对象是一个高级的协程对象,
- import asyncio
- import time
- async def func1():
- print('你好呀11')
- if name == '__main__':
- g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象
- asyncio.run(g1) # 协程城西执行需要asyncio模块的支持
普通的time.sleep()是同步操作,会导致异步操作中断
- import asyncio
- import time
- async def func1():
- print('你好呀11')
- time.sleep(3) # 当程序中除了同步操作时,异步就中端了
- print('你好呀12')
- async def func2():
- print('你好呀21')
- time.sleep(2)
- print('你好呀22')
- async def func3():
- print('你好呀31')
- time.sleep(4)
- print('你好呀32')
- if name == '__main__':
- g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象
- g2 = func2()
- g3 = func3()
- tasks = [g1, g2, g3]
- t1 = time.time()
- asyncio.run(asyncio.wait(tasks)) # 协程城西执行需要asyncio模块的支持
- t2 = time.time()
- print(t2 - t1)
-
-
-
- 你好呀21
- 你好呀22
- 你好呀11
- 你好呀12
- 你好呀31
- 你好呀32
- 9.003259658813477
使用异步睡眠函数,遇到睡眠时,挂起;
- import asyncio
- import time
- async def func1():
- print('你好呀11')
- await asyncio.sleep(3) # 异步模块的sleep
- print('你好呀12')
- async def func2():
- print('你好呀21'))
- await asyncio.sleep(4) # 异步模块的sleep
- print('你好呀22')
- async def func3():
- print('你好呀31')
- await asyncio.sleep(4) # 异步模块的sleep
- print('你好呀32')
- if name == '__main__':
- g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象
- g2 = func2()
- g3 = func3()
- tasks = [g1, g2, g3]
- t1 = time.time()
- asyncio.run(asyncio.wait(tasks)) # 协程城西执行需要asyncio模块的支持
- t2 = time.time()
- print(t2 - t1)
-
-
- 你好呀21
- 你好呀11
- 你好呀31
- 你好呀12
- 你好呀22
- 你好呀32
- 4.0028839111328125
整体耗时为最长时间 + 切换时间
- import asyncio
- import time
- async def func1():
- print('你好呀11')
- # time.sleep(3) # 当程序中除了同步操作时,异步就中端了
- await asyncio.sleep(3) # 异步模块的sleep
- print('你好呀12')
- async def func2():
- print('你好呀21')
- # time.sleep(2)
- await asyncio.sleep(4) # 异步模块的sleep
- print('你好呀22')
- async def func3():
- print('你好呀31')
- # time.sleep(4)
- await asyncio.sleep(4) # 异步模块的sleep
- print('你好呀32')
- async def main():
- # 写法1:不推荐
- # f1 = func1()
- # await f1 # await挂起操作,一般放在协程对象前边
- # 写法2:推荐,但是在3.8废止,3.11会被移除
- # tasks = [func1(), func2(), func3()]
- # await asyncio.wait(tasks)
- # 写法3:python3.8以后使用
- tasks = [asyncio.create_task(func1()),
- asyncio.create_task(func2()),
- asyncio.create_task(func3())]
- await asyncio.wait(tasks))
-
- if name == '__main__':
- t1 = time.time()
- asyncio.run(main())
- t2 = time.time()
- print(t2 - t1)
-
- 你好呀21
- 你好呀31
- 你好呀11
- 你好呀12
- 你好呀32
- 你好呀22
- 4.001523017883301
安装包:
pip install aiohttp
pip install aiofiles
基本框架:
•获取所有的url
•编写每个url的爬取函数
•每个url建立一个线程任务,爬取数据
- import asyncio
- import aiohttp
- import aiofiles
- headers = {'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Mobile Safari/537.36 Edg/91.0.864.41'}
- urls = {
- r'http://kr.shanghai-jiuxin.com/file/mm/20210503/xy2edb1kuds.jpg',
- r'http://kr.shanghai-jiuxin.com/file/mm/20210503/g4ok0hh2utm.jpg',
- r'http://kr.shanghai-jiuxin.com/file/mm/20210503/sqla2defug0.jpg',
- r'http://d.zdqx.com/aaneiyi_20190927/001.jpg'
- }
- async def aio_download(url):
- async with aiohttp.ClientSession() as session:
- async with session.get(url, headers=headers) as resp:
- async with aiofiles.open('img/' + url.split('/')[-1],mode='wb') as f:
- await f.write(await resp.content.read())
- await f.close() # 异步代码需要关闭文件,否则会输出0字节的空文件
- # with open(url.split('/')[-1],mode='wb') as f: # 使用with代码不用关闭文件
- # f.write(await resp.content.read()) # 等价于resp.content, resp.json(), resp.text()
- async def main():
- tasks = []
- for url in urls:
- tasks.append(asyncio.create_task(aio_download(url)))
- await asyncio.wait(tasks)
-
- if name == '__main__':
- # asyncio.run(main()) # 可能会报错 Event loop is closed 使用下面的代码可以避免
- asyncio.get_event_loop().run_until_complete(main())
- print('over')
知识点总结:
•在python 3.8以后,建议使用asyncio.create_task()创建人物
•aiofiles写文件,需要关闭文件,否则会生成0字节空文件
•aiohttp中生成图片、视频等文件时,使用resp.content.read(),而requests库时,并不需要read()
•报错 Event loop is closed时, 将 asyncio.run(main()) 更改为 asyncio.get_event_loop().run_until_complete(main())
•使用with打开文件时,不用手动关闭
注意:以下代码可能会因为 百度阅读 页面改版而无法使用
主题思想:
分析那些请求需要异步,那些不需要异步;在这个案例中,获取目录只需要请求一次,所以不需要异步
下载每个章节的内容,则需要使用异步操作
- import requests
- import asyncio
- import aiohttp
- import json
- import aiofiles
https://dushu.baidu.com/pc/detail?gid=4306063500
http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"} 获取章节的名称,cid;只请求1次,不需要异步
http://dushu.baidu.com/api/pc/getChapterContent # 涉及多个任务分发,需要异步请求,拿到所有的文章内容
- headers = {'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Mobile Safari/537.36 Edg/91.0.864.41'}
- async def aio_download(cid, book_id,title):
- data = {
- 'book_id': book_id,
- 'cid' : f'{book_id}|{cid}',
- 'need_bookinfo': 1
- }
- data = json.dump(data)
- url = f'http://dushu.baidu.com/api/pc/getChapterContent?data={data}'
- async with aiohttp.ClientSession as session:
- async with session.get(url) as resp:
- dic = await resp.json()
- async with aiofiles.open('img/'+title+'.txt',mode='w') as f:
- await f.write(dic['data']['novel']['content'])
- await f.close()
- async def getCatalog(url):
- resp = requests.get(url, headers=headers)
- dic = resp.json()
- tasks = []
- for item in dic['data']['novel']['items']:
- title = item['title']
- cid = item['cid']
- tasks.append(asyncio.create_task(aio_download((cid, book_id,title))))
- await asyncio.wait(tasks)
-
-
- if name == '__main__':
- book_id = '4306063500'
- url = r'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+book_id+'"}'
- asyncio.run(getCatalog(url))