Strawberry GraphQL и FastAPI. Так что получается, Pydantic всё-таки не нужен?

от автора

Сказ пойдет о том, как я протаптывал тропинки в этом неизведанном (или неосвещенном) мире GraphQL и Python.

Я начал работать с FastAPI когда он имел в десятки раз меньше звёзд чем у Django, и каково было моё удивление, что они уже почти наравне. На момент написания статьи FastAPI имеет 55K, а Django — 69K. Почему-то до сих пор у меня складывалось впечатление, что ничего так не изменилось, а случается так, потому что решений и шума вокруг Django я вижу намного больше. Да на том же рынке труда при просмотре Python-вакансий это видно невооруженным глазом. Так вот, к чему это я, при выборе библиотеки для работы с GraphQL я столкнулся с тем, что выбор-то не сладок. Выделить хотелось бы 3 библиотеки — это ariadne, strawberry и graphene. Но при детальном рассмотрении оказалось, что graphene не имеет простой возможности нормальной интеграции с FastAPI, какой-то из методов, который раньше сущевствовал был deprecated и вынесен в отдельную библиотеку, что меня насторожило. Ariadne показался более правильным выбором, но в момент выбора при установке летели ошибки из-за версии каких-то библиотек, да и коммитов на тот момент давненько не было. Поэтому остался strawberry, которая к тому же упоминается в документации FastAPI и имеет какую-то интеграцию с Pydantic. Выбор я остановил именно на ней. Несмотря на то, что эта интеграция имеет флаг Experimental.

По началу всё шло очень сложно и не понятно, получалось так, что я по сути дважды переписывал некоторые схемы, т.к. многие вложенные структуры неправильно конвертировались, да и я получал множество исключений. Я сейчас говорю именно об экспериметальной функции, из-за неё я полностью описывал и пайдантик модели, и схемы Strawberry, при этом по прежнему пользуясь experimental функционалом. И только так у меня всё более менее заводилось, но не тут то было…. Незамедлительно было принято решение писать костыли. И этими костылями в этой статье я с вами поделюсь, но не с целью показать как правильно, а скорее обсудить варианты, как сделать это более правильно. Вообще я хотел сам посидеть над тем самым «идеальным» решением, но решил понадеятся на сообщество Хабра. В последующем статья будет редактироваться, если какое-нибудь хорошее решение найдется.

В ходе долгих экспериментов, проб и исключений мне пришлось и вовсе выкинуть из этой цепочки Pydantic, проблема оставалась лишь в том, как бы модели SqlAlchemy сконвертировать. Я наткнулся на один интерсный файл на Github, который и сподвиг меня попробовать пойти дальше.

Суть этих методов в том, чтобы генерировать SQL запрос таким образом, чтобы мы из бд получали только то, что запросил клиент. Таким образом мы вроде как экономим на объеме получаемых из бд данных и вроде как получаем заветную оптимизацию в отличии от Rest API. Я дописал эти функции таким образом, чтобы мы имели возможность получать объекты бд и их отношения в бесконечную глубину благодаря рекурсии. Сейчас вы можете увидеть те самые функции, только слегка увеличенные:

