Практика outbox, гарантии доставки, DLX и схемы, которые реально спасают прод
Ситуация. REST-вызовы между сервисами начали валить друг друга: цепочки запросов, таймауты, каскадные фейлы. Мы перевели коммуникацию на RabbitMQ – и добились не «безошибочности», а предсказуемой деградации: данные не теряются, критичное – едет первым, яд быстрее восстанавливается
Ниже – только прикладное: проверенные паттерны, готовые фрагменты кода (Python / aio-pika), и схемы, которые можно сразу встраивать
База: «деградировать с честью» – это как?
Принцип простой: если брокер, потребители или сеть «сыпятся», система не ломает бизнес-операции и не теряет событий, а:
-
Временно буферизует
-
Расставляет приоритеты
-
Отбивает яд от «ядовитых» сообщений
-
Восстанавливается до согласованного состояния
В RabbitMQ это собирается из четырёх кирпичей:
-
Transactional Outbox (события в своей БД до брокера)
-
Безопасная публикация (publisher confirms + mandatory / AE)
-
Безопасное потребление (ручные ack / nack, DLX, идемпотентность)
-
Контроль нагрузки (prefetch, TTL, max-length, приоритеты / разделение очередей)
Шаг 1. Transactional Outbox – никаких потерь при падении брокера
Суть: вместе с записью бизнес-события пишем строку в outbox в той же транзакции. Фоновый воркер публикует в RabbitMQ с подтверждениями; при сбоях – ретраит. Событие не «исчезнет» между БД и брокером
DDL (PostgreSQL):
create table outbox ( id uuid primary key, aggregate text not null, event_type text not null, payload jsonb not null, created_at timestamptz not null default now(), status text not null default 'NEW', -- NEW|SENT|FAILED attempts int not null default 0, next_retry_at timestamptz ); create index on outbox (status, next_retry_at);
Воркер публикации (Python, aio-pika, подтверждения + mandatory):
import asyncio, json, uuid, asyncpg from aio_pika import connect_robust, Message, DeliveryMode, RobustConnection, RobustChannel, ExchangeType from datetime import datetime, timedelta AMQP_URL = "amqp://user:pass@rabbit:5672/%2F" EXCHANGE = "events.topic" async def publish_outbox_batch(pool): conn: RobustConnection = await connect_robust(AMQP_URL) ch: RobustChannel = await conn.channel(publisher_confirms=True) exchange = await ch.declare_exchange(EXCHANGE, ExchangeType.TOPIC, durable=True, arguments={"alternate-exchange": "events.ae"}) # AE на случай unroutable while True: async with pool.acquire() as db: rows = await db.fetch(""" update outbox set status='IN_PROGRESS' where id in ( select id from outbox where status='NEW' or (status='FAILED' and (next_retry_at is null or next_retry_at <= now())) order by created_at limit 200 ) returning id, event_type, payload """) if not rows: await asyncio.sleep(0.3) continue to_ack, to_fail = [], [] for r in rows: body = json.dumps(r["payload"]).encode() msg = Message( body, delivery_mode=DeliveryMode.PERSISTENT, message_id=str(uuid.uuid4()), content_type="application/json", headers={"event_type": r["event_type"]}, ) routing_key = f"{r['event_type']}" try: # mandatory=True - UnroutableError, если некуда доставить await exchange.publish(msg, routing_key=routing_key, mandatory=True) to_ack.append(r["id"]) except Exception: to_fail.append(r["id"]) async with pool.acquire() as db: if to_ack: await db.execute("update outbox set status='SENT' where id = any($1)", to_ack) if to_fail: await db.execute(""" update outbox set status='FAILED', attempts = attempts + 1, next_retry_at = now() + ((least(attempts, 10)) * interval '10 seconds') where id = any($1) """, to_fail) async def main(): pool = await asyncpg.create_pool(dsn="postgres://user:pass@db/app") await publish_outbox_batch(pool) asyncio.run(main())
Что важно:
-
publisher_confirms=True– ждём подтверждение брокера -
mandatory=True+ alternate-exchange (AE) – гарантируем, что ни одно сообщение не уйдёт в «вакуум» -
Экспоненциальный backoff по
attempts– не душим брокер при восстановлении
Шаг 2. Публикация с гарантиями: confirms, mandatory, AE
-
Publisher Confirms: брокер подтверждает приём (пер-сообщение или батчем)
-
mandatory: если ни одна привязка не подходит, брокер вернёт сообщение (или отправит в AE)
-
Alternate Exchange (AE): запасной обменник для unroutable – складываем туда и алертим
Топология (объявление обменников и AE):
# основной обменник событий await ch.declare_exchange("events.topic", ExchangeType.TOPIC, durable=True, arguments={"alternate-exchange": "events.ae"}) # AE для не маршрутизованных await ch.declare_exchange("events.ae", ExchangeType.FANOUT, durable=True) # очередь-парковка для unroutable q_ae = await ch.declare_queue("events.unroutable", durable=True) await q_ae.bind("events.ae", routing_key="")
Шаг 3. Потребление без сюрпризов: ack / nack, DLX, идемпотентность
Цели:
-
Не теряем сообщения
-
Не зацикливаем «ядовитые»
-
Обрабатываем идемпотентно (дубликаты возможны)
Очередь с DLX / TTL / ограничением:
# Dead-letter exchange await ch.declare_exchange("events.dlx", ExchangeType.TOPIC, durable=True) args = { "x-dead-letter-exchange": "events.dlx", "x-dead-letter-routing-key": "dead.order", # или по шаблону "x-message-ttl": 10_000, # TTL сообщений в очереди (пример) "x-max-length": 50_000, # защитимся от бесконечного роста "x-queue-mode": "lazy", # крупные очереди - на диск "x-max-priority": 10, # если используете приоритеты } q = await ch.declare_queue("order.events", durable=True, arguments=args) await q.bind("events.topic", routing_key="order.*")
Идемпотентный consumer (с учётом дубликатов и «ядовитых»): как не потерять сообщение при сбое
import hashlib, aioredis redis = await aioredis.from_url("redis://redis:6379/0") def seen_key(message_id: str) -> str: return f"seen:{message_id}" async def handle(message): # бизнес-логика ... async def consume(message): async with message.process(ignore_processed=True): # manual ack / nack внутри контекста mid = message.message_id or hashlib.sha1(message.body).hexdigest() # идемпотентность: уже обрабатывали? if await redis.setnx(seen_key(mid), "1"): await redis.expire(seen_key(mid), 24*3600) # хранить сутки / неделю - по объёму try: await handle(message) # ack произойдёт при выходе из контекста except TransientError: # временная ошибка - вернём в очередь (с учётом TTL / DLX / повторных попыток) await message.nack(requeue=True) except Exception: # яд - в DLX, чтобы не крутить бесконечно await message.reject(requeue=False) else: # дубликат - подтверждаем тихо return await q.consume(consume, no_ack=False) await ch.set_qos(prefetch_count=64) # контроль нагрузки
Зачем DLX: «ядовитые» сообщения автоматически попадают в «кладбище» (parking lot). Там их можно руками разбирать или переигрывать через отдельный «ре-процессор»
Шаг 4. Приоритеты и разделение путей: критичное едет первым
Частая ошибка – «одна очередь на всё». В результате уведомления забивают полосу, а деньги / заказы ждут
Правильно:
-
Разделяем критичное и фоновое по разным очередям / ключам
-
Для фонового – TTL + max-length (готовы терять «хвост» при аварии)
-
Для критичного – отдельные воркеры и меньший prefetch (быстрее реакция)
Пример:
# критичная очередь (оплата) await ch.declare_queue("billing.high", durable=True, arguments={"x-dead-letter-exchange": "events.dlx"}) await ch.queue_bind("billing.high", "events.topic", routing_key="billing.*") # фоновая (email): TTL + max-length, пусть деградирует первой await ch.declare_queue("notify.low", durable=True, arguments={ "x-dead-letter-exchange": "events.dlx", "x-message-ttl": 60_000, "x-max-length": 20_000 }) await ch.queue_bind("notify.low", "events.topic", routing_key="notify.*")
Набор «граблей» и как мы их обходили
-
Большие очереди = антипаттерн. Делайте короткими:
x-message-ttl,x-max-length, «ленивые» (x-queue-mode=lazy) только там, где осознанно готовы к диску -
Дубликаты неизбежны. Проектируйте идемпотентно (натуральные операции / UPSERT, ключи, registry processed ids)
-
Один пользователь RabbitMQ для всех сервисов – плохо: лишает видимости и контроля. Делите доступы
-
guest / guest в проде – никогда
-
prefetch по умолчанию душит латентность: подберите
set_qosэкспериментально (обычно 16–256) -
Нет AE / mandatory – «чёрная дыра»
-
Нет DLX – «ядовитые» крутятся бесконечно
Чек-лист внедрения
-
Включить Transactional Outbox в сервисах-продюсерах
-
Публиковать с publisher confirms +
mandatory=true, настроить AE -
Для каждой критичной линии – своя очередь, для фоновой – TTL / ограничения
-
Подписчики: ручные ack / nack, DLX, идемпотентность
-
Настроить
prefetch, health-пробы, авто-reconnect -
Метрики: ready / unacked, publish / ack rate, глубина DLQ, x-death
-
Runbook: как чистить DLQ, как «ре-проигрывать» из парковки
Итог
RabbitMQ сам по себе не «лечит» систему. Но с правильной топологией (Outbox – Confirms + AE – Idempotent Consumers + DLX – Контроль нагрузки) вы получаете контролируемую деградацию: критичные события приходят, фоновые отваливаются первыми, «ядовитые» безопасно паркуются, а данные – не теряются. Это и есть «честная» отказоустойчивость в реальной жизни
ссылка на оригинал статьи https://habr.com/ru/articles/943022/
Добавить комментарий