DLQ-first: учим Kafka-консьюмера падать красиво и поднимать поток за секунды

от автора

Привет, Хабр!

Сегодня рассмотрим, как построить Kafka‑консьюмер, который не падёт при первой же проблеме, а аккуратно сложит битые события в Dead Letter Queue (DLQ).


Когда и зачем нужен DLQ

В Kafka жизненно важно различать две плоскости:

Плоскость

Что происходит без DLQ

Что хотим видеть

Обработка

Консьюмер читает сообщение, попытка deserialise → enrich → persist; при эксепшене offset не коммитится, поток стопорится.

Консьюмер коммитит оффсет, а проблемное событие перекидывает в DLQ‑топик.

Хранение

Сообщение остаётся в основном топике и будет «досаждать» ретраями до конца ретеншена.

Чётко знаем: «в DLQ лежат только фэйлы, прод стрим идёт дальше».

Типовые сценарии

Ситуация

Повторяемая?

Действие

Не‑валидный JSON / Avro‑схема

Fatal

Сразу в DLQ, смысла ретраить нет.

Временная недоступность БД

Retryable

DLQ с отложенным re‑consume после таймаута.

Новая версия схемы (schema registry lag)

Retryable

Либо автоматический retry, либо DLQ → пересчитай позже.

Confluent и ряд энтузиастов приводят такую практику: разделяйте ошибки на fatal и retryable, метите их в header (error.class, error.stacktrace, retryable=true/false).

Где теряются сообщения, если DLQ нет?

  1. Консьюмер зависает на ядовитом сообщении, оффсеты не коммитятся, всё после него не читается.

  2. DevOps убивает pod — Kubernetes рестартует, — ситуация повторяется.

  3. Через N часов приходит ваш SRE и задаёт вопрос: «Почему в топике lag 50 млн, а бизнес‑процесс мёртв?»

Реализация DLQ на примере Kafka Consumer

Python (confluent-kafka)

from confluent_kafka import Consumer, Producer, KafkaException import json, logging, time  # Базовый конфиг common = {     "bootstrap.servers": "kafka-prod:9092",     "group.id":          "enricher-v1",     "auto.offset.reset": "earliest",     "enable.auto.commit": False, }  consumer = Consumer(common | {"key.deserializer": str}) producer = Producer({"bootstrap.servers": common["bootstrap.servers"]})  SOURCE_TOPIC = "clicks.raw" DLQ_TOPIC    = f"{SOURCE_TOPIC}.dlq"  def push_dlq(msg, exc, retryable: bool):     """Проксируем оригинальное сообщение в DLQ c расширенными headers."""     headers = msg.headers() or []     headers.extend([         ("error.class", str(type(exc)).encode()),         ("error.message", str(exc).encode()),         ("retryable", b"1" if retryable else b"0"),         ("ts.failed", str(int(time.time() * 1000)).encode()),     ])     producer.produce(         topic=DLQ_TOPIC,         key=msg.key(),         value=msg.value(),         headers=headers,     )     producer.flush(1_000)  def handle(msg):     payload = json.loads(msg.value())           # может кинуть JSONDecodeError     enriched = call_some_db(payload["user_id"]) # может кинуть DBError     publish_downstream(enriched)  while True:     batch = consumer.consume(num_messages=500, timeout=1.0)     for m in batch:         try:             handle(m)             consumer.commit(m)  # ручной коммит ⇒ at-least-once         except json.JSONDecodeError as je:             logging.exception("Bad JSON")             push_dlq(m, je, retryable=False)             consumer.commit(m)         except TransientDBError as te:             logging.warning("DB temp issue → DLQ for later retry")             push_dlq(m, te, retryable=True)             consumer.commit(m)         except Exception as e:             # last-line defense             push_dlq(m, e, retryable=False)             consumer.commit(m)

Коммит оффсета после пуша в DLQ — иначе мы бы «застряли» на плохом событии. retryable header — пригодится автоматическму ретраю. producer.flush() — в проде заменяем на асинхронный delivery callback + back‑pressure.

Java (Spring Kafka ≥ 2.8)

У Spring всё из коробки благодаря DefaultErrorHandler и DeadLetterPublishingRecoverer:

@Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvt> kafkaFactory(         ConsumerFactory<String, OrderEvt> cf,         KafkaOperations<String, OrderEvt> dlqTemplate) {      var recoverer = new DeadLetterPublishingRecoverer(         dlqTemplate,         (rec, ex) -> new TopicPartition(rec.topic() + ".dlq", rec.partition())     );      var errorHandler = new DefaultErrorHandler(         recoverer,         new FixedBackOff(0L, 0) // сразу в DLQ, без ретраев     );      errorHandler.addNotRetryableExceptions(JsonParseException.class);     errorHandler.addRetryableExceptions(SQLException.class);      var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvt>();     factory.setConsumerFactory(cf);     factory.setCommonErrorHandler(errorHandler);     return factory; }  @KafkaListener(topics = "orders.raw", groupId = "enricher-v1") public void onEvent(OrderEvt evt) {     // enrich & persist }

DeadLetterPublishingRecoverer автоматически копирует key/value и доклеивает метадату (kafka_dlt-original-topic, kafka_dlt-exception-fqcn, & др.). Для временных сбоев можно поставить FixedBackOff(1000L, 3) — три локальных ретрая до DLQ.

Spring‑team подтянула эти фичи с Apache Kafka 2.8; до этого приходилось писать кастомные SeekToCurrentErrorHandler.

Как мониторить и обрабатывать DLQ

Метрики и алёрты

Что смотрим

Где берём

Порог

records.in для *.dlq

kafka.server.BrokerTopicMetrics.MessagesInPerSec

>10% от основного топика (5-мин. окно)

Lag основного consumer‑group

kafka.consumer:type=consumer-fetch-manager-metricsrecords-lag-max

рост экспоненциальный

Кол‑во retryable=true в DLQ

ksqlDB / Kafka Streams window count

>1000 за 15 м

Пример алерт‑правила в Prometheus:

- alert: HighDLQInRate   expr: rate(kafka_server_brokertopicmetrics_messages_in_total{topic=~".*\\.dlq"}[5m]) > 100   for: 2m   labels:     severity: critical   annotations:     summary: "DLQ inflow is {{ $value }} msg/s"     description: "Too many errors hitting DLQ topics"

UI для ручного ревью

kcat (kafkacat) — быстрый просмотр одного сообщения:

kcat -C -t clicks.raw.dlq -o -5 -q

AKHQ / Redpanda Console / Confluent UI — визуальный поиск по key/headers. Когда сообщений десятки тысяч, делают «DLQ‑WorkBench»: удобное SPA, где можно фильтровать по error.class, делать bulk‑reprocess.

Автоматический retry-pipeline

Архитектурный паттерн таков:

clicks.raw.dlq (-- only retryable -->) clicks.retry          |                                   |          | DLQ-retry-consumer                | primary-consumer          +---------->  clicks.raw -----------+

Retry‑consumer читает только retryable=true. Ставит задержку (например, sleep(300_000) или Scheduled Executor). Пушит обратно в исходный топик с новым header x-retry-count. При превышении MAX_RETRY отправляет в clicks.raw.dlq.permanent — это уже зона ручного разбора.

В 2024-м Confluent добавила готовый компонент — Parallel Consumer с built‑in retry — но он пока в tech‑preview.

Итог

DLQ — это не просто «корзинка для битых сообщений», а фундамент отказоустойчивой стриминговой архитектуры:

  • Коммит оффсета → живой консьюмер.

  • Разметка ошибок → осмысленные ретраи.

  • Метрики → SRE спит спокойно.

Собрали? Тестируем — швыряем кривой JSON и дропаем коннект к БД. Консьюмер улыбается, а в DLQ аккуратно появляется две записи: одна retryable=false, вторая retryable=true. Красота.


В заключение приглашаем на открытый урок 22 мая «Оптимизация Nginx и Angie под высокие нагрузки». Узнайте, как настроить ключевые параметры для стабильной работы серверов при большом трафике, оптимизировать TLS, кэширование и анализировать производительность. Меньше узких мест — больше скорости. Записывайтесь по ссылке.

Любое развитие начинается с честной оценки. Пройдите тест на знание инфраструктуры высоконагруженных систем — он подскажет, куда расти дальше.


ссылка на оригинал статьи https://habr.com/ru/articles/905810/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *