Хорошая замена Celery

от автора

В своей прошлой статье «Как подружить Celery и SQLAlchemy 2.0 с асинхронным Python» я разбирал возможность запускать асинхронные задачи «из-под Celery» и в комментариях мне сообщили о существовании ещё одной библиотеки под названием aio_pika. И признаться, о ней я раньше никогда не слышал. Оно и не удивительно, библиотека имеет всего в районе 1К звёзд на GitHub (по сравнению с 20К+ у Celery). Я рассмотрел абсолютно все популярные (500+ звёзд) решения и остановился именно на этом из-за активной (на текущий момент) разработке и относительной популярности.

Стек, который вы увидите в статье: FastAPI, RabbitMQ, aio_pika и docker. Статья будет полезна тем кто использует Celery в своих проектах, а так же тем, кто только слышал о том, что такое очереди и RabbitMQ.

Навигация:

  1. Конфигурация RabbitMQ

  2. Task router для consumer’a

  3. Написание consumer’a

  4. Интеграция в основное приложение

Предисловие

Библиотека позиционирует себя «обёрткой aiormq для asyncio для людей». Моей целью стало заменить Celery, используемый в проекте на неё. Решил я это сделать из-за того, что его интерфейс не предполагает разбиение приложения и worker’ов в отдельные сервисы, чего очень хотелось бы. Второстепенными причинами стали: отсутствие асинхронности, запах legacy (я про атрибут self, который необходимо писать первым аргументом функций) и отсутствие type-хинтов (это на последнем месте важности!). Celery в проекте использовался для IO-Bound и Delay задач, поэтому интеграция асинхронности была очень кстати.

Конфигурация RabbitMQ

Я обновил свой RabbitMQ добавив плагин «RabbitMQ Delayed Message Plugin». Он нужен был для того, чтобы делать «отложенные» задачи. Т.е. задача была в том, чтобы удалять временные файлы по истечении определённого времени. Celery с этим справлялся, т.к. у него была нативная поддержка данной фичи, но, как я понял, aio-pika такого не имеет. Этот плагин позволяет добавить этот функционал в сам RabbitMQ. Мой docker-compose конфиг стал выглядеть следующим образом:

docker-compose.yaml
 rabbit:     image: rabbitmq:3-management     hostname: rabbit     env_file:       - .env     volumes:       - ./services/rabbit/delayed_message.ez:/opt/rabbitmq/plugins/delayed_message.ez       - ./services/rabbit/enabled:/etc/rabbitmq/enabled_plugins     ports:       - "15672:15672"

Через volumes я подключил скачанный плагин, а так же добавил его в список активированных по умолчанию. Мой enabled_plugins файл выглядел следующим образом:

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

*Точка в конце обязательна

Task router для consumer’a

Следующим этапом я написал Router для моего worker’а, который был бы для меня удобен. На этом моменте я немного заморочился:

router.py
class Router:     _routes: dict[str, list[str]] = {}      def __init__(self):         modules = list(filter(             lambda x: x != '__init__',             map(lambda y: y.split('.')[0], os.listdir('tasks'))         ))         for module in modules:             imported = import_module(f'tasks.{module}')             if not hasattr(imported, '__all__'):                 continue             self._routes[module] = imported.__all__             del imported     def get_method(self, action: str) -> Optional[Callable]:         module = action.split(':')[0] # Название файла         method = action.split(':')[1] # Название функции         if self._exists(module, method):             return getattr(import_module(f'tasks.{module}'), method)

Переменная _router заполняется задачами, которые расположены в папке tasks, в которой лежат сами функции (задачи). Так же они указаны в переменной all для экспорта. Для наглядности задачи выглядела примерно так:

async def test(is_test: bool):     print(f'Hello world! Value is: {is_test}')  __all__ = ['test']

Следующей задачей предстояло решить проблему с тем, что эти функции имеют произвольное количество аргументов. Я написал ещё один метод для роутера, который мог бы учесть и это:

router.py
def check_args(func: Callable, data: dict) -> bool:     hints = get_type_hints(func)     for arg, arg_type in hints.items():         if arg not in data:             return False         if not isinstance(data[arg], arg_type):             return False     return True

Мы передаем в данный метод функцию, которую импортировали из файла, а так же данные, которые пытаемся ей подсунуть. Мы так же проверяем типы указанные в аргументах функции. Если всё ок — то возвращаем True

Таким образом я регулировал количество доступных задач созданием \ удалением файлов из папки tasks. Это оказалось очень удобным и гибким решением.

Написание consumer’a

consumer.py
 async def process_message(message: AbstractIncomingMessage):     async with message.process():         message = MessageSchema.parse_obj(json.loads(message.body.decode()))         method = router.get_method(message.action) # Импортируем функцию и записываем в переменную         if method:             if not router.check_args(method, message.body): # Проверяем атрибуты, которые собираемся передавать                 print('Invalid args')                 return             if inspect.iscoroutinefunction(method): # Проверяем является ли функция async или нет                 await method(**message.body)             else:                 method(**message.body)   async def main() -> None:     queue_key = rabbit_config.RABBITMQ_QUEUE      connection = await aio_pika.connect_robust(rabbit_config.url)     # Для корректной работы с RabbitMQ указываем publisher_confirms=False     channel = await connection.channel(publisher_confirms=False)     # Кол-во задач, которые consumer может выполнять в момент времени. В моём случае 100     await channel.set_qos(prefetch_count=100)     queue = await channel.declare_queue(queue_key)          exchange = await channel.declare_exchange(         # Объявляем exchange с именем main и типом, который поддерживает отложенные задачи         # Важно чтобы это имя (main) совпадало с именем на стороне publisher         'main', ExchangeType.X_DELAYED_MESSAGE,          arguments={             'x-delayed-type': 'direct'         }     )     await queue.bind(exchange, queue_key)     await queue.consume(process_message)     try:         await asyncio.Future()     finally:         await connection.close()   if __name__ == "__main__":     asyncio.run(main()) 

В целом на этом сторона consumer’a закончена и можно приступить к интеграции всего этого добра в основное приложение (publisher).

Интеграция в основное приложение

На помощь снова приходит ООП и я написал класс для работы с aio-pika, который полностью закрыл мои нужды. Его инициализация происходила в новеньком lifespan (который скоро полностью вытолкнет старые способы):

@asynccontextmanager async def lifespan(_: FastAPI):     await rabbit_connection.connect()     yield     await rabbit_connection.disconnect()  app = FastAPI(lifespan=lifespan)

Далее идет реализация этого класса:

rabbit_connection.py
class RabbitConnection:     _connection: AbstractRobustConnection | None = None     _channel: AbstractRobustChannel | None = None     _exchange: AbstractRobustExchange | None = None      async def disconnect(self) -> None:         if self._channel and not self._channel.is_closed:             await self._channel.close()         if self._connection and not self._connection.is_closed:             await self._connection.close()         self._connection = None         self._channel = None      async def connect(self) -> None:         try:             self._connection = await connect_robust(rabbit_config.url)             self._channel = await self._connection.channel(publisher_confirms=False)             self._exchange = await self._channel.declare_exchange(                 # Повторяем из consumer'a. Важно указать одинакое                 # имя exchange'ов. В моём случае `main`                 'main', ExchangeType.X_DELAYED_MESSAGE,                 arguments={                     'x-delayed-type': 'direct'                 }             )         except Exception as e:             await self.disconnect()      async def send_messages(             self,             messages: list[MessageSchema],             *,             routing_key: str = rabbit_config.RABBITMQ_QUEUE,             delay: int = None # Задержка, через которое нужно выполнить задачу (в секундах)     ) -> None:         async with self._channel.transaction():             headers = None             if delay:                 headers = {                     'x-delay': f'{delay * 1000}' # Это тоже из документации плагина для RabbitMQ                 }             for message in messages:                 message = Message(                     body=json.dumps(message.dict()).encode(),                     headers=headers                 )                 await self._exchange.publish(                     message,                     routing_key=routing_key,                     mandatory=False if delay else True # Чтобы в логах был порядок ;)                 )   rabbit_connection = RabbitConnection()

В итоге для того, чтобы отправить работки worker’у достаточно было сделать следующее:

main.py
@router.get('/test') async def test():     message = MessageSchema(         action='images:delete',         body={'path': 'assets/temp/temp.png'}     )     await rabbit_connection.send_messages(       [message for _ in range(150)],        delay=20     )     return {'status': 'published'}

Подводя итоги хочется сказать что worker теперь чувствует себя намного увереннее и может выполнять намного больше и быстрее. Надеюсь статья оказалась полезной. Всем спасибо, всем пока.


ссылка на оригинал статьи https://habr.com/ru/articles/736598/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *