Для асинхронного Python существует мало полноценных ORM, и им далеко до таких монстров-комбайнов, как DjangoOrm и SQLAlchemy.ORM. Бедность ORM-инструментария для асинхронного программирования заставила многих программистов отказаться от зачастую непонятной им работы с ORM и перейти к более прозрачному взаимодействию с БД. Решение в лоб — написание raw SQL, но в этом случае запросы не будут защищены от инъекций, а запросы, составляемые по бизнес логике с опциональными параметрами, превратятся в конкатенацию строк. Важно найти баланс между прозрачностью выполнения кода, скоростью его написания и читаемостью.
Ниже я предлагаю реализацию такого баланса c использованием SQLAlchemy Core.
Сравнение с остальными ORM

Прежде чем рассматривать работу SQLAlchemy Core, для расширения инженерного кругозора давайте познакомимся с альтернативными решениями и их недостатками.
-
Django ORM и ponyORM не подходят из-за невозможности работать в асинхронном режиме.
-
SQLAlchemy.ORM: поддержку полноценной асинхронной ORM в альфа-режиме добавили в версии 1.4, пока не подходит для production-решений.
-
Peewee: умеет работать в асинхронном режиме, но только с Core-функциями. Хорошая библиотека, но я предлагаю выбрать SQLAlchemy из-за её большей популярности и поддерживаемости. Также были жалобы на ленивую загрузку объектов в Peewee — иногда она намертво вешает цикл (возможно, эту ошибку уже поправили в новых версиях).
-
Gino: простая ORM на основе SQLAlchemy Core. Работать с этой библиотекой можно без знаний об SQLAlchemy.
-
tortoise-orm: молодой проект с похожим на Django ORM способом доступа к данным.
Работа с SQLAlchemy Core

Алхимия состоит из двух частей. Первая — это абстракция над SQL-базой данных, которая называется SQLAlchemy Сore. Вторая — это ORM, собственно mapping между реляционной БД и объектным представлением. Соответственно, SQLAlchemy Сore почти один к одному совпадает с SQL — если вы знаете последний, то проблем с Core, как правило, не возникает. Благодаря этому использование SQLAlchemy Сore имеет наименьший оверхэд при обращении к БД.
SQLAlchemy Core предоставляет всю основную функциональность для построения запросов. Но в крупном проекте при обращении к базовому API SQLAlchemy будет много дублирующихся участков, поэтому логично написать библиотеку-обёртку для реализации наиболее популярных сценариев использования.
Примеры основных запросов к Алхимии:
tbl = cls.__table__ select_sql = select([tbl]).where(tbl.c.user_type == user_type) cursor = await conn.execute(select_sql) rows = await cursor.fetchall()
tbl = cls.__table__ insert_sql = tbl.insert().values(val='abc').returning(*tbl.c) cursor = await conn.execute(insert_sql) row = await cursor.fetchone()
Как видите, участки кода с передачей параметров повторяются, а сам код содержит много параметров, связанных с внутренней реализацией SQLAlchemy Core, что ухудшает читаемость. Хотелось бы получать данные из БД с помощью таких запросов:
user = await User.select().where(User.user_type == user_type).get(conn) _ = await User.insert().values(val='abc').execute(conn)
Чтобы этого добиться, необходимо реализовать обработчики для операций insert, select, update и delete.
Пишем «обёртку»

Напишем для функции select полноценную обёртку, которая даст возможность с минимумом кода использовать все виды join, сортировки и остальные часто используемые операции:
class SelectQuery: def __init__(self, model, fields): self._model = model self._from = None _fields = [] self._relations = {field.class_ if isinstance(field, QueryableAttribute) else field for field in fields} if model in self._relations: self._relations.remove(model) for field in fields: if isinstance(field, QueryableAttribute): _fields.append(getattr(field.class_.__table__.c, field.key)) elif isinstance(field, sa.Column): _fields.append(field) elif hasattr(field, '__table__'): for f in field.__table__.c: _fields.append(f) else: _fields.append(field) self._stmt = sa.select([model, *self._relations]).with_only_columns(_fields) def join(self, right, on): if self._from is None: self._from = sa.join(self._model, right, on) else: self._from = self._from.join(right, on) return self def outerjoin(self, right, on): if self._from is None: self._from = sa.outerjoin(self._model, right, on) else: self._from = self._from.outerjoin(right, on) return self def where(self, condition): self._stmt = self._stmt.where(condition) return self def group_by(self, column): self._stmt = self._stmt.group_by(column) return self def order_by(self, *fields): self._stmt = self._stmt.order_by(*fields) return self def distinct(self, *fields): self._stmt = self._stmt.distinct() return self def limit(self, limit): self._stmt = self._stmt.limit(limit) return self def offset(self, offset): self._stmt = self._stmt.offset(offset) return self async def get(self, conn): if self._from is not None: self._stmt = self._stmt.select_from(self._from) result = await conn.execute(self._stmt) result = await result.first() return result async def all(self, conn): if self._from is not None: self._stmt = self._stmt.select_from(self._from) result = await conn.execute(self._stmt) return await result.fetchall() def get_query(self): if self._from is not None: return self._stmt.select_from(self._from) return self._stmt @property def raw_sql(self): stmt = self._stmt if self._from is not None: stmt = stmt.select_from(self._from) return stmt.compile(compile_kwargs={'literal_binds': True}) def _create_model(self, model_class, data): model_fields = { getattr(field, 'name'): data[str(field)] for field in model_class.__table__.c if str(field) in data } model = model_class(**model_fields) for relation in model_class.__mapper__.relationships: if relation.mapper.class_ in self._relations: relation_model = self._create_model(relation.mapper.class_, data) setattr(model, relation.key, relation_model) return model
Также специфическими операциями обладают запросы на удаление:
class DeleteQuery: def __init__(self, model, stmt): self._model = model self._stmt = stmt def where(self, condition): self._stmt = self._stmt.where(condition) return self def returning(self, *cols): self._stmt = self._stmt.returning(*cols) return self async def execute(self, conn): return await conn.execute(self._stmt) def get_query(self): return self._stmt
Для insert и update подойдёт одинаковый набор операций:
class StatementQuery: def __init__(self, model, stmt): self._model = model self._stmt = stmt self._values = None def values(self, **values): self._values = self._filter_values(values) return self def where(self, condition): self._stmt = self._stmt.where(condition) return self def returning(self, *cols): self._stmt = self._stmt.returning(*cols) return self async def execute(self, conn): self._values = self._filter_values(self._values) if not self._values: return self._stmt = self._stmt.values(self._values) result = await conn.execute(self._stmt) return result def get_query(self): values = self._filter_values(self._values) return self._stmt.values(values) def _filter_values(self, values): return { f.key: values[f.key] for f in self._model.__table__.c if f.key in values }
Обернём описанный выше код в класс, от которого можно унаследовать модели SQLAlchemy:
class DB: @classmethod def select(cls, fields=['*']): return SelectQuery(cls, fields) @classmethod def update(cls): return StatementQuery(cls, sa.update(cls)) @classmethod def insert(cls): return StatementQuery(cls, sa.insert(cls)) @classmethod def delete(cls): return DeleteQuery(cls, sa.delete(cls)) @staticmethod def before_save(data): pass @staticmethod def after_save(data): pass @classmethod def model_fields(cls): return list(filter(lambda x: not x.startswith('_'), dir(cls)))
Примеры использования:
from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base, DB): __tablename__ = 'user' id = Column(Integer, primary_key=True) login = Column(String(255)) active = Column(Boolean, default=True) @staticmethod async def get_by_login(login, conn): user = await User.select() \ .where(User.login == login) \ .get(conn) return user @staticmethod async def get_all_active(conn): users = await User.select() \ .where(User.active.is_(True)) \ .all(conn) return users @staticmethod async def create_user(login, conn): _ = await User.insert().values( login=login ).execute(conn) return True @staticmethod async def delete_user(login, conn): _ = await User.update().values( active=False ).execute(conn) return True
Выводы

-
Большинство существующих ORM на Python не позволяют работать с БД в асинхронном режиме.
-
При работе с SQLAlchemy Core генерируется понятный SQL с наименьшим оверхедом.
-
Для удобства работы с популярными инструментами можно написать «обёртку», которая повысит читаемость кода.
ссылка на оригинал статьи https://habr.com/ru/company/domclick/blog/557638/
Добавить комментарий