Mela: асинхронный фреймворк на Python для сервисов, работающих с RabbitMQ

от автора

WARNING: длинная вступительная часть. Если хотите перейти сразу к делу — листайте до Getting Started.

Вступление

В 2023 году писать сервисы, взаимодействующие друг с другом через RabbitMQ, всё ещё неоправданно сложно. Ещё больше сложностей возникает с тестированием бизнес-логики в них, с согласованием контрактов между ними, с организацией монорепозиториев.

В недрах компании Alem Research, ещё года с 19-го, я начал писать легковесный фреймворк, который по задумке должен был стать чем-то вроде Flask или FastAPI для работы с RabbitMQ.

Удалось ли? Пожалуй, да. При помощи этого фреймворка наши дата-саентисты смогли самостоятельно писать и упаковывать в докер сервисы, которые лёгким движением руки и парой перебиндов встраиваются в трубу.

Впрочем, первая версия работала ещё с kombu, а позже ядро было переписано на aio-pika, и фреймворк стал ещё и асинхронным. Что даёт асинхронность? Например, можно написать фетчер HTTP-страниц, который при увеличении prefetch_count будет фетчить параллельно огромное количество страниц, и даже сможет утилизировать сеть практически полностью, чего без асинхронности было бы довольно сложно добиться.

Кроме того, асинхронность позволяет легче лёгкого реализовать паттерн RPC через RabbitMQ в рамках одного процесса, что довольно удобно для тестирования. Да и интеграция с FastAPI становится очень удобной. А скорость обработки растёт в десятки раз.

В 2020-м мы совместно с фаундером и тогдашним генеральным директором решили, что фреймворк смело можно выкладывать в open-source. И в целом, казалось бы, всё хорошо.

Однако, есть и проблемы. Главная из них — это нехватка живых пользователей и контрибьюторов. Я ушёл из Alem Research, и в компании продолжать работать над фреймворком в данный момент некому. Иногда кто-то из компании пишет мне запросы по фреймворку, но мне очень лень выкатывать обновления для одной-единственной компании: это получается своего рода работа на работодателя, который тебе больше не платит 🙂

Я давно уже собирался написать питчинг-статью про Mela, чтобы привлечь новых пользователей, а возможно даже и контрибьюторов, но не решался, потому что код был очень далёк от идеала. Да-да, меня просто жрал внутренний перфекционизм и стыд за то, чтобы выкатить что-то не очень красивое. Но сегодня я понял, что стыдиться мне особо нечего: да, архитектура ядра не идеальна, но за последние пару месяцев, пока я сидел без работы, я уже продумал её со всех сторон, и знаю, что делать дальше. В общем, красивая версия не готова, но у меня есть план.

У меня есть план

У меня есть план

И я решил, что будет неправильно (и долго) реализовывать свой план в одно лицо, лучше сразу попробовать привлечь к нему потенциальных контрибьюторов. В общем, если вас заинтересует то, что описано ниже — добро пожаловать на борт!

Извините за долгое вступление. Мы наконец-то отправляемся в путь.

Getting started

Конечно же, всё начинается с:

pip install mela=1.1.1

Версию я указал чтобы статья оставалась релевантной даже после обновлений.

Давайте напишем сервис, который будет получать сообщение из очереди, выводить его body распарсенный в json, и возвращать сообщение обратно.

# app.py  from mela import Mela  app = Mela(__name__)   @app.service("printer") def printer(body, message):     print(body)     return body   if __name__ == '__main__':     app.run() 

Как вы можете догадаться, это ещё не всё, что требуется для запуска приложения: здесь нет никакой информации об очередях и подключении. Это потому что она хранится в файле application.yml.

# application.yml connections:   default:     host: localhost     port: 5672     username: user     password: bitnami  services:   printer:     consumer:       exchange: general-sentiment-x       routing_key: general-sentiment-q       queue: general-sentiment-q     publisher:       exchange: general-sentiment-x       routing_key: general-sentiment-q

Вот и всё.

Этот пример знакомит нас сразу с тремя самыми основными высокоуровневыми концепциями Мелы: Publisher, Consumer и Service.

Если с паблишером и консьюмером всё понятно, то про Service, пожалуй, поясню, что это элемент трубы, который состоит из комбинации консьюмера и паблишера: он консьюмит сообщения из определённой очереди, обрабатывает его, и паблишит в указанный эксчейндж. По сути, сервис состоит из паблишера и консьюмера, что в том числе отражено в Yaml-файле.

