Dead Letter Queue в Kafka на практике

от автора

DLQ — это просто топик. Сложное — всё, что вокруг него.

Эта статья — про практическую архитектуру обработки событий из Kafka с отправкой данных во внешний REST API.

Главная проблема такого сценария — нестабильность внешнего API. Он периодически деградирует по latency или начинает отвечать с ошибками, и это напрямую влияет на пропускную способность всего консьюмера.

В этом материале разобрано:

  • почему подход с ручным commit’ом быстро упирается в RTT (Round Time Trip) внешнего API;

  • как перейти к асинхронной модели обработки на asyncio и увеличить throughput одного consumer’а;

  • какую роль в этой схеме играет DLQ-топик для обеспечения надежности доставки;

  • и как FSM (Finite State Machine) помогает управлять состояниями сообщений в процессе обработки.

Предполагается, что читатель знаком с базовыми концепциями Kafka (consumer groups, partitions, offsets) и имеет общее представление об asyncio в Python 3.12+. Остальное разберем по ходу.

Исходные данные

Рассмотрим реальный кейс. Сервис обрабатывает события профилей пользователей: регистрация, обновление, удаление, смена статуса, геолокации и т.д. Все события поступают в Kafka-топик, разбитый на N партиций. В системе — более 15 миллионов профилей.

Процесс обработки выглядит так:

  • Консьюмер читает событие из Kafka

  • Обогащает его данными из базы

  • Отправляет результат во внешний REST API

Ограничения системы:

  • нагрузка: 300–400 RPS

  • внешний API не поддерживает батчи

  • требуется обогащение данных из БД на лету

Способ 1: ручной commit

Первый очевидный вариант — подтверждать обработку сообщения только после успешного ответа внешнего API (200 OK). Алгоритм выглядит просто:

  1. Получаем сообщение из Kafka

  2. Обрабатываем и отправляем в внешний API

  3. Только после успешного ответа делаем commit offset’а

На уровне гарантии доставки это выглядит надежно, но у подхода есть критическое ограничение — он полностью синхронный. При RTT даже в ~100 мс один поток физически не может обрабатывать больше ~10 сообщений в секунду. И это в идеальном мире без retry, сетевых задержек и деградаций API.

Итого получаем: проходная способность жестко ограничена RTT внешнего сервиса, масштабирование через consumer group приводит к росту стоимости, архитектура становится плохо управляемой при росте нагрузки

Чтобы уйти от блокирующей модели, commit отделяется от обработки сообщений. Kafka consumer переключается на auto commit, а обработка выносится в асинхронные задачи asyncio, ограниченные семафором.

Способ 2: auto commit + asyncio

Теперь pipeline выглядит иначе:

  1. Консьюмер быстро читает сообщения

  2. Кладет обработку в очередь asyncio

  3. Ограничивает параллелизм через Semaphore

  4. Commit происходит автоматически через poll()

Теперь обработка стала конкурентной, consumer перестал ждать ответа от внешнего API и пропускная способность больше не зависит напрямую от RTT. Но появляется новая проблема: контроль жизненного цикла задач и надежность доставки теперь полностью смещаются на уровень приложения.

Реализация в коде

Ниже — упрощенный вариант реализации через aiokafka с ключевыми идеями:

  • семафор ограничивает количество одновременно выполняемых задач

  • каждая задача трекается явно

  • ошибки логируются сразу, не блокируя поток обработки

from aiokafka.structs import ConsumerRecord# Код из этого модуля будет представлен чуть нижеfrom src.utils import asyncio_tasks async def consume_message(message: ConsumerRecord) -> None:# Абстрактный процессоц для обработки сообщений из кафки     processor = ProcessorFactory.get_processor(topic=message.topic)    if not processor:        return# Здесть после декодирования можно добавить и валидацию через pydantic или msgspec     try:        event = processor.decode(message.value)    except DecodeError:        logger.error("decode failed", topic=message.topic, offset=message.offset)        metrics.increment("decode_failed", topic=message.topic)        return    await asyncio_tasks.workers_semaphore.acquire()    try:        asyncio_tasks.create_tracked_task(            coroutine=asyncio_tasks.process_and_release_semaphore(                coro=processor.enrich_and_send(event),                semaphore=asyncio_tasks.workers_semaphore,            ),            active_tasks=asyncio_tasks.workers_active_tasks,            task_name=f"kafka-{message.topic}-{message.partition}-{message.offset}",        )    except Exception:        asyncio_tasks.workers_semaphore.release()        raise

Важно не только создать задачу, но и не потерять её в рантайме. По умолчанию asyncio хранит на задачи слабые ссылки. Это означает, что без явного хранения ссылки задача может быть собрана GC (Garbage Collector) во время выполнения HTTP-запроса или I/O операции.

