ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.
ORM позволяет абстрагироваться от сырых SQL запросов путем абстракций.
В этой статье мы и рассмотрим создание своей ORM на Python с документацией и публикацией на PyPI. Данный проект очень интересен со стороны реализации: ведь требуется изучить большую часть ООП, принципов и паттернов.
Мы создадим сессии, модели баз данных, различные поля, миграции и другой вспомогательный функционал. Мы разберем изнутри, как работает такая концепция и как достигается удобство работы.
Некоторые из вас могут подумать что мы изобретаем велосипед. А я в ответ скажу — сможете ли вы прямо сейчас, без подсказок, только по памяти, нарисовать велосипед без ошибок?
Репозиторий доступен по ссылке.
Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.
Базы данных — очень популярный метод хранения и организации доступа к данным.
Базы данных имеют следующие преимущества перед обычными таблицами или файлами:
-
Базы данных позволяют обрабатывать, хранить и структурировать намного большие объёмы информации, чем таблицы.
-
Удалённый доступ и система запросов позволяет множеству людей одновременно использовать базы данных. С электронными таблицами тоже можно работать онлайн всей командой, но системы управления базами данных делают этот процесс организованнее, быстрее и безопаснее.
-
Объём информации в базах данных может быть огромным и не влиять на скорость работы. А в Google Таблицах уже после нескольких сотен строк или тысяч символов страница будет загружаться очень медленно.
В основном работают с реляционными базами данных (также называют SQL). Записи и связи между ними организованы при помощи таблиц. В таблицах есть поле для внешнего ключа со ссылками на другие таблицы. Благодаря высокой организации и гибкости структуры реляционные БД применяются для многих типов данных.
Базы данных, где информация о реальных вещах представлена в виде объектов под уникальным идентификатором, называется ООБД. К ООБД обращаются на языке объектно-ориентированного программирования (ООП). Состояние объекта описывается атрибутами, а его поведение — набором методов. Объекты с одинаковыми атрибутами и методами образуют классы.
Объект в ООП создаётся как отдельная сущность со своими свойствами и методами работы. И как только объект создан, его можно вызвать по «имени», или коду, а не разрабатывать заново. То есть то что мы и будем создавать сегодня — ORM!
Свою библиотеку-orm я назвал SQLSymphony, так что вам иногда придется сменить название, или импорты в соответствии с вашей структурой.
❯ sqlite
SQLite3 — это простая реляционная база данных, созданная и поддерживаемая всего тремя людьми. Для работы с ней существует стандартная python-библиотека.
Почему SQLite?
SQLite — это компактная и легкая встраиваемая база данных, которая позволяет хранить и управлять данными прямо внутри вашего приложения. Её простота в использовании и широкая поддержка делают её прекрасным выбором для различных проектов, включая веб-приложения, мобильные приложения и многое другое.
Больше о нем можно прочитать здесь.
❯ Инициализация
ORM будет распространяться в виде python-модуля, поэтому создадим в директории проекта файл __init__.py
:
import logging from typing import Union, List from rich.traceback import install from loguru import logger install(show_locals=True) class InterceptHandler(logging.Handler): """ This class describes an intercept handler. """ def emit(self, record) -> None: """ Get corresponding Loguru level if it exists :paramrecord: The record :typerecord: record :returns:None :rtype:None """ try: level = logger.level(record.levelname).name except ValueError: level = record.levelno frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 logger.opt(depth=depth, exception=record.exc_info).log( level, record.getMessage() ) def setup_logger(level: Union[str, int] = "DEBUG", ignored: List[str] = "") -> None: """ Setup logger :paramlevel: The level :typelevel: str :paramignored: The ignored :typeignored: List[str] """ logging.basicConfig( handlers=[InterceptHandler()], level=logging.getLevelName(level) ) for ignore in ignored: logger.disable(ignore) logger.add("sqlsymphony_orm.log") logger.info("Logging is successfully configured") setup_logger()
Я буду использовать модуль loguru для логгирования и rich для более красивых и информативных исключений.
Для логгирования при помощи loguru можно использовать следующую конструкцию:
from loguru import logger logger.info('info') logger.warning('warning') logger.error('error') logger.debug('debug')
Типы данных
В sqlite3 существуют следующие типы данных: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER, VarChar — текст с ограниченным количеством символов, BOOLEAN — логическое значение.
У типов данных есть параметры: NOT NULL, DEFAULT, UNIQUE.
Давайте реализуем базовые типы данных, создав модуль fields.py:
from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any class BaseDataType(ABC): """ This class describes a base data type. """ def __init__( self, primary_key: bool = False, unique: bool = False, null: bool = True, default: Any = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: Any """ self.primary_key: bool = primary_key self.unique: bool = unique self.null: bool = null self.default: Any = default @abstractmethod def validate(self, value: Any) -> bool: """ Validate value for current datatype :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ raise NotImplementedError() @abstractmethod def to_db_value(self, value: Any) -> Any: """ convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:Any """ raise NotImplementedError() @abstractmethod def from_db_value(self, value: Any) -> Any: """ convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:Any """ raise NotImplementedError() @abstractmethod def to_sql_type(self) -> str: """ Returns a sql type representation of the object. :returns:Sql type representation of the object. :rtype:str :raisesNotImplementedError: abstract method """ raise NotImplementedError() def __str__(self): return "<BaseDataType>"
Теперь реализуем на основе этого абстрактного класса другие типы данных:
@dataclass class IntegerField(BaseDataType): """ This class describes an integer field. """ def __init__( self, max_value: int = None, min_value: int = None, primary_key: bool = False, unique: bool = False, null: bool = True, default: int = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: int """ self.primary_key = primary_key self.unique: bool = unique self.null: bool = null self.default: int = default self.min_value = min_value self.max_value = max_value if self.primary_key: if self.default: raise ValueError('The parameter "default" is not used for PrimaryKey') self.default = 1 self.value = 1 def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if self.primary_key and value is None: return True if value is None and self.null: return True if self.min_value is not None and value < self.min_value: return False if self.max_value is not None and value > self.max_value: return False return True def to_db_value(self, value: Any) -> int: """ Convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:int """ if self.primary_key and value is None: return 0 return int(value) if value is not None else self.default def from_db_value(self, value: Any) -> int: """ Convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:int """ return int(value) if value is not None else None def to_sql_type(self) -> str: return "INTEGER" def __str__(self): return "<IntegerField>" @dataclass class RealField(BaseDataType): """ This class describes an real field. """ def __init__( self, min_value: float = None, max_value: float = None, unique: bool = False, null: bool = True, default: float = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: float """ self.primary_key = False self.unique: bool = unique self.null: bool = null self.default: float = default self.min_value = min_value self.max_value = max_value def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if value is None and self.null: return True if not isinstance(value, float): return False if self.min_value is not None and value < self.min_value: return False if self.max_value is not None and value > self.max_value: return False return True def to_db_value(self, value: Any) -> float: """ Convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:float """ return float(value) if value is not None else self.default def from_db_value(self, value: Any) -> float: """ Convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:float """ return float(value) if value is not None else None def to_sql_type(self) -> str: return "REAL" def __str__(self): return "<RealField>" class CharField(BaseDataType): """ This class describes a character field. """ def __init__( self, max_length: int = 64, unique: bool = False, null: bool = True, default: Any = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: Any """ self.primary_key: bool = False self.unique: bool = unique self.null: bool = null self.default: Any = default self.max_length = max_length def to_sql_type(self) -> str: return f"VARCHAR({self.max_length})" def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if value is None and self.null: return True if not isinstance(value, str): return False return len(value) <= self.max_length def to_db_value(self, value: Any) -> str: """ Convert value to db :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value) if value is not None else self.default def from_db_value(self, value: Any) -> str: """ Convert value from db :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value) if value is not None else self.default def __str__(self): return "<CharField>" class BooleanField(BaseDataType): """ This class describes a boolean field. """ def __init__( self, unique: bool = False, null: bool = True, default: Any = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: Any """ self.primary_key = False self.unique: bool = unique self.null: bool = null self.default: Any = default def to_sql_type(self) -> str: return "BOOLEAN" def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if isinstance(value, bool): return True else: return False def to_db_value(self, value: Any) -> str: """ Convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value).upper() if value is not None else self.default def from_db_value(self, value: Any) -> str: """ Convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value).upper() if value is not None else self.default def __str__(self): return "<BooleanField>" class TextField(BaseDataType): """ This class describes a character field. """ def __init__( self, unique: bool = False, null: bool = True, default: Any = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: Any """ self.primary_key = False self.unique: bool = unique self.null: bool = null self.default: Any = default def to_sql_type(self) -> str: return "TEXT" def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if value is None and self.null: return True return isinstance(value, str) def to_db_value(self, value: Any) -> str: """ Convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value) if value is not None else self.default def from_db_value(self, value: Any) -> str: """ Convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:str """ return str(value) if value is not None else None def __str__(self): return "<TextField>" @dataclass class BlobField(BaseDataType): """ This class describes a blob field. """ def __init__( self, max_size_in_bytes: int = None, unique: bool = False, null: bool = True, default: Any = None, ): """ Constructs a new instance. :paramprimary_key: The primary key :typeprimary_key: bool :paramunique: The unique :typeunique: bool :paramnull: The null :typenull: bool :paramdefault: The default :typedefault: Any """ self.primary_key = False self.unique: bool = unique self.null: bool = null self.default: Any = default self.max_size_in_bytes = max_size_in_bytes def to_sql_type(self) -> str: return "BLOB" def validate(self, value: Any) -> bool: """ Validate value :paramvalue:The value :typevalue:Any :returns:if the value is verified then True, otherwise False :rtype:bool """ if value is None and self.null: return True if len(value) > self.max_size_in_bytes: return False return isinstance(value, bytes) def to_db_value(self, value: Any) -> bytes: """ Convert to db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:bytes """ return bytes(value) if value is not None else self.default def from_db_value(self, value: Any) -> bytes: """ Convert from db value :paramvalue:The value :typevalue:Any :returns:db value :rtype:bytes """ return bytes(value) if value is not None else None def __str__(self): return "<BlobField>" class FieldMeta(type): """ This class describes a field meta. """ def __new__(cls, name, bases, attrs): """ New 'magic' func :paramcls:The cls :paramname:The name :parambases:The bases :paramattrs:The attributes :returns:class """ fields = {} primary_key = None for key, value in attrs.items(): if isinstance(value, BaseDataType): fields[key] = value if value.primary_key: if primary_key: raise ValueError("Multiple primary keys are not allowed") primary_key = key if value.auto_increment: value.default = 1 attrs["_fields"] = fields attrs["_primary_key"] = primary_key return super().__new__(cls, name, bases, attrs) def __str__(self): return "<FieldMeta>"
У каждого поля есть следующие параметры: unique (должно ли поле быть уникальным), null (может ли быть NULL) и default (значение по умолчанию). Также некоторые поля имеют дополнительные параметры (например CharField, требуется задать максимальную длину в символах).
❯ Запросы (Query)
В будущем, для фильтрации и получения записей из БД, нам нужны будут запросы. Вместо сырых SQL-запросов мы будем использовать классы для выборки, фильтрации и получения записей.
Класс QueryBuilder как раз и будет отвественнен за это. Строковый вид класса будет возращать созданный SQL запрос:
from abc import ABC, abstractmethod from rich.console import Console from rich.table import Table from loguru import logger AND = "and" OR = "or" class Q: """ This class describes a Q. """ def __init__(self, exp_type: str = AND, **kwargs): """ Constructs a new instance. :paramexp_type: The exponent type :typeexp_type: str :paramkwargs: The keywords arguments :typekwargs: dictionary """ self.separator = exp_type self._params = kwargs def __str__(self) -> str: """ Returns a string representation of the object. :returns:String representation of the object. :rtype:str """ kv_pairs = [f'{k} = "{v}"' for k, v in self._params.items()] return f" {self.separator} ".join(kv_pairs) def __bool__(self) -> bool: """ Returns a boolean representation of the object :returns:Boolean representation of the object. :rtype:bool """ return bool(self._params) class BaseExp(ABC): """ This abstract class describes a base exponent. """ name = None @abstractmethod def add(self, *args, **kwargs): """ Add params :paramargs: The arguments :typeargs: list :paramkwargs: The keywords arguments :typekwargs: dictionary """ raise NotImplementedError() def definition(self) -> str: """ Get the definition of query :returns:sql query :rtype:str """ return self.name + " " + self.line() + " " @abstractmethod def line(self): """ Get line """ raise NotImplementedError() @abstractmethod def __bool__(self): """ Boolean magic function """ raise NotImplementedError() class Select(BaseExp): """ This class describes a select. """ name = "SELECT" def __init__(self): """ Constructs a new instance. """ self._params = [] def add(self, *args, **kwargs): """ Add params :paramargs: The arguments :typeargs: list :paramkwargs: The keywords arguments :typekwargs: dictionary """ self._params.extend(args) def line(self) -> str: """ Get line :returns:line :rtype:str """ separator = "," return separator.join(self._params) def __bool__(self): """ Boolean magic function :returns:if self._params defined :rtype:bool """ return bool(self._params) class From(BaseExp): """ This class describes a from. """ name = "FROM" def __init__(self): """ Constructs a new instance. """ self._params = [] def add(self, *args, **kwargs): """ Add params :paramargs: The arguments :typeargs: list :paramkwargs: The keywords arguments :typekwargs: dictionary """ self._params.extend(args) def line(self) -> str: """ Get line :returns:line :rtype:str """ separator = "," return separator.join(self._params) def __bool__(self): """ Boolean magic function :returns:if self._params defined :rtype:bool """ return bool(self._params) class Where(BaseExp): """ This class describes a SQL query `where`. """ name = "WHERE" def __init__(self, exp_type: str = AND, **kwargs): """ Constructs a new instance. :paramexp_type: The exponent type :typeexp_type: str :paramkwargs: The keywords arguments :typekwargs: dictionary """ self._q = Q(exp_type, **kwargs) def add(self, exp_type: str = AND, **kwargs): """ Add params to sql query `where` :paramexp_type: The exponent type :typeexp_type: str :paramkwargs: The keywords arguments :typekwargs: dictionary :returns:Q class instance :rtype:Q """ self._q = Q(exp_type, **kwargs) return self._q def line(self): """ Get line :returns:line :rtype:str """ return str(self._q) def __bool__(self): """ Boolean magic function :returns:if self._q defined :rtype:bool """ return bool(self._q) class QueryBuilder: """ Front-end to create query objects step by step. """ def __init__(self): """ Constructs a new instance. """ self._data = {"select": Select(), "from": From(), "where": Where()} def SELECT(self, *args) -> "QueryBuilder": """ SQL query `select` :paramargs: The arguments :typeargs: list :returns:Query Builder :rtype:self """ self._data["select"].add(*args) return self def FROM(self, *args) -> "QueryBuilder": """ SQL query `from` :paramargs: The arguments :typeargs: list :returns:Query Builder :rtype:self """ self._data["from"].add(*args) return self def WHERE(self, exp_type: str = AND, **kwargs) -> "QueryBuilder": """ SQL query `where` :paramexp_type: The exponent type :typeexp_type: str :paramkwargs: The keywords arguments :typekwargs: dictionary :returns:Query Builder :rtype:self """ self._data["where"].add(exp_type=exp_type, **kwargs) return self def _lines(self): """ Lines :returns:Value definition :rtype:yeild (str) """ for key, value in self._data.items(): if value: yield value.definition() def __str__(self) -> str: """ Returns a string representation of the object. :returns:String representation of the object. :rtype:str """ return "".join(self._lines())
❯ Кастомные исключение
Для того, чтобы разработчику было более понятней разобраться в ошибках, создадим кастомные исключения:
from loguru import logger class SQLSymphonyException(Exception): """ Exception for signaling sql symphony errors. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"Basic SQLSymphony ORM exception. Message: {self.message if self.message else 'missing'}" def __str__(self): """ Returns a string representation of the object. :returns:String representation of the object. :rtype:str """ logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"SQLSymphonyException has been raised. {self.get_explanation()}" class FieldNamingError(SQLSymphonyException): """ This class describes a field naming error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"SQLSymphony Field Naming Error. The field name is prohibited/unavailable to avoid naming errors. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Field Naming Error has been raised. {self.get_explanation()}" class NullableFieldError(SQLSymphonyException): """ This class describes a nullable field error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"SQLSymphony Nullable Field Error. Field is set to NOT NULL, but it is empty. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Nullable Field Error has been raised. {self.get_explanation()}" class FieldValidationError(SQLSymphonyException): """ This class describes a field validation error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"SQLSymphony Field Validation Error. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Field Validation Error has been raised. {self.get_explanation()}" class PrimaryKeyError(SQLSymphonyException): """ This class describes a primary key error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"SQLSymphony Primary Key Error. According to database theory, each table should have only one PrimaryKey field, Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Primary Key Error has been raised. {self.get_explanation()}" class UniqueConstraintError(SQLSymphonyException): """ This class describes an unique constraint error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"SQLSymphony Unique Constraint Error. An exception occurred when executing an SQL query due to problems with UNIQUE fields. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Unique Constraint Error has been raised. {self.get_explanation()}" class ModelHookError(SQLSymphonyException): """ This class describes a model hook error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"Model Hooks Error. An exception occurred when executing an hook due to problems with ORM. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Model Hook error has been raised. {self.get_explanation()}" class MigrationError(SQLSymphonyException): """ This class describes a migration error. """ def __init__(self, *args): """ Constructs a new instance. :paramargs: The arguments :typeargs: list """ if args: self.message = args[0] else: self.message = None def get_explanation(self) -> str: """ Gets the explanation. :returns:The explanation. :rtype:str """ return f"Database Migration Error. An exception occurred when executing an hook due to problems with migration. Message: {self.message if self.message else 'missing'}" def __str__(self): logger.error(f"{self.__class__.__name__}: {self.get_explanation()}") return f"Migration Error has been raised. {self.get_explanation()}"
Мы создаем базовый класс, наследуемый от Exception и метод строкового обращения. А потом уже создаем более подробные исключения на основе базового класса.
❯ Пользовательские модели
По традициям, разработчик в нашей ORM должен создавать модели примерно следующим образом:
class User(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) def __repr__(self): return f"<User {self.pk}>" class User2(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) password = TextField(default="password1234") def __repr__(self): return f"<User {self.pk}>" class Comment(SessionModel): id = IntegerField(primary_key=True) name = TextField(null=False) user_id = IntegerField(null=False) def __repr__(self): return f"<Comment {self.pk}>"
Для этого нам нужно будет реализовать классы моделей (мета-класс и саму модель).
from pathlib import Path from typing import List, Any, Union, Callable from uuid import uuid4 from abc import ABC, abstractmethod from collections import OrderedDict from loguru import logger from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # модуль типов данных from sqlsymphony_orm.exceptions import ( PrimaryKeyError, FieldValidationError, NullableFieldError, FieldNamingError, ) from sqlsymphony_orm.queries import QueryBuilder class MetaSessionModel(type): """ This class describes a meta session model. """ __tablename__ = None def __new__(cls, class_object: "SessionModel", parents: tuple, attributes: dict): """ Magic method for creating instances and classes :paramcls: The cls :typecls: cls :paramclass_object: The class object :typeclass_object: Model :paramparents: The parents :typeparents: tuple :paramattributes: The attributes :typeattributes: dict :returns:new class :rtype:model """ new_class = super(MetaSessionModel, cls).__new__( cls, class_object, parents, attributes ) fields = OrderedDict() setattr(new_class, "_model_name", attributes["__qualname__"].lower()) if new_class.__tablename__ is None: setattr(new_class, "table_name", attributes["__qualname__"].lower()) else: setattr(new_class, "table_name", new_class.__tablename__) for k, v in attributes.items(): if isinstance(v, BaseDataType): fields[k] = v attributes[k] = None if isinstance(v, IntegerField): if v.primary_key: setattr(new_class, "_pk_name", k) setattr(new_class, "_original_fields", fields) return new_class class SessionModel(metaclass=MetaSessionModel): """ This class describes a ORM model. """ __tablename__ = None _ids = 0 def __init__(self, **kwargs): """ Constructs a new instance. :paramkwargs: The keywords arguments :typekwargs: dictionary """ self.fields = {} self.hooks = {} self.unique_id = str(uuid4()) for field_name, field in self._original_fields.items(): value = kwargs.get(field_name, None) if not kwargs.get("manager", False): if not field.null and value is None and field.default is None: raise NullableFieldError( f"Field {field_name} is set to NOT NULL, but it is empty" ) if value is not None and field.validate(value): setattr(self, field_name, field.to_db_value(value)) self.fields[field_name] = getattr(self, field_name) else: if value is not None and not field.validate(value): raise FieldValidationError( f'Validate error: field {field}; field name "{field_name}"; value "{value}"' ) if isinstance(field, IntegerField): if field.primary_key: self.__class__._ids += 1 setattr( self, "_primary_key", { "field": field, "field_name": field_name, "value": self.__class__._ids, }, ) setattr(self, field_name, field.default) self.fields[field_name] = getattr(self, field_name) if not getattr(self, "_primary_key"): raise PrimaryKeyError() self._last_action = {} def add_hook(self, before_action: str, func: Callable, func_args: tuple = ()): """ Adds a hook. :parambefore_action:The before action :typebefore_action:str :paramfunc:The function :typefunc:Callable :paramfunc_args:The function arguments :typefunc_args:tuple :raisesValueError:unknown before action """ actions = ["save", "delete", "update"] if before_action.lower() not in actions: raise ValueError( f"Unknown action: {before_action}. Supported actions: {actions}" ) logger.info( f"[{self.table_name}] Add Model Hook: before {before_action} execute {func.__name__}" ) self.hooks[before_action.lower()] = {"function": func, "args": func_args} @property def pk(self) -> Any: """ Get primary key value :returns:primary key value :rtype:primary key """ return self._primary_key["value"] @classmethod def _class_get_formatted_sql_fields(cls, skip_primary_key: bool = False) -> dict: """ Gets the formatted sql fields. :returns:The formatted sql fields. :rtype:dict """ model_fields = {} for field_name, field in cls._original_fields.items(): if field.primary_key and skip_primary_key: continue model_fields[field_name] = field.to_sql_type() if field.primary_key: model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY" else: if not field.null: try: model_fields[field_name] += " NOT NULL" except KeyError: model_fields[field_name] = f"{field.to_sql_type()} NOT NULL" if field.unique: try: model_fields[field_name] += " UNIQUE" except KeyError: model_fields[field_name] = f"{field.to_sql_type()} UNIQUE" if field.default is not None: try: model_fields[field_name] += f" DEFAULT {field.default}" except KeyError: model_fields[field_name] = ( f"{field.to_sql_type()} DEFAULT {field.default}" ) return model_fields def get_formatted_sql_fields(self, skip_primary_key: bool = False) -> dict: """ Gets the formatted sql fields. :returns:The formatted sql fields. :rtype:dict """ model_fields = {} for field_name, field in self._original_fields.items(): if field.primary_key and skip_primary_key: continue model_fields[field_name] = field.to_sql_type() if field.primary_key: model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY" else: if not field.null: try: model_fields[field_name] += " NOT NULL" except KeyError: model_fields[field_name] = f"{field.to_sql_type()} NOT NULL" if field.unique: try: model_fields[field_name] += " UNIQUE" except KeyError: model_fields[field_name] = f"{field.to_sql_type()} UNIQUE" if field.default is not None: try: model_fields[field_name] += f" DEFAULT {field.default}" except KeyError: model_fields[field_name] = ( f"{field.to_sql_type()} DEFAULT {field.default}" ) return model_fields
В метаклассе мы читаем модель, добавляем поля, задаем базовые настройки. В классе модели мы все проверяем и задаем Primary Key.
Также есть classmethod-функция и обычная функция для получения форматированных полей для SQL.
У нас есть свойство для получения Primary Key, а также функция для добавления хуков. Хуки в контексте нашей ORM — это функции, выполняемые до определенной операции модели и базы данных.
❯ Подключение к БД
Давайте теперь возьмемся за основу работы с базой данных — класс подключения и класс менеджера.
Напишем код для подключения к бд:
import sqlite3 from abc import ABC, abstractmethod from typing import Tuple from rich import print from loguru import logger class DBConnector(ABC): """ This class describes a db connector. """ def __new__(cls, *args, **kwargs): """ New class :paramcls: The cls :typecls: list :paramargs: The arguments :typeargs: list :paramkwargs: The keywords arguments :typekwargs: dictionary :returns:cls instance :rtype:self """ if not hasattr(cls, "instance"): cls.instance = super(DBConnector, cls).__new__(cls, *args, **kwargs) return cls.instance @abstractmethod def connect(self, database_name: str): """ Connect to database :paramdatabase_name: The database name :typedatabase_name: str :raisesNotImplementedError: Abstract method """ raise NotImplementedError() @abstractmethod def commit(self): """ Commit changes to database :raisesNotImplementedError: Abstract method """ raise NotImplementedError() @abstractmethod def fetch(self, query: str): """ Fetches the given query. :paramquery: The query :typequery: str :raisesNotImplementedError: Abstract method """ raise NotImplementedError() class SQLiteDBConnector(DBConnector): """ This class describes a sqlite db connector. """ def __new__(cls, *args, **kwargs): """ New class :paramcls: The cls :typecls: list :paramargs: The arguments :typeargs: list :paramkwargs: The keywords arguments :typekwargs: dictionary :returns:cls instance :rtype:self """ if not hasattr(cls, "instance"): cls.instance = super(SQLiteDBConnector, cls).__new__(cls, *args, **kwargs) return cls.instance def close_connection(self): """ Closes a connection. """ self._connection.close() print("[bold]Connection has been closed[/bold]") logger.info("Close Database Connection") def connect(self, database_name: str = "database.db"): """ Connect to database :paramdatabase_name:The database name :typedatabase_name:str """ pragmas = ["PRAGMA foreign_keys = 1"] self._connection = sqlite3.connect(database_name) self.database_name = database_name logger.info(f"[{database_name}] Connect database...") for pragma in pragmas: self._connection.execute(pragma) logger.debug(f"Set pragma: {pragma}") def commit(self): """ Commit changes to database """ logger.info("Commit changes to database") self._connection.commit() def fetch(self, query: str, values: Tuple = (), get_cursor: bool = False) -> list: """ Fetch SQL query :paramquery: The query :typequery: str :paramvalues: The values :typevalues: Tuple :returns:list with fetched results :rtype:list """ cursor = self._connection.cursor() self.commit() logger.debug(f"Fetch query: {query} {values}") try: cursor.execute(query, values) except Exception as ex: logger.error(f"An exception occurred while executing the request: {ex}") raise ex return [cursor, cursor.fetchall()] if get_cursor else cursor.fetchall()
Все просто — создаем абстрактный класс и класс SQLiteDBConnector, наследуемый от него. Если вам потребуется добавить поддержку других СУБД, такая структура сделает это более удобней.
Мы имеем метод __new__
для создания новых инстансов, метод подключения, отключения, коммита и выполнения запроса.
Перейдем теперь к более сложному — к менеджеру:
from abc import ABC, abstractmethod from typing import Any from loguru import logger from sqlsymphony_orm.queries import QueryBuilder from sqlsymphony_orm.database.connection import DBConnector, SQLiteDBConnector class MultiManager(ABC): """ This class describes a multi manager. """ @abstractmethod def reconnect(self): """ reconnect to db :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def drop_table(self, table_name: str): """ Drop sql table :paramtable_name: The table name :typetable_name: str :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def close_connection(self): """ Closes a connection. :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def insert( self, table_name: str, formatted_fields: dict, pk: int, model_class: "Model", ignore: bool = False, ): """ insert new model to database :paramtable_name: The table name :typetable_name: str :paramformatted_fields: The formatted fields :typeformatted_fields: dict :parampk: primary key value :typepk: int :parammodel_class: The model class :typemodel_class: Model :paramignore: The ignore :typeignore: bool :raisesNotImplementedError: { exception_description } """ raise NotImplementedError() @abstractmethod def update(self, table_name: str, key: str, orig_field: str, new_value: str): """ update model :paramtable_name: The table name :typetable_name: str :paramkey: The key :typekey: str :paramorig_field: The original field :typeorig_field: str :paramnew_value: The new value :typenew_value: str :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def filter(self, query: QueryBuilder): """ filter and get model by query :paramquery: The query :typequery: QueryBuilder :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def commit(self): """ Commit changes :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def create_table(self, table_name: str, fields: dict): """ Creates a table. :paramtable_name: The table name :typetable_name: str :paramfields: The fields :typefields: dict :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def delete(self, table_name: str, field_name: str, field_value: Any): """ delete model :paramtable_name: The table name :typetable_name: str :paramfield_name: The field name :typefield_name: str :paramfield_value: The field value :typefield_value: Any :raisesNotImplementedError: abstract method """ raise NotImplementedError() class SQLiteMultiManager(MultiManager): """ This class describes a sqlite multi manager. """ def __init__(self, database_name: str): """ Constructs a new instance. :paramdatabase_name:The database name :typedatabase_name:str """ self._connector = SQLiteDBConnector() self.database_name = database_name self._connector.connect(self.database_name) def execute(self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False): return self._connector.fetch(raw_sql_query, values, get_cursor) def reconnect(self, database_file: str = None): """ reconnect to database """ if database_file is not None: self.database_name = database_file self._connector.connect(self.database_name) def drop_table(self, table_name: str): """ drop table :paramtable_name: The table name :typetable_name: str """ query = f"DROP TABLE IF EXISTS {table_name}" logger.warning(f"Drop table: {table_name}") self._connector.fetch(query) self._connector.commit() def close_connection(self): """ Closes a connection. """ self._connector.close_connection() def insert( self, table_name: str, formatted_fields: dict, pk: int, model_class: "Model", ignore: bool = False, ): """ Insert a fields to database :paramtable_name: The table name :typetable_name: str :paramcolumns: The columns :typecolumns: str :paramcount: The count :typecount: str :paramvalues: The values :typevalues: tuple """ fields = [] values = [] for k, v in formatted_fields.items(): fields.append(k) if "PRIMARY KEY" in v: values.append(pk) else: values.append(getattr(model_class, k)) columns = ", ".join(fields) count = ", ".join(["?" for _ in values]) query = "INSERT " if ignore: query += "OR IGNORE " query += f"INTO {table_name} ({columns}) VALUES ({count})" logger.info( f'[{table_name}] Insert {"(or ignore)" if ignore else ""} new model into database' ) self._connector.fetch(query, values) def update(self, table_name: str, key: str, orig_field: str, new_value: str): """ Update fields in database table :paramtable_name: The table name :typetable_name: str :paramkey: The key :typekey: str :paramorig_field: The original field :typeorig_field: str :paramnew_value: The new value :typenew_value: str """ query = f"UPDATE {table_name} SET {key} = ? WHERE {key} = ?" logger.info(f"[{table_name}] Update model: {key}={new_value}") self._connector.fetch(query, (new_value, orig_field)) def filter(self, query: str) -> list: """ filter and get model by query :paramquery:The query :typequery:str :returns:models :rtype:list """ result = self._connector.fetch(query) return result def commit(self): """ Commits changes. """ self._connector.commit() def create_table(self, table_name: str, fields: dict): """ Creates a table. :paramtable_name: The table name :typetable_name: str :paramfields: The fields :typefields: dict """ columns = [f"{k} {v}" for k, v in fields.items()] query = f"CREATE TABLE IF NOT EXISTS {table_name} (" for column in columns: query += f"{column}," query = query[:-1] query += ")" logger.info(f"Create new table: {table_name}") self._connector.fetch(query) self._connector.commit() def delete(self, table_name: str, field_name: str, field_value: Any): """ Delete model from database :paramtable_name: The table name :typetable_name: str :paramfield_name: The field name :typefield_name: str :paramfield_value: The field value :typefield_value: Any """ query = f"DELETE FROM {table_name} WHERE {field_name} = ?" logger.info(f"[{table_name}] Delete model ({field_name}={field_value})") self._connector.fetch(query, (field_value,))
Это также относительно просто: при инициализации создаем инстанс класса подключения, подключаемся к БД, и создаем базовые методы: удаление, создание таблицы, коммит, фильтр, обновление, добавления, и прочие вспомогательные методы для работы с подключением и таблицами.
❯ Сессии
Но как мы будем добавлять модели в базу данных и работать с ними? Все просто — я решил что правильным способом будем создание класса сессии.
Давайте реализуем это:
from pathlib import Path from typing import List, Any, Union, Callable from uuid import uuid4 from abc import ABC, abstractmethod from collections import OrderedDict from loguru import logger from sqlsymphony_orm.database.manager import SQLiteMultiManager # менеджер from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # типы данныъ from sqlsymphony_orm.exceptions import ( # исключения PrimaryKeyError, FieldValidationError, NullableFieldError, FieldNamingError, )lsymphony_orm.queries import QueryBuilder # билдер запросов class Session(ABC): """ This class describes a session. """ @abstractmethod def reconnect(self): """ reconnect to database """ raise NotImplementedError @abstractmethod def get_all(self): """ Gets all models """ raise NotImplementedError @abstractmethod def get_all_by_module(self, needed_model: SessionModel): """ Gets all models by module. :paramneeded_model: The needed model :typeneeded_model: SessionModel """ raise NotImplementedError @abstractmethod def drop_table(self, table_name: str): """ drop table :paramtable_name: The table name :typetable_name: str """ raise NotImplementedError @abstractmethod def filter(self, query: "QueryBuilder", first: bool = False): """ Filter and get models by query :paramquery:The query :typequery:QueryBuilder :paramfirst:The first :typefirst:bool """ raise NotImplementedError @abstractmethod def update(self, model: SessionModel, **kwargs): """ Update model :parammodel: The model :typemodel: SessionModel :paramkwargs: The keywords arguments :typekwargs: dictionary """ raise NotImplementedError @abstractmethod def add(self, model: SessionModel, ignore: bool = False): """ Add model :parammodel: The model :typemodel: SessionModel :paramignore: The ignore :typeignore: bool """ raise NotImplementedError @abstractmethod def delete(self, model: SessionModel): """ Deletes the given model. :parammodel:The model :typemodel:SessionModel """ raise NotImplementedError @abstractmethod def commit(self): """ Commit changes """ raise NotImplementedError @abstractmethod def close(self): """ Close connection """ raise NotImplementedError class SQLiteSession(Session): """ This class describes a sqlite session. """ def __init__(self, database_file: str): """ Constructs a new instance. :paramdatabase_file:The database file :typedatabase_file:str """ self.database_file = Path(database_file) self.models = {} self.manager = SQLiteMultiManager(self.database_file) def reconnect(self, database_file: str = None): """ Reconnecto to database """ if database_file is not None: self.database_file = Path(database_file) logger.info(f"Session {self.database_file}: reconnect") self.manager.reconnect(database_file) def execute( self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False ) -> list: """ Execute raw sql query :paramraw_sql_query:The raw sql query :typeraw_sql_query:str :paramvalues:The values :typevalues:tuple :paramget_cursor:The get cursor :typeget_cursor:bool :returns:list with output data :rtype:list """ return self.manager.execute(raw_sql_query, values, get_cursor) def get_all(self) -> List[SessionModel]: """ Gets all. :returns:All. :rtype:List[SessionModel] """ models_instances = [self.models[model]["model"] for model in self.models.keys()] return models_instances def get_all_by_module(self, needed_model: SessionModel) -> List[SessionModel]: """ Gets all by module. :paramneeded_model: The needed model :typeneeded_model: SessionModel :returns:All by module. :rtype:List[SessionModel] """ all_instances = [self.models[model]["model"] for model in self.models.keys()] needed_instances = [] model_name = needed_model._model_name for model in all_instances: if model._model_name == model_name: needed_instances.append(model) return needed_instances def drop_table(self, table_name: str): """ Drop table :paramtable_name: The table name :typetable_name: str """ logger.info(f"Session {self.database_file}: drop table {table_name}") self.manager.drop_table(table_name) def filter( self, query: "QueryBuilder", first: bool = False ) -> Union[List[SessionModel], SessionModel]: """ Filter and get model by query :paramquery:The query :typequery:QueryBuilder :paramfirst:The first :typefirst:bool :returns:list with SessionModel or SessionModel :rtype:Union[List[SessionModel], SessionModel] """ db_results = self.manager.filter(str(query)) results = [] fields = {} for unique_id, curr_model in self.models.items(): model = curr_model["model"] fields[unique_id] = { "keys": model._original_fields.keys(), "values": [ getattr(model, value) if model._primary_key["field_name"] != value else model.pk for value in model._original_fields.keys() ], } for result in db_results: for unique_id, data in fields.items(): if len(data["keys"]) == len(result): if tuple(data["values"]) == result: results.append(self.models[unique_id]["model"]) if results: return results[0] if first else results else: return None def update(self, model: SessionModel, **kwargs): """ Update model :parammodel: The model :typemodel: SessionModel :paramkwargs: The keywords arguments :typekwargs: dictionary """ current_model = self.models.get(model.unique_id, None) if current_model is None: self.add(model) logger.info(f"Session {self.database_file}: update model {model.unique_id}") if model.hooks: func = model.hooks["update"]["function"] logger.debug(f"Exec Model Hook[update]: {func.__name__}") func(*model.hooks["update"]["args"]) for key, value in kwargs.items(): if hasattr(model, key): if value is not None and model._original_fields[key].validate(value): orig_field = getattr(model, key) setattr(model, key, model._original_fields[key].to_db_value(value)) logger.info( f"[{model.table_name}] Update {model._model_name}#{model.pk} {key}: {orig_field} -> {value}" ) self.models[model.unique_id]["model"] = model def add(self, model: SessionModel, ignore: bool = False): """ Add new model :parammodel: The model :typemodel: SessionModel :paramignore: The ignore :typeignore: bool """ if self.models.get(model.unique_id, None) is not None: logger.warning(f"Model {model.unique_id} already added") return if model.hooks: func = model.hooks["save"]["function"] logger.debug(f"Exec Model Hook[save]: {func.__name__}") func(*model.hooks["save"]["args"]) self.models[model.unique_id] = {"model": model} formatted_fields = model.get_formatted_sql_fields(skip_primary_key=True) self.manager.create_table(model.table_name, model.get_formatted_sql_fields()) self.manager.insert(model.table_name, formatted_fields, model.pk, model, ignore) last_pk = self.execute( f'SELECT max({model._primary_key["field_name"]}) FROM {model.table_name}' ) model._primary_key["value"] = int(last_pk[0][0]) logger.info( f"Session {self.database_file}: insert new model: {model.unique_id}" ) def delete(self, model: SessionModel): """ Deletes the given model. :parammodel:The model :typemodel:SessionModel """ current_model = self.models.get(model.unique_id, None) if current_model is None: logger.error(f"Model {model.unique_id} does not exists") return if model.hooks: func = model.hooks["delete"]["function"] logger.debug(f"Exec Model Hook[delete]: {func.__name__}") func(*model.hooks["delete"]["args"]) self.manager.delete( current_model["model"].table_name, current_model["model"]._primary_key["field_name"], current_model["model"].pk, ) logger.info(f"Session {self.database_file}: delete model: {model.unique_id}") def commit(self): """ Commit changes """ self.manager.commit() def close(self): """ Close connection """ self.manager.close_connection()
Такая же структура, как и в остальном коде — создаем абстрактный класс сессии и позже создаем класс-наследник нужной СУБД — в данном случае sqlite.
Класс сессии имеет методы для добавления модели, фильтра, получения всех моделей, получения моделей одного типа, удаление моделей, обновления и другие вспомогательные методы.
Через сессию можно работать так:
from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField from sqlsymphony_orm.models.session_models import SessionModel from sqlsymphony_orm.models.session_models import SQLiteSession from sqlsymphony_orm.queries import QueryBuilder session = SQLiteSession("example.db") class User(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) def __repr__(self): return f"<User {self.pk}>" class User2(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) password = TextField(default="password1234") def __repr__(self): return f"<User {self.pk}>" class Comment(SessionModel): id = IntegerField(primary_key=True) name = TextField(null=False) user_id = IntegerField(null=False) user = User(name="John") user2 = User(name="Bob") user3 = User(name="Ellie") session.add(user) session.commit() session.add(user2) session.commit() session.add(user3) session.commit() session.delete(user3) session.commit() session.update(model=user2, name="Anna") session.commit() comment = Comment(name=user.name, user_id=user.pk) session.add(comment) session.commit() print( session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna")) ) print(session.get_all()) print(session.get_all_by_module(User)) print(user.pk) session.close()
❯ Вспомогательные модули
Давайте реализуем небольшой модуль хеширования. Это может потребоваться, когда в БД хранятся данные, которые должны быть засекречены.
import hashlib from abc import ABC, abstractmethod from enum import Enum, auto from hmac import compare_digest from typing import Union class HashAlgorithm(Enum): """ This class describes a hash algorithms. """ SHA256 = auto() SHA512 = auto() MD5 = auto() BLAKE2B = auto() BLAKE2S = auto() class HashingBase(ABC): """ This class describes a hashing base. """ @abstractmethod def hash( self, data: Union[bytes, str], hexdigest: bool = False ) -> Union[bytes, str]: """ Hash :paramdata: The data :typedata: Union[bytes, str] :paramhexdigest: The hexdigest :typehexdigest: bool :returns:hashing :rtype:Union[bytes, str] :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool: """ Verify data and hashed data :paramdata: The data :typedata: Union[bytes, str] :paramhashed_data: The hashed data :typehashed_data: Union[bytes, str] :returns:true if data=hashed_data :rtype:bool :raisesNotImplementedError: { exception_description } """ raise NotImplementedError() class PlainHasher(HashingBase): """ This class describes a plain hasher. """ def __init__(self, algorithm: HashAlgorithm = HashAlgorithm.SHA256): """ Constructs a new instance. :paramalgorithm:The algorithm :typealgorithm:HashAlgorithm """ self.algorithm = algorithm def hash(self, data: Union[bytes, str]) -> bytes: """ Generate hash :paramdata: The data :typedata: Union[bytes, str] :returns:hash :rtype:bytes """ if isinstance(data, str): data = data.encode("utf-8") hasher = self.get_hasher() return hasher(data).digest() def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool: """ Verify data and hashed data :paramdata: The data :typedata: Union[bytes, str] :paramhashed_data: The hashed data :typehashed_data: Union[bytes, str] :returns:true if data==hashed_data :rtype:bool """ if isinstance(data, str): data = data.encode("utf-8") if isinstance(hashed_data, str): hashed_data = hashed_data.encode() expected_hash = self.hash(data) return compare_digest(expected_hash, hashed_data) def get_hasher(self) -> callable: """ Gets the hasher function. :returns:The hasher. :rtype:callable :raisesValueError: unknown hash function. """ hash_functions = { HashAlgorithm.SHA256: hashlib.sha256, HashAlgorithm.SHA512: hashlib.sha512, HashAlgorithm.MD5: hashlib.md5, HashAlgorithm.BLAKE2B: hashlib.blake2b, HashAlgorithm.BLAKE2S: hashlib.blake2s, } hash_function = hash_functions.get(self.algorithm, None) if hash_function is None: raise ValueError(f"Unknown hash function type: {self.algorithm}") else: return hash_function class SaltedHasher(HashingBase): """ This class describes a salted hasher. """ def __init__( self, algorithm: HashAlgorithm = HashAlgorithm.SHA256, salt: str = "SOMESALT" ): """ Constructs a new instance. :paramalgorithm:The algorithm :typealgorithm:HashAlgorithm :paramsalt:The salt :typesalt:str """ self.algorithm = algorithm self.salt = salt def hash(self, data: Union[bytes, str]) -> bytes: """ Generate hash :paramdata: The data :typedata: Union[bytes, str] :returns:hash :rtype:bytes """ salt = self.salt.encode("utf-8") if isinstance(data, str): data = data.encode("utf-8") hasher = self.get_hasher() value = f"{data}{salt}".encode("utf-8") return hasher(value).digest() def verify(self, data: str, hashed_data: Union[bytes, str]) -> bool: """ Verify data and hashed_data :paramdata: The data :typedata: str :paramhashed_data: The hashed data :typehashed_data: Union[bytes, str] :returns:true if data==hashed_data :rtype:bool """ if isinstance(hashed_data, str): print("convert") expected_hash = self.hash(data) return compare_digest(expected_hash, hashed_data) def get_hasher(self) -> callable: """ Gets the hasher function. :returns:The hasher. :rtype:callable :raisesValueError: unknown hasher function """ hash_functions = { HashAlgorithm.SHA256: hashlib.sha256, HashAlgorithm.SHA512: hashlib.sha512, HashAlgorithm.MD5: hashlib.md5, HashAlgorithm.BLAKE2B: hashlib.blake2b, HashAlgorithm.BLAKE2S: hashlib.blake2s, } hash_function = hash_functions.get(self.algorithm, None) if hash_function is None: raise ValueError(f"Unknown hash function type: {self.algorithm}") else: return hash_function
Здесь есть также абстрактный класс хешера и два его наследника — обычный (plain) хешер и засоленный (salted) hasher. Salted hasher генерирует хеш с солью, то есть к значению для хеширования мы добавляем соль. Это позволит избежать нахождения хеш-коллизий.
❯ Миграции
Миграции в контексте нашей ORM — это действия для обновления старой модели на новую с апдейтом базы данных. То есть: если старая модель имела два поля: id и name, то мы можем создать новую модель с тремя полями (id, name и age), и во время миграции в базу данных добавится новое поле, без потери старых изменений. Но есть несколько ограничений:
-
Возможны проблемы с UNIQUE-полями.
-
Если поле с параметром NOT NULL, она должна иметь параметр DEFAULT, иначе будет ошибка.
Конечно, у нас миграции простые, но вы можете сделать лучше.
Также я сделал возможность бекапить БД, которые были до миграции. Во время инициализации менеджера миграций, создается директория migrations, туда помещается бекап текущей базы данных, и после этого уже идет работа менеджера. А сами миграции для восстановления хранятся в json-файле. Благодаря этому, если миграция прошла неудачно, можно ее отменить и все вернуть на место. Главное — чтобы существовал бекап.
Напишем код:
from typing import Optional, Union from abc import ABC, abstractmethod import os import json import shutil from pathlib import Path from datetime import datetime from sqlsymphony_orm.models.session_models import SQLiteSession # сессия from sqlsymphony_orm.exceptions import MigrationError # исключение from loguru import logger class MigrationManager(ABC): """ This class describes a migration manager. """ @abstractmethod def get_current_table_columns(self, table_name: str): """ Gets the current table columns. :paramtable_name: The table name :typetable_name: str :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def get_table_columns_from_model(self, model: "Model"): """ Gets the table columns from model. :parammodel: The model :typemodel: Model :raisesNotImplementedError: abstract method """ raise NotImplementedError() @abstractmethod def revert_migration(self, index_key: int = -1): """ Revert migration :paramindex_key: The index key :typeindex_key: int :raisesNotImplementedError: abstract method """ raise NotImplementedError() class SQLiteMigrationManager(MigrationManager): """ This class describes a sqlite migration manager. """ def __init__(self, session: SQLiteSession, migrations_dir: str = "migrations"): """ Constructs a new instance. :paramsession: The session :typesession: SQLiteSession :parammigrations_dir: The migrations dir :typemigrations_dir: str """ self.session = session self.migrations_dir = migrations_dir os.makedirs(self.migrations_dir, exist_ok=True) self.migrations = {} self.migrations_file = "sqlsymphony_migrates.json" def get_current_table_columns(self, table_name: str) -> list: """ Gets the current table columns. :paramtable_name: The table name :typetable_name: str :returns:The current table columns. :rtype:list """ data = self.session.execute(f"SELECT * FROM {table_name}", get_cursor=True) cursor = data[0] fieldnames = [field[0] for field in cursor.description] return fieldnames def get_table_columns_from_model(self, model: "Model") -> list: """ Gets the table columns from model. :parammodel:The model :typemodel:Model :returns:The table columns from model. :rtype:list """ return [key for key in model._original_fields.keys()] def upload_migrations_file(self): logger.debug(f"Load JSON migrations history file: {self.migrations_file}") with open(self.migrations_file, "r") as read_file: self.migrations = json.load(read_file) def update_migrations_file(self): logger.debug(f"Update JSON migrations history file: {self.migrations_file}") with open(self.migrations_file, "w") as write_file: json.dump(self.migrations, write_file, indent=4) def migrate_from_model( self, old_model: Union["SessionModel", "Model"], new_model: Union["SessionModel", "Model"], original_table_name: str, new_table_name: Optional[str] = None, ): """ Migrate from old model to new model :paramold_model: The old model :typeold_model: Union["SessionModel", "Model"] :paramnew_model: The new model :typenew_model: Union["SessionModel", "Model"] :paramoriginal_table_name: The original table name :typeoriginal_table_name: str :paramnew_table_name: The new table name :typenew_table_name: Optional[str] :raisesMigrationError: fields error """ sql_queries = [] logger.info("Start database migrating") if new_table_name is not None: sql_queries.append( f"ALTER TABLE {original_table_name} RENAME TO {new_table_name};" ) logger.debug( f"[Migration] Change table name: {original_table_name} -> {new_table_name}" ) new_model.table_name = new_table_name original_table_name = new_table_name old_fields = set( [ f"{field_name} {field_params}" for field_name, field_params in old_model._class_get_formatted_sql_fields( skip_primary_key=False ).items() ] ) new_fields = set( [ f"{field_name} {field_params}" for field_name, field_params in new_model._class_get_formatted_sql_fields( skip_primary_key=False ).items() ] ) added = new_fields - old_fields dropped = old_fields - new_fields for field_name in dropped: logger.debug( f'[Migration] Drop column {field_name.split(" ")[0]} from table {original_table_name}' ) sql_queries.append( f"ALTER TABLE {original_table_name} DROP COLUMN {field_name.split(" ")[0]};" ) for field in added: if "NOT NULL" in field and "DEFAULT" not in field: raise MigrationError( f'Cannot script a "not null" field without default value in field "{field}"' ) logger.debug(f"[Migration] Add column {field} to {original_table_name}") sql_queries.append(f"ALTER TABLE {original_table_name} ADD COLUMN {field};") migrationfile = os.path.join( self.migrations_dir, f'{datetime.now().strftime("backup_%Y%m%d%H%M%S")}_{self.session.database_file}', ) logger.debug(f"Create migraton file: {migrationfile}") shutil.copyfile(self.session.database_file, migrationfile) if Path(self.migrations_file).exists(): self.upload_migrations_file() self.migrations[str(len(self.migrations) + 1)] = { "migrationfile": migrationfile, "tablename": original_table_name, "description": f"from {old_model._model_name} to {new_model._model_name}", "sql_queries": list(sql_queries), "fields": { "new": list(new_fields), "old": list(old_fields), "added": list(added), "dropped": list(dropped), }, } self.update_migrations_file() for sql_query in sql_queries: logger.debug(f"[Migration] Execute sql query: {sql_query}") try: self.session.execute(sql_query) except Exception as ex: raise MigrationError(str(ex)) def revert_migration(self, index_key: int = -1): """ Revert migration :paramindex_key:The index key :typeindex_key:int """ self.upload_migrations_file() if index_key == -1: index_key = [k for k in self.migrations.keys()][-1] try: migration = self.migrations[str(index_key)] except KeyError as ke: logger.error(f"Cannot get migration by index {index_key}: {ke}") logger.info("[Migration] Rollback database from new to old.") shutil.copyfile(migration["migrationfile"], self.session.database_file)
Весь основной код содержится в методе migrate_from_model, которая принимает на вход старую модель, новую модель, название таблицы и новое название таблицы (опционально). Мы получаем нужные поля, работая с ними через множества set, и создаем нужные бекапы.
Метод revert_migration же позволяет по индексу миграции вернуть старую версию БД. Индекс по умолчанию -1, то есть последний. После мы получаем нужную миграцию и замещаем новую БД старой БД.
❯ Пример работы
Вот полный код примера работы ORM:
from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField # поля from sqlsymphony_orm.models.session_models import SessionModel # модель from sqlsymphony_orm.models.session_models import SQLiteSession # сессия from sqlsymphony_orm.queries import QueryBuilder # билдер запросов from sqlsymphony_orm.migrations.migrations_manager import SQLiteMigrationManager # миграции start = time() session = SQLiteSession("example.db") class User(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) def __repr__(self): return f"<User {self.pk}>" class User2(SessionModel): __tablename__ = "Users" id = IntegerField(primary_key=True) name = TextField(null=False) cash = RealField(null=False, default=0.0) password = TextField(default="password1234") def __repr__(self): return f"<User {self.pk}>" class Comment(SessionModel): id = IntegerField(primary_key=True) name = TextField(null=False) user_id = IntegerField(null=False) user = User(name="John") user2 = User(name="Bob") user3 = User(name="Ellie") session.add(user) session.commit() session.add(user2) session.commit() session.add(user3) session.commit() session.delete(user3) session.commit() session.update(model=user2, name="Anna") session.commit() comment = Comment(name=user.name, user_id=user.pk) session.add(comment) session.commit() print( session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna")) ) print(session.get_all()) print(session.get_all_by_module(User)) print(user.pk) migrations_manager = SQLiteMigrationManager(session) migrations_manager.migrate_from_model(User, User2, "Users", "UserAnd") migrations_manager.revert_migration(-1) session.close()
У нас есть три модели — модель юзера, новая модель юзера, модель комментария. Мы всех их добавляем, обновляем если надо. Потом в коде демонстрируется фильтрация и получение моделей, а в конце мы создаем миграцию и после сразу же возвращаем старую БД, и в конце закрываем сессию.
Для сохранения изменений после операции следует вызвать session.commit().
Итак, это все!
Мы смогли изучить многие сложные конструкции ООП в python на примере создания такой базовой вещи как ORM.
Да, наша ORM не идеальна, нет ForeignKey, некоторых других вещей. Значение PrimaryKey доступно только после добавления модели. Если у вас есть замечания по поводу статьи — пишите в комментариях. Разумная критика приветствуется!
Ссылка на github-репозиторий с примерами здесь.
Документация моей ORM доступна по этой ссылке.
А PyPI проект находится по этой ссылке.
❯ Заключение
Спасибо за внимание! Это был довольно интересный опыт для меня, т.к. это мой первый большой проект на python с продвинутым ООП, где я попытался изучить более подробно язык и инструментарии.
Если у вас есть замечания по статье или по коду — пишите, наверняка есть более опытный и профессиональный программист на Python, который может помочь как и читателям статьи, так и мне.
Ссылка на мой репозиторий реализации ORM здесь.
Буду рад, если вы присоединитесь к моему небольшому телеграм-блогу. Анонсы статей, новости из мира IT и полезные материалы для изучения программирования и смежных областей. Если конечно хотите 🙂
Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале ↩
Перейти ↩
📚 Читайте также:
❯ Источники
ссылка на оригинал статьи https://habr.com/ru/articles/851706/
Добавить комментарий