• python数据库连接池的正确用法


    1. # coding=utf-8
    2. import random
    3. import threading
    4. from dbutils.pooled_db import PooledDB
    5. from dbutils.persistent_db import PersistentDB
    6. import time
    7. import pymysql
    8. from configuration.config import system_logger, db_config
    9. class MysqlHelper(object):
    10. def __init__(self, db_config):
    11. self.__pool = PooledDB(creator=pymysql,
    12. mincached=1,
    13. maxcached=5,
    14. maxshared=5,
    15. maxconnections=5,
    16. maxusage=5,
    17. blocking=True,
    18. user=db_config.get('user'),
    19. passwd=db_config.get('password'),
    20. db=db_config.get('database'),
    21. host=db_config.get('host'),
    22. port=db_config.get('port'),
    23. charset=db_config.get('charset'),
    24. )
    25. def getConn(self):
    26. conn = self.__pool.connection() # 从连接池获取一个链接
    27. cursor = conn.cursor()
    28. return conn, cursor
    29. @staticmethod
    30. def dispose(cursor, conn):
    31. cursor.close()
    32. conn.close()
    33. def getOne(self, sql):
    34. conn, cursor = self.getConn()
    35. th_name = threading.currentThread().getName()
    36. # print(f'{th_name} {self.conn} {self.cursor} {time.time():.4f} start {sql}')
    37. cursor.execute(sql)
    38. rows = cursor.fetchall()
    39. print(f"{th_name} {conn} {cursor} {time.time():.4f} {rows}")
    40. # self.dispose()
    41. self.dispose(cursor, conn)
    42. return rows
    43. def queryOne(self, sql):
    44. system_logger.info("----------------------sql start ----------------------")
    45. system_logger.info(sql)
    46. try:
    47. conn, cursor = self.getConn()
    48. result = cursor.execute(sql)
    49. # rows = cursor.fetchall()
    50. json_data = self.sql_fetch_json(cursor)
    51. # 将连接返回
    52. self.dispose(cursor, conn)
    53. system_logger.info(f"-----------------------queryByKey result:{result} " + str(json_data))
    54. if len(json_data) == 1:
    55. return json_data[0]
    56. return None
    57. except Exception as e:
    58. system_logger.info("-----------predict exception line: " + str(e.__traceback__.tb_lineno) + " of " +
    59. e.__traceback__.tb_frame.f_globals["__file__"])
    60. system_logger.info(e)
    61. return None
    62. @staticmethod
    63. def sql_fetch_json(cursor: pymysql.cursors.Cursor):
    64. """ Convert the pymysql SELECT result to json format """
    65. keys = []
    66. for column in cursor.description:
    67. keys.append(column[0])
    68. key_number = len(keys)
    69. json_data = []
    70. for row in cursor.fetchall():
    71. item = dict()
    72. for q in range(key_number):
    73. item[keys[q]] = row[q]
    74. json_data.append(item)
    75. return json_data
    76. def test1(pool):
    77. phone_no = f"1390709000{random.randint(6,7)}"
    78. strsql = f"select * from zy_phone where policy_holder_phone_no={phone_no} order by insure_date " \
    79. + "desc, kafka_etl_time asc limit 1 "
    80. while True:
    81. time.sleep(1)
    82. pool.getOne(strsql)
    83. # time.sleep(0.001)
    84. j = 0
    85. th_name = threading.currentThread().getName()
    86. # if th_name in ['Thread-2','Thread-5']:
    87. # # print(f"task {th_name}")
    88. # time.sleep(0.003)
    89. def main(pool):
    90. # pool.getConn()
    91. ths = []
    92. for i in range(5):
    93. th = threading.Thread(target=test1, args=(pool,))
    94. ths.append(th)
    95. for th in ths:
    96. th.start()
    97. for th in ths:
    98. th.join()
    99. if __name__ == "__main__":
    100. mysqlhelper = MysqlHelper(db_config)
    101. main(mysqlhelper)
    102. time.sleep(3)
    103. while True:
    104. time.sleep(1)

    常见错误1、

     def getConn(self):
          self.conn = self.__pool.connection()
          self.cursor = self.conn.cursor() 
    此处不应该共享链接,和cursor,会导致报错:

    AttributeError: 'NoneType' object has no attribute 'read'

    或者:

    AttributeError: 'NoneType' object has no attribute ‘settimeout‘

    常见错误2、

    获取链接以及查询的时候加锁

    lock.acquire()
    pool.getConn()
    pool.getOne(strsql)
    lock.release()
    time.sleep(1)

    因为pooldb本身就会加锁,参见如下代码,自己在从链接池获取链接,到cursor获取数据的时候加锁,会导致锁冗余,此时连接池会退化成单个数据库链接。

    self.__pool.connection() 逻辑如下:

    1. def connection(self, shareable=True):
    2. """Get a steady, cached DB-API 2 connection from the pool.
    3. If shareable is set and the underlying DB-API 2 allows it,
    4. then the connection may be shared with other threads.
    5. """
    6. if shareable and self._maxshared:
    7. with self._lock:
    8. while (not self._shared_cache and self._maxconnections
    9. and self._connections >= self._maxconnections):
    10. self._wait_lock()
    11. if len(self._shared_cache) < self._maxshared:
    12. # shared cache is not full, get a dedicated connection
    13. try: # first try to get it from the idle cache
    14. con = self._idle_cache.pop(0)
    15. except IndexError: # else get a fresh connection
    16. con = self.steady_connection()
    17. else:
    18. con._ping_check() # check this connection
    19. con = SharedDBConnection(con)
    20. self._connections += 1
    21. else: # shared cache full or no more connections allowed
    22. self._shared_cache.sort() # least shared connection first
    23. con = self._shared_cache.pop(0) # get it
    24. while con.con._transaction:
    25. # do not share connections which are in a transaction
    26. self._shared_cache.insert(0, con)
    27. self._wait_lock()
    28. self._shared_cache.sort()
    29. con = self._shared_cache.pop(0)
    30. con.con._ping_check() # check the underlying connection
    31. con.share() # increase share of this connection
    32. # put the connection (back) into the shared cache
    33. self._shared_cache.append(con)
    34. self._lock.notify()
    35. con = PooledSharedDBConnection(self, con)
    36. else: # try to get a dedicated connection
    37. with self._lock:
    38. while (self._maxconnections
    39. and self._connections >= self._maxconnections):
    40. self._wait_lock()
    41. # connection limit not reached, get a dedicated connection
    42. try: # first try to get it from the idle cache
    43. con = self._idle_cache.pop(0)
    44. except IndexError: # else get a fresh connection
    45. con = self.steady_connection()
    46. else:
    47. con._ping_check() # check connection
    48. con = PooledDedicatedDBConnection(self, con)
    49. self._connections += 1
    50. return con

  • 相关阅读:
    盲盒一番赏小程序:打开未知的惊喜之旅
    MySQL报错:this is incompatible with sql_mode=only_full_group_by 解决方法
    SpringBoot+Mybaits搭建通用管理系统实例九:基础增删改查功能实现上
    js中的new方法
    Improving Few-Shot Learning with Auxiliary Self-Supervised Pretext Tasks(论文解读)
    Cesium中的DataSource和Entity关系
    【面向校招】Golang面试题合集
    计算机毕业设计django基于python的高校奖学金管理系统(源码+系统+mysql数据库+Lw文档)
    品牌方发行NFT时,应如何考量实用性?
    凉鞋的 Godot 笔记 201. 第三轮循环:引入变量
  • 原文地址:https://blog.csdn.net/mtj66/article/details/125501757