Функция, которая находится под декоратором @app.service(...), как не сложно догадаться, является коллбэком консьюмера, а то, что она возвращает — будет отправлено в паблишер, привязанный к сервису.

Pydantic

Какой современный фреймворк без Pydantic, правда? И их есть у меня.

# app.py from pydantic import BaseModel from datetime import datetime  from mela import Mela   app = Mela(__name__)   class Document(BaseModel):     text: str     url: str     date: datetime   @app.service('validator') def validator(body: Document) -> Document:     if '#' in body.url:         body.url = body.url.split('#')[0]     return body   if __name__ == '__main__':     app.run()

В данном случае мы используем new-style сигнатуру обработчика, и явно указывать второй аргумент, в который в предыдущем примере прилетел бы объект Message, нам не нужно.

body сначала переводится в json, потом этот json скармливается в класс Document.

При ошибках валидации ошибки будут выведены в консоль, а сообщение вернётся обратно в очередь. Если, конечно, в консьюмереrequeue_broken_messages=True (по умолчанию так) и если у консьюмера не задан dead letter exchange. То же самое будет происходить если в коллбэке зарейзится любая другая ошибка.

Управление ack/nack

Есть несколько способов управлять ответами. Давайте рассмотрим все сразу.

from datetime import datetime from datetime import timedelta from pydantic import BaseModel  from mela import IncomingMessage from mela import Mela from mela.components.exceptions import NackMessageError  app = Mela(__name__)   class Document(BaseModel):     text: str     url: str     date: datetime   @app.service("filter") async def filter_(body: Document, message: IncomingMessage):     if body.date > datetime.utcnow():         # First way: we can raise special exception with some `requeue` value         raise NackMessageError("We are not working with time travellers", requeue=False)     elif body.date < datetime.utcnow() - timedelta(days=365):         # Second way: we can manually nack message via IncomingMessage object         # As you can see, in this case we can't write any message about requeue reason.         # But it is still useful if you need to silently send message to DLX         await message.nack(requeue=False)  # Go to archive, dude      if body.url == '':         # Third way: we can raise almost any exception. The message should be or should not          # be requeued based on `requeue_broken_messages` value         raise AssertionError("Message without url is not acceptable")      return body   if __name__ == '__main__':     app.run() 

Чистый консьюмер

Консьюмер создаётся точно так же, как сервис, но с другим декоратором.

from pydantic import BaseModel from pydantic import EmailStr from mela import Mela  app = Mela(__name__)   class EmailNotification(BaseModel):     template_name: str     vars: dict     receiver: EmailStr   @app.consumer("email-sender") def printer(body: EmailNotification):     # Some Jinja2 and SMTP integration     pass   if __name__ == '__main__':     app.run() 

Если в ходе обработки сообщения не произошло никакой ошибки, то сообщение будет акнуто автоматически. Это поведение можно изменить. Однако, чаще всего оно всех устраивает. В любом случае, можно вручную акнуть сообщение, думаю, вы уже догадались, как это сделать.

Ещё важно показать application.yml для консьюмера:

connections:   default:     host: localhost     port: 5672     username: user     password: bitnami  consumers:   email-sender:     exchange: notifications-x     exchange_type: topic     routing_key: "email.#"     queue: email-sender-q 

Собственно, да. Просто другое название блока. И невзначай показанный пример работы с эксчейнджами других типов 🙂

Чистый паблишер и интеграция с FastAPI

from datetime import datetime from uuid import uuid4  from fastapi import FastAPI from mela import Mela from mela.settings import Settings from pydantic import BaseModel  app = FastAPI()  mela_app = Mela(__name__) mela_app.settings = Settings()   class ReportRequest(BaseModel):     start_date: datetime     end_date: datetime     user_id: str     report_id: str | None = None   @app.post("/report") async def read_root(report_request: ReportRequest):     if report_request.report_id is None:         report_request.report_id = str(uuid4())     # some DB writing     publisher = await mela_app.publisher_instance('report-generator')     await publisher.publish(report_request)     return report_request 

application.yml:

connections:   default:     host: localhost     port: 5672     username: user     password: bitnami  publishers:   report-generator:     exchange: report-x     routing_key: new-report 

Тут даже не знаю что комментировать. Разве что добавлю, что в будущем хотелось бы сделать внешний декоратор, чтобы инжектить паблишеры и RPC клиенты в сторонние функции так же, как в следующем примере. Но это уже к планам на будущее. А пока перейдём к следующему примеру.

Инъекции дополнительных паблишеров

Иногда нам нужно реализовать сплиттер или просто по ходу обработки сообщения что-то куда-то запаблишить. Можно, конечно, сделать как в предыдущем разделе, но можно сделать круче:

from datetime import datetime  from pydantic import BaseModel  from mela import Mela from mela.components import Publisher  app = Mela(__name__)   class Document(BaseModel):     text: str     url: str     date: datetime     has_images: bool = False   @app.service('archiver') async def archiver(document: Document, images_downloader: Publisher = 'images-downloader') -> Document:     # archiving document          if document.has_images:         await images_downloader.publish(document)          return document   if __name__ == '__main__':     app.run() 

application.yml:

connections:   default:     host: localhost     port: 5672     username: admin     password: admin  services:   archiver:     consumer:       exchange: archiver-x       routing_key: archiver-q       queue: archiver-q     publisher:       exchange: notify-archived-x       exchange_type: topic       routing_key: document.archived  publishers:   images-downloader:     exchange: images-downloader-x     routing_key: images-downloader-q 

На данный момент инъекции реализованы неправильно с точки зрения типизации. Есть план переписать их на новый служебный тип Annotated. Но… Главное работает.

DLX

Для любого консьюмера поддерживается Dead Letter Exchange.

Сделать его очень просто:

connections:   default:     host: localhost     port: 5672     username: admin     password: admin  services:   service_with_dlx:     consumer:       exchange: dlx-test-x       routing_key: dlx-test-k       queue: dlx-test-q       dead_letter_exchange: dlx-test-dead-letter-x       dead_letter_routing_key: dlx-test-dead-letter-k     publisher:       exchange: test-x       routing_key: test_queue

Но вам придётся самостоятельно следить за тем, чтобы была очередь, которая будет собирать сломанные сообщения из вашего эксчейнджа.

RPC

# server.py import asyncio import aio_pika  from mela import Mela  app = Mela(__name__)   async def fetch(url):     # asynchronously fetching url here and return its body     await asyncio.sleep(1)     return url   @app.rpc_service("fetcher") async def fetcher(url: str):     return {"fetched": await fetch(url)}   bots = {}   def create_bot(bot_id, bot_username, bot_password):     bots[bot_id] = {'username': bot_username, 'password': bot_password}   def get_bot(bot_id):     return bots[bot_id]   @app.rpc_service("bot_manager") async def fetcher(body, message: aio_pika.Message):     if message.headers['method'] == 'create_bot':         create_bot(**body)         return {'result': None, 'status': "OK"}     elif message.headers['method'] == 'get_bot':         return {'result': get_bot(**body), 'status': "OK"}     else:         return {'result': None, 'status': "ERROR_UNKNOWN_METHOD"}  if __name__ == '__main__':     app.run() 
# client.py import asyncio  from mela import Mela  app = Mela(__name__)   async def main():     # RPC calls over RabbitMQ never were simpler!      fetcher = await app.rpc_client_instance("fetcher")      bot_manager = await app.rpc_client_instance("bot_manager")      res = await fetcher.call({'url': "test"})     print(res)      # we can even gather call results!     g = await asyncio.gather(fetcher.call({'url': url1}), fetcher.call({'url': url2}))     print(g)      create_bot_result = await bot_manager.call({         'bot_id': 1,         'bot_username': "LalkaPalka",         'bot_password': "supersecret",     },         headers={'method': 'create_bot'},     )     print(f"create_bot result {create_bot_result}")      get_bot_result = await bot_manager.call({'bot_id': 1}, headers={'method': 'get_bot'})     print(f"get_bot_result {get_bot_result}")      unknown_method_result = await bot_manager.call({'bot_id': 4}, headers={'method': 'getBot'})     print(f"unknown method result: {unknown_method_result}")   if __name__ == '__main__':     url1 = (         'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-'         'almatinka-rasskazala-zaderjanii-supruga-459127/'     )     url2 = (         'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-'         'hozyaystv-naschityvaetsya-v-kazahstane_a3896073'     )     app.run(main())
connections:   default:     host: localhost     port: 5672     username: user     password: bitnami  rpc-services:   fetcher:     exchange: fetcher-x     routing_key: fetcher-k     queue: fetcher-q     response_exchange: fetching-result-x   bot_manager:     exchange: botmanager-x     routing_key: botmanager-k     queue: botmanager-q     response_exchange: botmanager-result-x

