RabbitMQ как инструмент «деградации с честью»

от автора

Практика outbox, гарантии доставки, DLX и схемы, которые реально спасают прод

Ситуация. REST-вызовы между сервисами начали валить друг друга: цепочки запросов, таймауты, каскадные фейлы. Мы перевели коммуникацию на RabbitMQ – и добились не «безошибочности», а предсказуемой деградации: данные не теряются, критичное – едет первым, яд быстрее восстанавливается

Ниже – только прикладное: проверенные паттерны, готовые фрагменты кода (Python / aio-pika), и схемы, которые можно сразу встраивать

База: «деградировать с честью» – это как?

Принцип простой: если брокер, потребители или сеть «сыпятся», система не ломает бизнес-операции и не теряет событий, а:

  • Временно буферизует

  • Расставляет приоритеты

  • Отбивает яд от «ядовитых» сообщений

  • Восстанавливается до согласованного состояния

В RabbitMQ это собирается из четырёх кирпичей:

  1. Transactional Outbox (события в своей БД до брокера)

  2. Безопасная публикация (publisher confirms + mandatory / AE)

  3. Безопасное потребление (ручные ack / nack, DLX, идемпотентность)

  4. Контроль нагрузки (prefetch, TTL, max-length, приоритеты / разделение очередей)

Шаг 1. Transactional Outbox – никаких потерь при падении брокера

Суть: вместе с записью бизнес-события пишем строку в outbox в той же транзакции. Фоновый воркер публикует в RabbitMQ с подтверждениями; при сбоях – ретраит. Событие не «исчезнет» между БД и брокером

Архитектура с Outbox, AE и DLX

Архитектура с Outbox, AE и DLX

DDL (PostgreSQL):

create table outbox (   id            uuid primary key,   aggregate     text not null,   event_type    text not null,   payload       jsonb not null,   created_at    timestamptz not null default now(),   status        text not null default 'NEW',      -- NEW|SENT|FAILED   attempts      int  not null default 0,   next_retry_at timestamptz );  create index on outbox (status, next_retry_at);

Воркер публикации (Python, aio-pika, подтверждения + mandatory):

import asyncio, json, uuid, asyncpg from aio_pika import connect_robust, Message, DeliveryMode, RobustConnection, RobustChannel, ExchangeType from datetime import datetime, timedelta  AMQP_URL = "amqp://user:pass@rabbit:5672/%2F" EXCHANGE = "events.topic"  async def publish_outbox_batch(pool):     conn: RobustConnection = await connect_robust(AMQP_URL)     ch: RobustChannel = await conn.channel(publisher_confirms=True)     exchange = await ch.declare_exchange(EXCHANGE, ExchangeType.TOPIC, durable=True,                                          arguments={"alternate-exchange": "events.ae"})  # AE на случай unroutable     while True:         async with pool.acquire() as db:             rows = await db.fetch("""               update outbox                  set status='IN_PROGRESS'                where id in (                  select id from outbox                   where status='NEW'                      or (status='FAILED' and (next_retry_at is null or next_retry_at <= now()))                   order by created_at                   limit 200                )              returning id, event_type, payload             """)         if not rows:             await asyncio.sleep(0.3)             continue          to_ack, to_fail = [], []         for r in rows:             body = json.dumps(r["payload"]).encode()             msg = Message(                 body,                 delivery_mode=DeliveryMode.PERSISTENT,                 message_id=str(uuid.uuid4()),                 content_type="application/json",                 headers={"event_type": r["event_type"]},             )             routing_key = f"{r['event_type']}"             try:                 # mandatory=True - UnroutableError, если некуда доставить                 await exchange.publish(msg, routing_key=routing_key, mandatory=True)                 to_ack.append(r["id"])             except Exception:                 to_fail.append(r["id"])          async with pool.acquire() as db:             if to_ack:                 await db.execute("update outbox set status='SENT' where id = any($1)", to_ack)             if to_fail:                 await db.execute("""                   update outbox                      set status='FAILED',                          attempts = attempts + 1,                          next_retry_at = now() + ((least(attempts, 10)) * interval '10 seconds')                    where id = any($1)                 """, to_fail)  async def main():     pool = await asyncpg.create_pool(dsn="postgres://user:pass@db/app")     await publish_outbox_batch(pool)  asyncio.run(main())

Что важно:

  • publisher_confirms=True – ждём подтверждение брокера

  • mandatory=True + alternate-exchange (AE) – гарантируем, что ни одно сообщение не уйдёт в «вакуум»

  • Экспоненциальный backoff по attemptsне душим брокер при восстановлении

Шаг 2. Публикация с гарантиями: confirms, mandatory, AE

  • Publisher Confirms: брокер подтверждает приём (пер-сообщение или батчем)

  • mandatory: если ни одна привязка не подходит, брокер вернёт сообщение (или отправит в AE)

  • Alternate Exchange (AE): запасной обменник для unroutable – складываем туда и алертим

Последовательность публикации

Последовательность публикации

Топология (объявление обменников и AE):

# основной обменник событий await ch.declare_exchange("events.topic", ExchangeType.TOPIC, durable=True,                           arguments={"alternate-exchange": "events.ae"}) # AE для не маршрутизованных await ch.declare_exchange("events.ae", ExchangeType.FANOUT, durable=True) # очередь-парковка для unroutable q_ae = await ch.declare_queue("events.unroutable", durable=True) await q_ae.bind("events.ae", routing_key="")

Шаг 3. Потребление без сюрпризов: ack / nack, DLX, идемпотентность

Цели:

  • Не теряем сообщения

  • Не зацикливаем «ядовитые»

  • Обрабатываем идемпотентно (дубликаты возможны)

Очередь с DLX / TTL / ограничением:

# Dead-letter exchange await ch.declare_exchange("events.dlx", ExchangeType.TOPIC, durable=True)  args = {     "x-dead-letter-exchange": "events.dlx",     "x-dead-letter-routing-key": "dead.order",   # или по шаблону     "x-message-ttl": 10_000,                     # TTL сообщений в очереди (пример)     "x-max-length": 50_000,                      # защитимся от бесконечного роста     "x-queue-mode": "lazy",                      # крупные очереди - на диск     "x-max-priority": 10,                        # если используете приоритеты }  q = await ch.declare_queue("order.events", durable=True, arguments=args) await q.bind("events.topic", routing_key="order.*")

Идемпотентный consumer (с учётом дубликатов и «ядовитых»): как не потерять сообщение при сбое

import hashlib, aioredis  redis = await aioredis.from_url("redis://redis:6379/0")  def seen_key(message_id: str) -> str:     return f"seen:{message_id}"  async def handle(message):     # бизнес-логика     ...  async def consume(message):     async with message.process(ignore_processed=True):  # manual ack / nack внутри контекста         mid = message.message_id or hashlib.sha1(message.body).hexdigest()          # идемпотентность: уже обрабатывали?         if await redis.setnx(seen_key(mid), "1"):             await redis.expire(seen_key(mid), 24*3600)  # хранить сутки / неделю - по объёму             try:                 await handle(message)                 # ack произойдёт при выходе из контекста             except TransientError:                 # временная ошибка - вернём в очередь (с учётом TTL / DLX / повторных попыток)                 await message.nack(requeue=True)             except Exception:                 # яд - в DLX, чтобы не крутить бесконечно                 await message.reject(requeue=False)         else:             # дубликат - подтверждаем тихо             return  await q.consume(consume, no_ack=False) await ch.set_qos(prefetch_count=64)  # контроль нагрузки

Зачем DLX: «ядовитые» сообщения автоматически попадают в «кладбище» (parking lot). Там их можно руками разбирать или переигрывать через отдельный «ре-процессор»

Потребление с идемпотентностью и DLX

Потребление с идемпотентностью и DLX

Шаг 4. Приоритеты и разделение путей: критичное едет первым

Частая ошибка – «одна очередь на всё». В результате уведомления забивают полосу, а деньги / заказы ждут

Правильно:

  • Разделяем критичное и фоновое по разным очередям / ключам

  • Для фонового – TTL + max-length (готовы терять «хвост» при аварии)

  • Для критичного – отдельные воркеры и меньший prefetch (быстрее реакция)

Пример:

# критичная очередь (оплата) await ch.declare_queue("billing.high", durable=True,                        arguments={"x-dead-letter-exchange": "events.dlx"}) await ch.queue_bind("billing.high", "events.topic", routing_key="billing.*")  # фоновая (email): TTL + max-length, пусть деградирует первой await ch.declare_queue("notify.low", durable=True,   arguments={     "x-dead-letter-exchange": "events.dlx",     "x-message-ttl": 60_000,     "x-max-length": 20_000 }) await ch.queue_bind("notify.low", "events.topic", routing_key="notify.*")

Набор «граблей» и как мы их обходили

  • Большие очереди = антипаттерн. Делайте короткими: x-message-ttl, x-max-length, «ленивые» (x-queue-mode=lazy) только там, где осознанно готовы к диску

  • Дубликаты неизбежны. Проектируйте идемпотентно (натуральные операции / UPSERT, ключи, registry processed ids)

  • Один пользователь RabbitMQ для всех сервисов – плохо: лишает видимости и контроля. Делите доступы

  • guest / guest в проде – никогда

  • prefetch по умолчанию душит латентность: подберите set_qos экспериментально (обычно 16–256)

  • Нет AE / mandatory – «чёрная дыра»

  • Нет DLX – «ядовитые» крутятся бесконечно

Чек-лист внедрения

  1. Включить Transactional Outbox в сервисах-продюсерах

  2. Публиковать с publisher confirms + mandatory=true, настроить AE

  3. Для каждой критичной линии – своя очередь, для фоновой – TTL / ограничения

  4. Подписчики: ручные ack / nack, DLX, идемпотентность

  5. Настроить prefetch, health-пробы, авто-reconnect

  6. Метрики: ready / unacked, publish / ack rate, глубина DLQ, x-death

  7. Runbook: как чистить DLQ, как «ре-проигрывать» из парковки

Итог

RabbitMQ сам по себе не «лечит» систему. Но с правильной топологией (Outbox – Confirms + AE – Idempotent Consumers + DLX – Контроль нагрузки) вы получаете контролируемую деградацию: критичные события приходят, фоновые отваливаются первыми, «ядовитые» безопасно паркуются, а данные – не теряются. Это и есть «честная» отказоустойчивость в реальной жизни


ссылка на оригинал статьи https://habr.com/ru/articles/943022/