def flatten(items):     if not items:         return items     if isinstance(items[0], list):         return flatten(items[0]) + flatten(items[1:])     return items[:1] + flatten(items[1:])   def get_relation_options(relation: dict, prev_sql=None):     key, val = next(iter(relation.items()))     fields = val['fields']     relations = val['relations']     if prev_sql:         sql = prev_sql.joinedload(key).load_only(*fields)     else:         sql = joinedload(key).load_only(*fields)     if len(relations) == 0:         return sql     if len(relations) == 1:         return get_relation_options(relations[0], sql)     result = []     for i in relations:         rels = get_relation_options(i, sql)         if hasattr(rels, '__iter__'):             for r in rels:                 result.append(r)         else:             result.append(rels)     return result   def get_only_selected_fields(     db_baseclass_name,  # это наша SqlAlchemy модель которая является основной, от которой будем отталкиваться.     info: Info,  ):     def process_items(items: list[SelectedField], db_baseclass): # В этой функции мы разбиваем наши fields и relations для дальнейшей обработки         fields, relations = [], []         for item in items:             if item.name == '__typename': # item.name - имя нашего field из GraphQL Query                 continue             try:                 relation_name = getattr(db_baseclass, convert_camel_case(item.name))             except AttributeError:                 continue             if not len(item.selections):                 fields.append(relation_name)                 continue             related_class = relation_name.property.mapper.class_             relations.append({relation_name: process_items(item.selections, related_class)})         return dict(fields=fields, relations=relations)      selections = info.selected_fields[0].selections     options = process_items(selections, db_baseclass_name)      fields = [load_only(*options['fields'])] if len(options['fields']) else []      query_options = [         *fields,         *flatten([get_relation_options(i) for i in options['relations']]) # Здесь мы имеем уже отсортированные отношения     ]      return select(db_baseclass_name).options(*query_options) 

Код достаточно нечитаемый, но в течении 5-ти минут в нем можно разобраться. Желательно вам самостоятельно поиграться с этим кодом и посмотреть на выходимый SQL, чтобы быстрее понять что это такое. Здесь важно уточнить, что ваши SqlAlchemy модели и все ваши relationship должны быть описаны в этих моделях. Но если вам лень всё это делать я напишу простой пример. Имеем запрос вида:

{   users: {     id     name     username     email     groups {       id       name       category {         id         name       }     }   } }

И автоматически получаем SqlAclhemy запрос вида:

select(User).options(   load_only(User.id, User.name, User.username, User.email),   joinedload(User.groups).load_only(     Group.id, Group.name   ).joinedload(Group.category).load_only(     Category.id, Category.name   ) )

Далее мы уже можем на него накрутить различные фильтрации, и всё что нам необходимо. Далее переходим к функциям, которые заставляют превращать модели SqlAlchemy в схемы Strawberry.

Первая функция, которая тоже выглядит достаточно страшно занимается тем, что превращает SqlAlchemy модели в полноценные dict объекты.

def get_dict_object(model):     if isinstance(model, list):         return [get_dict_object(i) for i in model]     if isinstance(model, dict):         for k, v in model.items():             if isinstance(v, list):                 return {                     **model,                     k: [get_dict_object(i) for i in v]                 }         return model     mapper = class_mapper(model.__class__)     out = {         col.key: getattr(model, col.key)         for col in mapper.columns         if col.key in model.__dict__     }     for name, relation in mapper.relationships.items():         if name not in model.__dict__:             continue         try:             related_obj = getattr(model, name)         except AttributeError:             continue         if related_obj is not None:             if relation.uselist:                 out[name] = [get_dict_object(child) for child in related_obj]             else:                 out[name] = get_dict_object(related_obj)         else:             out[name] = None     return out 

Дальше идёт самая страшная часть статьи, потому что этот код выглядит уже действительно страшно (да простит меня бог, что я такое публикую на хабр):

def orm_to_strawberry_step(item: dict, current_strawberry_type):     annots = current_strawberry_type.__annotations__     temp = {}     for k, v in item.items():         if k not in annots.keys():             continue         current_type = annots.get(k)         if isinstance(v, str) or isinstance(v, int) or isinstance(v, float) or isinstance(v, datetime):             temp[k] = v             continue         if isinstance(v, enum.Enum):             temp[k] = strawberry.enum(v.__class__)[v.value]             continue         if isinstance(current_type, StrawberryOptional):             current_type = current_type.of_type         if isinstance(current_type, UnionType):             current_type = current_type.__args__[0]         if isinstance(current_type, StrawberryList):             current_type = current_type.of_type         if isinstance(current_type, GenericAlias):             current_type = current_type.__args__[0]         if isinstance(v, list):             temp[k] = [orm_to_strawberry_step(i, current_type) for i in item[k]]         elif isinstance(v, dict):             temp[k] = orm_to_strawberry_step(item[k], current_type)     return current_strawberry_type(**temp)   def orm_to_strawberry(input_data, strawberry_type):     if isinstance(input_data, list):         return [orm_to_strawberry_step(get_dict_object(item), strawberry_type) for item in input_data]     return orm_to_strawberry_step(get_dict_object(input_data), strawberry_type)

Что стоит сказать об этом коде, так это то, что мы даем этой большой функции наш dict, и схему Strawberry в которую будет происходить перевоплощение. Она получает все вложенные модели (отношения) и подставляет в каждую наши дикты. Опять-таки, чтобы понять код, вам придется немножно над ним посидеть. Таким образом мы имеем глубокие схемы с подсхемами, которые полностью провалидированы Strawberry и готовы к выдаче пользователю. Напоследок оставлю простенькую функция, которая частично повторяет функционал Pydantic, когда мы пользуемся методом .dict():

def _to_dict(obj):     if isinstance(obj, list) or isinstance(obj, tuple):         return [_to_dict(i) for i in obj]     if not hasattr(obj, '__dict__'):         return obj     temp = obj.__dict__     for key, value in temp.items():         if hasattr(value, '_enum_definition') or isinstance(value, bytes):             continue         elif hasattr(value, '__dict__'):             temp[key] = _to_dict(value)         elif isinstance(value, list):             temp[key] = [_to_dict(i) for i in value]     return temp  def strawberry_to_dict(     strawberry_model,     exclude_none: bool = False,     exclude: set | None = None, ):     deep_copy = copy.deepcopy(strawberry_model)     dict_obj = _to_dict(deep_copy)     result_dict = {**dict_obj}     for k, v in dict_obj.items():         if exclude:             if k in exclude:                 result_dict.pop(k, None)         if exclude_none and v is None:             result_dict.pop(k, None)     return result_dict

Надеюсь кто-то дочитает статью до конца, потому что я старался вложить в статью максимум пользы. Кода нету на GitHub и наверное не будет, тут буквально 7 простых функций)))


ссылка на оригинал статьи https://habr.com/ru/post/723220/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *