跳至主要內容

SQLModel

言午日尧耳总大约 6 分钟PythonSQLModel

SQLModel

起步

安装

pip install sqlmodel

# mysql驱动
pip install pymysql
# postgresql驱动
pip install psycopg2-binary

连接引擎

from sqlmodel import SQLModel, create_engine

# 内存
db_url = "sqlite+pysqlite:///:memory:"
# sqlite
db_url = "sqlite+pysqlite:///db_demo.sqlite"
# mysql
db_url = "mysql+pymysql://username:password@127.0.0.1:3306/db_name"
# postgresql
db_url = "postgresql://username:password@127.0.0.1:5432/db_name"

# 创建引擎
engine = create_engine(db_url, echo=True)

# 创建表格
SQLModel.metadata.create_all(engine)

数据表

import time
from sqlmodel import SQLModel, create_engine
from sqlmodel import Field, BigInteger, Integer, String, Boolean

# 直接创建
class User(SQLModel, table=True):
    __tablename__ = "user"
    id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
    name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})


# 使用基类
class BaseTable(SQLModel):
    id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
    create_timestamp: int = Field(
        sa_type=BigInteger,
        default_factory=lambda: time.time_ns() // 1000000,  # lambda使用default_factory
        description="创建时间戳",
        sa_column_kwargs={"comment": "创建时间戳"},
    )

class UseBaseUser(BaseTable, table=True):
    __tablename__ = "use_base_user"
    name: str = Field(sa_type=String(32), sa_column_kwargs={"comment": "用户名"})
    age: int = Field(sa_type=Integer, default=0, ge=0, le=200, sa_column_kwargs={"comment": "年龄"})
字段作用示例备注
tablename自定义表名"user"可省略
sa_type数据表类型BigIntegersa即SqlAlchemy
sa_column_kwargs数据表参数sa即SqlAlchemy,可填写数据库的字段备注
文档:https://github.com/fastapi/sqlmodel/issues/492open in new window
default默认值None
default_factory默认值工厂lambda: ...
primary_key主键True
title标题用于Swagger等
description标题用于Swagger等
gt/ge/lt/le范围pydantic的校验

JSON数据

  • sa_type为JSON,保存和获取都能使用对应的类型
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, Integer, String, Boolean, JSON

# 直接创建
class User(SQLModel, table=True):
    __tablename__ = "user"
    id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
    name: str = Field(sa_type=String(32), description="用户名", sa_column_kwargs={"comment": "用户名"})
    
    # JSON格式,数据表中为json格式,存取都是对应的类型(list、dict)
    addrs: list[str] = Field(sa_type=JSON, default=[], sa_column_kwargs={"comment": "地址列表"})
    other: dict = Field(sa_type=JSON, default={}, sa_column_kwargs={"comment": "其他信息"})

# 插入数据库不需要特殊处理
user = User(name="李四", addrs=["福州"], other={"a": 1, "b": 2})
with Session(engine) as session:
    session.add(user)
    session.commit()
    session.refresh(entity)

# 读取结果也能自动转换
with Session(engine) as session:
    statement = select(User)
    result = session.exec(statement).all()
    for i in result:
        print(type(i.addrs), i.addrs)
        # <class 'list'> ['福州']
        print(type(i.other), i.other)
        # <class 'dict'> {'a': 1, 'b': 2}

枚举类

  • 当前版本枚举类较为鸡肋,无法自动转换,需要转换成字符串,否则报错
  • 而且不会校验字符串是否是枚举类
from enum import Enum
from sqlmodel import SQLModel
from sqlmodel import Field, BigInteger, String

# 枚举类型
class SexEnum(Enum):
    MALE = "MALE"
    FEMALE = "FEMALE"
    UNKNOWN = "UNKNOWN"


class User(SQLModel, table=True):
    __tablename__ = "user"
    id: int | None = Field(sa_type=BigInteger, default=None, primary_key=True)
    name: str = Field(sa_type=String(32), default="")
    
    # 枚举字段;数据库类型为字符串;不会自动转换,传入参数要加上.value转成字符串(default也一样)
    sex: SexEnum = Field(sa_type=String(32), default=SexEnum.UNKNOWN.value)

# sex参数需要转换成字符串
user = User(name="zhangsan", age=18, sex=SexEnum.FEMALE.value)

SQL语句

  • User实体类
from sqlmodel import SQLModel, Field, BigInteger, String, Integer

class User(SQLModel, table=True):
    id: int | None = Field(
        sa_type=BigInteger,
        default=None,
        primary_key=True,
        sa_column_kwargs={"comment": "自增id"},
    )
    name: str = Field(sa_type=String, sa_column_kwargs={"comment": "用户名"})
    age: int = Field(sa_type=Integer, default=0, sa_column_kwargs={"comment": "年龄"})

sql语句

  • 执行语句中的
from sqlmodel import Session, text

# 使用sql语句
def sql():
    with Session(engine) as session:
        statement = text('SELECT * FROM "user";')
        result = session.exec(statement).all()	# .all()表示多个返回值
        return result

# 带参数
def sql_with_params():
    with Session(engine) as session:
        statement = text('SELECT * FROM "user" WHERE id=:id;')
        result = session.exec(statement, params={"id": 2}).one()	# .one()表示单个返回值
        return result


session.exec(statement).all()			# 返回多个值
session.exec(statement).one()			# 返回单一值(返回0个或多个都会报错)
session.exec(statement).one_or_none()	# 返回一个或None
session.exec(statement).first() 		# 返回一个或None(性能不好,语句是查询所有,然后返回第一条)
  • 下面的语句实际都可以转换成sql
def to_sql():
    with Session(engine) as session:
        statement = select(User)
        
        print(statement)
        # SELECT "user".id, "user".name, "user".age FROM "user"
        
        result = session.exec(statement).all()
        return result

CRUD

新增

  • 最佳实践
from sqlmodel import Session

def insert_entity(entity: User) -> None:
    with Session(engine) as session:
        session.add(entity)
        session.commit()
        # 重新绑定实体
        session.refresh(entity)


user = User(name="张三", age=18)
insert_entity(user)
print(user) # 可获取完整数据,包括id
  • 其他写法
from sqlmodel import Session, insert

def insert_entity_1() -> None:
    with Session(engine) as session:
        statement = insert(User).values(name="李四", age=10)
        session.exec(statement)
        session.commit()

def insert_entity_2() -> None:
    with Session(engine) as session:
        statement = insert(User).values({"name": "王五", "age": 20})
        session.exec(statement)
        session.commit()

def insert_entity_3(entity: User) -> None:
    with Session(engine) as session:
        statement = insert(User).values(entity.model_dump(exclude_none=True))   # 使用pydantic转换成字典(排除None是避免id传入None报错)
        session.exec(statement)
        session.commit()

更新

  • 最佳实践
from sqlmodel import Session


def update_entity(entity: User) -> User:
    with Session(engine) as session:
        db_entity = session.get(User, entity.id)
        db_entity.sqlmodel_update(entity.model_dump())
        session.add(db_entity)
        session.commit()
        # 重新绑定实体
        session.refresh(db_entity)
        return db_entity


user = User(id=3, name="李四", age=20)
update_entity(user)
  • 其他更新语句
    • 无法返回更新后真正结果
from sqlmodel import Session, update

# 配合where语句可用于批量更新
def update_with_args() -> None:
    with Session(engine) as session:
        statement = update(User).where(User.id == 2).values(name="args")
        session.exec(statement)
        session.commit()

def update_with_kwargs() -> None:
    with Session(engine) as session:
        statement = update(User).where(User.id == 2).values({"name": "test"})
        session.exec(statement)
        session.commit()

def update_entity(user: User) -> None:
    with Session(engine) as session:
        statement = update(User).where(User.id == user.id).values(user.model_dump())
        session.exec(statement)
        session.commit()

删除

  • 最佳实践
from sqlmodel import Session, delete


def delete_by_id(id: int) -> None:
    with Session(engine) as session:
        statement = delete(User).where(User.id == id)
        session.exec(statement)
        session.commit()

  • 其他用法
from sqlmodel import Session


def delete_by_id(id: int) -> None:
    with Session(engine) as session:
        db_entity = session.get(User, id)
        session.delete(db_entity)
        session.commit()

简单查询

  • 使用id查询
from sqlmodel import Session

def select_by_id(id: int) -> User | None:
    with Session(engine) as session:
        user = session.get(User, id)
        return user
  • 使用where语句
from sqlmodel import Session, select

# 根据条件查询
def select_by_where(name: str, age: int) -> list[User]:
    with Session(engine) as session:
        statement = select(User).where(User.name == name, User.age == age)
        result = session.exec(statement).all()
        return result

# 根据可选条件查询
def select_by_condition(name: str | None = None, age: int | None = None) -> list[User]:
    with Session(engine) as session:
        statement = select(User)

        if name is not None:
            statement = statement.where(User.name == name)
        if age is not None:
            statement = statement.where(User.age == age)

        result = session.exec(statement).all()
        return result

  • like
from sqlmodel import Session, select

# like语句
def select_by_like(name: str) -> list[User]:
    with Session(engine) as session:
        statement = select(User).where(User.name.like(f"%{name}%"))
        result = session.exec(statement).all()
        return result

复杂查询

  • 获取数量
from sqlmodel import Session, select, func

# 数量
def select_count():
    with Session(engine) as session:
        statement = select(func.count()).select_from(User)
        # 增加筛选
        # statement = statement.where(User.age > 18)
        result = session.exec(statement).one()
        return result
  • 分页
from sqlmodel import Session, select


def select_page() -> list[User]:
    with Session(engine) as session:
        statement = select(User).offset(0).limit(3)
        result = session.exec(statement).all()
        return result


# 分页查询包括总数量
def select_page_with_count(
    offset: int = 0, limit: int = 10, name: str | None = None, age: int | None = None
):
    with Session(engine) as session:
        conditions = []
        if name is not None:
            conditions.append(User.name.like(f"%{name}%"))

        if age is not None:
            conditions.append(User.age == age)

        # 拼接条件(第一个参数True作用是避免条件为空)
        whereclause = and_(True, *conditions)

        # 语句
        count_statement = select(func.count()).select_from(User).where(whereclause)
        list_statement = select(User).where(whereclause).offset(offset).limit(limit)
        return {
            "count": session.exec(count_statement).one(),
            "list": session.exec(list_statement).all(),
        }
  • 排序
from sqlmodel import Session, select, asc, desc

# 数量
def select_by_order():
    with Session(engine) as session:
        statement = select(User).order_by(desc(User.age))
        result = session.exec(statement).all()
        return result

FastAPI

  • response_model中如果设置了UserPageResp
  • data的list[Any]会自动转成list[UserItem]
# 分页响应表格结果
class TablePageResp(PageResp):
    total: int = Field(default=0, title="总数量")
    data: list[Any] = Field(default=[], title="数据")

# 响应结果
class UserPageResp(PageResp):
    total: int = Field(default=0, title="总数量")
    data: list[UserItem] = Field(default=[], title="数据")

上次编辑于:
贡献者: xxc