Наш путь активной работы с очередями RabbitMQ начался с классического Celery. Осознав критичность низкоуровневого контроля системы, принялись работать с aio-pika. Но и этот уровень слишком местами сложный (далее расскажу почему), и нашли отличное решение, на текущий момент, в лице FastStream. Сразу оставлю такую пометку, что каждый инструмент подходит для решения своей задачи. Мы больше хотели сделать акцент на удобство и скорость разработки относительно затрачиваемого времени на миграции решений.
N.B.: Код возможно покажется неоптимальным или старым. Это всё наш дорогой Легаси.
Постановка задачи
Наша система построена на основе микросервисов, работающих с RabbitMQ. Внутри — обычный асинхронный код для похода на внешние API и в БД.
Требования:
-
Надежный консьюминг — это для нас критично, чтобы сообщение шло по всему флоу и нигде не останавливалось без причин. Если ошибка падает, то это должно отражаться в 3 местах: БД, логи и метрики.
-
Ретраи при ошибках обработки.
-
Трейсинг — поддержка OpenTelemetry.
-
Мониторинг сервиса через healthcheck’и.
-
Prometheus метрики.
Решение №1: Celery как консьюмер
Почему Celery
Celery — классический инструмент для фоновых задач, знакомый большинству Python-разработчиков. Из коробки: декларативное описание задач, ретраи с экспоненциальной задержкой, хранение результатов, мониторинг через Flower, интеграции с фреймворками. Логика проста: пишешь @app.task, запускаешь воркер — и сообщения из очереди начинают обрабатываться.
Как мы его использовали
Мы не отправляли задачи из кода в духе my_task.delay(), а настраивали Celery на прослушивание внешней очереди, куда сообщения попадали от других систем. По сути, Celery выступал как consumer: подключался к брокеру, забирал сообщения, десериализовал и передавал в наши обработчики. Настройки вроде max_retries, default_retry_delay, countdown позволяли гибко управлять поведением при сбоях. Важно ещё подсветить, что результат всегда игнорируется с помощью параметра ignore_result=True поскольку все результаты записываются в БД.
Пример инициализации воркера:
def create_app( name, broker, include, backend=None, task_queues=None, liveness_probe=1, update_period=60, watcher_config={},): # Создание само приложение + наложение дополнительных конфигурации app = Celery(name, broker=broker, include=include, backend=backend) app.conf.update( result_expires=120, ) add_without_heartbeat_argument = Option( ("--without-heartbeat",), default=True, ) app.user_options["worker"].add(add_without_heartbeat_argument) if task_queues is not None: app.conf.task_queues = task_queues if liveness_probe: add_update_period_argument = Option( ("--update-period",), default=update_period, ) HEARTBEAT_FILE.touch() app.user_options["worker"].add(add_update_period_argument) # Добавление кастомной livenessProbe для K8s app.steps["worker"].add(LivenessProbe) # инициализация трейсинга with_tracing = watcher_config.get("with_tracing") if with_tracing: tracing_exporters = watcher_config.get("tracing_exporters", ()) signals.worker_process_init.connect( init_celery_tracing(app_name=name, tracing_exporters=tracing_exporters), weak=False, ) return app
С чем столкнулись
Celery проектировался как система передачи сообщений между системами. Из-за чего столкнулись со следующими проблемами:
-
Потребление памяти — из-за оборачивания каждого сообщения в метаданные и хранения внутренних структур Celery расход оперативной памяти быстро рос пропорционально потоку сообщений. При высоком темпе обработки воркер начинал использовать значительно больше ОЗУ, чем требовалось самой бизнес-логике, что вынуждало выделять избыточные ресурсы.
-
Управление соединениями и heartbeat — Celery скрывает многие детали брокера, из‑за чего при сетевых сбоях восстановление происходило с задержками, а тонкая настройка
consumer_timeout,broker_transport_optionsбыла сложной и плохо документированной. -
Избыточность — Как можно заметить мы используем минимальные дополнительные конфигурации Celery.
-
Падение контейнера на битом сообщений — Будет не совсем справедливо относить это полностью к минусам самого Celery, это больше камень в наш огород. Но опыт есть опыт. Если воркер обрабатывал невалидное сообщение в плане структуры, то падал весь контейнер, при этом сообщение консьюмилось. Что приводило к непониманию “куда делось сообщение и что с ним произошло?”.
Стало ясно: Celery — отличный выбор для фоновых задач с результатом, но для непрерывного потокового консьюминга он перегружен и недостаточно прозрачен.
Решение №2: aio-pika — близко к железу
Идея
Перейти на чистый AMQP, отказавшись от посредника в виде Celery. aio-pika — асинхронная библиотека для RabbitMQ, предоставляющая прямой доступ к каналам, обменникам, очередям. Код становится минимальной прослойкой над протоколом: сами управляем подписками, подтверждениями (ack/nack), префетчем, реконнектами.
Что переписали
Написали небольшой собственный “фреймворк” консьюминга: асинхронный раннер, который при запуске создаёт соединение, открывает канал, объявляет очереди с нужными параметрами durable/exclusive, подписывается на них, а в колбэке вызывает наши обработчики.
Поверх этого появились:
-
Ручной retry — Декоратор поверх колбэка, имеющий свой счётчик кол-во повторов при возникновений ошибок.
-
LivenessProb — Кастомный на уровне ядра aio-pika представлен чуть ниже.
-
Трейсинг — вручную оборачиваем вызовы в OpenTelemetry спаны, передаём trace context в заголовках AMQP при ретраях.
Пример LivenessProb:
class CustomRobustConnection(RobustConnection): def __init__( self, url: URL, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Any, ): super().__init__(url=url, loop=loop, **kwargs) # Кастомный класс пробы, схож с тем, что выше для Celery self._liveness_probe = LivenessProbe() async def connect(self, timeout: TimeoutType = None) -> None: # Тут держитесь крепче. Я когда впервые это увидел, без литра кофе не смог понять как оно работает self._RobustConnection__connect_timeout = timeout if self.is_closed: raise RuntimeError(f"{self!r} connection closed") if self.reconnecting: raise RuntimeError( ( "Connect method called but connection " f"{self!r} is reconnecting right now." ), self, ) if not self._RobustConnection__reconnection_task: self._RobustConnection__reconnection_task = self.loop.create_task( self.__connection_factory(), ) await self._RobustConnection__fail_fast_future await self.connected.wait() async def __connection_factory(self) -> None: logger.debug("Starting connection factory for %r", self) while not self.is_closed and not self._close_called: logger.debug("Waiting for connection close event for %r", self) await self._RobustConnection__connection_close_event.wait() if self.is_closed or self._close_called: return try: self.transport = None self.connected.clear() logger.debug("Connection attempt for %r", self) await Connection.connect(self, self._RobustConnection__connect_timeout) if not self._RobustConnection__fail_fast_future.done(): self._RobustConnection__fail_fast_future.set_result(None) logger.debug("Connection made on %r", self) self._liveness_probe.start() except CONNECTION_EXCEPTIONS as e: if not self._RobustConnection__fail_fast_future.done(): self._RobustConnection__fail_fast_future.set_exception(e) return logger.warning( 'Connection attempt to "%s" failed: %s. ' "Reconnecting after %r seconds.", self, e, self.reconnect_interval, ) self._liveness_probe.stop() except Exception: logger.exception( "Reconnect attempt failed %s. " "Retrying after %r seconds.", self, self.reconnect_interval, ) self._liveness_probe.stop() await asyncio.sleep(self.reconnect_interval)
Что стало лучше
Полный контроль над жизненным циклом соединения, тонкая настройка prefetch, возможность реализовать любую логику подтверждения (например, отложенный ack после завершения цепочки действий). Нет лишних метаданных в теле сообщения — брокер передаёт ровно то, что отправил продюсер. Асинхронность нативная, работает на asyncio без костылей.
Проблемы
Каждая «плюшка» делалась вручную и со временем объём инфраструктурного кода разросся. Десятки строк для декларации очередей, логирование reconnect‑цикла, согласование формата trace‑заголовков между сервисами. Healthcheck, хоть и был создан вручную, требовал аккуратности: нужно было отслеживать состояние не только TCP-соединения, но и открытого канала.
Решение №3: FastStream — золотая середина
Основная идея
FastStream — надстройка над aio-pika (а также над NATS, Kafka), которая даёт декларативный стиль описания consumer’ов, lifespan‑хуки, встроенные механизмы: healthcheck-эндпоинт, OpenTelemetry-интеграция, метрики Prometheus.
По сути, это aio-pika, обёрнутая в лучшие практики, которые мы сами реализовывали руками в предыдущем решении. Механизм retry описан как пример использования middleware.
Что пошло так
-
Healthcheck из коробки — достаточно указать
FastStreamобъект в ASGI-приложении (черезAsgiFastStream), и на/healthвозвращается статус брокера. -
Lifespan — менеджер контекста управляет запуском и корректной остановкой consumer’ов, повторными соединениями. Не нужно писать свои обработчики сигналов. Это касалось не только RMQ коннектов, но и например коннектов к базам данных.
-
Мониторинг и трейсинг — подключение OpenTelemetry сводится к нескольким строчкам: спаны автоматически создаются для каждого обработанного сообщения, propagate context через заголовки.
-
Декларативные middleware — проще внедрять кросс‑касательную логику (логирование, валидацию) без захламления бизнес-кода.
-
Простота конфигурации — брокер, очереди, обменники описываются через Python-декораторы и типы, нет нужды вручную управлять каналами и ack/nack.
-
Интеграция с Pydantic — Сообщение на уровне описание консьюмеров сразу пытается отвалидироваться в Pydantic модель, если это нужно.
-
Dependency Injection — Как можно увидеть в примере ниже
FastStreamпредлагает нам возможность подключение DI как вFastAPI.
Вот во что превратилась кодовая база одной инициализации процесса:
app = AsgiFastStream( broker, # health + metrics из коробки asgi_routes=[ ( "/health", make_system_ping_asgi(broker, timeout=5.0, include_in_schema=False), ), ("/metrics", make_asgi_app(registry)), ], # Кастомный lifespan. Обычно здесь идёт инициализации подключений к БД lifespan=lifespan,)
Было бы ещё не очень справедливо не показать, что из себя представляет broker
broker = RabbitBroker( settings.RMQ_URL, # Примеры миддлвары # RabbitPrometheusMiddleware из коробки для мониторинга сообщений # EnrichLogMiddleware и FailCatchComplexMiddleware кастомные для отслеживания логирования и ошибок middlewares=( RabbitPrometheusMiddleware(registry=registry), EnrichLogMiddleware, FailCatchComplexMiddleware( ignore_routing_keys=[ settings.RMQ_FAIL_TABLE_QUEUE, settings.RMQ_DASHBOARD_SETTINGS_QUEUE, settings.RMQ_TIMEOUT_QUEUE, ], ), ), logger=logger, # Кастомные парсеры и декодеры для обработки входящих сообщений parser=json_parser, decoder=decoder,)broker.include_router(router)
и пример как инициализируется консьюмер
from faststream import Dependsfrom faststream.rabbit import ( ExchangeType, RabbitExchange, RabbitMessage, RabbitQueue, RabbitRouter,)from faststream.rabbit.annotations import RabbitBroker as ContextRabbitBroker# Вместо RabbitRouter можно использовать broker.router = RabbitRouter()@router.subscriber( RabbitQueue( name=..., durable=True, routing_key=..., ), RabbitExchange( name=..., durable=True, ),)async def on_service_hub_message( message: RabbitMessage, # Имеется общий контекст всей системы broker: ContextRabbitBroker, # DI async_session=Depends(get_db_session),) -> None:
Подключение консьюмеров простое, почти схожее с Celery, но чуть шире. Возможности FastStream на этом не заканчиваются. Если копать ещё глубже, то там можно найти документацию AsyncAPI и In-memory тесты.
Что пошло не так
-
Производительность — из-за дополнительных слоёв абстракции (middleware, автоматическая обвязка спанов, встроенные ретраи) пропускная способность ниже, чем у голого aio-pika. Это для нас лишь только в теории, поскольку поток не достигает больших значений 30-40 RPS. Справедливости ради, это не относится к минусам, поскольку бенчмарки я не проводил, и фраза “поверь мне брат” меня убедила.
-
Сложность отслеживания кодовой базы — Тут опять из-за дополнительных слоёв абстракции иногда заходишь внутрь посмотреть что там, и можно с лёгкостью потеряться.
Итоговое сравнение
|
Критерий |
Celery |
aio-pika (самописный) |
FastStream |
|---|---|---|---|
|
Подход |
Динамические задачи |
Низкоуровневый AMQP |
Высокоуровневый consumer |
|
Асинхронность |
Ограниченная (gevent) |
Полная (asyncio) |
Полная (asyncio) |
|
Healthcheck |
Требует доп. решения |
Ручная реализация |
Из коробки |
|
Retry |
Встроен в задачу |
Реализуется вручную |
Реализуется вручную (есть пример) |
|
Трейсинг (OTel) |
Через сигналы Celery |
Ручное встраивание |
Из коробки |
|
Контроль |
Низкий |
Максимальный |
Средний (middleware) |
|
Производительность |
Умеренная |
Высокая |
Больше чем у Celery, ниже чем у aio-pika |
|
Кривая входа |
Низкая |
Средняя (требует знаний AMQP) |
Низкая (знакомо по FastAPI) |
Заключение
Пройдя путь от монолитного Celery через хрупкий самописный код до сбалансированного FastStream, мы пришли к инструменту, который закрывает все эксплуатационные потребности: healthcheck, трейсинг, метрики — и при этом не требует поддерживать сотни строк инфраструктурного кода.
Да, возможно я выступаю в качестве адвоката этого инструмента, но что могу поделать, когда он так понравился? это вы ещё не увидели как стараюсь переписать всё на Rust.
Отойдя от жаргона и шуток повторю первую свою мысль: У каждой задачи свой инструмент. Мой — это FastStream.
ссылка на оригинал статьи https://habr.com/ru/articles/1030082/