之前一直使用tornado做项目,数据库一直使用mongo与redis居多,凭借其优异的异步特性工作的也很稳定高效,最近的项目需要使用 mysql ,由于之前在使用mongo与redis时所使用的moto与aioredis来异步的执行数据库操作,所以在网上查询了异步操作mysql的库, 本文记录一下异步操作中所遇到的问题与相应的解决方案。
Aiomysql 介绍
我们在使用tornado开发网站的时候,利用python3中新加入的异步关键词 async/await , 我们使用各种异步操作为来执行各种异步的操作,如使用 aiohttp 来代替 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作mongo数据库,同样,我们在开发同步的python程序时,我们会使用PyMySQL来操作mysql数据库,同样,我们会使用aiomysql来异步操作mysql 数据库。
Aiomysql 连接
- docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 -d mysql:5.7
-
我们先准备一些测试数据,创建一个mytest的数据库,创建一个user表,里面有三个字段,id, username, age, 简单的三个字段,并且添加两条数据。

- #coding: utf-8
-
- import aiomysql
- import asyncio
-
- loop = asyncio.get_event_loop()
-
- async def test():
- conn = await aiomysql.connect(
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123456',
- db='mytest',
- loop=loop
- )
-
- cur = await conn.cursor()
- await cur.execute("select * from user")
- r = await cur.fetchall()
- if r:
- for i in r:
- print(i)
- else:
- print("no data")
- await cur.close()
- conn.close()
-
- loop.run_until_complete(test())
-
我们来看下代码,来顺一下执行流程
1. 创建连接
首先我们使用 aiomysql.connect() 创建一个连接对象conn,代码里只是使用了最常用的连接选项,这个connect() 方法返回一个Connection类对象,这个对象里的参数非常多,我们在后面的代码中,如果遇到会进行相应的介绍。
2. 创建游标
之后我们使用 conn 这个对象的cursor方法获取Cursor对象cur,我们只有使用cursor对象才能对数据库进行各种操作。
3. 执行SQL语句
我们使用cur 对象的execute() 方法执行SQL语句。这里执行 select * from user ,这个方法返回影响的行数,对于查询而言,是命中查询的数据量,我们也可以根据这里的返回值,如果是0的话则说明没有符合查询条件的数据。
- cur = await conn.cursor()
- count = await cur.execute("select * from user where id = 4")
- print("count:{}".format(count))
- if count:
- r = await cur.fetchall()
- for i in r:
- print(i)
- else:
- print("no data")
- await cur.close()
- conn.close()
-
5. 关闭连接conn
注意conn对象的关闭函数不是协程,直接调用close() 即可。
- async with conn.cursor() as cur:
- count = await cur.execute("select * from user")
- if count:
- r = await cur.fetchall()
- for i in r:
- print(i)
- else:
- print("no user")
-
Aiomysql简单的CURD
上面我们简单地使用游标对象进行了查询,这节我们来看看更多CURD操作,其实这里已经和aiomysql没有太多的关系,主要是考查各位的mysql能力了,一个execute方法走天下。但是这里我们来看一个老生常谈的问题,sql注入问题。
SQL注入的问题
- username = "yyx"
-
- async with conn.cursor() as cur:
- sql = "select * from user where username = '%s'" % username
- print(sql)
- count = await cur.execute(sql)
- if count:
- r = await cur.fetchall()
- for i in r:
- print(i)
- else:
- print("no user")
-
如何避免SQL注入
- async def execute(self, query, args=None):
- """Executes the given operation
- Executes the given operation substituting any markers with
- the given parameters.
- For example, getting all rows where id is 5:
- cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
- :param query: ``str`` sql statement
- :param args: ``tuple`` or ``list`` of arguments for sql query
- :returns: ``int``, number of rows that has been produced of affected
- """
- conn = self._get_db()
-
- while (await self.nextset()):
- pass
-
- if args is not None:
- query = query % self._escape_args(args, conn)
-
- await self._query(query)
- self._executed = query
- if self._echo:
- logger.info(query)
- logger.info("%r", args)
- return self._rowcount
-
execute有二个参数,一个是query, 另外是args,我们看注释,query是 sql的语句, args是 tulpe 或者 list 类型的参数。如果args非空,脚本会通过 query = query % self._escape_args(args, conn) 重新组织query, 再来看下 _escape_args(args, conn) 的实现
- def _escape_args(self, args, conn):
- if isinstance(args, (tuple, list)):
- return tuple(conn.escape(arg) for arg in args)
- elif isinstance(args, dict):
- return dict((key, conn.escape(val)) for (key, val) in args.items())
- else:
- # If it's not a dictionary let's try escaping it anyways.
- # Worst case it will throw a Value error
- return conn.escape(args)
-
如果是list或者tuple,则返回使用 conn.escape 转换之后的tuple, 如果是dict字典类型的话,则返回一个字典,key还是原来的key, value为 conn.escape(val) , 最终都是使用 conn.escape() 函数来进行转换,再来看下这个函数的实现
- def escape(self, obj):
- """ Escape whatever value you pass to it"""
- if isinstance(obj, str):
- return "'" + self.escape_string(obj) + "'"
- return escape_item(obj, self._charset)
-
- def escape_string(self, s):
- if (self.server_status &
- SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES):
- return s.replace("'", "''")
- return escape_string(s)
-
函数将在传入的字符串两边加上两个单引号 ’ , 并且将 字符串中的单引号替换成两个单引号,这样就可以避免大多的sql注入问题,我们修改一下脚本
- username = 'yanyanxin'
- async with conn.cursor() as cur:
- count = await cur.execute("select * from user where username = %s", username)
- if count:
- r = await cur.fetchall()
- for i in r:
- print(i)
- else:
- print("no user")
-
此时转换后的SQL语句为 select * from user where username = ‘yyx’ or 1=1#’ 已经将单引号进行了转义,此时就不会查找到用户了。
注意为了避免SQL注入的问题,我们一定不要自己进行拼接SQL语句,一定要对用户的输入进行检查转义
多参数的查询
- select * from user WHERE age >19 and age<29
-
我们使用aiomysql的实现
- async with conn.cursor() as cur:
- count = await cur.execute("select * from user where age>%s and age<%s", (19, 29))
- if count:
- r = await cur.fetchall()
- for i in r:
- print(i)
- else:
- print("no user")
-
联合查询
我们再创建一个表,表示用户表中用户的职业, 创建三条数据, userid对应于user表中的id, 这里之所以没有用外键,之后再讨论,只是记住,这里的userid只是一个普通的列,它表示user表中的id。

- SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx'
-

- async with conn.cursor() as cur:
- sql = 'SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username=%s'
- count = await cur.execute(sql, ('yyx',))
- if count:
- r = await