• Python 之Mysql的异步操作库 aiomysql 笔记


    在Python中,进行网络IO操作性能最好的操作就是异步IO,新的Python发布了asyncio库,可以进行异步操作,与 aiohttp类似,python中操作mysql数据库也是网络操作。mysql是c/s体系,每次客户端访问数据库都需要链接mysql服务,发送网络请求,等待mysql数据库服务器返回结果。python中执行异步数据库操作的库为aiomysql。

    aiomysql 是一个基于 asyncio 的异步 MySQL 客户端库,用于在 Python 中与 MySQL 数据库进行交互。它提供了异步的数据库连接和查询操作,适用于异步编程环境

    1. 异步支持:aiomysql 是基于 asyncio 实现的,可以与 asyncio 框架无缝集成,充分利用异步编程的优势,提高应用程序的性能和并发能力。
    2. 高性能:aiomysql 使用了底层的 PyMySQL 库,通过异步操作和连接池技术,可以实现高性能的数据库访问。
    3. 支持事务:aiomysql 提供了事务管理的支持,可以执行原子性的数据库操作,保证数据的一致性。
    4. SQL 语句构建器:aiomysql 提供了一个 SQL 语句构建器,可以方便地构建和执行 SQL 查询。

    一、安装

    pip install aiomysql

    二、aiomysql.connect()

    在操作mysql之前,需要与服务器进行链接。connect方法用于与mysql服务器链接,链接成功后返回一个链接对象,用于执行后续操作。

    参数说明
    hostMySQL 服务器的主机名或 IP 地址。
    portMySQL 服务器的端口号,默认为 3306。
    user连接 MySQL 服务器的用户名。
    password连接 MySQL 服务器的密码。
    db连接的数据库名称。
    unix_socketUNIX 域套接字路径。如果指定了 unix_socket,则会忽略 host 和 port 参数
    charset连接的字符集,默认为 “utf8mb4”。
    autocommit自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。
    connect_timeout连接超时时间(以秒为单位),默认为 10 秒。
    maxsize连接池中的最大连接数,默认为 10。
    minsize连接池中的最小连接数,默认为 1。
    sslSSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如:
    ssl = {‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’}
    loop事件循环对象。如果不指定,将使用默认的事件循环。
    echo是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。
    cursorclass游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。
    server_public_key服务器公钥,用于 SSL/TLS 连接的服务器验证。

    使用示例

    1. import aiomysql
    2. import asyncio
    3. async def main():
    4. conn = await aiomysql.connect(host='localhost', port=3306, user='root', password='123456',db='job')
    5. cursor = await conn.cursor()
    6. await cursor.execute('SELECT * FROM hxcms_blog;')
    7. result = await cursor.fetchall()
    8. print(result)
    9. await cursor.close()
    10. conn.close()
    11. asyncio.run(main())

    示例2:

     

    1. import asyncio
    2. import aiomysql
    3. #定义操作协程
    4. async def main():
    5. # 连接Mysql数据库
    6. conn = await aiomysql.connect(
    7. host='localhost', # Mysql服务器IP地址
    8. port=3306, # Mysql服务器端口号
    9. user='root', # Mysql用户名
    10. password='123456', # Mysql密码
    11. db='mydb', # Mysql数据库名称
    12. autocommit=True) # 自动提交
    13. # 配置游标
    14. cur = await conn.cursor()
    15. # 执行SQL语句
    16. await cur.execute('SELECT * FROM `user`')
    17. # 获取查询结果
    18. result = await cur.fetchall()
    19. print(result)
    20. # 关闭连接
    21. await cur.close()
    22. conn.close()
    23. # 启动事件循环,事件循环中将执行 main 协程
    24. loop = asyncio.get_event_loop()
    25. loop.run_until_complete(main())

    二、aiomysql.create_pool()

    创建一个连接池。该方法用于创建一个异步连接池,接受一系列的连接参数,例如数据库主机、端口、用户名、密码等。

    参数说明
    hostMySQL 服务器的主机名或 IP 地址。
    portMySQL 服务器的端口号,默认为 3306。
    user连接 MySQL 服务器的用户名。
    password连接 MySQL 服务器的密码。
    db连接的数据库名称。
    unix_socketUNIX 域套接字路径。如果指定了 unix_socket,则会忽略 host 和 port 参数
    charset连接的字符集,默认为 “utf8mb4”。
    autocommit自动提交模式,默认为 False。如果设置为 True,则每个 SQL 语句都会自动提交。
    connect_timeout连接超时时间(以秒为单位),默认为 10 秒。
    maxsize连接池中的最大连接数,默认为 10。
    minsize连接池中的最小连接数,默认为 1。
    sslSSL 配置参数。可以传入一个字典,包含 SSL 相关的配置选项,如
    ssl = {‘cert’: ‘/path/to/cert.pem’, ‘key’: ‘/path/to/key.pem’}。
    loop事件循环对象。如果不指定,将使用默认的事件循环。
    pool_recycle连接池中连接的回收时间(以秒为单位)。当连接在连接池中的时间超过 pool_recycle 时,连接将被回收并重新创建。
    echo是否打印 SQL 语句,默认为 False。如果设置为 True,则会在控制台打印执行的 SQL 语句。
    cursorclass游标类,默认为 aiomysql.cursors.DictCursor,返回的查询结果将以字典形式返回。
    server_public_key服务器公钥,用于 SSL/TLS 连接的服务器验证。

    使用示例 

    1. import aiomysql
    2. import asyncio
    3. async def main():
    4. pool = await aiomysql.create_pool(host='localhost', port=3306, user='root', password='123456',db='job')
    5. async with pool.acquire() as conn:
    6. async with conn.cursor() as cursor:
    7. await cursor.execute('SELECT * FROM hxcms_blog;')
    8. result = await cursor.fetchall()
    9. print(result)
    10. asyncio.run(main())

    aiomysql内建对象说明

    1、pool-链接池对象

    方法或属性说明
    pool.acquire()从连接池中获取一个连接。该方法用于从连接池中获取一个异步连接对象,可以用于执行数据库操作。
    conn = pool.acquire()
    pool.release(conn)释放一个连接。该方法用于将一个连接对象返回到连接池中,以便其他代码可以继续使用。
    pool.wait_closed()用于等待连接池中的所有连接关闭。在调用 close() 方法后,应该调用 wait_closed() 方法来确保所有连接都已关闭,然后才能结束程序。
    pool.size表示连接池的当前大小,即池中当前可用的连接数。
    pool.maxsize表示连接池的最大连接数。当连接池中的连接数达到最大值时,新的连接请求将被阻塞,直到有连接被释放。
    pool.minsize表示连接池的最小连接数。连接池在初始化时会创建最小连接数的连接,并保持这些连接处于活动状态。
    pool.close()用于关闭连接池。调用 close() 方法后,连接池将不再接受新的连接请求,并开始关闭池中的所有连接。

    2、connection 数据库连接对象

    属性与方法说明
    conn.hostMySQL 服务器的主机名或 IP 地址。
    conn.portMySQL 服务器的端口号。
    conn.user连接 MySQL 服务器的用户名。
    conn.db当前连接的数据库名称。
    conn.server_statusMySQL 服务器的状态信息。
    conn.server_capabilitiesMySQL 服务器的功能列表。
    conn.client_flag客户端标志。
    conn.insert_id最近插入行的自增 ID。
    conn.warning_count最近一次执行的 SQL 语句产生的警告数量。
    conn.errorhandler错误处理器。
    conn.autocommit自动提交模式。该属性用于设置是否开启自动提交模式,默认为 False。
    conn.charset字符集。该属性用于设置数据库连接的字符集,默认为 “utf8mb4”。
    conn.maxsize最大连接数。该属性用于设置连接池中的最大连接数,默认为 10。
    conn.minsize最小连接数。该属性用于设置连接池中的最小连接数,默认为 1。
    conn.timeout连接超时时间。该属性用于设置连接的超时时间,默认为 10 秒。
    conn.echo是否打印 SQL 语句。该属性用于设置是否在控制台打印执行的 SQL 语句,默认为 False。
    conn.loop事件循环对象。该属性用于设置要使用的事件循环对象。
    conn.connection当前连接对象。该属性用于获取当前的连接对象。
    conn.server_versionMySQL 服务器版本。该属性用于获取 MySQL 服务器的版本信息。
    conn.commit()提交事务。该方法用于提交当前事务的操作。
    conn.rollback()回滚事务。该方法用于回滚当前事务的操作。
    conn.begin()提开始一个事务。
    conn.ping()检查连接是否存活。
    conn.select_db(dbName)切换数据库
    conn.escape_string(string)对字符串进行 MySQL 转义。
    conn.close()关闭连接。该方法用于关闭连接,释放资源。
    conn.cursor()创建一个游标对象。该方法用于创建一个异步游标对象,用于执行 SQL 查询和操作。
    cur = conn.cursor()

    3、cursor-游标对象

    方法与属性说明
    cursor.rowcount最近一次执行的 SQL 语句影响的行数。
    cursor.lastrowid最近插入行的自增 ID。
    cursor.description查询结果的字段描述信息。
    cursor.rownumber当前行在查询结果中的索引。
    cursor.arraysize获取或设置从数据库获取的行数。
    cursor.execute()cursor.execute(sql, args=None)
    执行 SQL 语句。该方法用于执行 SQL 语句,接受 SQL 语句字符串和参数,可以执行查询、插入、更新等操作。
    cursor.executemany()cursor.executemany(sql, args=None)
    执行多个 SQL 语句。该方法用于执行多个相同结构的 SQL 语句,接受 SQL 语句字符串和参数列表。
    cursor.fetchone()获取一条查询结果。该方法用于获取查询结果的下一行数据。
    cursor.fetchall()获取所有查询结果。该方法用于获取查询结果的所有行数据。
    cursor.fetchmany()cursor.fetchmany(size=None)
    获取查询结果的多行。
    cursor.close()关闭游标对象

    一个综合例子

    该例子用于抓取某网站数据。

    1. '''
    2. 网址:https://www.che168.com/china/a0_0msdgscncgpi1ltocsp7exf4x0/?pvareaid=102179#currengpostion
    3. '''
    4. import asyncio
    5. import aiohttp
    6. import aiomysql
    7. from lxml import etree
    8. import random
    9. class Car:
    10. url = {
    11. 'car_list': 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion',
    12. 'car_detail': 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
    13. }
    14. headers = {
    15. 'authority': 'www.che168.com',
    16. 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    17. 'accept-language': 'zh-CN,zh;q=0.9',
    18. 'cache-control': 'no-cache',
    19. 'pragma': 'no-cache',
    20. 'referer': 'https://www.che168.com/dealer/481320/48787661.html?pvareaid=100519&userpid=0&usercid=0&offertype=&offertag=0&activitycartype=0&fromsxmlist=0',
    21. 'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
    22. 'sec-ch-ua-mobile': '?0',
    23. 'sec-ch-ua-platform': '"Windows"',
    24. 'sec-fetch-dest': 'document',
    25. 'sec-fetch-mode': 'navigate',
    26. 'sec-fetch-site': 'same-origin',
    27. 'sec-fetch-user': '?1',
    28. 'upgrade-insecure-requests': '1',
    29. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
    30. }
    31. def __init__(self):
    32. self.pool = None
    33. self.request = None
    34. '''获取汽车列表'''
    35. async def get_car_list(self, page):
    36. print(self.url['car_list'].format(page))
    37. response = await self.request.get(self.url['car_list'].format(page))
    38. result = await response.text(encoding='GBK')
    39. return etree.HTML(result)
    40. '''处理汽车列表信息'''
    41. async def parse_car_list_info(self, html):
    42. car_list_html = html.xpath('//div[@id="goodStartSolrQuotePriceCore0"]//li[@name="lazyloadcpc"]')
    43. task_list = []
    44. for item in car_list_html:
    45. car_info = {
    46. 'specid': item.xpath('./@specid')[0],
    47. 'infoid': item.xpath('./@infoid')[0]
    48. }
    49. task = asyncio.create_task(self.get_car_detail(car_info))
    50. task_list.append(task)
    51. await asyncio.wait(task_list)
    52. '''获取第一页数据,并返回总页数'''
    53. async def get_first_page(self):
    54. html = await self.get_car_list(1)
    55. page_total = html.xpath('//div[@id="listpagination"]/a[last()-1]/text()')[0]
    56. await self.parse_car_list_info(html)
    57. return int(page_total)
    58. '''获取除第一页数据之外的其他页数据'''
    59. async def get_all_page(self,page):
    60. async with self.semaphore:
    61. await asyncio.sleep(random.randint(500, 800) / 1000)
    62. html = await self.get_car_list(page)
    63. await self.parse_car_list_info(html)
    64. '''获取汽车详情'''
    65. async def get_car_detail(self, car_info):
    66. response = await self.request.get(self.url['car_detail'].format(car_info['specid']))
    67. result = await response.json()
    68. detail = result['result']['paramtypeitems']
    69. car_detail = {
    70. 'specid': car_info['specid'],
    71. 'infoid': car_info['infoid'],
    72. 'name': detail[0]['paramitems'][0]['value'],
    73. 'price': detail[0]['paramitems'][1]['value'],
    74. 'manufacturer': detail[0]['paramitems'][2]['value'],
    75. 'level': detail[0]['paramitems'][3]['value'],
    76. 'length': f'{detail[1]["paramitems"][0]["value"]}mm',
    77. 'width': f'{detail[1]["paramitems"][1]["value"]}mm',
    78. 'height':f'{detail[1]["paramitems"][2]["value"]}mm',
    79. }
    80. await self.insert_table(car_detail)
    81. '''异步建立mysql表'''
    82. async def create_table(self):
    83. sql = '''
    84. CREATE TABLE IF NOT EXISTS qichezhijia(
    85. Id INT UNIQUE,
    86. Specid INT,
    87. Name VARCHAR(255),
    88. Price VARCHAR(10),
    89. Manufacturer VARCHAR(255),
    90. Level VARCHAR(50),
    91. Length VARCHAR(10),
    92. Width VARCHAR(10),
    93. Height VARCHAR(10),
    94. PRIMARY KEY(Id)
    95. )
    96. '''
    97. async with self.pool.acquire() as conn:
    98. async with conn.cursor() as cursor:
    99. try:
    100. await cursor.execute(sql)
    101. except Exception as e:
    102. print(f'创建表失败{e}')
    103. '''插入数据'''
    104. async def insert_table(self,car_detail):
    105. sql = '''
    106. INSERT INTO qichezhijia VALUES(%(infoid)s,%(specid)s,%(name)s,%(price)s,%(manufacturer)s,%(level)s,%(length)s,%(width)s,%(height)s)
    107. '''
    108. async with self.pool.acquire() as conn:
    109. async with conn.cursor() as cursor:
    110. try:
    111. await cursor.execute(sql, car_detail)
    112. await conn.commit()
    113. print('数据插入成功')
    114. except Exception as e:
    115. print(f'插入数据失败{e},infoid={car_detail["infoid"]}')
    116. '''程序运行主函数'''
    117. async def main(self):
    118. async with aiomysql.create_pool(host='localhost', port=3306, user='root', password='root', db='car') as pool:
    119. self.pool = pool
    120. await self.create_table()
    121. async with aiohttp.ClientSession(headers=self.headers) as request:
    122. self.request = request
    123. page_total = await self.get_first_page()
    124. self.semaphore = asyncio.Semaphore(3)
    125. task_list = []
    126. for page in range(2,page_total+1):
    127. task = asyncio.create_task(self.get_all_page(page))
    128. task_list.append(task)
    129. await asyncio.wait(task_list)
    130. if __name__ == '__main__':
    131. car = Car()
    132. asyncio.run(car.main())

  • 相关阅读:
    花菁染料CY5.5标记角叉藻胶;CY5.5-Furcellaran;Furcellaran-CY5.5定制合成
    ES6异步编程解决方案——async、Generator、Promise
    java中HashMap的设计精妙在哪?
    LeetCode704.二分查找及二分法
    JavaScript变量预解析和函数预解析
    创新破万“卷”,但创新自己得“卷”
    前端基础建设与架构12 如何理解 AST 实现和编译原理?
    ES6 入门教程 3 变量的解构赋值 3.2 对象的解构赋值
    2023年亚太杯APMCM数学建模大赛数据分析题MySQL的使用
    List<T>范型的使用
  • 原文地址:https://blog.csdn.net/ababab12345/article/details/133252425