在Python中,进行网络IO操作性能最好的操作就是异步IO,新的Python发布了asyncio库,可以进行异步操作,与 aiohttp类似,python中操作mysql数据库也是网络操作。mysql是c/s体系,每次客户端访问数据库都需要链接mysql服务,发送网络请求,等待mysql数据库服务器返回结果。python中执行异步数据库操作的库为aiomysql。
aiomysql 是一个基于 asyncio 的异步 MySQL 客户端库,用于在 Python 中与 MySQL 数据库进行交互。它提供了异步的数据库连接和查询操作,适用于异步编程环境
pip install aiomysql
在操作mysql之前,需要与服务器进行链接。connect方法用于与mysql服务器链接,链接成功后返回一个链接对象,用于执行后续操作。
| 参数 | 说明 |
|---|---|
| host | MySQL 服务器的主机名或 IP 地址。 |
| port | MySQL 服务器的端口号,默认为 3306。 |
| user | 连接 MySQL 服务器的用户名。 |
| password | 连接 MySQL 服务器的密码。 |
| db | 连接的数据库名称。 |
| unix_socket | UNIX 域套接字路径。如果指定了 unix_socket,则会忽略 host 和 port 参数 |
| charset | 连接的字符集,默认为 “utf8mb4”。 |
| autocommit | 自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。 |
| connect_timeout | 连接超时时间(以秒为单位),默认为 10 秒。 |
| maxsize | 连接池中的最大连接数,默认为 10。 |
| minsize | 连接池中的最小连接数,默认为 1。 |
| ssl | SSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如: ssl = {‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’} |
| loop | 事件循环对象。如果不指定,将使用默认的事件循环。 |
| echo | 是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。 |
| cursorclass | 游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。 |
| server_public_key | 服务器公钥,用于 SSL/TLS 连接的服务器验证。 |
- import aiomysql
- import asyncio
-
-
- async def main():
- conn = await aiomysql.connect(host='localhost', port=3306, user='root', password='123456',db='job')
- cursor = await conn.cursor()
- await cursor.execute('SELECT * FROM hxcms_blog;')
- result = await cursor.fetchall()
- print(result)
- await cursor.close()
- conn.close()
-
-
- asyncio.run(main())
- import asyncio
- import aiomysql
-
- #定义操作协程
- async def main():
- # 连接Mysql数据库
- conn = await aiomysql.connect(
- host='localhost', # Mysql服务器IP地址
- port=3306, # Mysql服务器端口号
- user='root', # Mysql用户名
- password='123456', # Mysql密码
- db='mydb', # Mysql数据库名称
- autocommit=True) # 自动提交
- # 配置游标
- cur = await conn.cursor()
- # 执行SQL语句
- await cur.execute('SELECT * FROM `user`')
- # 获取查询结果
- result = await cur.fetchall()
- print(result)
- # 关闭连接
- await cur.close()
- conn.close()
- # 启动事件循环,事件循环中将执行 main 协程
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
创建一个连接池。该方法用于创建一个异步连接池,接受一系列的连接参数,例如数据库主机、端口、用户名、密码等。
| 参数 | 说明 |
|---|---|
| host | MySQL 服务器的主机名或 IP 地址。 |
| port | MySQL 服务器的端口号,默认为 3306。 |
| user | 连接 MySQL 服务器的用户名。 |
| password | 连接 MySQL 服务器的密码。 |
| db | 连接的数据库名称。 |
| unix_socket | UNIX 域套接字路径。如果指定了 unix_socket,则会忽略 host 和 port 参数 |
| charset | 连接的字符集,默认为 “utf8mb4”。 |
| autocommit | 自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。 |
| connect_timeout | 连接超时时间(以秒为单位),默认为 10 秒。 |
| maxsize | 连接池中的最大连接数,默认为 10。 |
| minsize | 连接池中的最小连接数,默认为 1。 |
| ssl | SSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如 ssl = {‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’}。 |
| loop | 事件循环对象。如果不指定,将使用默认的事件循环。 |
| pool_recycle | 连接池中连接的回收时间(以秒为单位)。当连接在连接池中的时间超过 pool_recycle 时,连接将被回收并重新创建。 |
| echo | 是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。 |
| cursorclass | 游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。 |
| server_public_key | 服务器公钥,用于 SSL/TLS 连接的服务器验证。 |
- import aiomysql
- import asyncio
-
- async def main():
- pool = await aiomysql.create_pool(host='localhost', port=3306, user='root', password='123456',db='job')
- async with pool.acquire() as conn:
- async with conn.cursor() as cursor:
- await cursor.execute('SELECT * FROM hxcms_blog;')
- result = await cursor.fetchall()
- print(result)
-
- asyncio.run(main())
| 方法或属性 | 说明 |
|---|---|
| pool.acquire() | 从连接池中获取一个连接。该方法用于从连接池中获取一个异步连接对象,可以用于执行数据库操作。 conn = pool.acquire() |
| pool.release(conn) | 释放一个连接。该方法用于将一个连接对象返回到连接池中,以便其他代码可以继续使用。 |
| pool.wait_closed() | 用于等待连接池中的所有连接关闭。在调用 close() 方法后,应该调用 wait_closed() 方法来确保所有连接都已关闭,然后才能结束程序。 |
| pool.size | 表示连接池的当前大小,即池中当前可用的连接数。 |
| pool.maxsize | 表示连接池的最大连接数。当连接池中的连接数达到最大值时,新的连接请求将被阻塞,直到有连接被释放。 |
| pool.minsize | 表示连接池的最小连接数。连接池在初始化时会创建最小连接数的连接,并保持这些连接处于活动状态。 |
| pool.close() | 用于关闭连接池。调用 close() 方法后,连接池将不再接受新的连接请求,并开始关闭池中的所有连接。 |
| 属性与方法 | 说明 |
|---|---|
| conn.host | MySQL 服务器的主机名或 IP 地址。 |
| conn.port | MySQL 服务器的端口号。 |
| conn.user | 连接 MySQL 服务器的用户名。 |
| conn.db | 当前连接的数据库名称。 |
| conn.server_status | MySQL 服务器的状态信息。 |
| conn.server_capabilities | MySQL 服务器的功能列表。 |
| conn.client_flag | 客户端标志。 |
| conn.insert_id | 最近插入行的自增 ID。 |
| conn.warning_count | 最近一次执行的 SQL 语句产生的警告数量。 |
| conn.errorhandler | 错误处理器。 |
| conn.autocommit | 自动提交模式。该属性用于设置是否开启自动提交模式,默认为 False。 |
| conn.charset | 字符集。该属性用于设置数据库连接的字符集,默认为 “utf8mb4”。 |
| conn.maxsize | 最大连接数。该属性用于设置连接池中的最大连接数,默认为 10。 |
| conn.minsize | 最小连接数。该属性用于设置连接池中的最小连接数,默认为 1。 |
| conn.timeout | 连接超时时间。该属性用于设置连接的超时时间,默认为 10 秒。 |
| conn.echo | 是否打印 SQL 语句。该属性用于设置是否在控制台打印执行的 SQL 语句,默认为 False。 |
| conn.loop | 事件循环对象。该属性用于设置要使用的事件循环对象。 |
| conn.connection | 当前连接对象。该属性用于获取当前的连接对象。 |
| conn.server_version | MySQL 服务器版本。该属性用于获取 MySQL 服务器的版本信息。 |
| conn.commit() | 提交事务。该方法用于提交当前事务的操作。 |
| conn.rollback() | 回滚事务。该方法用于回滚当前事务的操作。 |
| conn.begin() | 提开始一个事务。 |
| conn.ping() | 检查连接是否存活。 |
| conn.select_db(dbName) | 切换数据库 |
| conn.escape_string(string) | 对字符串进行 MySQL 转义。 |
| conn.close() | 关闭连接。该方法用于关闭连接,释放资源。 |
| conn.cursor() | 创建一个游标对象。该方法用于创建一个异步游标对象,用于执行 SQL 查询和操作。 cur = conn.cursor() |
| 方法与属性 | 说明 |
|---|---|
| cursor.rowcount | 最近一次执行的 SQL 语句影响的行数。 |
| cursor.lastrowid | 最近插入行的自增 ID。 |
| cursor.description | 查询结果的字段描述信息。 |
| cursor.rownumber | 当前行在查询结果中的索引。 |
| cursor.arraysize | 获取或设置从数据库获取的行数。 |
| cursor.execute() | cursor.execute(sql, args=None) 执行 SQL 语句。该方法用于执行 SQL 语句,接受 SQL 语句字符串和参数,可以执行查询、插入、更新等操作。 |
| cursor.executemany() | cursor.executemany(sql, args=None) 执行多个 SQL 语句。该方法用于执行多个相同结构的 SQL 语句,接受 SQL 语句字符串和参数列表。 |
| cursor.fetchone() | 获取一条查询结果。该方法用于获取查询结果的下一行数据。 |
| cursor.fetchall() | 获取所有查询结果。该方法用于获取查询结果的所有行数据。 |
| cursor.fetchmany() | cursor.fetchmany(size=None) 获取查询结果的多行。 |
| cursor.close() | 关闭游标对象 |
该例子用于抓取某网站数据。
- '''
- 网址:https://www.che168.com/china/a0_0msdgscncgpi1ltocsp7exf4x0/?pvareaid=102179#currengpostion
- '''
-
- import asyncio
- import aiohttp
- import aiomysql
- from lxml import etree
- import random
-
-
- class Car:
- url = {
- 'car_list': 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion',
- 'car_detail': 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
- }
-
- headers = {
- 'authority': 'www.che168.com',
- 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'pragma': 'no-cache',
- 'referer': 'https://www.che168.com/dealer/481320/48787661.html?pvareaid=100519&userpid=0&usercid=0&offertype=&offertag=0&activitycartype=0&fromsxmlist=0',
- 'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'same-origin',
- 'sec-fetch-user': '?1',
- 'upgrade-insecure-requests': '1',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
- }
-
- def __init__(self):
- self.pool = None
- self.request = None
-
- '''获取汽车列表'''
- async def get_car_list(self, page):
- print(self.url['car_list'].format(page))
- response = await self.request.get(self.url['car_list'].format(page))
- result = await response.text(encoding='GBK')
- return etree.HTML(result)
-
- '''处理汽车列表信息'''
- async def parse_car_list_info(self, html):
- car_list_html = html.xpath('//div[@id="goodStartSolrQuotePriceCore0"]//li[@name="lazyloadcpc"]')
- task_list = []
- for item in car_list_html:
- car_info = {
- 'specid': item.xpath('./@specid')[0],
- 'infoid': item.xpath('./@infoid')[0]
- }
- task = asyncio.create_task(self.get_car_detail(car_info))
- task_list.append(task)
- await asyncio.wait(task_list)
-
- '''获取第一页数据,并返回总页数'''
- async def get_first_page(self):
- html = await self.get_car_list(1)
- page_total = html.xpath('//div[@id="listpagination"]/a[last()-1]/text()')[0]
- await self.parse_car_list_info(html)
- return int(page_total)
-
- '''获取除第一页数据之外的其他页数据'''
- async def get_all_page(self,page):
- async with self.semaphore:
- await asyncio.sleep(random.randint(500, 800) / 1000)
- html = await self.get_car_list(page)
- await self.parse_car_list_info(html)
-
- '''获取汽车详情'''
- async def get_car_detail(self, car_info):
- response = await self.request.get(self.url['car_detail'].format(car_info['specid']))
- result = await response.json()
- detail = result['result']['paramtypeitems']
- car_detail = {
- 'specid': car_info['specid'],
- 'infoid': car_info['infoid'],
- 'name': detail[0]['paramitems'][0]['value'],
- 'price': detail[0]['paramitems'][1]['value'],
- 'manufacturer': detail[0]['paramitems'][2]['value'],
- 'level': detail[0]['paramitems'][3]['value'],
- 'length': f'{detail[1]["paramitems"][0]["value"]}mm',
- 'width': f'{detail[1]["paramitems"][1]["value"]}mm',
- 'height':f'{detail[1]["paramitems"][2]["value"]}mm',
- }
- await self.insert_table(car_detail)
-
- '''异步建立mysql表'''
- async def create_table(self):
- sql = '''
- CREATE TABLE IF NOT EXISTS qichezhijia(
- Id INT UNIQUE,
- Specid INT,
- Name VARCHAR(255),
- Price VARCHAR(10),
- Manufacturer VARCHAR(255),
- Level VARCHAR(50),
- Length VARCHAR(10),
- Width VARCHAR(10),
- Height VARCHAR(10),
- PRIMARY KEY(Id)
- )
- '''
- async with self.pool.acquire() as conn:
- async with conn.cursor() as cursor:
- try:
- await cursor.execute(sql)
- except Exception as e:
- print(f'创建表失败{e}')
-
- '''插入数据'''
- async def insert_table(self,car_detail):
- sql = '''
- INSERT INTO qichezhijia VALUES(%(infoid)s,%(specid)s,%(name)s,%(price)s,%(manufacturer)s,%(level)s,%(length)s,%(width)s,%(height)s)
- '''
- async with self.pool.acquire() as conn:
- async with conn.cursor() as cursor:
- try:
- await cursor.execute(sql, car_detail)
- await conn.commit()
- print('数据插入成功')
- except Exception as e:
- print(f'插入数据失败{e},infoid={car_detail["infoid"]}')
-
- '''程序运行主函数'''
- async def main(self):
- async with aiomysql.create_pool(host='localhost', port=3306, user='root', password='root', db='car') as pool:
- self.pool = pool
- await self.create_table()
- async with aiohttp.ClientSession(headers=self.headers) as request:
- self.request = request
- page_total = await self.get_first_page()
- self.semaphore = asyncio.Semaphore(3)
- task_list = []
- for page in range(2,page_total+1):
- task = asyncio.create_task(self.get_all_page(page))
- task_list.append(task)
- await asyncio.wait(task_list)
-
-
- if __name__ == '__main__':
- car = Car()
- asyncio.run(car.main())