SQLModel
大约 6 分钟
SQLModel
- 官方文档:https://sqlmodel.tiangolo.com/
- FastAPI文档(SQLModel): https://fastapi.tiangolo.com/zh/tutorial/sql-databases/
起步
安装
pip install sqlmodel
# mysql驱动
pip install pymysql
# postgresql驱动
pip install psycopg2-binary
连接引擎
from sqlmodel import SQLModel, create_engine
# 内存
db_url = "sqlite+pysqlite:///:memory:"
# sqlite
db_url = "sqlite+pysqlite:///db_demo.sqlite"
# mysql
db_url = "mysql+pymysql://username:password@127.0.0.1:3306/db_name"
# postgresql
db_url = "postgresql://username:password@127.0.0.1:5432/db_name"
# 创建引擎
engine = create_engine(db_url, echo=True)
# 创建表格
SQLModel.metadata.create_all(engine)
数据表
import time
from sqlmodel import SQLModel, create_engine
from sqlmodel import Field, BigInteger, Integer, String, Boolean
# 直接创建
class User(SQLModel, table=True):
__tablename__ = "user"
id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})
# 使用基类
class BaseTable(SQLModel):
id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
create_timestamp: int = Field(
sa_type=BigInteger,
default_factory=lambda: time.time_ns() // 1000000, # lambda使用default_factory
description="创建时间戳",
sa_column_kwargs={"comment": "创建时间戳"},
)
class UseBaseUser(BaseTable, table=True):
__tablename__ = "use_base_user"
name: str = Field(sa_type=String(32), sa_column_kwargs={"comment": "用户名"})
age: int = Field(sa_type=Integer, default=0, ge=0, le=200, sa_column_kwargs={"comment": "年龄"})
字段 | 作用 | 示例 | 备注 |
---|---|---|---|
tablename | 自定义表名 | "user" | 可省略 |
sa_type | 数据表类型 | BigInteger | sa即SqlAlchemy |
sa_column_kwargs | 数据表参数 | sa即SqlAlchemy,可填写数据库的字段备注 文档:https://github.com/fastapi/sqlmodel/issues/492 | |
default | 默认值 | None | |
default_factory | 默认值工厂 | lambda: ... | |
primary_key | 主键 | True | |
title | 标题 | 用于Swagger等 | |
description | 标题 | 用于Swagger等 | |
gt/ge/lt/le | 范围 | pydantic的校验 |
JSON数据
- sa_type为JSON,保存和获取都能使用对应的类型
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, Integer, String, Boolean, JSON
# 直接创建
class User(SQLModel, table=True):
__tablename__ = "user"
id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})
# JSON格式,数据表中为json格式,存取都是对应的类型(list、dict)
addrs: list[str] = Field(sa_type=JSON, default=[], sa_column_kwargs={"comment": "地址列表"})
other: dict = Field(sa_type=JSON, default={}, sa_column_kwargs={"comment": "其他信息"})
# 插入数据库不需要特殊处理
user = User(name="李四", addrs=["福州"], other={"a": 1, "b": 2})
with Session(engine) as session:
session.add(user)
session.commit()
session.refresh(entity)
# 读取结果也能自动转换
with Session(engine) as session:
statement = select(User)
result = session.exec(statement).all()
for i in result:
print(type(i.addrs), i.addrs)
# <class 'list'> ['福州']
print(type(i.other), i.other)
# <class 'dict'> {'a': 1, 'b': 2}
枚举类
- 当前版本枚举类较为鸡肋,无法自动转换,需要转换成字符串,否则报错
- 而且不会校验字符串是否是枚举类
from enum import Enum
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, String
# 枚举类型
class SexEnum(Enum):
MALE = "MALE"
FEMALE = "FEMALE"
UNKNOWN = "UNKNOWN"
class User(SQLModel, table=True):
__tablename__ = "user"
id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
name: str = Field(sa_type=String(32), default="")
# 枚举字段;数据库类型为字符串;不会自动转换,传入参数要加上.value转成字符串(default也一样)
sex: SexEnum = Field(sa_type=String(32), default=SexEnum.UNKNOWN.value)
# sex参数需要转换成字符串
user = User(name="zhangsan", age=18, sex=SexEnum.FEMALE.value)
SQL语句
- User实体类
from sqlmodel import SQLModel, Field, BigInteger, String, Integer
class User(SQLModel, table=True):
id: int | None = Field(
sa_type=BigInteger,
default=None,
primary_key=True,
sa_column_kwargs={"comment": "自增id"},
)
name: str = Field(sa_type=String, sa_column_kwargs={"comment": "用户名"})
age: int = Field(sa_type=Integer, default=0, sa_column_kwargs={"comment": "年龄"})
sql语句
- 执行语句中的
from sqlmodel import Session, text
# 使用sql语句
def sql():
with Session(engine) as session:
statement = text('SELECT * FROM "user";')
result = session.exec(statement).all() # .all()表示多个返回值
return result
# 带参数
def sql_with_params():
with Session(engine) as session:
statement = text('SELECT * FROM "user" WHERE id=:id;')
result = session.exec(statement, params={"id": 2}).one() # .one()表示单个返回值
return result
session.exec(statement).all() # 返回多个值
session.exec(statement).one() # 返回单一值(返回0个或多个都会报错)
session.exec(statement).one_or_none() # 返回一个或None
session.exec(statement).first() # 返回一个或None(性能不好,语句是查询所有,然后返回第一条)
- 下面的语句实际都可以转换成sql
def to_sql():
with Session(engine) as session:
statement = select(User)
print(statement)
# SELECT "user".id, "user".name, "user".age FROM "user"
result = session.exec(statement).all()
return result
CRUD
新增
- 最佳实践
from sqlmodel import Session
def insert_entity(entity: User) -> None:
with Session(engine) as session:
session.add(entity)
session.commit()
# 重新绑定实体
session.refresh(entity)
user = User(name="张三", age=18)
insert_entity(user)
print(user) # 可获取完整数据,包括id
- 其他写法
from sqlmodel import Session, insert
def insert_entity_1() -> None:
with Session(engine) as session:
statement = insert(User).values(name="李四", age=10)
session.exec(statement)
session.commit()
def insert_entity_2() -> None:
with Session(engine) as session:
statement = insert(User).values({"name": "王五", "age": 20})
session.exec(statement)
session.commit()
def insert_entity_3(entity: User) -> None:
with Session(engine) as session:
statement = insert(User).values(entity.model_dump(exclude_none=True)) # 使用pydantic转换成字典(排除None是避免id传入None报错)
session.exec(statement)
session.commit()
更新
- 最佳实践
from sqlmodel import Session
def update_entity(entity: User) -> User:
with Session(engine) as session:
db_entity = session.get(User, entity.id)
db_entity.sqlmodel_update(entity.model_dump())
session.add(db_entity)
session.commit()
# 重新绑定实体
session.refresh(db_entity)
return db_entity
user = User(id=3, name="李四", age=20)
update_entity(user)
- 其他更新语句
- 无法返回更新后真正结果
from sqlmodel import Session, update
# 配合where语句可用于批量更新
def update_with_args() -> None:
with Session(engine) as session:
statement = update(User).where(User.id == 2).values(name="args")
session.exec(statement)
session.commit()
def update_with_kwargs() -> None:
with Session(engine) as session:
statement = update(User).where(User.id == 2).values({"name": "test"})
session.exec(statement)
session.commit()
def update_entity(user: User) -> None:
with Session(engine) as session:
statement = update(User).where(User.id == user.id).values(user.model_dump())
session.exec(statement)
session.commit()
删除
- 最佳实践
from sqlmodel import Session, delete
def delete_by_id(id: int) -> None:
with Session(engine) as session:
statement = delete(User).where(User.id == id)
session.exec(statement)
session.commit()
- 其他用法
from sqlmodel import Session
def delete_by_id(id: int) -> None:
with Session(engine) as session:
db_entity = session.get(User, id)
session.delete(db_entity)
session.commit()
简单查询
- 使用id查询
from sqlmodel import Session
def select_by_id(id: int) -> User | None:
with Session(engine) as session:
user = session.get(User, id)
return user
- 使用where语句
from sqlmodel import Session, select
# 根据条件查询
def select_by_where(name: str, age: int) -> list[User]:
with Session(engine) as session:
statement = select(User).where(User.name == name, User.age == age)
result = session.exec(statement).all()
return result
# 根据可选条件查询
def select_by_condition(name: str | None = None, age: int | None = None) -> list[User]:
with Session(engine) as session:
statement = select(User)
if name is not None:
statement = statement.where(User.name == name)
if age is not None:
statement = statement.where(User.age == age)
result = session.exec(statement).all()
return result
- like
from sqlmodel import Session, select
# like语句
def select_by_like(name: str) -> list[User]:
with Session(engine) as session:
statement = select(User).where(User.name.like(f"%{name}%"))
result = session.exec(statement).all()
return result
复杂查询
- 获取数量
from sqlmodel import Session, select, func
# 数量
def select_count():
with Session(engine) as session:
statement = select(func.count()).select_from(User)
# 增加筛选
# statement = statement.where(User.age > 18)
result = session.exec(statement).one()
return result
- 分页
from sqlmodel import Session, select
def select_page() -> list[User]:
with Session(engine) as session:
statement = select(User).offset(0).limit(3)
result = session.exec(statement).all()
return result
# 分页查询包括总数量
def select_page_with_count(
offset: int = 0, limit: int = 10, name: str | None = None, age: int | None = None
):
with Session(engine) as session:
conditions = []
if name is not None:
conditions.append(User.name.like(f"%{name}%"))
if age is not None:
conditions.append(User.age == age)
# 拼接条件(第一个参数True作用是避免条件为空)
whereclause = and_(True, *conditions)
# 语句
count_statement = select(func.count()).select_from(User).where(whereclause)
list_statement = select(User).where(whereclause).offset(offset).limit(limit)
return {
"count": session.exec(count_statement).one(),
"list": session.exec(list_statement).all(),
}
- 排序
from sqlmodel import Session, select, asc, desc
# 数量
def select_by_order():
with Session(engine) as session:
statement = select(User).order_by(desc(User.age))
result = session.exec(statement).all()
return result
FastAPI
- response_model中如果设置了UserPageResp
- data的list[Any]会自动转成list[UserItem]
# 分页响应表格结果
class TablePageResp(PageResp):
total: int = Field(default=0, title="总数量")
data: list[Any] = Field(default=[], title="数据")
# 响应结果
class UserPageResp(PageResp):
total: int = Field(default=0, title="总数量")
data: list[UserItem] = Field(default=[], title="数据")