• Aiomysql 与 Sqlalchemy 的使用


    之前一直使用tornado做项目,数据库一直使用mongo与redis居多,凭借其优异的异步特性工作的也很稳定高效,最近的项目需要使用 mysql ,由于之前在使用mongo与redis时所使用的moto与aioredis来异步的执行数据库操作,所以在网上查询了异步操作mysql的库, 本文记录一下异步操作中所遇到的问题与相应的解决方案。

    Aiomysql 介绍

    我们在使用tornado开发网站的时候,利用python3中新加入的异步关键词 async/await , 我们使用各种异步操作为来执行各种异步的操作,如使用 aiohttp 来代替 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作mongo数据库,同样,我们在开发同步的python程序时,我们会使用PyMySQL来操作mysql数据库,同样,我们会使用aiomysql来异步操作mysql 数据库。

    Aiomysql 连接

    1. docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 -d mysql:5.7

    我们先准备一些测试数据,创建一个mytest的数据库,创建一个user表,里面有三个字段,id, username, age, 简单的三个字段,并且添加两条数据。

    1. #coding: utf-8
    2. import aiomysql
    3. import asyncio
    4. loop = asyncio.get_event_loop()
    5. async def test():
    6. conn = await aiomysql.connect(
    7. host='127.0.0.1',
    8. port=3306,
    9. user='root',
    10. password='123456',
    11. db='mytest',
    12. loop=loop
    13. )
    14. cur = await conn.cursor()
    15. await cur.execute("select * from user")
    16. r = await cur.fetchall()
    17. if r:
    18. for i in r:
    19. print(i)
    20. else:
    21. print("no data")
    22. await cur.close()
    23. conn.close()
    24. loop.run_until_complete(test())

    我们来看下代码,来顺一下执行流程

    1. 创建连接

    首先我们使用 aiomysql.connect() 创建一个连接对象conn,代码里只是使用了最常用的连接选项,这个connect() 方法返回一个Connection类对象,这个对象里的参数非常多,我们在后面的代码中,如果遇到会进行相应的介绍。

    2. 创建游标

    之后我们使用 conn 这个对象的cursor方法获取Cursor对象cur,我们只有使用cursor对象才能对数据库进行各种操作。

    3. 执行SQL语句

    我们使用cur 对象的execute() 方法执行SQL语句。这里执行 select * from user ,这个方法返回影响的行数,对于查询而言,是命中查询的数据量,我们也可以根据这里的返回值,如果是0的话则说明没有符合查询条件的数据。

    1. cur = await conn.cursor()
    2. count = await cur.execute("select * from user where id = 4")
    3. print("count:{}".format(count))
    4. if count:
    5. r = await cur.fetchall()
    6. for i in r:
    7. print(i)
    8. else:
    9. print("no data")
    10. await cur.close()
    11. conn.close()

    5. 关闭连接conn

    注意conn对象的关闭函数不是协程,直接调用close() 即可。

    1. async with conn.cursor() as cur:
    2. count = await cur.execute("select * from user")
    3. if count:
    4. r = await cur.fetchall()
    5. for i in r:
    6. print(i)
    7. else:
    8. print("no user")

    Aiomysql简单的CURD

    上面我们简单地使用游标对象进行了查询,这节我们来看看更多CURD操作,其实这里已经和aiomysql没有太多的关系,主要是考查各位的mysql能力了,一个execute方法走天下。但是这里我们来看一个老生常谈的问题,sql注入问题。

    SQL注入的问题

    1. username = "yyx"
    2. async with conn.cursor() as cur:
    3. sql = "select * from user where username = '%s'" % username
    4. print(sql)
    5. count = await cur.execute(sql)
    6. if count:
    7. r = await cur.fetchall()
    8. for i in r:
    9. print(i)
    10. else:
    11. print("no user")

    如何避免SQL注入

    1. async def execute(self, query, args=None):
    2. """Executes the given operation
    3. Executes the given operation substituting any markers with
    4. the given parameters.
    5. For example, getting all rows where id is 5:
    6. cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    7. :param query: ``str`` sql statement
    8. :param args: ``tuple`` or ``list`` of arguments for sql query
    9. :returns: ``int``, number of rows that has been produced of affected
    10. """
    11. conn = self._get_db()
    12. while (await self.nextset()):
    13. pass
    14. if args is not None:
    15. query = query % self._escape_args(args, conn)
    16. await self._query(query)
    17. self._executed = query
    18. if self._echo:
    19. logger.info(query)
    20. logger.info("%r", args)
    21. return self._rowcount

    execute有二个参数,一个是query, 另外是args,我们看注释,query是 sql的语句, args是 tulpe 或者 list 类型的参数。如果args非空,脚本会通过 query = query % self._escape_args(args, conn) 重新组织query, 再来看下 _escape_args(args, conn) 的实现

    1. def _escape_args(self, args, conn):
    2. if isinstance(args, (tuple, list)):
    3. return tuple(conn.escape(arg) for arg in args)
    4. elif isinstance(args, dict):
    5. return dict((key, conn.escape(val)) for (key, val) in args.items())
    6. else:
    7. # If it's not a dictionary let's try escaping it anyways.
    8. # Worst case it will throw a Value error
    9. return conn.escape(args)

    如果是list或者tuple,则返回使用 conn.escape 转换之后的tuple, 如果是dict字典类型的话,则返回一个字典,key还是原来的key, value为 conn.escape(val) , 最终都是使用 conn.escape() 函数来进行转换,再来看下这个函数的实现

    1. def escape(self, obj):
    2. """ Escape whatever value you pass to it"""
    3. if isinstance(obj, str):
    4. return "'" + self.escape_string(obj) + "'"
    5. return escape_item(obj, self._charset)
    6. def escape_string(self, s):
    7. if (self.server_status &
    8. SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES):
    9. return s.replace("'", "''")
    10. return escape_string(s)

    函数将在传入的字符串两边加上两个单引号 ’ , 并且将 字符串中的单引号替换成两个单引号,这样就可以避免大多的sql注入问题,我们修改一下脚本

    1. username = 'yanyanxin'
    2. async with conn.cursor() as cur:
    3. count = await cur.execute("select * from user where username = %s", username)
    4. if count:
    5. r = await cur.fetchall()
    6. for i in r:
    7. print(i)
    8. else:
    9. print("no user")

    此时转换后的SQL语句为 select * from user where username = ‘yyx’ or 1=1#’ 已经将单引号进行了转义,此时就不会查找到用户了。

    注意为了避免SQL注入的问题,我们一定不要自己进行拼接SQL语句,一定要对用户的输入进行检查转义

    多参数的查询

    1. select * from user WHERE age >19 and age<29

    我们使用aiomysql的实现

    1. async with conn.cursor() as cur:
    2. count = await cur.execute("select * from user where age>%s and age<%s", (19, 29))
    3. if count:
    4. r = await cur.fetchall()
    5. for i in r:
    6. print(i)
    7. else:
    8. print("no user")

    联合查询

    我们再创建一个表,表示用户表中用户的职业, 创建三条数据, userid对应于user表中的id, 这里之所以没有用外键,之后再讨论,只是记住,这里的userid只是一个普通的列,它表示user表中的id。

    1. SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx'

    1. async with conn.cursor() as cur:
    2. sql = 'SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username=%s'
    3. count = await cur.execute(sql, ('yyx',))
    4. if count:
    5. r = await
  • 相关阅读:
    【基于stm32f407应用】中断模式串口通讯-USART1
    JAVA计算机毕业设计学校食堂管理Mybatis+源码+数据库+lw文档+系统+调试部署
    Java应用|使用Apache Spark MLlib构建机器学习模型【下】
    易基因技术推介|m1A RNA甲基化测序(MeRIP-seq/m1A-seq)
    如何在 Rocky Linux 上安装 Apache Kafka?
    【SQL学习】常用命令
    常用的生命钩子(VUe2方法)
    跑步装备推荐:2022年跑步装备选购清单
    领导提拔项目经理,看的从来不是努力
    使用 JavaScript 检测用户是否在线
  • 原文地址:https://blog.csdn.net/ceshiren456/article/details/126744409