Fast Lane / Slow Lane: разделение трафика через две очереди Kafka

от автора

Привет, Хабр!

Сегодня мы рассмотрим Fast Lane / Slow Lane для Kafka: как одним росчерком кода защитить SLA‑критичный поток от толстых сообщений, не перекраивая пол‑стека и не устраивая зоопарк из очередей.

Kafka читает батчами и строго по порядку. Если впереди в логах стоит гигантский JSON, consumer обязан проглотить его прежде, чем добраться до маленького heartbeat. Лёгкие события застревают, медианное время обработки идёт в космос, SLA горит синим пламенем. Разнести трафик на fast lane и slow lane — самый прямой способ убрать взаимное влияние. Лёгкие события летят в приоритетный топик, тяжёлые отправляются в отдельный, медленный. Теоретически можно пытаться делать приоритизацию внутри одной очереди, но тогда упираемся в порядковую семантику Kafka и получаем latency‑капкан.

Топология на уровне брокера

Создаём два топика с разными настройками. Fast Lane держим с большим количеством партиций и жёстким лимитом на размер сообщения, Slow Lane — с меньшим количеством партиций, но с повышенным message.max.bytes. Пример Terraform‑модуля:

resource "kafka_topic" "events_fast" {   name               = "events.input.fast"   replication_factor = 3   partitions         = 12    config = {     "max.message.bytes" = "1048576"   # 1 MB     "retention.ms"      = "604800000" # 7 дней   } }  resource "kafka_topic" "events_slow" {   name               = "events.input.slow"   replication_factor = 3   partitions         = 6    config = {     "max.message.bytes" = "8388608"   # 8 MB     "retention.ms"      = "259200000" # 3 дня   } }

Продюсер

Первый вариант это решать по размеру сообщение/события:

@Service @RequiredArgsConstructor public class EventRouter {      private final KafkaTemplate<String, byte[]> kafka;      public void send(byte[] payload) {         if (payload.length > 900_000) {          // > ~900 КБ             kafka.send("events.input.slow", payload);         } else {             kafka.send("events.input.fast", payload);         }     } }

Логика размазана по сервисам; если понадобится сложная классификация — придётся менять все продюсеры.

RecordInterceptor: централизуем роутинг

@Configuration public class KafkaProducerConfig {      @Bean     public ProducerFactory<String, byte[]> producerFactory() {         Map<String, Object> props = Map.of(             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap,             ProducerConfig.ACKS_CONFIG, "all",             ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4", // дешёвое сжатие             ProducerConfig.LINGER_MS_CONFIG, 5,             ProducerConfig.BATCH_SIZE_CONFIG, 32_768         );         DefaultKafkaProducerFactory<String, byte[]> factory =             new DefaultKafkaProducerFactory<>(props);          factory.addPostProcessor((producer, tx) ->    // подключаем интерцептор             producer.setInterceptor(new RoutingInterceptor()));         return factory;     } }  public class RoutingInterceptor implements ProducerInterceptor<String, byte[]> {      @Override     public ProducerRecord<String, byte[]> onSend(ProducerRecord<String, byte[]> record) {         byte[] payload = record.value();         if (payload != null && payload.length > 900_000) {             return new ProducerRecord<>("events.input.slow", record.key(), payload);         }         return new ProducerRecord<>("events.input.fast", record.key(), payload);     } }

Интерцептор прозрачен для бизнес‑кода: сервисы зовут kafkaTemplate.send("events.input", …) и ничего не знают про дорожки.

Kafka Streams для динамического бранчинга

Когда нужны более хитрые правила, например обогащение события метаданными или ML‑моделью, удобнее взять Kafka Streams:

@Bean public Topology topology() {     StreamsBuilder builder = new StreamsBuilder();      KStream<String, Event> source = builder.stream("events.input");      Predicate<String, Event> isSmall =         (k, v) -> v.size() <= 900_000;     Predicate<String, Event> isLarge =         (k, v) -> v.size() > 900_000;      KStream<String, Event>[] branches = source.branch(isSmall, isLarge); // fast / slow      branches[0].to("events.input.fast");     branches[1].to("events.input.slow");      return builder.build(); }

Streams‑процесс на отдельном сервисе — и у продюсеров чистая боль, а правила меняются в одном месте.

Конфигурация consumer’ов

Быстрый контейнер

@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> fastFactory(         ConsumerFactory<String, byte[]> base) {      var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();     f.setConsumerFactory(base);     f.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);     f.setBatchListener(true);     f.setConcurrency(6);      // poll меньше 100 мс — реагируем быстро     f.getContainerProperties().setIdleBetweenPolls(50);      // после 1000 рекордов делаем commit     f.setAckOnError(false);     return f; }

Медленный контейнер

@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> slowFactory(         ConsumerFactory<String, byte[]> base) {      var f = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();     f.setConsumerFactory(base);     f.setBatchListener(true);     f.setConcurrency(2);      // читаем больше данных за один fetch     Map<String, Object> props = f.getConsumerFactory().getConfigurationProperties();     props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2_097_152); // 2 MB     props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 8_388_608); // 8 MB     return f; }

Параметры fetch.max.bytes и max.partition.fetch.bytes: для slow lane разрешаем больше, иначе толстый пакет не пролезет и consumer поймает RecordTooLargeException.

Слушатели с ручным ack

@Slf4j @Service public class FastListener {      @KafkaListener(         id = "fast-listener",         topics = "events.input.fast",         containerFactory = "fastFactory"     )     public void onFast(List<byte[]> messages, Acknowledgment ack) {         messages.forEach(this::processFast);         ack.acknowledge(); // commit offset сразу     } }  @Slf4j @Service public class SlowListener {      @KafkaListener(         id = "slow-listener",         topics = "events.input.slow",         containerFactory = "slowFactory"     )     public void onSlow(List<byte[]> messages, Acknowledgment ack) {         for (byte[] m : messages) {             try {                 processSlow(m);             } catch (Exception ex) {                 sendToDlq(m, ex);             }         }         ack.acknowledge();     } }

В slow lane часто нужна Dead Letter Queue. Сразу выделяем events.input.slow.dlq и не ломаем голову.

Динамический паузинг slow‑консьюмера

Когда fast lane начинает отставать, можно временно остановить slow listener, сохранив heartbeat, чтобы не получить ребаланс. Spring‑Kafka умеет это из коробки.

@Component @RequiredArgsConstructor public class SlowLaneThrottler {      private final KafkaListenerEndpointRegistry registry;      // запускаем из Spring‑Scheduler     @Scheduled(fixedDelay = 30_000)     public void controlSlowLane() {         MessageListenerContainer fast = registry.getListenerContainer("fast-listener");         MessageListenerContainer slow = registry.getListenerContainer("slow-listener");          long lagFast = lag("events.input.fast");         boolean overloaded = lagFast > 10_000;          if (overloaded && !slow.isPauseRequested()) {             slow.pause();             log.warn("Slow lane paused, fast lag={}", lagFast);         } else if (!overloaded && slow.isPauseRequested()) {             slow.resume();             log.info("Slow lane resumed");         }     } }

Метод lag берёт данные из JMX/Prometheus; реализацию опустим ради краткости.

Dynamic Throttle API

Начиная с Kafka 3.3 можно отдавать broker‑side throttle через Admin API: ограничиваем скорость отдачи для конкретных consumer‑групп. В Spring‑Kafka это делается так:

@Bean public KafkaAdmin.NewPartitions throttleGroup() {     return (admin) -> admin.alterConsumerGroupOffsets(             "slow-consumer-group",             Map.of(new TopicPartition("events.input.slow", 0),                    new OffsetAndMetadata(0L)),             new AlterConsumerGroupOffsetsOptions().timeoutMs(5_000)                     .throttle(512 * 1024)); // 512 KiB/s }

Такая мера включается по алерту и почти не требует остановки приложения.

Метрики и алерты

Подключаем Micrometer: management.metrics.enable.kafka: true — и получаем пачку готовых метрик. Главное: лейблы client.id, topic. Вывешиваем в Grafana два графика:

  • kafka_consumer_records_lag_max{topic="events.input.fast"}

  • kafka_consumer_records_lag_max{topic="events.input.slow"}

Держим fast‑lag < 1000, slow‑lag < 100 000. Алерт: если fast‑lag > 10 000 за 5 минут — пауза slow‑консьюмера и Slack‑уведомление.

Некоторые ошибки

Часто промахиваются на этапе тюнинга fast‑lane: оставляют одинаковое значение max.poll.records для обоих слушателей, и быстрый consumer захлёбывается, потому что на него ложится объём батча, рассчитанный на slow‑lane; отсюда лавинообразный рост лагов. Вторая типовая оплошность это отправлять события без сжатия: когда payload приближается к порогу 1 MB, брокер отклоняет запись, а продюсер отвечает ошибкой RecordTooLarge, хотя проблему решали бы две строчки compression.type=lz4.

Другая пара ошибок связана с ресурсными лимитами. Если забыть про quota, «медленный» consumer при высоком fetch.max.bytes идёт быстрее «быстрого» и съедает пропускную способность брокера, сводя на нет всю идею приоритизации. И, наконец, retention для slow‑topic: день хранения выглядит разумно, пока ночью не прилетит пик крупных событий; если retention меньше фактической волны, самые важные сообщения исчезнут до обработки, и восстановить их будет некуда.


Итог

Если вы уже внедряли подобную схему разделения трафика или пошли другим путём — делитесь опытом в комментариях. Интересно посмотреть и обсудить, как вы решали проблему приоритизации событий в Kafka.

Если вы работаете с высоконагруженными системами и интересуетесь архитектурными подходами, приглашаем вас на два открытых урока курса Highload Architect:

  • 12 августа в 20:00 — «Мониторинг в высоконагруженных проектах»
    Разберём, как выстроить наблюдаемость системы под постоянной нагрузкой: какие метрики важны, как быстро выявлять узкие места и реагировать на инциденты до того, как они станут проблемой.

  • 20 августа в 20:00 — «Wasm на сервере в высоконагруженных системах»
    Поговорим о применении WebAssembly на серверной стороне. Вы узнаете, зачем его интегрируют в высоконагруженные сервисы, какие преимущества даёт песочница и как подход влияет на производительность.

Кроме того, вы можете пройти тестирование, чтобы проверить свои знания и навыки в области высоконагруженных систем.


ссылка на оригинал статьи https://habr.com/ru/articles/932134/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *