Reactive Programming не спасёт вас. Если вы не решили эти 5 проблем — у вас просто медленный монолит с Flux

от автора

Нагрузочный тест. 10 000 событий в секунду, три инстанса сервиса, Spring WebFlux, Project Reactor — всё как по учебнику.

Смотрю на метрики. Event loop завис на 800 миллисекунд. В хранилище — данные за более позднее время перезаписаны более ранними. WebSocket-сессия потеряла сообщения под нагрузкой. Два из трёх инстансов не получают события.

Код написан на WebFlux. Но реактивности в нём не было.

Я строил сервис потоковой доставки данных в реальном времени: тысячи источников → обработка → тысячи WebSocket-подписчиков. Каждая из пяти проблем ниже была невидима на демо. Каждая проявилась под нагрузкой. И каждая из них — не баг фреймворка. Это паттерны которые нужно знать заранее.

Оглавление

  1. Ты блокируешь event loop — и Reactor молчит

  2. Данные приходят не в том порядке — и это не баг Kafka

  3. switchOnFirst

  4. Kafka Consumer Group которая ломает broadcast

  5. N+1 в реактивном коде и Mono.cache()

1. Ты блокируешь event loop — и Reactor молчит

Это самая распространённая ошибка. Код выглядит реактивным. Компилируется. Тесты зелёные. Но одна строка убивает всю пропускную способность.

Ситуация типичная: нужно сохранить событие в Redis. Используешь Lettuce — официально рекомендованный клиент для Spring. У Lettuce есть синхронный API (sync()) и реактивный (reactive()). Синхронный удобнее, привычнее, документации больше. Используешь его.

// Выглядит как нормальный реактивный кодpublic Mono<Boolean> save(Event event) {    return Mono.fromCallable(() -> {        // Lettuce sync API — блокирующий вызов        redisSync.hset(buildKey(event), toFields(event));        return true;    });    // subscribeOn нет → выполняется на потоке который вызвал subscribe()    // Если это Netty event loop — он заблокирован до завершения Redis-запроса}

Mono.fromCallable() оборачивает синхронный код в реактивную обёртку. Но не переносит его на другой поток. По умолчанию — выполняется на том потоке который вызвал subscribe(). В WebFlux это Netty event loop. Пока он ждёт Redis — он не обрабатывает входящие запросы.

На одном запросе задержка в 1–2ms незаметна. На 1000 параллельных — event loop заблокирован, очередь растёт, latency улетает.

Reactor при этом никак не предупреждает. Код работает. Просто медленно.

Исправление — одна строка:

public Mono<Boolean> save(Event event) {    return Mono.fromCallable(() -> {        redisSync.hset(buildKey(event), toFields(event));        return true;    }).subscribeOn(Schedulers.boundedElastic()); // ← вот она}

Schedulers.boundedElastic() — пул потоков специально для блокирующего I/O. Bounded — ограничен чтобы не создать тысячи потоков и не словить OOM. Elastic — масштабируется под нагрузку. Не трогает event loop Netty.

subscribeOn влияет на момент подписки: указывает на каком пуле запустить код. Работает по всей цепочке до ближайшего publishOn. Объявил один раз — дальше Reactor управляет сам.

Правило которое я теперь применяю автоматически: любой блокирующий I/O внутри Mono.fromCallable() — добавляй subscribeOn(Schedulers.boundedElastic()). Всегда. Даже если кажется что там нет блокировки — перепроверь.

Есть инструмент для поимки таких мест в тестах: BlockHound. Интегрируется в тесты и бросает исключение при любом блокирующем вызове на реактивном потоке. Полезно при первом знакомстве с реактивным стеком.

2. Данные приходят не в том порядке — и это не баг Kafka

Задача: система получает события об одних и тех же объектах из разных источников. Нужно хранить актуальное состояние — последнее значение на каждый момент времени.

Простая реализация: получил событие → записал в Redis.

Я предусмотрел эту проблему заранее — когда начал думать о параллельных потоках и нарисовал сценарий на бумаге. Потом проверил нагрузочным тестом. Опасения подтвердились.

Race condition при записи

Два события обрабатываются параллельно. Событие с timestamp 12:00:03 стартует первым — но Redis-запрос занимает чуть дольше. Событие с timestamp 12:00:01 приходит вторым — и перезаписывает более новое значение:

Поток A: Event(ts=12:00:03) → [Redis запрос начат] ─────────────────→ [записано: value=42]Поток B: Event(ts=12:00:01) → [Redis запрос начат] ──────→ [записано: value=37] ← перезаписалИтог в Redis: value=37 (данные 12:00:01), хотя должно быть value=42 (данные 12:00:03)

Атомарной операции “записать только если входящее время новее сохранённого” в Redis нет. Но есть Lua-скрипты. Redis выполняет Lua атомарно — никакой другой команды между началом и концом скрипта:

local stored_ts = redis.call('HGET', KEYS[1], 'timestamp')if stored_ts == false or tonumber(ARGV[1]) > tonumber(stored_ts) then    redis.call('HSET', KEYS[1], 'value', ARGV[2], 'timestamp', ARGV[1])    return 1endreturn 0

Java-вызов:

private static final String SAVE_IF_NEWER_SCRIPT = """    local stored_ts = redis.call('HGET', KEYS[1], 'timestamp')    if stored_ts == false or tonumber(ARGV[1]) > tonumber(stored_ts) then        redis.call('HSET', KEYS[1], 'value', ARGV[2], 'timestamp', ARGV[1])        return 1    end    return 0    """;public Mono<Boolean> saveIfNewer(String key, String value, long timestamp) {    return Mono.fromCallable(() ->        (Long) redisSync.eval(            SAVE_IF_NEWER_SCRIPT,            ScriptOutputType.INTEGER,            new String[]{key},            String.valueOf(timestamp), value        )    )    .subscribeOn(Schedulers.boundedElastic())    .map(result -> result == 1L);}

Stale write невозможен: Redis проверяет timestamp и записывает только если входящее значение новее. Атомарно. Без блокировок на стороне Java.

Порядок в Kafka

Kafka гарантирует порядок только внутри одной партиции. Если события одного объекта попадают в разные партиции — консьюмеры обрабатывают их параллельно, порядок не гарантирован.

Решение: использовать идентификатор объекта как ключ Kafka-сообщения. Все сообщения с одинаковым ключом всегда попадают в одну партицию.

// Ключ = идентификатор объекта → одна партиция → гарантированный порядокString partitionKey = event.getDeviceId() + ":" + event.getParameterId();SenderRecord<String, String, String> record = SenderRecord.create(    new ProducerRecord<>(TOPIC, partitionKey, toJson(event)),    event.getId());kafkaSender.send(Mono.just(record))    .flatMap(result -> {        if (result.exception() != null) {            return Mono.error(result.exception());        }        return Mono.just(result);    })    .subscribe();

Важная деталь: используй flatMap для отправки в Kafka, не doOnSuccess. doOnSuccess — fire-and-forget, ошибки отправки теряются. flatMap встраивает результат отправки в реактивную цепочку — ошибки поднимаются и обрабатываются.

3. switchOnFirst

WebSocket-хендлер. Клиент подключается и сразу присылает первое сообщение — запрос подписки: что именно он хочет получать. Все последующие сообщения — управляющие команды.

Очевидный подход:

public Mono<Void> handle(WebSocketSession session) {    return session.receive()        .next()                              // берём первое сообщение        .flatMap(firstMsg -> {            SubscriptionRequest req = parse(firstMsg.getPayloadAsText());            return buildDataStream(session, req);        });}

Выглядит логично. На интеграционных тестах с одним клиентом — работает идеально. Под нагрузкой — теряет сообщения.

Проблема: .next() подписывается на session.receive(), читает один элемент, затем отменяет подписку. flatMap создаёт новую цепочку — но session.receive() это hot stream, прошлые элементы он не переигрывает. Между моментом отмены первой подписки и созданием второй — зазор в несколько миллисекунд. Под нагрузкой клиент может прислать второе сообщение именно в этот зазор.

session.receive(): [msg1]──[msg2]──[msg3]──...                     ↑       ↑              .next() читает, │              отменяет подп.  Приходит в gap — теряется                              │                     новая подписка начинается здесь

switchOnFirst решает это без разрыва:

public Mono<Void> handle(WebSocketSession session) {    return session.receive()        .switchOnFirst((firstSignal, inbound) -> {            if (!firstSignal.hasValue()) {                return inbound.then();            }            WebSocketMessage firstMsg = firstSignal.get();            SubscriptionRequest req = parse(firstMsg.getPayloadAsText());            // inbound — тот же поток session.receive(), без новой подписки            // skip(1) — пропускаем первое сообщение, оно уже обработано            return buildDataStream(session, inbound.skip(1), req);        });}

switchOnFirst даёт два аргумента:

  • firstSignal — первый элемент потока (или сигнал ошибки/завершения)

  • inbound — весь исходный поток целиком, включая первый элемент

Ключевое: inbound — это не новая подписка. Это тот же поток session.receive(). Никакой отмены, никакого зазора. skip(1) убирает первое сообщение из inbound чтобы не обработать его дважды — и дальше поток идёт непрерывно.

Нашёл этот оператор в документации Project Reactor — в разделе про продвинутые операторы. В большинстве туториалов и примеров WebSocket-хендлеров его нет. Именно поэтому эту ошибку повторяют снова и снова.

Почему .next().flatMap() не всегда ломается — и как это усыпляет бдительность

На маленькой нагрузке и локальных тестах .next().flatMap() работает стабильно. Gap между двумя подписками — микросекунды, и клиент обычно не успевает прислать второе сообщение за это время. При росте нагрузки на сервер event loop занят другими задачами, gap увеличивается — и потери начинают проявляться нерегулярно. Нерегулярные потери особенно опасны: они сложно воспроизводятся, могут выглядеть как сетевые проблемы и долго не попадают в радар.

4. Kafka Consumer Group которая ломает broadcast

Сервис масштабируется горизонтально — несколько инстансов. Каждый держит свои WebSocket-соединения. Когда приходит событие, его должны получить подписчики на всех инстансах.

Стандартная Kafka Consumer Group:

Event → [Topic]           ↓    Consumer Group "my-service"    ┌──────────────┬──────────────┐    │  Instance 1  │  Instance 2  │    │ (читает msg) │              │  ← Instance 2 это сообщение не увидит    └──────────────┴──────────────┘

Kafka распределяет сообщения между консьюмерами одной группы. Каждое сообщение получает ровно один инстанс. Подписчики на других инстансах остаются без данных.

Решение: каждый инстанс создаёт уникальную consumer group при старте.

@Beanpublic ReactiveKafkaConsumerTemplate<String, String> kafkaConsumer(        KafkaProperties kafkaProperties) {    Map<String, Object> props = kafkaProperties.buildConsumerProperties(null);    // Уникальный group-id = отдельная consumer group = получает все сообщения    props.put(ConsumerConfig.GROUP_ID_CONFIG,              "realtime-service-" + UUID.randomUUID());    // latest — нас интересует только реальное время, не история    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");    ReceiverOptions<String, String> options =        ReceiverOptions.<String, String>create(props)            .subscription(Collections.singleton(EVENTS_TOPIC));    return new ReactiveKafkaConsumerTemplate<>(options);}

Теперь каждый инстанс — отдельная consumer group. Kafka рассылает каждое сообщение во все группы:

Event → [Topic]           ├──→ Group "service-{uuid-1}" → Instance 1 (читает всё)           └──→ Group "service-{uuid-2}" → Instance 2 (читает всё)

Это до сих пор ощущается как костыль. Буду честным: при каждом перезапуске инстанса появляется новая consumer group, Kafka накапливает метаданные. Технически — это нарушение идиоматичного использования consumer groups.

Но это правильный паттерн для broadcast-семантики поверх Kafka. Альтернативы: Redis Pub/Sub для fan-out (проще, но нет персистентности), отдельный топик на инстанс (overhead на управление топиками), WebSocket через брокер типа RabbitMQ с exchange-маршрутизацией. Если вам нужен именно broadcast поверх Kafka — UUID group-id это самое простое решение с минимальным количеством движущихся частей.

Подключаем поток событий:

@Beanpublic Flux<Event> eventBroadcast(        ReactiveKafkaConsumerTemplate<String, String> consumer) {        return consumer.receiveAutoAck()        .map(record -> parseEvent(record.value()))        .onErrorContinue((err, val) -> log.error("Parse error: {}", val, err))        .publish()        .autoConnect(0); // hot stream: стартует при поднятии бина,                         // новые подписчики получают события с момента подписки}

publish().autoConnect(0) превращает Flux в hot stream: поток стартует немедленно при поднятии бина, не дожидаясь первого подписчика. Новые WebSocket-сессии подписываются — и получают события реального времени.

5. N+1 в реактивном коде и Mono.cache()

Тысяча подписчиков подключаются за короткий промежуток времени. Каждая подписка требует выполнить запрос к Redis — например, загрузить конфигурацию подписки или список объектов за которыми нужно следить. Запрос не тяжёлый — 2–3ms. Но если тысяча подписчиков делают одинаковые запросы одновременно — Redis получает тысячу запросов за которые ему всё равно нужно отвечать.

Синхронный аналог этой проблемы — классический N+1 в JPA. Здесь та же природа, другой контекст.

Стандартное решение из синхронного мира: ConcurrentHashMap как кэш.

В реактивном коде это не работает напрямую. Значения в нашей Map — Mono<T>, а не T. Если положить в Map просто Mono без кэширования — каждый подписчик всё равно будет его выполнять заново.

Mono.cache() делает именно то что нужно: кэширует результат выполнения Mono и раздаёт его всем последующим подписчикам.

private final ConcurrentHashMap<String, Mono<List<String>>> cache =    new ConcurrentHashMap<>();public Mono<List<String>> getSubscriptionConfig(String configKey) {    return cache.computeIfAbsent(configKey, key ->        loadFromRedis(key)            .cache(Duration.ofSeconds(60)) // кэш на 60 секунд    );}private Mono<List<String>> loadFromRedis(String key) {    return Mono.fromCallable(() ->        redisSync.lrange(key, 0, -1) // загрузить список из Redis    ).subscribeOn(Schedulers.boundedElastic());}

Как это работает при тысяче одновременных подписчиков с одним configKey:

Подписчик #1 → computeIfAbsent создаёт Mono → loadFromRedis запускается (1 запрос Redis)Подписчик #2 → computeIfAbsent возвращает тот же Mono → ждёт результата #1Подписчик #3 → то же самое...Подписчик #1000 → то же самое→ Итого: 1 запрос к Redis вместо 1000. Все подписчики получили результат.

Разница с Mono.just():

// Mono.just() — ты сам вычислил значение и обернул его// Каждая подписка получает это значение, но вычислить его нужно было ДОMono<String> cached = Mono.just(expensiveComputation()); // вычисляется сразу// Mono.cache() — вычисление происходит при первой подписке// Все остальные подписки получают закэшированный результатMono<String> cached = expensiveComputationAsMono().cache(Duration.ofMinutes(1));// вычисляется лениво, при первом subscribe(), результат живёт 1 минуту

cache() без аргумента кэширует навсегда — ошибки тоже кэшируются, что может привести к тому что Mono вечно возвращает одну и ту же ошибку. cache(Duration) инвалидирует через TTL. Для данных реального времени — всегда с TTL.

Один нюанс: Mono.cache() кэширует в том числе ошибки. Если первый вызов loadFromRedis вернул ошибку — все последующие подписчики получат ту же ошибку до истечения TTL. Для продакшн-кода стоит добавить onErrorResume перед cache() — или инвалидировать ключ в Map при ошибке.

Что это значит на практике

Возвращаясь к нагрузочному тесту из начала.

После всех пяти исправлений:

  • Event loop свободен — блокирующие операции уехали на boundedElastic

  • Данные всегда актуальны — Lua-скрипт отбрасывает устаревшие записи атомарно

  • WebSocket-сессии не теряют сообщения — switchOnFirst держит поток непрерывным

  • Все инстансы получают все события — UUID group-id даёт broadcast-семантику

  • Redis не захлёбывается от одновременных подписчиков — Mono.cache() делает одно вычисление вместо тысячи

WebFlux не делает код реактивным автоматически. Mono и Flux — это инструменты. Правильно использованные — убирают блокировки, держат потоки свободными, масштабируют без роста числа потоков. Неправильно — просто добавляют обёртку поверх тех же проблем.

Меня в этом стеке больше всего удивляет следующее: каждая из этих проблем невидима в простом сценарии. BlockHound не включён по умолчанию. Kafka не скажет что твой consumer group настроен неправильно для твоего use case — он просто будет работать “немного неправильно”. switchOnFirst нет в Hello World примерах. Reactor не предупреждает о блокировках на event loop в runtime.

Реактивный стек требует понимать не только что делают операторы, но и почему именно этот оператор — а не очевидная альтернатива.

Эти пять проблем я увидел только под нагрузкой. Если вы только начинаете с WebFlux — прогоните нагрузочные тесты до того как идти в production. Лучше найти здесь.

Если у вас есть опыт с реактивным программированием и есть детали о которых я не говорил, на что стоит обратить внимание, отпишите в комментах, буду благодарен

Это мой канал, всех жду — @java_quant | @KarimAbushaev

ссылка на оригинал статьи https://habr.com/ru/articles/1039618/