Вот вам пример сразу с двумя не конфликтующими друг с другом RPC-сервисами в одном процессе.

Тут тоже не вижу смысла что-то объяснять, но если будут вопросы — с радостью отвечу.

Несколько подключений и переменные окружения

Очень распространённый кейс — когда нам нужно перекачать данные из одного кластера реббита в другой. Сейчас мы не будем обсуждать правильность этой практики, а просто покажем, как это можно сделать легко и непринуждённо.

# application.yml connections:   input_connection:     host: $RABBIT_INPUT_HOST     port: ${RABBIT_INPUT_PORT|5672}     username: ${RABBIT_INPUT_USERNAME|rabbitmq-bridge}     password: ${RABBIT_INPUT_PASSWORD|rabbitmq-bridge}   output_connection:     host: $RABBIT_OUTPUT_HOST     port: ${RABBIT_OUTPUT_PORT|5672}     username: ${RABBIT_OUTPUT_USERNAME|rabbitmq-bridge}     password: ${RABBIT_OUTPUT_PASSWORD|rabbitmq-bridge}  services:   bridge:     consumer:       connection: input_connection       prefetch_count: ${RABBIT_INPUT_PREFETCH_COUNT|1}       routing_key: ${RABBIT_INPUT_ROUTING_KEY}       exchange: ${RABBIT_INPUT_EXCHANGE}       queue: ${RABBIT_INPUT_QUEUE}     publisher:       connection: output_connection       routing_key: ${RABBIT_OUTPUT_ROUTING_KEY}       exchange: ${RABBIT_OUTPUT_EXCHANGE} 
# app.py from mela import Mela  app = Mela(__name__)   @app.service("bridge") async def serve(body, message):     return body   if __name__ == '__main__':     app.run() 

Как вы можете заметить, код сервиса очень простой. А вот в конфигурационном файле есть кое-что новое. Первое, что бросается в глаза — это переменные окружения. Да, их очень просто сюда вшить. Вот, кстати, пример дотэнва:

RABBIT_INPUT_HOST=localhost RABBIT_INPUT_ROUTING_KEY=routing-key RABBIT_INPUT_EXCHANGE=exchange RABBIT_INPUT_QUEUE=queue RABBIT_OUTPUT_HOST=localhost RABBIT_OUTPUT_PORT=5673 RABBIT_OUTPUT_ROUTING_KEY=routing-key RABBIT_OUTPUT_EXCHANGE=exchange

Всё очень просто и прямолинейно, правда?

А ещё это по сути уже можно упаковывать в докер, и запускать через оркестратор.

Второе, на что нужно обратить внимание в конфиг-файле этого раздела — это то, что здесь два коннекшена, и для консьюмера и паблишера используются разные коннекшены.

На этом у меня юзкейсы закончились. Перейдём к следующему разделу.

Производительность

Цель этой статьи — не похвастаться, да и к тому же производительность фреймворка — это целиком и полностью заслуга авторов aio-pika, а не моя. Детальные бенчмарки — это будет тема для отдельной статьи, но пока просто скажу, что на моём не самом мощном ноуте простой бридж между двумя реббитами обрабатывает порядка 500 сообщений в секунду. А лучшее, чего мне удавалось добиться от обычной pika — это 80-100 сообщений в секунду. В случае с фетчингом страниц по всем понятным причинам не асинхронная pika не могла показать вообще сколько-нибудь адекватный результат.

Заключение

Как мне кажется, мне удалось составить неплохую внешнюю апишку. За исключением инъекций, но про них я писал выше. Внутри — бардак. Всё работает, конечно, но мейнтейнить и дальше развивать фреймворк будет сложно. Я уже начал переписывать ядро в чистовой вариант, но мне не хватает мотивации.

Поэтому я и написал эту статью. Если статья получит отклик, если этот фреймворк кого-то заинтересует, если найдутся люди, которые будут им пользоваться, а может даже и контрибьютить, то я обязательно продолжу работу над ним. Можете даже просто звёздочек накидать, это тоже даст мне хоть какую-то обратную связь 🙂

В конце дублирую ссылку на GitHub

Пишите вопросы в комментах. Ответами на самые важные буду дополнять статью, на остальные отвечу просто in-place.


ссылка на оригинал статьи https://habr.com/ru/articles/737134/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *