Нагрузочный тест. 10 000 событий в секунду, три инстанса сервиса, Spring WebFlux, Project Reactor — всё как по учебнику.
Смотрю на метрики. Event loop завис на 800 миллисекунд. В хранилище — данные за более позднее время перезаписаны более ранними. WebSocket-сессия потеряла сообщения под нагрузкой. Два из трёх инстансов не получают события.
Код написан на WebFlux. Но реактивности в нём не было.
Я строил сервис потоковой доставки данных в реальном времени: тысячи источников → обработка → тысячи WebSocket-подписчиков. Каждая из пяти проблем ниже была невидима на демо. Каждая проявилась под нагрузкой. И каждая из них — не баг фреймворка. Это паттерны которые нужно знать заранее.
Оглавление
-
Ты блокируешь event loop — и Reactor молчит
-
Данные приходят не в том порядке — и это не баг Kafka
-
switchOnFirst -
Kafka Consumer Group которая ломает broadcast
-
N+1 в реактивном коде и
Mono.cache()
1. Ты блокируешь event loop — и Reactor молчит
Это самая распространённая ошибка. Код выглядит реактивным. Компилируется. Тесты зелёные. Но одна строка убивает всю пропускную способность.
Ситуация типичная: нужно сохранить событие в Redis. Используешь Lettuce — официально рекомендованный клиент для Spring. У Lettuce есть синхронный API (sync()) и реактивный (reactive()). Синхронный удобнее, привычнее, документации больше. Используешь его.
// Выглядит как нормальный реактивный кодpublic Mono<Boolean> save(Event event) { return Mono.fromCallable(() -> { // Lettuce sync API — блокирующий вызов redisSync.hset(buildKey(event), toFields(event)); return true; }); // subscribeOn нет → выполняется на потоке который вызвал subscribe() // Если это Netty event loop — он заблокирован до завершения Redis-запроса}
Mono.fromCallable() оборачивает синхронный код в реактивную обёртку. Но не переносит его на другой поток. По умолчанию — выполняется на том потоке который вызвал subscribe(). В WebFlux это Netty event loop. Пока он ждёт Redis — он не обрабатывает входящие запросы.
На одном запросе задержка в 1–2ms незаметна. На 1000 параллельных — event loop заблокирован, очередь растёт, latency улетает.
Reactor при этом никак не предупреждает. Код работает. Просто медленно.
Исправление — одна строка:
public Mono<Boolean> save(Event event) { return Mono.fromCallable(() -> { redisSync.hset(buildKey(event), toFields(event)); return true; }).subscribeOn(Schedulers.boundedElastic()); // ← вот она}
Schedulers.boundedElastic() — пул потоков специально для блокирующего I/O. Bounded — ограничен чтобы не создать тысячи потоков и не словить OOM. Elastic — масштабируется под нагрузку. Не трогает event loop Netty.
subscribeOn влияет на момент подписки: указывает на каком пуле запустить код. Работает по всей цепочке до ближайшего publishOn. Объявил один раз — дальше Reactor управляет сам.
Правило которое я теперь применяю автоматически: любой блокирующий I/O внутри Mono.fromCallable() — добавляй subscribeOn(Schedulers.boundedElastic()). Всегда. Даже если кажется что там нет блокировки — перепроверь.
Есть инструмент для поимки таких мест в тестах: BlockHound. Интегрируется в тесты и бросает исключение при любом блокирующем вызове на реактивном потоке. Полезно при первом знакомстве с реактивным стеком.
2. Данные приходят не в том порядке — и это не баг Kafka
Задача: система получает события об одних и тех же объектах из разных источников. Нужно хранить актуальное состояние — последнее значение на каждый момент времени.
Простая реализация: получил событие → записал в Redis.
Я предусмотрел эту проблему заранее — когда начал думать о параллельных потоках и нарисовал сценарий на бумаге. Потом проверил нагрузочным тестом. Опасения подтвердились.
Race condition при записи
Два события обрабатываются параллельно. Событие с timestamp 12:00:03 стартует первым — но Redis-запрос занимает чуть дольше. Событие с timestamp 12:00:01 приходит вторым — и перезаписывает более новое значение:
Поток A: Event(ts=12:00:03) → [Redis запрос начат] ─────────────────→ [записано: value=42]Поток B: Event(ts=12:00:01) → [Redis запрос начат] ──────→ [записано: value=37] ← перезаписалИтог в Redis: value=37 (данные 12:00:01), хотя должно быть value=42 (данные 12:00:03)
Атомарной операции “записать только если входящее время новее сохранённого” в Redis нет. Но есть Lua-скрипты. Redis выполняет Lua атомарно — никакой другой команды между началом и концом скрипта:
local stored_ts = redis.call('HGET', KEYS[1], 'timestamp')if stored_ts == false or tonumber(ARGV[1]) > tonumber(stored_ts) then redis.call('HSET', KEYS[1], 'value', ARGV[2], 'timestamp', ARGV[1]) return 1endreturn 0
Java-вызов:
private static final String SAVE_IF_NEWER_SCRIPT = """ local stored_ts = redis.call('HGET', KEYS[1], 'timestamp') if stored_ts == false or tonumber(ARGV[1]) > tonumber(stored_ts) then redis.call('HSET', KEYS[1], 'value', ARGV[2], 'timestamp', ARGV[1]) return 1 end return 0 """;public Mono<Boolean> saveIfNewer(String key, String value, long timestamp) { return Mono.fromCallable(() -> (Long) redisSync.eval( SAVE_IF_NEWER_SCRIPT, ScriptOutputType.INTEGER, new String[]{key}, String.valueOf(timestamp), value ) ) .subscribeOn(Schedulers.boundedElastic()) .map(result -> result == 1L);}
Stale write невозможен: Redis проверяет timestamp и записывает только если входящее значение новее. Атомарно. Без блокировок на стороне Java.
Порядок в Kafka
Kafka гарантирует порядок только внутри одной партиции. Если события одного объекта попадают в разные партиции — консьюмеры обрабатывают их параллельно, порядок не гарантирован.
Решение: использовать идентификатор объекта как ключ Kafka-сообщения. Все сообщения с одинаковым ключом всегда попадают в одну партицию.
// Ключ = идентификатор объекта → одна партиция → гарантированный порядокString partitionKey = event.getDeviceId() + ":" + event.getParameterId();SenderRecord<String, String, String> record = SenderRecord.create( new ProducerRecord<>(TOPIC, partitionKey, toJson(event)), event.getId());kafkaSender.send(Mono.just(record)) .flatMap(result -> { if (result.exception() != null) { return Mono.error(result.exception()); } return Mono.just(result); }) .subscribe();
Важная деталь: используй flatMap для отправки в Kafka, не doOnSuccess. doOnSuccess — fire-and-forget, ошибки отправки теряются. flatMap встраивает результат отправки в реактивную цепочку — ошибки поднимаются и обрабатываются.
3. switchOnFirst
WebSocket-хендлер. Клиент подключается и сразу присылает первое сообщение — запрос подписки: что именно он хочет получать. Все последующие сообщения — управляющие команды.
Очевидный подход:
public Mono<Void> handle(WebSocketSession session) { return session.receive() .next() // берём первое сообщение .flatMap(firstMsg -> { SubscriptionRequest req = parse(firstMsg.getPayloadAsText()); return buildDataStream(session, req); });}
Выглядит логично. На интеграционных тестах с одним клиентом — работает идеально. Под нагрузкой — теряет сообщения.
Проблема: .next() подписывается на session.receive(), читает один элемент, затем отменяет подписку. flatMap создаёт новую цепочку — но session.receive() это hot stream, прошлые элементы он не переигрывает. Между моментом отмены первой подписки и созданием второй — зазор в несколько миллисекунд. Под нагрузкой клиент может прислать второе сообщение именно в этот зазор.
session.receive(): [msg1]──[msg2]──[msg3]──... ↑ ↑ .next() читает, │ отменяет подп. Приходит в gap — теряется │ новая подписка начинается здесь
switchOnFirst решает это без разрыва:
public Mono<Void> handle(WebSocketSession session) { return session.receive() .switchOnFirst((firstSignal, inbound) -> { if (!firstSignal.hasValue()) { return inbound.then(); } WebSocketMessage firstMsg = firstSignal.get(); SubscriptionRequest req = parse(firstMsg.getPayloadAsText()); // inbound — тот же поток session.receive(), без новой подписки // skip(1) — пропускаем первое сообщение, оно уже обработано return buildDataStream(session, inbound.skip(1), req); });}
switchOnFirst даёт два аргумента:
-
firstSignal— первый элемент потока (или сигнал ошибки/завершения) -
inbound— весь исходный поток целиком, включая первый элемент
Ключевое: inbound — это не новая подписка. Это тот же поток session.receive(). Никакой отмены, никакого зазора. skip(1) убирает первое сообщение из inbound чтобы не обработать его дважды — и дальше поток идёт непрерывно.
Нашёл этот оператор в документации Project Reactor — в разделе про продвинутые операторы. В большинстве туториалов и примеров WebSocket-хендлеров его нет. Именно поэтому эту ошибку повторяют снова и снова.
Почему .next().flatMap() не всегда ломается — и как это усыпляет бдительность
На маленькой нагрузке и локальных тестах .next().flatMap() работает стабильно. Gap между двумя подписками — микросекунды, и клиент обычно не успевает прислать второе сообщение за это время. При росте нагрузки на сервер event loop занят другими задачами, gap увеличивается — и потери начинают проявляться нерегулярно. Нерегулярные потери особенно опасны: они сложно воспроизводятся, могут выглядеть как сетевые проблемы и долго не попадают в радар.
4. Kafka Consumer Group которая ломает broadcast
Сервис масштабируется горизонтально — несколько инстансов. Каждый держит свои WebSocket-соединения. Когда приходит событие, его должны получить подписчики на всех инстансах.
Стандартная Kafka Consumer Group:
Event → [Topic] ↓ Consumer Group "my-service" ┌──────────────┬──────────────┐ │ Instance 1 │ Instance 2 │ │ (читает msg) │ │ ← Instance 2 это сообщение не увидит └──────────────┴──────────────┘
Kafka распределяет сообщения между консьюмерами одной группы. Каждое сообщение получает ровно один инстанс. Подписчики на других инстансах остаются без данных.
Решение: каждый инстанс создаёт уникальную consumer group при старте.
@Beanpublic ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer( KafkaProperties kafkaProperties) { Map<String, Object> props = kafkaProperties.buildConsumerProperties(null); // Уникальный group-id = отдельная consumer group = получает все сообщения props.put(ConsumerConfig.GROUP_ID_CONFIG, "realtime-service-" + UUID.randomUUID()); // latest — нас интересует только реальное время, не история props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); ReceiverOptions<String, String> options = ReceiverOptions.<String, String>create(props) .subscription(Collections.singleton(EVENTS_TOPIC)); return new ReactiveKafkaConsumerTemplate<>(options);}
Теперь каждый инстанс — отдельная consumer group. Kafka рассылает каждое сообщение во все группы:
Event → [Topic] ├──→ Group "service-{uuid-1}" → Instance 1 (читает всё) └──→ Group "service-{uuid-2}" → Instance 2 (читает всё)
Это до сих пор ощущается как костыль. Буду честным: при каждом перезапуске инстанса появляется новая consumer group, Kafka накапливает метаданные. Технически — это нарушение идиоматичного использования consumer groups.
Но это правильный паттерн для broadcast-семантики поверх Kafka. Альтернативы: Redis Pub/Sub для fan-out (проще, но нет персистентности), отдельный топик на инстанс (overhead на управление топиками), WebSocket через брокер типа RabbitMQ с exchange-маршрутизацией. Если вам нужен именно broadcast поверх Kafka — UUID group-id это самое простое решение с минимальным количеством движущихся частей.
Подключаем поток событий:
@Beanpublic Flux<Event> eventBroadcast( ReactiveKafkaConsumerTemplate<String, String> consumer) { return consumer.receiveAutoAck() .map(record -> parseEvent(record.value())) .onErrorContinue((err, val) -> log.error("Parse error: {}", val, err)) .publish() .autoConnect(0); // hot stream: стартует при поднятии бина, // новые подписчики получают события с момента подписки}
publish().autoConnect(0) превращает Flux в hot stream: поток стартует немедленно при поднятии бина, не дожидаясь первого подписчика. Новые WebSocket-сессии подписываются — и получают события реального времени.
5. N+1 в реактивном коде и Mono.cache()
Тысяча подписчиков подключаются за короткий промежуток времени. Каждая подписка требует выполнить запрос к Redis — например, загрузить конфигурацию подписки или список объектов за которыми нужно следить. Запрос не тяжёлый — 2–3ms. Но если тысяча подписчиков делают одинаковые запросы одновременно — Redis получает тысячу запросов за которые ему всё равно нужно отвечать.
Синхронный аналог этой проблемы — классический N+1 в JPA. Здесь та же природа, другой контекст.
Стандартное решение из синхронного мира: ConcurrentHashMap как кэш.
В реактивном коде это не работает напрямую. Значения в нашей Map — Mono<T>, а не T. Если положить в Map просто Mono без кэширования — каждый подписчик всё равно будет его выполнять заново.
Mono.cache() делает именно то что нужно: кэширует результат выполнения Mono и раздаёт его всем последующим подписчикам.
private final ConcurrentHashMap<String, Mono<List<String>>> cache = new ConcurrentHashMap<>();public Mono<List<String>> getSubscriptionConfig(String configKey) { return cache.computeIfAbsent(configKey, key -> loadFromRedis(key) .cache(Duration.ofSeconds(60)) // кэш на 60 секунд );}private Mono<List<String>> loadFromRedis(String key) { return Mono.fromCallable(() -> redisSync.lrange(key, 0, -1) // загрузить список из Redis ).subscribeOn(Schedulers.boundedElastic());}
Как это работает при тысяче одновременных подписчиков с одним configKey:
Подписчик #1 → computeIfAbsent создаёт Mono → loadFromRedis запускается (1 запрос Redis)Подписчик #2 → computeIfAbsent возвращает тот же Mono → ждёт результата #1Подписчик #3 → то же самое...Подписчик #1000 → то же самое→ Итого: 1 запрос к Redis вместо 1000. Все подписчики получили результат.
Разница с Mono.just():
// Mono.just() — ты сам вычислил значение и обернул его// Каждая подписка получает это значение, но вычислить его нужно было ДОMono<String> cached = Mono.just(expensiveComputation()); // вычисляется сразу// Mono.cache() — вычисление происходит при первой подписке// Все остальные подписки получают закэшированный результатMono<String> cached = expensiveComputationAsMono().cache(Duration.ofMinutes(1));// вычисляется лениво, при первом subscribe(), результат живёт 1 минуту
cache() без аргумента кэширует навсегда — ошибки тоже кэшируются, что может привести к тому что Mono вечно возвращает одну и ту же ошибку. cache(Duration) инвалидирует через TTL. Для данных реального времени — всегда с TTL.
Один нюанс: Mono.cache() кэширует в том числе ошибки. Если первый вызов loadFromRedis вернул ошибку — все последующие подписчики получат ту же ошибку до истечения TTL. Для продакшн-кода стоит добавить onErrorResume перед cache() — или инвалидировать ключ в Map при ошибке.
Что это значит на практике
Возвращаясь к нагрузочному тесту из начала.
После всех пяти исправлений:
-
Event loop свободен — блокирующие операции уехали на
boundedElastic -
Данные всегда актуальны — Lua-скрипт отбрасывает устаревшие записи атомарно
-
WebSocket-сессии не теряют сообщения —
switchOnFirstдержит поток непрерывным -
Все инстансы получают все события — UUID group-id даёт broadcast-семантику
-
Redis не захлёбывается от одновременных подписчиков —
Mono.cache()делает одно вычисление вместо тысячи
WebFlux не делает код реактивным автоматически. Mono и Flux — это инструменты. Правильно использованные — убирают блокировки, держат потоки свободными, масштабируют без роста числа потоков. Неправильно — просто добавляют обёртку поверх тех же проблем.
Меня в этом стеке больше всего удивляет следующее: каждая из этих проблем невидима в простом сценарии. BlockHound не включён по умолчанию. Kafka не скажет что твой consumer group настроен неправильно для твоего use case — он просто будет работать “немного неправильно”. switchOnFirst нет в Hello World примерах. Reactor не предупреждает о блокировках на event loop в runtime.
Реактивный стек требует понимать не только что делают операторы, но и почему именно этот оператор — а не очевидная альтернатива.
Эти пять проблем я увидел только под нагрузкой. Если вы только начинаете с WebFlux — прогоните нагрузочные тесты до того как идти в production. Лучше найти здесь.
Если у вас есть опыт с реактивным программированием и есть детали о которых я не говорил, на что стоит обратить внимание, отпишите в комментах, буду благодарен
Это мой канал, всех жду — @java_quant | @KarimAbushaev
ссылка на оригинал статьи https://habr.com/ru/articles/1039618/