Как мы переписывали логику очередей: Celery => aio-pika => FastStream

от автора

Наш путь активной работы с очередями RabbitMQ начался с классического Celery. Осознав критичность низкоуровневого контроля системы, принялись работать с aio-pika. Но и этот уровень слишком местами сложный (далее расскажу почему), и нашли отличное решение, на текущий момент, в лице FastStream. Сразу оставлю такую пометку, что каждый инструмент подходит для решения своей задачи. Мы больше хотели сделать акцент на удобство и скорость разработки относительно затрачиваемого времени на миграции решений.

N.B.: Код возможно покажется неоптимальным или старым. Это всё наш дорогой Легаси.

Постановка задачи

Наша система построена на основе микросервисов, работающих с RabbitMQ. Внутри — обычный асинхронный код для похода на внешние API и в БД.

Требования:

  • Надежный консьюминг — это для нас критично, чтобы сообщение шло по всему флоу и нигде не останавливалось без причин. Если ошибка падает, то это должно отражаться в 3 местах: БД, логи и метрики.

  • Ретраи при ошибках обработки.

  • Трейсинг — поддержка OpenTelemetry.

  • Мониторинг сервиса через healthcheck’и.

  • Prometheus метрики.


Решение №1: Celery как консьюмер

Почему Celery

Celery — классический инструмент для фоновых задач, знакомый большинству Python-разработчиков. Из коробки: декларативное описание задач, ретраи с экспоненциальной задержкой, хранение результатов, мониторинг через Flower, интеграции с фреймворками. Логика проста: пишешь @app.task, запускаешь воркер — и сообщения из очереди начинают обрабатываться.

Как мы его использовали

Мы не отправляли задачи из кода в духе my_task.delay(), а настраивали Celery на прослушивание внешней очереди, куда сообщения попадали от других систем. По сути, Celery выступал как consumer: подключался к брокеру, забирал сообщения, десериализовал и передавал в наши обработчики. Настройки вроде max_retries, default_retry_delay, countdown позволяли гибко управлять поведением при сбоях. Важно ещё подсветить, что результат всегда игнорируется с помощью параметра ignore_result=True поскольку все результаты записываются в БД.

Пример инициализации воркера:

def create_app(    name,    broker,    include,    backend=None,    task_queues=None,    liveness_probe=1,    update_period=60,    watcher_config={},):    # Создание само приложение + наложение дополнительных конфигурации    app = Celery(name, broker=broker, include=include, backend=backend)    app.conf.update(        result_expires=120,    )    add_without_heartbeat_argument = Option(        ("--without-heartbeat",),        default=True,    )    app.user_options["worker"].add(add_without_heartbeat_argument)    if task_queues is not None:        app.conf.task_queues = task_queues    if liveness_probe:        add_update_period_argument = Option(            ("--update-period",),            default=update_period,        )        HEARTBEAT_FILE.touch()        app.user_options["worker"].add(add_update_period_argument)        # Добавление кастомной livenessProbe для K8s        app.steps["worker"].add(LivenessProbe)        # инициализация трейсинга    with_tracing = watcher_config.get("with_tracing")    if with_tracing:        tracing_exporters = watcher_config.get("tracing_exporters", ())        signals.worker_process_init.connect(            init_celery_tracing(app_name=name, tracing_exporters=tracing_exporters),            weak=False,        )    return app

С чем столкнулись

Celery проектировался как система передачи сообщений между системами. Из-за чего столкнулись со следующими проблемами:

  • Потребление памяти — из-за оборачивания каждого сообщения в метаданные и хранения внутренних структур Celery расход оперативной памяти быстро рос пропорционально потоку сообщений. При высоком темпе обработки воркер начинал использовать значительно больше ОЗУ, чем требовалось самой бизнес-логике, что вынуждало выделять избыточные ресурсы.

  • Управление соединениями и heartbeat — Celery скрывает многие детали брокера, из‑за чего при сетевых сбоях восстановление происходило с задержками, а тонкая настройка consumer_timeout, broker_transport_options была сложной и плохо документированной.

  • Избыточность — Как можно заметить мы используем минимальные дополнительные конфигурации Celery.

  • Падение контейнера на битом сообщений — Будет не совсем справедливо относить это полностью к минусам самого Celery, это больше камень в наш огород. Но опыт есть опыт. Если воркер обрабатывал невалидное сообщение в плане структуры, то падал весь контейнер, при этом сообщение консьюмилось. Что приводило к непониманию “куда делось сообщение и что с ним произошло?”.

Стало ясно: Celery — отличный выбор для фоновых задач с результатом, но для непрерывного потокового консьюминга он перегружен и недостаточно прозрачен.


Решение №2: aio-pika — близко к железу

Идея

Перейти на чистый AMQP, отказавшись от посредника в виде Celery. aio-pika — асинхронная библиотека для RabbitMQ, предоставляющая прямой доступ к каналам, обменникам, очередям. Код становится минимальной прослойкой над протоколом: сами управляем подписками, подтверждениями (ack/nack), префетчем, реконнектами.

Что переписали

Написали небольшой собственный “фреймворк” консьюминга: асинхронный раннер, который при запуске создаёт соединение, открывает канал, объявляет очереди с нужными параметрами durable/exclusive, подписывается на них, а в колбэке вызывает наши обработчики.

Поверх этого появились:

  • Ручной retry — Декоратор поверх колбэка, имеющий свой счётчик кол-во повторов при возникновений ошибок.

  • LivenessProb — Кастомный на уровне ядра aio-pika представлен чуть ниже.

  • Трейсинг — вручную оборачиваем вызовы в OpenTelemetry спаны, передаём trace context в заголовках AMQP при ретраях.

Пример LivenessProb:

class CustomRobustConnection(RobustConnection):    def __init__(        self,        url: URL,        loop: asyncio.AbstractEventLoop | None = None,        **kwargs: Any,    ):        super().__init__(url=url, loop=loop, **kwargs)        # Кастомный класс пробы, схож с тем, что выше для Celery        self._liveness_probe = LivenessProbe()    async def connect(self, timeout: TimeoutType = None) -> None:        # Тут держитесь крепче. Я когда впервые это увидел, без литра кофе не смог понять как оно работает        self._RobustConnection__connect_timeout = timeout        if self.is_closed:            raise RuntimeError(f"{self!r} connection closed")        if self.reconnecting:            raise RuntimeError(                (                    "Connect method called but connection "                    f"{self!r} is reconnecting right now."                ),                self,            )        if not self._RobustConnection__reconnection_task:            self._RobustConnection__reconnection_task = self.loop.create_task(                self.__connection_factory(),            )        await self._RobustConnection__fail_fast_future        await self.connected.wait()    async def __connection_factory(self) -> None:        logger.debug("Starting connection factory for %r", self)        while not self.is_closed and not self._close_called:            logger.debug("Waiting for connection close event for %r", self)            await self._RobustConnection__connection_close_event.wait()            if self.is_closed or self._close_called:                return            try:                self.transport = None                self.connected.clear()                logger.debug("Connection attempt for %r", self)                await Connection.connect(self, self._RobustConnection__connect_timeout)                if not self._RobustConnection__fail_fast_future.done():                    self._RobustConnection__fail_fast_future.set_result(None)                logger.debug("Connection made on %r", self)                self._liveness_probe.start()            except CONNECTION_EXCEPTIONS as e:                if not self._RobustConnection__fail_fast_future.done():                    self._RobustConnection__fail_fast_future.set_exception(e)                    return                logger.warning(                    'Connection attempt to "%s" failed: %s. '                    "Reconnecting after %r seconds.",                    self,                    e,                    self.reconnect_interval,                )                self._liveness_probe.stop()            except Exception:                logger.exception(                    "Reconnect attempt failed %s. " "Retrying after %r seconds.",                    self,                    self.reconnect_interval,                )                self._liveness_probe.stop()            await asyncio.sleep(self.reconnect_interval)

Что стало лучше

Полный контроль над жизненным циклом соединения, тонкая настройка prefetch, возможность реализовать любую логику подтверждения (например, отложенный ack после завершения цепочки действий). Нет лишних метаданных в теле сообщения — брокер передаёт ровно то, что отправил продюсер. Асинхронность нативная, работает на asyncio без костылей.

Проблемы

Каждая «плюшка» делалась вручную и со временем объём инфраструктурного кода разросся. Десятки строк для декларации очередей, логирование reconnect‑цикла, согласование формата trace‑заголовков между сервисами. Healthcheck, хоть и был создан вручную, требовал аккуратности: нужно было отслеживать состояние не только TCP-соединения, но и открытого канала.


Решение №3: FastStream — золотая середина

Основная идея

FastStream — надстройка над aio-pika (а также над NATS, Kafka), которая даёт декларативный стиль описания consumer’ов, lifespan‑хуки, встроенные механизмы: healthcheck-эндпоинт, OpenTelemetry-интеграция, метрики Prometheus.

По сути, это aio-pika, обёрнутая в лучшие практики, которые мы сами реализовывали руками в предыдущем решении. Механизм retry описан как пример использования middleware.

Что пошло так

  • Healthcheck из коробки — достаточно указать FastStream объект в ASGI-приложении (через AsgiFastStream), и на /health возвращается статус брокера.

  • Lifespan — менеджер контекста управляет запуском и корректной остановкой consumer’ов, повторными соединениями. Не нужно писать свои обработчики сигналов. Это касалось не только RMQ коннектов, но и например коннектов к базам данных.

  • Мониторинг и трейсинг — подключение OpenTelemetry сводится к нескольким строчкам: спаны автоматически создаются для каждого обработанного сообщения, propagate context через заголовки.

  • Декларативные middleware — проще внедрять кросс‑касательную логику (логирование, валидацию) без захламления бизнес-кода.

  • Простота конфигурации — брокер, очереди, обменники описываются через Python-декораторы и типы, нет нужды вручную управлять каналами и ack/nack.

  • Интеграция с Pydantic — Сообщение на уровне описание консьюмеров сразу пытается отвалидироваться в Pydantic модель, если это нужно.

  • Dependency Injection — Как можно увидеть в примере ниже FastStream предлагает нам возможность подключение DI как в FastAPI.

Вот во что превратилась кодовая база одной инициализации процесса:

app = AsgiFastStream(    broker,    # health + metrics из коробки    asgi_routes=[        (            "/health",            make_system_ping_asgi(broker, timeout=5.0, include_in_schema=False),        ),        ("/metrics", make_asgi_app(registry)),    ],    # Кастомный lifespan. Обычно здесь идёт инициализации подключений к БД    lifespan=lifespan,)

Было бы ещё не очень справедливо не показать, что из себя представляет broker

broker = RabbitBroker(    settings.RMQ_URL,    # Примеры миддлвары    # RabbitPrometheusMiddleware из коробки для мониторинга сообщений    # EnrichLogMiddleware и FailCatchComplexMiddleware кастомные для отслеживания логирования и ошибок     middlewares=(        RabbitPrometheusMiddleware(registry=registry),        EnrichLogMiddleware,        FailCatchComplexMiddleware(            ignore_routing_keys=[                settings.RMQ_FAIL_TABLE_QUEUE,                settings.RMQ_DASHBOARD_SETTINGS_QUEUE,                settings.RMQ_TIMEOUT_QUEUE,            ],        ),    ),    logger=logger,    # Кастомные парсеры и декодеры для обработки входящих сообщений    parser=json_parser,    decoder=decoder,)broker.include_router(router)

и пример как инициализируется консьюмер

from faststream import Dependsfrom faststream.rabbit import (    ExchangeType,    RabbitExchange,    RabbitMessage,    RabbitQueue,    RabbitRouter,)from faststream.rabbit.annotations import RabbitBroker as ContextRabbitBroker# Вместо RabbitRouter можно использовать broker.router = RabbitRouter()@router.subscriber(    RabbitQueue(        name=...,        durable=True,        routing_key=...,    ),    RabbitExchange(        name=...,        durable=True,    ),)async def on_service_hub_message(    message: RabbitMessage,    # Имеется общий контекст всей системы    broker: ContextRabbitBroker,    # DI    async_session=Depends(get_db_session),) -> None:

Подключение консьюмеров простое, почти схожее с Celery, но чуть шире. Возможности FastStream на этом не заканчиваются. Если копать ещё глубже, то там можно найти документацию AsyncAPI и In-memory тесты.

Что пошло не так

  • Производительность — из-за дополнительных слоёв абстракции (middleware, автоматическая обвязка спанов, встроенные ретраи) пропускная способность ниже, чем у голого aio-pika. Это для нас лишь только в теории, поскольку поток не достигает больших значений 30-40 RPS. Справедливости ради, это не относится к минусам, поскольку бенчмарки я не проводил, и фраза “поверь мне брат” меня убедила.

  • Сложность отслеживания кодовой базы — Тут опять из-за дополнительных слоёв абстракции иногда заходишь внутрь посмотреть что там, и можно с лёгкостью потеряться.

Итоговое сравнение

Критерий

Celery

aio-pika (самописный)

FastStream

Подход

Динамические задачи

Низкоуровневый AMQP

Высокоуровневый consumer

Асинхронность

Ограниченная (gevent)

Полная (asyncio)

Полная (asyncio)

Healthcheck

Требует доп. решения

Ручная реализация

Из коробки

Retry

Встроен в задачу

Реализуется вручную

Реализуется вручную (есть пример)

Трейсинг (OTel)

Через сигналы Celery

Ручное встраивание

Из коробки

Контроль

Низкий

Максимальный

Средний (middleware)

Производительность

Умеренная

Высокая

Больше чем у Celery, ниже чем у aio-pika

Кривая входа

Низкая

Средняя (требует знаний AMQP)

Низкая (знакомо по FastAPI)


Заключение

Пройдя путь от монолитного Celery через хрупкий самописный код до сбалансированного FastStream, мы пришли к инструменту, который закрывает все эксплуатационные потребности: healthcheck, трейсинг, метрики — и при этом не требует поддерживать сотни строк инфраструктурного кода.

Да, возможно я выступаю в качестве адвоката этого инструмента, но что могу поделать, когда он так понравился? это вы ещё не увидели как стараюсь переписать всё на Rust.

Отойдя от жаргона и шуток повторю первую свою мысль: У каждой задачи свой инструмент. Мой — это FastStream.

ссылка на оригинал статьи https://habr.com/ru/articles/1030082/