介绍

SQLAlchemy 是一个 ORM 框架,可以帮助我们使用面向对象的方式快速实现数据库操作。

组成部分:

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

SQLAlchemy 本身无法操作数据库,其必须依赖 pymysql 等第三方插件,Dialect(方言)用于和数据 API 进行交流,根据配置文件的不同调用不同的数据库 API ,从而实现对数据库的操作,如:

MySQLDB
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

使用

安装

pip3 install sqlalchemy

创建和删除表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
age = Column(Integer, nullable=False)
name = Column(String(32), index=True, nullable=False) """
根据类创建数据库表
"""
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) def init_db():
# 创建表
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) if __name__ == '__main__':
drop_db()
init_db()

单表操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
age = Column(Integer, nullable=False)
name = Column(String(32), index=True, nullable=False) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session
session = Session()
# 添加单条
obj = Users(name="zhangsan",age=20)
session.add(obj)
# 添加多条
obj1 = Users(name='lisi',age=12)
obj2 = Users(name='wangwu',age=14)
session.add_all([obj1, obj2])
# 提交事务
session.commit()

新增操作

# 查询所有
user_query = session.query(Users)
print(user_query) # SELECT users.id AS users_id, users.name AS users_name FROM users
print(user_query.all()) # [<__main__.Users object at 0x0000000003A3E2B0>, <__main__.Users object at 0x0000000003A3E320>, <__main__.Users object at 0x0000000003A3E390>] # 过滤条件
user_query = user_query.filter(Users.id > 1)
print(user_query) # SELECT users.id AS users_id, users.name AS users_name FROM users WHERE users.id > %(id_1)s
print(user_query.all()) # [<__main__.Users object at 0x000000000395E4A8>, <__main__.Users object at 0x000000000395E518>] # 取第一个
print(user_query.first()) # <__main__.Users object at 0x000000000392DC88>

基本查询

# 删除 返回删除条数
print(session.query(Users).filter(Users.id > 1).delete()) #
session.commit()

删除操作

# 修改方式一
user_obj = session.query(Users).filter(Users.id == 1).first()
user_obj.name = '张大三'
session.commit() # 修改方式二
session.query(Users).filter(Users.id == 1).update({Users.name: '张小三'})
session.commit() # 在原来值基础修改
session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + ''}, synchronize_session=False)
session.query(Users).filter(Users.id == 1).update({'age': Users.age + 2}, synchronize_session='evaluate')
session.commit()

修改操作

# 通过字段过滤
session.query(Users).filter_by(name='zhangsan').all()
# filter 中过滤多条件时它们的关系是 and
session.query(Users).filter(Users.id > 1, Users.name == 'lisi').all()
# between
session.query(Users).filter(Users.id.between(1,2)).all()
# in
session.query(Users).filter(Users.id.in_([1, 3])).all()
# not in
session.query(Users).filter(~Users.id.in_([1, 3])).all()
# 子查询
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='zhangsan'))).all()
# and 和 or 操作
from sqlalchemy import and_, or_
session.query(Users).filter(and_(Users.id > 3, Users.name == 'lisi')).all()
session.query(Users).filter(or_(Users.id < 2, Users.name == 'zhangsan')).all()
session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'zhangsan', Users.id > 3),
)).all()
# like
session.query(Users).filter(Users.name.like('z%')).all()
# not like
session.query(Users).filter(~Users.name.like('z%')).all()
# 切片 含首不含尾
session.query(Users)[0:1]
# order by
session.query(Users).order_by(Users.name).all()
# order by desc
session.query(Users).order_by(Users.name.desc(),Users.id.asc()).all()
# group by & having & max & sum & min & avg
from sqlalchemy.sql import func
session.query(func.max(Users.age), func.sum(Users.age), func.min(Users.age), func.avg(Users.age)).group_by(Users.age).having(func.min(Users.age) > 14).all() # [(26, Decimal('26'), 26, Decimal('26.0000'))]
# 查询指定列 & as
ret = session.query(Users.name,Users.age,Users.name.label('nick_name')).all()
[print(user_item[0],user_item.age,user_item.nick_name) for user_item in ret]
# 连表
# where xx.id = xx.id
session.query(Users, Favor).filter(Users.id == Favor.nid).all()
# inner join
session.query(Person).join(Favor).all()
# outer join
session.query(Person).join(Favor, isouter=True).all()
# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# 合并去重
ret = q1.union(q2).all()
# 合并不去重
ret = q1.union_all(q2).all()

其它操作

一对多操作

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Depart(Base):
__tablename__ = 'depart'
id = Column(Integer, primary_key=True)
title = Column(String(32), nullable=True) class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
dept_id = Column(Integer, ForeignKey('depart.id')) """
根据类创建数据库表
"""
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/1221?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) def init_db():
# 创建表
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) if __name__ == '__main__':
drop_db()
init_db()

创建外键关系

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Depart(Base):
__tablename__ = 'depart'
id = Column(Integer, primary_key=True)
title = Column(String(32), nullable=True) class Users(Base):
__tablename__ = 'users' id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
dept_id = Column(Integer, ForeignKey('depart.id')) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/1221", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个session
session = Session()
# 查询所有用户名称 & 所属部门名称
user_list = session.query(Users, Depart).join(Depart, Users.dept_id == Depart.id).all()
print(user_list)
'''
[(<__main__.Users object at 0x000000000395C8D0>, <__main__.Depart object at 0x000000000395C940>),
(<__main__.Users object at 0x000000000395C9B0>, <__main__.Depart object at 0x000000000395C940>),
(<__main__.Users object at 0x000000000395CA20>, <__main__.Depart object at 0x000000000395CA90>)]
'''
[print(user[0].name, user[1].title) for user in user_list]
'''
张三 后勤部
李四 后勤部
王五 财务部
'''
# SELECT users.name AS users_name, depart.title AS depart_title FROM users INNER JOIN depart ON users.dept_id = depart.id
user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id).all()
print(user_list, type(user_list)) # [('张三', '后勤部'), ('李四', '后勤部'), ('王五', '财务部')]
[print(user.name, user.title) for user in user_list]
'''
张三 后勤部
李四 后勤部
王五 财务部
'''
# SELECT users.name AS users_name, depart.title AS depart_title FROM users LEFT OUTER JOIN depart ON users.dept_id = depart.id
user_list = session.query(Users.name, Depart.title).join(Depart, Users.dept_id == Depart.id, isouter=True)
print(user_list)

连表查询

# 正向查询 :查询所有用户关联的部门名称
user_list = session.query(Users).all()
[print(user.name, user.depart, user.depart.title) for user in user_list]
'''
张三 <__main__.Depart object at 0x00000000039697F0> 后勤部
李四 <__main__.Depart object at 0x00000000039697F0> 后勤部
王五 <__main__.Depart object at 0x0000000003969898> 财务部
'''
# 反向查询 :查询后勤部所有用户名称
depart_obj = session.query(Depart).filter_by(id=1).first()
[print(user.name, user) for user in depart_obj.users]
'''
张三 <__main__.Users object at 0x000000000395FA20>
李四 <__main__.Users object at 0x000000000395F978>
''' # 关联新增 :创建一个部门'酱油部',再在该部门中添加一个员工'李德刚'
# 方式一:
user_obj = Users(name='李德刚')
depart_obj = Depart(title='酱油部', users=[user_obj])
session.add(depart_obj)
session.commit()
# 方式二:
depart_obj = Depart(title='酱油部')
user_obj = Users(name='李德刚', depart=depart_obj)
session.add(user_obj)
session.commit()

relationship使用

多对多操作

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Permission(Base):
'''权限'''
__tablename__ = 'permission'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False) class User(Base):
'''用户'''
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False) class Permission2User(Base):
__tablename__ = 'permission2user'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey('user.id'))
permission_id = Column(Integer, ForeignKey('permission.id')) __table_args__ = (
# 联合唯一索引
UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'),
) """
根据类创建数据库表
"""
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
) def init_db():
# 创建表
Base.metadata.create_all(engine) def drop_db():
Base.metadata.drop_all(engine) if __name__ == '__main__':
drop_db()
init_db()

