URL:
---SQLAlchemy 教程 —— 基础入门篇
目录:
- SQLAlchemy 简介
- 横向对比
- 核心概念与入门
- 模型定义
- 增
- 查
- 复杂查询
- 删
- 改
- 基础性能
- 扩展与进阶
- 事件
- 反射
- Python3.x asyncio扩展
- 分片Session
- 自定义的列类型
- 混合(hybrid)属性
- 序列化Query
- Baked Query
- 多态与关系
在新团队里做的技术分享,过了一段时间,整理下来并且有了新的想法。似乎入门级的教程在知乎更受欢迎?
SQLAlchemy 简介
SQLAlchemy 是一个功能强大的Python ORM 工具包,口碑不错,社区活跃也较为开放
提供 全功能的SQL和ORM操作 本次附赠的文件(这里放不上来,也懒得放gayhub了,总之很简单的,单元测试多一些,一下午搞定):connect.py :底层的数据库连接orm.py :模型定义的样例example_test.py :单元测试,实质上可以对应业务的具体使用python3_test.py :展示Python3 asyncio下的SQLAlchemy分别建立python2/3的虚拟环境,然后安装对应的requirements.txt即可
无论什么语言,无论什么库,做一个ORM实现,至少应当实现完全语义化的数据库操作,使得操作数据库表就像在操作对象。
完整的ORM应当可以完全避免SQL拼接为什么需要ORM
当时分享完毕之后,也确实很多同事表示还是喜欢裸SQL,我后来也又在工作中看到了很多遗留代码的问题。我也正好趁浴室迷思 想了一下,为什么我需要ORM呢?
第一条来自一个定理:
一切由人直接来保证安全性的系统,就一定会出错
拼接SQL、把SQL做成模板、开始使用ORM、封装出DAO层,几乎是每个项目的共识吧?
过往的项目中,由我第一手写的,都会第一时间加入ORM,毕竟也只是两三个小文件,一百行以内的事情(后续由于封装的增多,可能会到达数百行)这段时间在写旧系统的小规模重构(定理2:一个好程序猿应当友好地帮前人擦好屁股,而不是靠重新制造一个新屁股实现),拼接字符串并没有带来任何优点,反而引入了非常简单的注入漏洞,简单的设想这样一个列表API的场景:
- 根据请求参数控制对应的:过滤条件、排序方法、翻页
- 根据需要预取关联的表,JOIN并把对一对多的关系化为一个list
第一条,刚一上手,就发现满地的string format,翻页用了:
order_sql = "ORDER BY {} {}".format(order_by,direction)
毫无疑问的order_by=id%3Bselect+1%3B-- 就直接注入了
要解决这些在SQL拼接的问题,除了表单验证,毫无疑问需要做一个SQL字符转义,另外在能用SQL参数的地方,需要用参数(然后也得注意拼接时候参数的个数,是的,这里我们的接口有另一个BUG,参数数量没数对)
第二个功能点,想象一下在需要的地方额外加一句LEFT JOIN,然后对结果再做额外的解析
还有一些附属功能:单元测试如何建表?代码里遍地的硬编码表名如何解决?
自己不是不能实现,但自己来实现这些,就走上了发明ORM的老路,用一个成熟的、文档丰富的ORM,岂不美哉?横向对比
简单的挑了三个
(知乎的表格似乎智障,不插入表格了)
SQLAlchemy、Peewee、Django ORM
Django ORM一直就不是一个全功能的ORM,会发现你想写的SQL几乎无法通过ORM写出来,当然raw属于tan90,使用裸SQL不在我们的考虑范围。Django 1.12后提供了一些subquery等各类丰富SQL操作,但这么新,估计还极少项目在这么新的版本
Peewee如果有兴趣可以后续继续使用来感受一下,Peewee也是一个功能全面的ORM,star很多但开发没有SQLAlchemy活跃
核心概念与入门
我总是在想为什么团队里很多人会觉得SQLAlchemy入门门槛高,我曾经也被困扰过,但回头一看会发现的概念实质比较简单。官方文档的脉络不太清晰,要扫过一遍并且学以致用才能感受得到。example很友好的! 回过头来看它的从教程到API的文档,会发现它的文档非常详细,学会它,除了学会了Python操作SQL的一个库,同样也可以学到从代码组织、各类Pythonic技巧到思想的很多东西 总的感受是:上手还算容易,精通要花很多功夫,但确实还挺有趣的先放一个表,待会我们会继续讲
(再次损失一个表)
概念很少,并且很清晰,理解这些概念之后的后续使用时,基本可以感受到:你能直觉想到的操作,还确实都有(比如subquery、复杂查询的构造)
模型定义
我们来看看他如何完成模型定义:
# coding=utf-8from __future__ import unicode_literals, absolute_import from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime ModelBase = declarative_base() #<-元类 class User(ModelBase): __tablename__ = "auth_user" id = Column(Integer, primary_key=True) date_joined = Column(DateTime) username = Column(String(length=30)) password = Column(String(length=128))
从这里可以看到,模型定义甚至与数据库是无关的,所以允许不同的数据库后端,不同类型拥有不同的表现形式和建表语句
这里我们可以看到它实现了 ORM与数据库连接的解耦,一些数据库后端不支持的数据类型,例如Numeric类型,在sqlite中不支持,不过SQLAlchemy也能做一些兼容使用普通浮点
Model 等同于数据库的一张表
Column 显然就是这张表的一列PS: SQLAlchemy 1.2之后才支持comment注释,以在ddl产生建表SQL时写上comment属性,1.2还在beta版里,所以还不能用。。。我倒很好奇为毛这个feature这么不重要
增
with get_session() as session: session.add(User(username="asd", password="asd")) session.add(User(username="qwe", password="qwe")) session.commit()
session(会话)的概念,可以看成一个管理数据库持久连接的对象,在此下面是完全透明的连接池和事务等东西
get_session底下configure可以控制auto_commit参数,= False时写操作默认都不放在事务里,SQLAlchemy默认为True
session.add函数将会把Model加入当前的持久空间(可以从session.dirty看到),直到commit时更新
查
with get_session() as session: # session.query(User)
最简单的这个查询返回了一个Query对象
需要注意的是,这里只构造Query,事实上并没有发送至数据库进行查询,只会在Query.get()、Query.all()、Query.one()以及Query.__iter__等具有“执行”语义的函数,才会真的去获取Query :本质上是数据表的若干行
- 在查询情况的下,等同于SQL 中的 SELECT Syntax
- 在update函数的操作时,可以根据参数选择等同于直接UPDATE users SET xxx WHERE name=xxx或者先用SELECT 选出ID,再循环用UPDATE xxx WHERE id=xxx
- delete同上
以SQLAlchemy为代表的ORM基本都支持链式操作。
形如:with get_session() as session: # query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") #上面两个都是添加where .join(Addreess)#使用ForeignKey .join(Addreess,Addreess.user_id==User.id)#使用显式声明 .limit(10) .offset(0) )
所有Query支持的详情见
上面也涉及到一个特别有意思的filter函数:User.username == "asd" ,实际上是SQLAlchemy重载了Column上的各种运算符 __eq__、__ge__,返回了一个BinaryExpression对象,看起来就更加符合直觉上的语义
复杂查询
基于Query的subquery
with get_session() as session: # query = (session .query(User.id) .filter(User.username == "asd") .filter_by(username="asd") .limit(10) ) subquery = query.subquery() query2 = session.query(User).filter( User.id.in_(subquery) ) print query2#<-打印展开成的SQL,此处没有SQL查询
理解了Query、Column的概念,也很容易自行构造出这样的SQL
所有在Column级别上的使用 详见
删
上面我们提到了直接对Query进行的删除:
with get_session() as session: query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") .join(Addreess) .join(Addreess,Addreess.user_id=User.id) .limit(10) .delete()#<-这里 )
另外,因为Model也可以被放进session里,然后删除的,和插入是个反向操作:
with get_session() as session: instance = session.query(User).get(1) session.delete(instance) #下一句执行:DELETE FROM auth_user WHERE auth_user.id = ? session.commit()
改
改首先是上述Query中所说的update方法:
with get_session() as session: # get by id query = (session .query(User) .filter_by(id=1) .update({"username": User.username + "a"}, synchronize_session=False) )
然后是在Model级别的方法:
with get_session() as session: # get by id user = (session .query(User) .get(1) ) user.password = "zxcv" # UPDATE auth_user SET password=? # WHERE auth_user.id = ? session.commit()
在对Model的属性进行修改的时候,session会得到修改对应的内容,下次commit即会提交SQL
这里留个思考题:如果对1、同一对象的同一属性进行修改,2、同一对象的不同属性进行修改 ,最终会有几个SQL被发出? 如果你来实现这样的功能,你会从哪里下手?基础性能
比较了十万条记录插入的性能
另外不要觉得比sqlite 裸SQL慢三倍很慢,注意这个量级,实际项目中会发现慢查询、不规范操作(例如for循环里放查询)的危害比引入ORM的这点开销打多了
总结
到这再贴上面那个概念表,应该就能比较好的理解了
在用裸SQL可以解决的场景下,上述的SQLAlchemy入门部分就足以掌控场景,完成所有的增删查改API需求(甚至自动生成代码的需求),自动生成真是偷懒无止境。。不过发明新的DSL嘛,能不做就不做。。
扩展与进阶
从过往的经验来看,SQLAlchemy以优雅的直觉实现了诸多接口,并保留了良好的可扩展性,这里抛砖引玉一些有趣的特性
事件
应用层的触发器(trigger),支持:
- ConnectionEvents 包括Connection和Engine(连接后进行一些自检操作)
- DDLEvents 模型增删查改事件
- DialectEvents 不同种类的数据库的事件
- PoolEvents 连接池事件,连接的检出和回收等
上面的性能测试里就使用了两种事件
from sqlalchemy import eventfrom sqlalchemy.engine import Engineimport timeimport logginglogging.basicConfig()logger = logging.getLogger("myapp.sqltime")logger.setLevel(logging.DEBUG)@event.listens_for(Engine, "before_cursor_execute")def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): conn.info.setdefault('query_start_time', []).append(time.time()) logger.debug("Start Query: %s", statement)@event.listens_for(Engine, "after_cursor_execute")def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): total = time.time() - conn.info['query_start_time'].pop(-1) logger.debug("Query Complete!") logger.debug("Total Time: %f", total)
反射
现有项目或者别人的代码里如果已经用其他的方式写好了表定义,不想再定义Model了,想用SQLAlchemy直接使用对应的数据库表
查文档关键字:from sqlalchemy.ext.automap import automap_basefrom sqlalchemy.orm import Sessionfrom sqlalchemy import create_engineBase = automap_base()# engine, suppose it has two tables 'user' and 'address' set upengine = create_engine("sqlite:///mydatabase.db")# reflect the tablesBase.prepare(engine, reflect=True)tables = Base.classes#<-load tablesUser = Base.classes.userAddress = Base.classes.address# rudimentary relationships are producedsession.add(Address(email_address="foo@bar.com", user=User(name="foo")))session.commit()# collection-based relationships are by default named# " _collection"print (u1.address_collection)
扩展阅读:
我之前在一些OLAP应用 用来做数据分析时用到过。。
Python3.x asyncio扩展
16年12月 Python3.6进入稳定期,同时也标志着Python3.4和3.5中的asyncio模块进入稳定期
SQLAlchemy对asyncio的支持在于,它实质上可以在engine层进行扩展,同时扩展Engine、Connection、Transaction、Context 代码量约400行
Strategies for creating new instances of Engine types. These are semi-private implementation classes which provide the underlying behavior for the "strategy" keyword argument available on :func:~sqlalchemy.engine.create_engine. Current available options are plain, threadlocal, and mock. New strategies can be added via new EngineStrategy classes. """
形如:
from sqlalchemy.engine.strategies import DefaultEngineStrategyfrom .engine import AsyncioEngineASYNCIO_STRATEGY = '_asyncio'class AsyncioEngineStrategy(DefaultEngineStrategy): name = ASYNCIO_STRATEGY engine_cls = AsyncioEngineAsyncioEngineStrategy() async def main(): engine = create_engine( # In-memory sqlite database cannot be accessed from different # threads, use file. 'sqlite:///test.db', strategy=ASYNCIO_STRATEGY ) metadata = MetaData() users = Table( 'users', metadata, Column('id', Integer, primary_key=True), Column('name', Text), ) # Create the table await engine.execute(CreateTable(users)) conn = await engine.connect()
另外提一嘴的是:asyncio不是银弹,会导致应用层压力直接传给DB,会掩盖应用的SQL写的烂的问题
分片Session
读写分离是当数据库压力到达一定阶段时,由应用层进行的拆分数据库压力的措施
实现一种主从分离的Session:- 最简单的方案是直接扩展Session类get_bind方法
get_bind(mapper=None, clause=None)
Return a “bind” to which this Session is bound. Note that the “mapper” argument is usually present when Session.get_bind() is called via an ORM operation such as a Session.query(), each individual INSERT/UPDATE/DELETE operation within a Session.flush(), call, etc.
- 也可以使用sqlalchemy.ext.horizontal_shard模块中已经实现好的ShardedSession
Parameters:
- shard_chooser – A callable which, passed a Mapper, a mapped instance, and possibly a SQL clause, returns a shard ID. This id may be based off of the attributes present within the object, or on some round-robin scheme.
- id_chooser – A callable, passed a query and a tuple of identity values, which should return a list of shard ids where the ID might reside.
- query_chooser – For a given Query, returns the list of shard_ids where the query should be issued.
- shards – A dictionary of string shard names to Engine objects.
允许根据model或者SQL条件、ID选择具体的数据库连接。一个未经验证的脑洞:因为shards是Engine的dict,那么是否允许在异构数据库之间使用Shard?这样会带来什么样的优缺点?
自定义的列类型
很久很久以前做的功能了,想象一个这样的场景:
- Postgresql支持IP/CIDR的存储,本质上是使用4*8bit=32bit的int存储
- Mysql此时并没有这样简单的IP存储 如何对其进行扩展?
自定义实现的列类型实质上需要:
- 指定在某种数据库方言下的存储类型,例如Mysql下使用int
- 实现两个方法:从数据库中取出来一个python对象和把Python对象放入数据库
- 按需需要实现:支持一些操作符(例如==,in_)
from sqlalchemy import typesclass MyIPType(types.TypeDecorator): impl = types.Integer def process_bind_param(self, value, dialect): #from python to database if dialect=="mysql": pass return #.... def process_result_value(self, value, dialect): #from database to python object return #...
我们也可以在awesome-sqlalchemy中找到一些有趣的类型扩展
混合(hybrid)属性
我们常见使用Python的property修饰器来构造一个复杂属性,SQLAlchemy中,这个混合属性的作用也类似,不仅可以用于获得对应的值,也可以用于Query时的链式操作
定义一个Model后,可以在各类增删查改中用到这个混合属性。混合属性 混合在:既是一个Python属性,也是一个可以放入数据库查询的属性
class Interval(Base): __tablename__ = 'interval' id = Column(Integer, primary_key=True) start = Column(Integer, nullable=False) end = Column(Integer, nullable=False) def __init__(self, start, end): self.start = start self.end = end @hybrid_property def length(self): return self.end - self.start #下面这个写着玩的。。 @length.setter def length(self, value): self._value = value >>> i1 = Interval(5, 10) >>> i1.length 5 >>> print Session().query(Interval).filter_by(length=5) SELECT interval.id AS interval_id, interval.start AS interval_start, interval."end" AS interval_end FROM interval WHERE interval."end" - interval.start = :param_1
上述还有一个写着玩儿的setter,hybrid_property支持:
- comparator 扩展Interval.length在各种比较符(><=)的行为
- deleter/setter 顾名思义
- expression 可以扩展最后展开的SQL表达式,例如展开成SUM(xxx):
from sqlalchemy.orm import func #下面这个写着玩的。。 @length.expression def length(self, expr): return func.sum(self.end, expr)
序列化Query
提供一个接口,以序列化和反序列化Query,用于跨系统、微服务的场景
from sqlalchemy.ext.serializer import loads, dumpsmetadata = MetaData(bind=some_engine)Session = scoped_session(sessionmaker())# ... define mappersquery = Session.query(User). filter(User.somedata=='foo').order_by(User.sortkey)# pickle the queryserialized = dumps(query)# unpickle. Pass in metadata + scoped_session # 上面提到过的 query和Session实际上是密不可分的query2 = loads(serialized, metadata, Session)print query2.all()
这个做起来其实就非常带感了,微服务之间的必要条件就是各种dump,结合一下celery,实现一个去中心的HTTP服务也是不在话下
Baked Query
缓存从Query生成的SQL,以减少生成时间,实际上是个应用层面的存储过程、View
from sqlalchemy.ext import bakedbakery = baked.bakery()#<-创建了一个LRU from sqlalchemy import bindparam def search_for_user(session, username, email=None): baked_query = bakery(lambda session: session.query(User)) baked_query += lambda q: q.filter(User.name == bindparam('username')) baked_query += lambda q: q.order_by(User.id) if email: baked_query += lambda q: q.filter(User.email == bindparam('email')) result = baked_query(session).params(username=username, email=email).all() return result
上面说到了SQLAlchemy展开成SQL的性能问题,真的特别担忧的话,再来一个缓存绑定参数如何?
多态和关系
使用多个模型,但实际上只是操作一张数据库表 此处基本略,之前写过一篇文章了:
class Employee(Base): __tablename__ = 'employee' id = Column(Integer, primary_key=True) name = Column(String(50)) type = Column(String(50)) __mapper_args__ = { 'polymorphic_identity':'employee', 'polymorphic_on':type }
这里定义了雇员Employee 模型,指定type字段为多态所在字段,并且对于这个模型,当type字段为'employee'时,即为一个雇员
一对一、一对多、多对多的关系和自动收集成collection,这里不会细说,relationship函数的各种参数留待大家游玩。
关系间的收集有多种lazy方式,可以选择在父类读取时直接JOIN或者Subquery,也可以在需要的时候使用Query.option设置。说起来的篇幅会更长,我投个懒,大家去读文档吧~hfgl