Поэтому добавляется явный registry активных задач.

import asynciofrom functools import partialworkers_active_tasks: set[asyncio.Task] = set()workers_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)async def process_and_release_semaphore(    coro: Coroutine[Any, Any, Any],    semaphore: asyncio.Semaphore,) -> None:    try:        await coro    finally:        semaphore.release()def create_tracked_task(    coroutine: Coroutine[Any, Any, Any],    active_tasks: set[asyncio.Task[Any]],    task_name: str | None = None,) -> asyncio.Task[Any]:    task = asyncio.create_task(coroutine, name=task_name)    active_tasks.add(task)    task.add_done_callback(partial(_cleanup_task, active_tasks=active_tasks))    return taskdef _cleanup_task(finished_task: asyncio.Task, active_tasks: set) -> None:    active_tasks.discard(finished_task)        if finished_task.cancelled():        logger.warning("Tracked task cancelled", task=finished_task.get_name())        return            exception = finished_task.exception()    if exception is not None:        logger.error(            "Tracked task failed",            task=finished_task.get_name(),            error=str(exception),            error_type=type(exception).__name__,        )

На этом этапе система уже умеет:

  • обрабатывать сообщения конкурентно

  • ограничивать нагрузку через семафор

  • не блокировать consumer на уровне RTT внешнего API

Однако нестабильным местом в цепочке обработки является вызов processor.enrich_and_send(event) — отправка события во внешний API. На этот участок наш сервис повлиять не может: HTTP-запрос может завершиться ошибкой 5xx, сетевым таймаутом, разрывом соединения и другими сбоями. При рабочей нагрузке каждый под может одновременно держать в памяти сотни asyncio-тасок, часть из которых неизбежно столкнётся с такими ошибками. Конечно, базовые механизмы повторной отправки, например tenacity, никто не отменял.

Проблема в том, что даже при высокой конкурентности и ограничении параллелизма система остаётся “слепой” к состоянию внешнего API и качеству отдельных сообщений. Даже после всех ретраев остаётся вероятность, что событие так и не будет доставлено во внешний API, тогда как оффсеты Kafka уже закомичены. Если при этом сервис развёрнут в нескольких репликах в рамках одной consumer group, все поды продолжат безуспешно молотить в недоступный API, расходуя CPU на бесполезные ретраи. 

Отсюда возникают новые вопросы:

  • что делать с “плохими” сообщениями, которые стабильно падают при обработке?

  • как обрабатывать постоянные ошибки внешнего API, когда ретраи не помогают?

  • где проходит граница между повторными попытками и фактически “потерянным” событием?

  • как не допустить ситуации, когда один проблемный event начинает отравлять весь поток обработки?

Именно здесь появляется DLQ и конечные состояния обработки, о которых дальше и пойдет речь.

Применение DLQ с Kafka

Для решения этой задачи можно использовать DLQ (Dead Letter Queue), но не как «кладбище ошибочных сообщений», а как временное хранилище событий, которые не удалось отправить из-за недоступности внешнего API.
Одного DLQ-топика недостаточно — необходим механизм, который синхронизирует поведение всех подов и определяет, когда нужно прекратить чтение основного топика и переключиться на повторную обработку накопленных событий.

В качестве такого механизма используется ключ с TTL в Redis. Этот ключ выступает единым источником правды для всех экземпляров сервиса и сигнализирует, что внешний API сейчас недоступен.

Если после всех ретраев отправить событие не удалось, сервис устанавливает этот ключ и помещает событие в DLQ-топик. Пока ключ существует, чтение новых сообщений из основного Kafka-топика приостанавливается. Это позволяет не создавать дополнительную нагрузку на недоступный API и не тратить ресурсы на заведомо бесполезные попытки отправки.

После истечения TTL сервисы сначала переключаются на обработку DLQ-топика. Основной топик в этот момент остаётся на паузе. Асинхронные HTTP-вызовы дополнительно ограничиваются семафором, чтобы восстановившийся API не получил весь накопленный бэклог одним залпом. Если во время обработки DLQ-топика внешний API снова становится недоступным, ключ в Redis создаётся повторно, сообщения продолжают складываться в DLQ-топик, и цикл повторяется.

При этом сообщения с ошибками декодирования или валидации отправлять в DLQ не имеет смысла. Такие ошибки обычно говорят о нарушении контракта между продюсером и консьюмером Kafka и требуют отдельного процесса обработки.

В итоге поведение каждого консьюмера можно представить в виде конечного автомата Finite State Machine (FSM) с тремя состояниями:

  1. Чтение основного топика (штатный режим).

  2. Ожидание истечения TTL-ключа (пауза обработки).

  3. Повторная отправка событий из DLQ-топика (основной топик на паузе).

FSM для Kafka-консьюмера

FSM для Kafka-консьюмера

Важно отметить, что всеми состояниями управляет один и тот же AIOKafkaConsumer, работающий в рамках одной consumer group. Отдельного воркера для обработки DLQ нет — сервис просто переключает режим работы, ставя на паузу или возобновляя чтение нужных партиций.

Это позволяет избежать лишней инфраструктуры и сохранить единый механизм коммита оффсетов. При этом Kafka приостанавливает обработку именно на уровне партиций (pause/resume), а не целых топиков, что хорошо ложится на нашу FSM.

В упрощённом виде логика переключения выглядит так:

from aiokafka.consumer import AIOKafkaConsumerasync def manage_topics(consumer: AIOKafkaConsumer) -> None:    while True:        api_unavailable, dlq_has_unread = await asyncio.gather(            is_loymax_api_unavailable(),            has_unprocessed_messages(consumer, dlq_topic),        )        match (api_unavailable, dlq_has_unread):            case (True, _):                # PAUSED: приостанавливаем обработку всех сообщений                consumer.pause(*all_partitions(consumer))            case (False, True):                # DRAIN DLQ: читаем DLQ, основной топик стоит на паузе                consumer.pause(*main_partitions(consumer))                consumer.resume(*dlq_partitions(consumer))            case (False, False):                # NORMAL: работаем с основным топиком                consumer.resume(*main_partitions(consumer))                consumer.pause(*dlq_partitions(consumer))        await asyncio.sleep(POLLING_INTERVAL)

Однако у такого подхода есть один очевидный недостаток. Если внешний API будет оставаться недоступным длительное время, сообщения начнут бесконечно циркулировать по одному и тому же маршруту:

основной топик → DLQ → попытка отправки → DLQ → ...

Другими словами, получится замкнутая петля, в которой сервис будет расходовать ресурсы, но не приближаться к успешной обработке событий. Поэтому следующим шагом потребовался механизм, который позволяет разорвать этот цикл и ограничить количество повторных попыток доставки.

Перед тем как обсуждать механизмы ограничения ретраев, стоит ввести ещё одну сущность — конверт сообщения (envelope). Само событие из основного Kafka-топика стараемся не изменять. Вместо этого при отправке в DLQ оно оборачивается в дополнительную структуру, которая содержит служебные метаданные, необходимые исключительно для повторной обработки.

В зависимости от сценария в конверте могут храниться:

  • dlq_retries — количество попаданий сообщения в DLQ;

  • first_dlq_timestamp — время первого помещения в DLQ;

  • reason — причина последнего сбоя;

  • дополнительные технические поля для трассировки и диагностики.

Такой подход позволяет отделить бизнес-данные от инфраструктурной логики. Исходное сообщение остаётся неизменным, а вся информация, необходимая для управления жизненным циклом в DLQ, хранится рядом с ним. В дальнейшем именно envelope, а не исходное событие, будет перемещаться между сервисом и DLQ-топиком.

Счётчик попыток

Это самый простой и универсальный механизм. Он подходит для событий, которые не теряют актуальности со временем: рано или поздно мы либо доставим их, либо признаем недоставляемыми.

Для этого исходное сообщение оборачивается в envelope, содержащий поле dlq_retries. При каждом повторном помещении в DLQ значение увеличивается. Если число попыток превышает заданный лимит, сообщение считается необрабатываемым: фиксируется в логах и метриках, а затем либо удаляется, либо сохраняется в долгосрочном хранилище для последующего анализа.

async def produce_to_dlq(envelope: DeadLetterEnvelope, ...) -> None:    if envelope.dlq_retries > MAX_RETRIES:        logger.warning("dropping after max retries", reason=envelope.reason)        metrics.increment("dlq_dropped", reason=envelope.reason)                # Один из возможных вариантов        await add_to_database(envelope)        return    await producer.send_and_wait(dlq_topic, envelope)

Важно, что счётчик увеличивается на стороне продюсера при записи сообщения в DLQ, а не на стороне консьюмера при его чтении. Благодаря этому значение монотонно растёт независимо от ребалансировки consumer group или перезапуска подов.

Отдельного внимания заслуживают детерминированные ошибки. Если сообщение имеет некорректный формат или не проходит валидацию схемы, повторные попытки ничего не изменят — оно будет снова и снова попадать в DLQ, пока не исчерпает лимит. Поэтому декодирование и базовую валидацию лучше выполнять ещё в poll-цикле, до создания асинхронной задачи. Если сообщение заведомо не может быть обработано, не стоит расходовать на него бюджет ретраев.

Значение MAX_RETRIES обычно подбирается эмпирически. Слишком маленький лимит увеличивает вероятность потери доставляемых сообщений во время кратковременных сбоев, слишком большой — создаёт избыточную нагрузку на внешний API после его восстановления.

Дедлайн с проверкой актуальности

Для некоторых событий одного счётчика недостаточно. Сообщение может устареть, пока находится в DLQ, и его доставка после восстановления системы приведёт к нарушению бизнес-логики. Например, если пользователю уже доставили заказ, а затем приходит уведомление «курьер найден» или «заказ собирается», такая коммуникация будет выглядеть ошибочной.

В этом случае в конверт добавляется поле first_dlq_timestamp — время первого попадания сообщения в DLQ. При последующих повторах это значение не изменяется. Дедлайн вычисляется следующим образом:
deadline = first_dlq_timestamp + DEADLINE_INTERVAL

Перед каждой повторной отправкой необходимо проверить, остаётся ли событие актуальным, обратившись к источнику истины (например, БД заказов или отдельному сервису состояний). Возможны четыре результата проверки:

  • MATCH — событие всё ещё актуально, выполняем повторную отправку.

  • MISMATCH — состояние объекта изменилось, сообщение устарело и должно быть отброшено.

  • EXPIRED — дедлайн истёк, сообщение удаляется независимо от текущего состояния.

  • QUERY_FAILED — сервис проверки недоступен, сообщение возвращается в DLQ.

async def handle_dlq_envelope(envelope: OrderDLQEnvelope) -> None:    if expired(envelope):        drop(envelope, reason="deadline_exceeded")        return    result = await validate_against_source_of_truth(envelope)        match result:        case ValidationResult.MATCH:            await retry_send(envelope)                    case ValidationResult.MISMATCH:            drop(envelope, reason="event_obsolete")                    case ValidationResult.QUERY_FAILED:            await reproduce_to_dlq(envelope)

Интересный случай возникает, если валидатор стабильно отвечает QUERY_FAILED. Тогда сообщение снова и снова возвращается в DLQ. Именно здесь дедлайн играет вторую важную роль: после истечения DEADLINE_INTERVALсообщение будет отброшено независимо от состояния валидатора. 

Таким образом, дедлайн защищает систему не только от длительной недоступности целевого API, но и от сбоев самого механизма проверки актуальности.

Как выбрать подход

Выбор зависит от природы события.

Если событие не теряет актуальности со временем — например, webhook или аналитический лог, — достаточно счётчика попыток: либо доставим сообщение, либо откажемся от дальнейших ретраев.

Если же событие имеет ограниченное временное окно и его поздняя доставка может нарушить бизнес-логику, необходимы дедлайн и дополнительная проверка актуальности перед каждой повторной отправкой.

Итоги

Высокая пропускная способность в этой архитектуре практически неизбежно требует auto-commit и асинхронных тасок внутри консьюмера — иначе RTT внешнего API становится жёстким потолком для одного пода.

Но auto-commit без дополнительного контроля создаёт критическую проблему: offset’ы начинают двигаться вперёд быстрее, чем реально завершается доставка события. В случае деградации внешнего API это приводит к потере сообщений — Kafka считает их обработанными, хотя фактическая отправка не произошла.

Чтобы закрыть этот разрыв между “прочитано” и “доставлено”, система собирается из трёх основных механизмов, каждый из которых решает свою часть задачи:

  • DLQ-топик принимает события, которые не удалось доставить после всех попыток обработки;

  • маркер в Redis синхронизирует поведение всех подов в моменты деградации внешнего API;

  • FSM на уровне партиций определяет, в каком состоянии находится обработка и что именно можно читать/обрабатывать в текущий момент.

Дополняют эту схему счётчик попыток и дедлайн обработки — они ограничивают бесконечные ретраи и задают границы “живучести” события, каждая метрика под свою природу нагрузки.

Сам по себе DLQ — это всего лишь топик. Всё сложное находится вокруг него: маркер состояния, конечный автомат обработки, а также вспомогательные ограничения вроде счётчика ретраев и дедлайна.

Именно из этих элементов и складывается рабочая схема:

  • высокая пропускная способность сохраняется за счёт asyncio и auto-commit,

  • потери ограничиваются через контролируемые переходы в DLQ,

  • система не “разъезжается” при деградации внешнего API,

  • а вся обработка остаётся управляемой через единое состояние.

ссылка на оригинал статьи https://habr.com/ru/articles/1045324/