SQLAlchemy 是一个用 Python 实现的 ORM (Object Relational Mapping)框架,它由多个组件构成,这些组件可以单独使用,也能独立使用。它的组件层次结构如下:

SQLAlchemy 学习笔记(一):Engine 与 SQL 表达式语言-LMLPHP

其中最常用的组件,应该是 ORMSQL 表达式语言,这两者既可以独立使用,也能结合使用。

ORM 的好处在于它

  1. 自动处理了数据库和 Python 对象之间的映射关系,屏蔽了两套系统之间的差异。程序员不需要再编写复杂的 SQL 语句,直接操作 Python 对象就行。
  2. 屏蔽了各数据库之间的差异,更换底层数据库不需要修改 SQL 语句,改下配置就行。
  3. 使数据库结构文档化,models 定义很清晰地描述了数据库的结构。
  4. 避免了不规范、冗余、风格不统一的 SQL 语句,可以避免很多人为 Bug,也方便维护。

但是 ORM 需要消耗额外的性能来处理对象关系映射,此外用 ORM 做多表关联查询或复杂 SQL 查询时,效率低下。因此它适用于场景不太复杂,性能要求不太苛刻的场景。

都说 ORM 学习成本高,我自己也更倾向于直接使用 SQL 语句(毕竟更熟悉),因此这一篇笔记不涉及 ORM 部分,只记录 SQLAlchemy 的 Engine 与 SQL 表达式语言。

一、直接使用 Engine 和 Connections

第一步是创建数据库引擎实例:

from sqlalchemy import create_engine

engine = create_engine('sqlite:///:memory:',
                    echo=True,  # echo=True 表示打印出自动生成的 SQL 语句(通过 logging)
                    pool_size=5,  # 连接池容量,默认为 5,生产环境下太小,需要修改。
                    # 下面是 connection 回收的时间限制,默认 -1 不回收
                    pool_recyle=7200)   # 超过 2 小时就重新连接(MySQL 默认的连接最大闲置时间为 8 小时)

create_engine 接受的第一个参数是数据库 URI,格式为 dialect[+driver]://user:password@host/dbname[?key=value..],dialect 是具体的数据库名称,driver 是驱动名称。key-value 是可选的参数。举例:

# PostgreSQL
postgresql+psycopg2://scott:tiger@localhost/dbtest

# MySQL + PyMySQL(或者用更快的  mysqlclient)
mysql+pymysql://scott:tiger@localhost/dbtest

# sqlite 内存数据库
# 注意 sqlite 要用三个斜杠,表示不存在 hostname,sqlite://<nohostname>/<path>
sqlite:///:memory:

# sqlite 文件数据库
# 四个斜杠是因为文件的绝对路径以 / 开头:/home/ryan/Codes/Python/dbtest.db
sqlite:////home/ryan/Codes/Python/dbtest.db

# SQL Server + pyodbc
# 首选基于 dsn 的连接,dsn 的配置请搜索hhh
mssql+pyodbc://scott:tiger@some_dsn

引擎创建后,我们就可以直接获取 connection,然后执行 SQL 语句了。这种用法相当于把 SQLAlchemy 当成带 log 的数据库连接池使用:

with engine.connect() as conn:
    res = conn.execute("select username from users")  # 无参直接使用

    # 使用问号作占位符,前提是下层的 DBAPI 支持。更好的方式是使用 text(),这个后面说
    conn.execute("INSERT INTO table (id, value) VALUES (?, ?)", 1, "v1")  # 参数不需要包装成元组

    # 查询返回的是 ResultProxy 对象,有和 dbapi 相同的 fetchone()、fetchall()、first() 等方法,还有一些拓展方法
    for row in result:
        print("username:", row['username'])

但是要注意的是,connection 的 execute 是自动提交的(autocommit),这就像在 shell 里打开一个数据库客户端一样,分号结尾的 SQL 会被自动提交。
只有在 BEGIN TRANSACTION 内部,AUTOCOMMIT 会被临时设置为 FALSE,可以通过如下方法开始一个内部事务:

def transaction_a(connection):
    trans = connection.begin()  # 开启一个 transaction
    try:
        # do sthings
        trans.commit()  # 这里需要手动提交
    except:
        trans.rollback()  # 出现异常则 rollback
        raise
# do other things

with engine.connect() as conn:
    transaction_a(conn)

1. 使用 text() 构建 SQL

相比直接使用 string,text() 的优势在于它:

  1. 提供了统一的参数绑定语法,与具体的 DBAPI 无关。
# 1. 参数绑定语法
from sqlalchemy import text

result = connection.execute(
            # 使用 :key 做占位符
            text('select * from table where id < :id and typeName=:type'),
            {'id': 2,'type':'USER_TABLE'})  # 用 dict 传参数,更易读

# 2. 参数类型指定
from sqlalchemy import DateTime

date_param=datetime.today()+timedelta(days=-1*10)
sql="delete from caw_job_alarm_log  where alarm_time<:alarm_time_param"

# bindparams 是 bindparam 的列表,bindparam 则提供参数的一些额外信息(类型、值、限制等)
t=text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
connection.execute(t, {"alarm_time_param": date_param})
  1. 可以很方便地转换 Result 中列的类型
stmt = text("SELECT * FROM table",
            # 使用 typemap 指定将 id 列映射为 Integer 类型,name 映射为 String 类型
            typemap={'id': Integer, 'name': String},
          )

二、SQL 表达式语言

SQLAlchemy 表达式语言是一个使用 Python 结构表示关系数据库结构和表达式的系统。

1. 定义并创建表

SQL 表达式语言使用 Table 来定义表,而表的列则用 Column 定义。Column 总是关联到一个 Table 对象上。

一组 Table 对象以及它们的子对象的集合就被称作「数据库元数据(database metadata)」。metadata 就像你的网页分类收藏夹,相关的 Table 放在一个 metadata 中。

下面是创建元数据(一组相关联的表)的例子,:

from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey

metadata = MetaData()  # 先创建元数据(收藏夹)

users = Table('user', metadata,  # 创建 user 表,并放到 metadata 中
              Column('id', Integer, primary_key=True),
              Column('name', String),
              Column('fullname', String)
             )

addresses = Table('address', metadata,
                  Column('id', Integer, primary_key=True),
                  Column('user_id', None, ForeignKey('user.id')),  # 外键,引用 user 表的 id 列
                  Column('email_address', String, nullable=False)
                 )

metadata.create_all(engine)  # 使用 engine 创建 metadata 内的所有 Tables(会检测表是否已经存在,所以可以重复调用)

2. 增删改语句

  1. :
# 方法一,使用 values 传参
ins = users.insert().values(name="Jack", fullname="Jack Jones")  # 可以通过 str(ins) 查看自动生成的 sql
connection.execute(ins)

# 方法二,参数传递给 execute()
conn.execute(users.insert(), id=2, name='wendy', fullname='Wendy Williams')

# 方法三,批量 INSERT,相当于 executemany
conn.execute(addresses.insert(), [  # 插入到 addresses 表
    {'user_id': 1, 'email_address': 'jack@yahoo.com'},  # 传入 dict 列表
    {'user_id': 1, 'email_address': 'jack@msn.com'},
    {'user_id': 2, 'email_address': 'www@www.org'},
    {'user_id': 2, 'email_address': 'wendy@aol.com'}
])

# 此外,通过使用 bindparam,INSERT 还可以执行更复杂的操作
stmt = users.insert() \
         .values(name=bindparam('_name') + " .. name")  # string 拼接
conn.execute(stmt, [
        {'id':4, '_name':'name1'},
        {'id':5, '_name':'name2'},
        {'id':6, '_name':'name3'},
     ])
_table.delete() \
        .where(_table.c.f1==value1) \
        .where(_table.c.f2==value2)  # where 指定条件
# 举例
stmt = users.update() \
             .where(users.c.name == 'jack') \
             .values(name='tom')
conn.execute(stmt)

# 批量更新
stmt = users.update().\
             where(users.c.name == bindparam('oldname')).\
             values(name=bindparam('newname'))
conn.execute(stmt, [
     {'oldname':'jack', 'newname':'ed'},
     {'oldname':'wendy', 'newname':'mary'},
     {'oldname':'jim', 'newname':'jake'},
     ])

可以看到,所有的条件都是通过 where 指定的,它和后面 ORM 的 filter 接受的参数是一样的。(详细的会在第二篇文章里讲)

查询暂时略过(因为感觉手写 sql 更方便。。)

链接

01-21 21:06