• SQLAlchemy 封装的工具类,数据库pgsql(数据库连接池)


    1.SQLAlchemy是什么?

              SQLAlchemy 是 Python 著名的 ORM 工具包。通过 ORM,开发者可以用面向对象的方式来操作数据库,不再需要编写 SQL 语句。

    SQLAlchemy 支持多种数据库,除 sqlite 外,其它数据库需要安装第三方驱动。

    1.1组成部分:

            Engine, 框架引擎

           Connect Pooling 数据库连接池

          Dialect ,选择连接数据库DB API种类

          Schema/Types , 架构和类型

          SQL Expression Language: SQL表达式

      1.2 SQLAlchemy 不能创建数据库,可以建表,创建字段

            创建engine对象:dialect+driver://username:password@host:port/database

    # 使用pymysql驱动连接到mysql

       engine =  create_engine('mysql+pymysql://user:pwd@localhost/testdb')

    # 使用pymssql驱动连接到sql server

       engine = create_engine('mssql+pymssql://user:pwd@localhost:1433/testdb')

    1.3 filter 和 filter_by 区别:

     1.filter用类名.属性名,比较用==,filter_by()直接用属性名,比较用=, filter_by() 只接受键值对参        数,所以 filter_by() 不支持><(大于和小于)和 and_、or_查询。

     2.filter不支持组合查询,只能连续调用filter来变相实现。

    3. filter传的是表达式,filter_by传的是参数

    2. 使用数据库连接池说明


        Engine 对象是使用 sqlalchemy 的起点,Engine 包括数据库连接池 (Pool) 和 方言 (Dialect,指不同数据库 sql 语句等的语法差异),两者一起把对数据库的操作,以符合 DBAPI 规范的方式与数据库交互。

    3.工具类展示

      3.1 数据库配置类:db_config.py

    1. # dev环境配置
    2. host = "dev-pg.test.xxxx.cloud"
    3. port = 1921
    4. user = "check"
    5. database = "checkn"
    6. password = "Ku2221AP123aXsNW"
    7. # 连接池大小,默认为5,设置为0时表示连接无限制
    8. pool_size = 10
    9. # 连接池中最大连接数,如果访问数据库的请求数超过了pool_size,连接池将会自动创建新的连接,
    10. # 直到创建达到max_overflow个连接为止。默认情况下,max_overflow值为10
    11. max_overflow = 20
    12. # 连接池中获取连接的等待时间,超过该等待时间后,获取连接方法将会超时,引发连接失败异常。默认情况下,timeout为30秒。
    13. pool_timeout = 60

    3.2 数据库类封装:database.py

    1. # !/usr/bin/python
    2. # -*- coding: UTF-8 -*-
    3. from sqlalchemy import *
    4. from sqlalchemy.orm import sessionmaker
    5. from sqlalchemy.orm import Session
    6. from sqlalchemy.ext.declarative import declarative_base
    7. import db_config
    8. class dbTools(object):
    9. session = None;
    10. isClosed = True;
    11. def open(self, host=db_config.host, port=db_config.port, db=db_config.database, user=db_config.user,
    12. pwd=db_config.password, pool_size=db_config.pool_size, max_overflow=db_config.max_overflow,
    13. pool_timeout=db_config.pool_timeout):
    14. url = 'postgresql://%s:%s@%s:%d/%s' % (user, pwd, host, port, db)
    15. # echo: 设置为ture时,会将orm语句转化成sql语句并打印出来,一般debug时候使用
    16. engine = create_engine(url, poolclass=QueuePool, pool_size=pool_size, max_overflow=max_overflow,
    17. pool_timeout=pool_timeout, echo=True)
    18. DbSession = sessionmaker(bind=engine)
    19. self.session = DbSession()
    20. self.isClosed = False
    21. return self.session
    22. def query(self, type):
    23. query = self.session.query(type)
    24. return query
    25. def execute(self, sql):
    26. return self.session.execute(sql)
    27. def add(self, item):
    28. self.session.add(item)
    29. def add_all(self, items):
    30. self.session.add_all(items)
    31. def delete(self, item):
    32. self.session.delete(item)
    33. def commit(self):
    34. self.session.commit()
    35. def close(self):
    36. if self.isClosed:
    37. pass
    38. self.session.close()
    39. self.isClosed = True

    3.3 模型类 modeBatchInfo.py

    1. from sqlalchemy.ext.declarative import declarative_base
    2. from sqlalchemy import String, Column, Integer, DateTime, Enum, Table, ForeignKey, Text
    3. from sqlalchemy.orm import relationship
    4. # 创建Base类
    5. Base = declarative_base()
    6. # 创建ORM模型类
    7. class icsBatchInfo(Base):
    8. __tablename__ = 'ics_batch_info'
    9. batch_id = Column(Integer, primary_key=True)
    10. process_id = Column(String)
    11. task_id = Column(String)
    12. data_path = Column(Text)
    13. project_id = Column(String)
    14. user_name = Column(String)
    15. user_type = Column(String)
    16. status = Column(String)
    17. start_time = Column(DateTime)
    18. end_time = Column(DateTime)
    19. confidence_code = Column(String)
    20. repair_code = Column(String)
    21. report_count = Column(Integer)
    22. task_scope = Column(Text)
    23. adcity_code = Column(String)
    24. progress = Column(String)
    25. task_type = Column(String)
    26. job_id = Column(String)

    3.4 开始使用工具类:main.py

    1. # coding=utf-8
    2. from database import dbTools
    3. from modelBatchInfo import icsBatchInfo
    4. from sqlalchemy import text
    5. if __name__ == "__main__":
    6. dbtools = dbTools()
    7. dbtools.open()
    8. # 打开一个文件
    9. with open('task.txt') as fr:
    10. # 读取文件所有行
    11. lines = fr.readlines()
    12. lines = [i.rstrip() for i in lines]
    13. list = []
    14. list.append("taskId,batchId\n")
    15. for taskId in lines:
    16. # 1. 使用对象查询
    17. # result = dbtools.query(icsBatchInfo).filter_by(task_id=taskId).all()
    18. # result = dbtools.query(icsBatchInfo).filter(icsBatchInfo.task_id == taskId).all()
    19. # nodes = dbtools.filter(icsBatchInfo.py.master == False).all()
    20. # 2. 使用sql查询
    21. sql = text(
    22. "select * from ics_batch_info where batch_id=(select MAX(batch_id) from ics_batch_info WHERE task_id = '{taskId}')".format(
    23. taskId=taskId))
    24. result = dbtools.execute(sql)
    25. for batchInfo in result:
    26. list.append(batchInfo.task_id + "," + str(batchInfo.batch_id) + "\n")
    27. dbtools.close()
    28. list[len(list) - 1] = list[len(list) - 1].rstrip();
    29. with open("最大批次查询结果.csv", 'w') as fw:
    30. fw.writelines(list)
    31. print("☺☺☺执行完毕☺☺☺")

    说明:

    读取本目录下task.txt 中的任务号,去查数据库记录,并将需求查出来的内容写到本地csv文件"最大批次查询结果.csv" 文件。

    上阶尽管费力,却一步比一步高。不经过琢磨,宝石也不会发光

  • 相关阅读:
    Java并发-交替打印的四种方法。
    什么是覆盖索引?
    深入理解指针(四)
    jzoj1212 重建道路
    OpenAI官方吴达恩《ChatGPT Prompt Engineering 提示词工程师》(7)聊天机器人 / ChatBot
    安全远程访问工具
    hadoop生态现状、介绍、部署
    演示在一台Windows主机上运行两个Mysql服务器(端口号3306 和 3307),安装步骤详解
    MySql事务
    感觉 C++ 很简单,但为何这么多劝退的?
  • 原文地址:https://blog.csdn.net/qiaobing1226/article/details/132629753