创建多对多关系

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship Base = declarative_base() class Permission(Base):
'''权限'''
__tablename__ = 'permission'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
# 第一个参数:指定关联的模型类,secondary :指定通过该表创建多对多关系,backref :反向查询时字段
user_list = relationship('User', secondary='permission2user', backref='permission_list') class User(Base):
'''用户'''
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False) class Permission2User(Base):
__tablename__ = 'permission2user'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey('user.id'))
permission_id = Column(Integer, ForeignKey('permission.id'))
__table_args__ = (
# 联合唯一索引
UniqueConstraint('user_id', 'permission_id', name='uc_user_permission'),
) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine) session = Session()
# 初始化数据
session.add_all(
[
Permission(name='新增'),
Permission(name='查询'),
User(name='张三'),
User(name='吴刚')
]
)
session.commit() session.add_all([
Permission2User(user_id=1,permission_id=1),
Permission2User(user_id=1,permission_id=2),
Permission2User(user_id=2,permission_id=2),
])
session.commit()

初始化数据

list = session.query(Permission2User.id, User.name, Permission.name) \
.join(Permission, Permission2User.permission_id == Permission.id) \
.join(User, Permission2User.user_id == User.id).all()
print(list) # [(3, '吴刚', '查询'), (1, '张三', '新增'), (2, '张三', '查询')]
# 查询拥有新增权限的用户
permission_obj = session.query(Permission).filter_by(name='新增').first()
[print(user_obj.name) for user_obj in permission_obj.user_list] # 张三
# 查询吴刚拥有的权限
user_obj = session.query(User).filter_by(name='吴刚').first()
[print(permission_obj.name) for permission_obj in user_obj.permission_list] # 查询 # 创建一个权限,再创建两个用户,让这两个用户拥有这个权限
permission_obj = Permission(name='修改')
permission_obj.user_list = [User(name='王五'), User(name='赵柳')]
session.add(permission_obj)
session.commit()

常用操作示例

两种连接方式

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship, scoped_session Base = declarative_base() class User(Base):
'''用户'''
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False) engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
SessionFactory = sessionmaker(bind=engine) def task1():
# 去连接池获取一个连接。
session = SessionFactory()
ret = session.query(User).all()
# 将连接交还给连接池。
session.close() session = scoped_session(session_factory=SessionFactory) def task2():
# 当真正使用 session 时,才从连接池中取一个连接放在 threading.Local 对象的当前线程域中,实现多线程访问时的 session 隔离。
ret = session.query(User).all()
# 将连接交还给连接池
session.remove() from threading import Thread for i in range(20):
t1 = Thread(target=task1)
t1.start()
t2 = Thread(target=task2)
t2.start()

执行原生Sql

# 添加
cursor = session.execute('insert into user(name) values(:name)', params={"name": '刘能'})
session.commit()
print(cursor.lastrowid) # # 查询
cursor = session.execute('select * from user')
result = cursor.fetchall()
print(result) # [(5, '刘能'), (2, '吴刚'), (1, '张三'), (3, '王五'), (4, '赵柳')]

最新文章

  1. Failure to find xxx in xxx was cached in the local repository, resolution will not be reattempted until the update interval of nexus has elapsed or updates are forced @ xxx
  2. 百度api短信开发
  3. 刚知道的android属性
  4. There is no ‘Animation’ attached to the “Player” game object
  5. Java锁(一)之内存模型
  6. mysqli_stmt预处理类的使用
  7. 《第一行代码--Android》阅读笔记之Activity
  8. wrong number of arguments,java方法反射时数组参数的坑
  9. requirejs-define jquery 快速初学实例(一)
  10. bug fix: openstack can not run swift for pyeclib and liberasurecode do not match
  11. iphone--有关日历中NSDateFormatter中英文
  12. 数据库导出到excel
  13. PHP操作MySQL对表增加一列(一个字段)
  14. RDO Stack:VMs cannot access external network.
  15. apache基础学习
  16. [转] Webpack-CommonsChunkPlugin
  17. 主成分分析PCA(Principal Component Analysis)在sklearn中的应用及部分源码分析
  18. jQuery的selector和context属性
  19. 201521123014《Java程序设计》第1周学习总结
  20. java关于类加载的面试题

热门文章

  1. Atitit 关于处理环保行动联盟和动物解放阵线游击队的任命书 委任状
  2. vultr搭建ss
  3. Hibernate获取数据java.lang.StackOverflowError
  4. SpringMvc的Url映射和传参案例(转)
  5. 磁盘 I/O 优化
  6. ss搭建
  7. 【Postgres】空间数据库创建
  8. Fiddler 断点功能
  9. iOS - User Agent 的应用和设置
  10. MySql.Data.dll官网下载