Салют Хабр! Меня зовут Влад и я хотел бы осветить довольно важную тему, которую я не видел в обучающих материалах, а именно сложные запросы через паттерн Repository.
Проблема: построение гибких SQLAlchemy запросов на основе данных, переданных из бизнес логики. Более масштабное использование паттерна Repository.
Решение(то, о чём говорится в этой статье): все запросы формируются посредством конфигов, переданных из бизнес логики, далее конфиги обрабатываются и из них составляется ORM запрос.
Схема работы реализации:
Configs, ORM Query Builder, Operator Agregate.Configs - это правила, по которым в бизнес логику должны вернуться данные.
Актуальность для читателя: я думаю, что эта статья способна показать работу с паттерном Repository под другим углом, показать как можно строить гибкие запросы без создания кучи методов в имплементациях и потом описывания этих же методов в интерфейсах.
Пожалуй, можно начинать.
Все вы видели те самые классы, которые мелькают в видео на Youtube.
from typing import Generic, TypeVar, Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
DTO = TypeVar("DTO")
class SQLAlchemyRepository(Generic[DTO]):
def __init__(self, session, model):
self._model = model
self._session = session
async def get_by_filter(self, filters: dict[str, Any]) -> DTO | None:
query = select(self._model).filter_by(**filters)
data = await self._session.execute(query)
data_scalar = data.scalar()
if data_scalar is None:
return None
return data_scalar.to_dto()
async def get_many_by_filter(self, filters: dict[str, Any]) -> list[DTO]:
query = select(self._model).filter_by(**filters)
data = await self._session.execute(query)
data_all = data.scalars().all()
if not data_all:
return []
return [sqal_model.to_dto() for sqal_model in data_all]
При росте вашего проекта вам понадобятся более кастомизированные запросы в бд и логики этих методов хватать не будет, соответственно вам придётся расширяться.
Как уже упоминалось в Решение используются конфиги, думаю, сейчас лучший момент, чтобы их показать, но для начала нужно объявить структуру проекта.
repository/
builder_configs/
__init__.py
configs.py
types.py
Рассмотрим содержимое файла configs.py
from dataclasses import dataclass, field
from typing import Any
from .types import FilterType, OrderByType, LazyLoadType, FilterLogicType, UNSET
@dataclass
class Filter:
column: str
value: Any
filter_type: FilterType = FilterType.EQ
@dataclass
class FilterConfig:
filters: list[Filter]
mode: FilterLogicType = FilterLogicType.NULL
@dataclass
class OrderByConfig:
column: str
mode: OrderByType = OrderByType.ASC
@dataclass
class ColumnConfig:
column: str
label: str | UNSET = UNSET
value: Any = UNSET
filter_type: FilterType = FilterType.EQ
value_is_column: bool = False
@dataclass
class JoinConfig:
table_name: str
columns: list[ColumnConfig] = field(default_factory=list)
filters: list[FilterConfig] = field(default_factory=list)
order_by: list[OrderByConfig] = field(default_factory=list)
@dataclass
class LazyLoadConfig:
relationship_strategy: str
load_type: LazyLoadType = LazyLoadType.JOINEDLOAD
Хочу поподробней остановиться на конфигах.
-
Filter - простой в реализации dataclass, который принимает на вход поле, значение и необходимый оператор.
-
FilterConfig - в него можно передавать несколько фильтров и применять к ним логическое условие OR, AND. В случае если условие передано не будет (NULL), то все агрегированные объекты будут передавать в where позиционно.
-
OrderByConfig - сортировка поля по заданному режиму (ASC, DESC)
-
ColumnConfig - этот dataclass позволяет включать в SQL запрос выборку отдельных колонок, задавать им label, также он позволяет делать булевые выборки, также есть поддержка добаdления в булевую выборку полей из таблиц. Для этого нужно:
value=table_name.column,value_is_column=True. -
JoinConfig - позволяет присоединять другие таблицы. Переданные атрибуты
columns,filters,order_byбудут использованы для конкретной таблицыtable_name. -
LazyLoadConfig - поддержка подгрузки relationship. Чтобы подгрузить нужно указать название relationhip атрибута. Если же вы хотите получить вложенное отношение, то нужно указывать relationship по цепочке через
.-
Пример вложенных отношений:
Допустим, есть модель User и она имеет отношение с таблицей Item через атрибут
item, также Item связана с другой таблицей Collections посредством отношенияcollectionsи вот здесь и появляются вложенные отношения. Представим, что выполняется запрос в таблицу User и понадобилось подтянуть отношение item и к нему collections. В таком случаеrelationship_strategy=item.collections. Более подробно это будет рассмотрено при разборе реализации методаloadкласса SQLAlchemyQueryBuilder.
-
Теперь можно посмотреть на содержание файла types.py
from typing import NewType
from enum import Enum
UNSET = NewType("UNSET", None)
class FilterLogicType(Enum):
AND = "and"
OR = "or"
NULL = "null"
class FilterType(Enum):
EQ = "="
GT = ">"
GE = ">="
LT = "<"
LE = "<="
IN = "in"
VECTOR_SORT = "vector_sort"
class OrderByType(Enum):
ASC = "asc"
DESC = "desc"
class LazyLoadType(Enum):
JOINEDLOAD = "joinedload"
SELECTINLOAD = "selectinload"
Всё, что находится в FilterType должен содержать Operator Agregate.
Теперь я хотел бы затронуть сам ORM Query Builder. Структура папок в свою очередь принимает такой вид:
repository/
builder_configs/
__init__.py
configs.py
types.py
query_builder/
__init__.py
interface.py
impl/
__init__.py
alchemy.py
Интерфейс для Query Builder (query_builder/interface.py)
from typing import Protocol, Any
from typing_extensions import Self
from repository.builder_configs.configs import (
FilterConfig,
ColumnConfig,
JoinConfig,
LazyLoadConfig,
OrderByConfig
)
from .agregator.interface import AgregateFilterTypeProtocol
class QueryBuilderProtocol(Protocol):
def init(
self,
filter_agregate: AgregateFilterTypeProtocol,
*args,
**kwargs
) -> None:
self._filter_agregate = filter_agregate
def count(self) -> Self:
raise NotImplementedError
def columns(self, columns: list[ColumnConfig], *args, *kwargs) -> Self:
raise NotImplementedError
def filter(self, filters: list[FilterConfig], *args, *kwargs) -> Self:
raise NotImplementedError
def join(self, joins: list[JoinConfig]) -> Self:
raise NotImplementedError
def load(self, loads: list[LazyLoadConfig], *args, *kwargs) -> Self:
raise NotImplementedError
def order_by(self, order_by: list[OrderByConfig], *args, *kwargs) -> Self:
raise NotImplementedError
def values(self, data: dict[str, Any] | list[dict[str, Any]]) -> Self:
raise NotImplementedError
def limit(self, value: int | None) -> Self:
raise NotImplementedError
def offset(self, value: int | None) -> Self:
raise NotImplementedError
def build(self) -> Any:
raise NotImplementedError
В магическом методе __init__ вы можете заметить AgregateFilterTypeProtocol данный протокол нужен для агрегации операторов.
С появлением агрегатора структура папок примет такой вид:
repository/
builder_configs/
__init__.py
configs.py
types.py
query_builder/
__init__.py
interface.py
impl/
__init__.py
alchemy.py
agregator/
__init__.py
interface.py
impl/
__init__.py
alchemy.py
Ниже вы можете увидеть интерфейс агрегатора (query_builder/agregator/interface.py)
from typing import Protocol, Any
from repository.builder_configs.types import FilterType
class AgregateFilterTypeProtocol(Protocol):
def filter_agregate(
self,
value: Any,
filter_type: FilterType,
*args,
**kwargs
) -> Any:
raise NotImplementedError
Реализация агрегатора (query_builder/agregator/impl/alchemy.py)
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm.properties import MappedColumn
from repository.builder_configs.types import FilterType
class SQLAlchemyAgregateFilterType:
def filter_agregate(
self,
value: Any,
filter_type: FilterType,
mapped_column: MappedColumn[Any],
*args,
**kwargs
) -> Any:
operator_with_lambda = {
"=": lambda column, value: column == value,
">": lambda column, value: column > value,
">=": lambda column, value: column >= value,
"<": lambda column, value: column < value,
"<=": lambda column, value: column <= value,
"in": lambda column, value: column.in_(value),
"vector_sort": lambda column, value: column.op("@@")(func.websearch_to_tsquery(str(value)))
}
if filter_type.value not in operator_with_lambda:
raise ValueError(f"Not found operator {filter_type.value}")
return operator_with_lambda[filter_type.value](mapped_column, value)
В реализации вам может быть непонятен аргумент mapped_column - это экземпляр класса, полученный из атрибута ORM модели.
Ну и теперь гвоздь программы SQLAlchemyQueryBuilder. Я буду объяснять каждый метод постепенно.
from typing import Callable, Any, TypeVar
from typing_extensions import Self
from sqlalchemy.orm.properties import MappedColumn
from sqlalchemy.sql.elements import ColumnElement, Label, UnaryExpression
from sqlalchemy.orm.strategy_options import _AbstractLoad
from sqlalchemy import or_, and_, func, asc, desc
from sqlalchemy.orm import MappedColumn, joinedload, selectinload
from db.models import Base
from repository.builder_configs.configs import (
FilterConfig,
ColumnConfig,
JoinConfig,
OrderByConfig,
LazyLoadConfig
)
from repository.builder_configs.types import LazyLoadType, OrderByType, FilterLogicType, UNSET
from repository.query_builder.agregator.impl.alchemy import SQLAlchemyAgregateFilterType
ALCHEMYMODEL = TypeVar("ALCHEMYMODEL", bound=Base)
class SQLAlchemyQueryBuilder:
def __init__(
self,
query_type: Callable,
model: type[ALCHEMYMODEL],
filter_agregate: SQLAlchemyAgregateFilterType,
*args,
**kwargs
) -> None:
self._query_type: Callable = query_type
self._filter_agregate: SQLAlchemyAgregateFilterType = filter_agregate
self._model: type[ALCHEMYMODEL] = model
self._columns: list[MappedColumn[Any] | ColumnElement[bool] | Label[Any]] = []
self._limit: int | None = None
self._offset: int | None = None
self._joins: list[type[ALCHEMYMODEL]] = []
self._filter: list[ColumnElement[bool]] = []
self._order_by: list[UnaryExpression] = []
self._values: list[dict[str, Any]] = []
self._loads: list[_AbstractLoad] = []
self._count: bool = False
self._orm_models: dict[str, type["Base"]] = model.orm_models()
query_type - тип выполняемого запроса: select, update и тп.
model - orm модель
После создания экземпляра этого класса и при вызове методов данные накапливаются в атрибутах и при вызове метода build строится ORM запрос.
Здесь можно обратить внимание на атрибут _orm_models, исходя из его аннотации можно сказать, что он хранит все зарегистрированные sqlalchemy модели.
Реализация метода orm_models
class Base(DeclarativeBase):
_cached_orm_models: dict[str, type["Base"]] = {}
@classmethod
def orm_models(cls) -> dict[str, type["Base"]]:
if not cls._cached_orm_models:
models = {}
for mapper in cls.registry.mappers:
models[mapper.class_.__tablename__] = mapper.class_
cls._cached_orm_models.update(models)
return cls._cached_orm_models
Теперь можно перейти к самим методам класса SQLAlchemyQueryBuilder.
count
def count(self) -> Self:
self._count = True
return self
Позволяет получать количество записей.
columns
def columns(
self,
columns: list[ColumnConfig] = [],
model: type[ALCHEMYMODEL] | None = None,
*args,
**kwargs
) -> Self:
if columns:
if model is None:
model = self._model
for config in columns:
obj = getattr(model, config.column, None)
if (obj is None):
raise ValueError(f"model {model.__name__} has not column {config.column}")
if config.value is not UNSET:
if config.value_is_column is True:
table_name, column_name = config.value.split(".")
orm_object = self._orm_models.get(table_name)
if (hasattr(orm_object, column_name) is False):
raise ValueError(f"model {orm_object} has not column {column_name}")
config.value = getattr(orm_object, column_name)
obj = self._filter_agregate.filter_agregate(
mapped_column=obj,
value=config.value,
filter_type=config.filter_type
)
if config.label is not None:
obj = obj.label(config.label)
self._columns.append(obj)
return self
В начале функции проверяется, была ли передана модель в функцию, это нужно, чтобы переиспользовать метод для разных моделей. Дальше - итерация, в которой из объекта model достаётся атрибут config.column. После идёт проверка на переданное value и если оно является полем в другой таблице, то строка распиливается по . и из _orm_models достаётся нужная orm модель, после в config.value записывается экземпляр класса MappedColumn полученный из атрибута column_name. Дальше работает агрегатор и как вы можете заметить в config.value может лежать как произвольное значение так и MappedColumn, далее это добавление label ну и окончательное действие - запись получившегося объекта в список _columns.
filter
def filter(
self,
filters: list[FilterConfig] = [],
model: type[ALCHEMYMODEL] | None = None,
*args,
**kwargs
) -> Self:
if filters:
if model is None:
model = self._model
configs = []
for filter in filters:
mode = None
if filter.mode == FilterLogicType.OR:
mode = or_
elif filter.mode == FilterLogicType.AND:
mode = and_
elements_objects = []
for filter_ in filter.filters:
obj = getattr(model, filter_.column, None)
if (obj is None):
raise ValueError(f"model {model.__name__} has not column {filter_.column}")
column_element = self._filter_agregate.filter_agregate(
mapped_column=obj,
filter_type=filter_.filter_type,
value=filter_.value
)
elements_objects.append(column_element)
if mode is None:
configs.extend(elements_objects)
else:
configs.append(mode(*elements_objects))
if configs:
self._filter.extend(configs)
return self
Итерация по FilterConfig, определение логического условия, после чего происходит итерация по Filter, в этом блоке кода также достаётся MappedColumn и отправляется в агрегатор. В element_objects остаются все аргегированные Filter, в configs - FilterConfig. После завершения основного цикла изменения отправляются в атрибут _filters.
order_by
def order_by(
self,
order_by: list[OrderByConfig] = [],
model: type[ALCHEMYMODEL] | None = None,
*args,
**kwargs
) -> Self:
if order_by:
if model is None:
model = self._model
for oby in order_by:
oby_mode = asc
if oby.mode == OrderByType.DESC:
oby_mode = desc
obj = getattr(model, oby.column, None)
if (obj is None):
raise ValueError(f"model {model.__name__} has not column {oby.column}")
self._order_by.append(oby_mode(obj))
return self
Я думаю, что здесь объяснения излишни так как используются всё те же принципы, что и в предыдущих методах.
join
def join(self, joins: list[JoinConfig]) -> Self:
if joins:
for join in joins:
orm_object = self._orm_models.get(join.table_name)
if join.columns:
self.columns(
columns=join.columns,
model=orm_object
)
if join.filters:
self.filter(
filters=join.filters,
model=orm_object
)
if join.order_by:
self.order_by(
order_by=join.order_by,
model=orm_object
)
self._joins.append(orm_object)
return self
Здесь можно заметить, что переиспользуются методы filter, columns, order_by, только
вместо изначально переданной в init orm модели, будет использована модель, название
которой указано в конфиге, объект этой модели достаётся из _orm_models.
load
def load(
self,
loads: list[LazyLoadConfig],
model: type[Base] | None = None,
*args,
**kwargs
) -> Self:
if loads:
for load in loads:
load_mode = joinedload
if load.load_type == LazyLoadType.SELECTINLOAD:
load_mode = selectinload
current_model = model
if model is None:
current_model = self._model
current_load = None
relationship_path = load.relationship_strategy.split(".")
for part in relationship_path:
relationship_declared = getattr(current_model, part, None)
if (relationship_declared is None):
raise ValueError(f"Error in relationship path: model {current_model.__name__} has no relationship {part}")
current_model = relationship_declared.mapper.class_
if current_load is None:
current_load = load_mode(relationship_declared)
else:
current_load = getattr(current_load, load_mode.__name__)(relationship_declared)
if current_load:
self._loads.append(current_load)
return self
Здесь я бы хотел объяснить как именно обрабатывается relationship_strategy и как собираются вложенные relationship. Переменная relationship_path хранит все отношения, из которых составляется цепочка, итерируясь по ним, relationship_declared получает экземпляр класса _RelationshipDeclared и после current_model перезаписывается на модель, с которой связано отношение. После чего формируется current_load, если оно не было до этого записано, то просто передаём отношение в функцию load_mode, иначе мы вызываем функцию load_mode у current_load. Пример: joinedload(part1_relationship).joinedload(part2_relationship). Именно так и строятся вложенные relationship.
Теперь покажу несколько легковесных методов, в которых я думаю вы уже можете разобраться самостоятельно.
values, limit, offset
def values(
self,
data: dict[str, Any] | list[dict[str, Any]]
) -> Self:
if isinstance(data, dict):
self._values.append(data)
elif isinstance(data, list):
self._values.extend(data)
return self
def limit(
self,
value: int | None = None
) -> Self:
if value:
self._limit = value
return self
def offset(
self,
value: int | None = None
) -> Self:
if value:
self._offset = value
return self
build
def build(
self
) -> Any:
query = self._query_type(self._model)
if self._columns:
query = self._query_type(*self._columns).select_from(self._model)
if self._count:
count = func.count()
if self._columns:
count = func.count(*self._columns)
query = self._query_type(count).select_from(self._model)
if self._values:
query = query.values(self._values)
if self._filter:
query = query.filter(*self._filter)
if self._joins:
for join in self._joins:
query = query.join(join)
if self._order_by:
query = query.order_by(*self._order_by)
if self._loads:
query = query.options(*self._loads)
if self._limit:
query = query.limit(self._limit)
if self._offset:
query = query.offset(self._offset)
return query
Данный метод собирает ORM запрос.
Больше сказать мне нечего, всё показал, всё рассказал. Надеюсь, что эта статья даст читающим новые мысли по поводу паттерна Repository и то как с ним можно работать. Очень надеюсь на конструктивную критику и буду рад, если вы предложите свои реализации решения данной проблемы.
В этом репозитории вы можете посмотреть примеры использования.
Спасибо тебе, дорогой читатель, за интерес к моей статье!
Автор: shayzi
