Кнопка «F5» устала: real-time уведомления в микросервисной архитектуре

от автора

Представьте себе: у вас железнодорожная станция, сотни вагонов, десятки пользователей в системе, каждый раз кто-то нажимает кнопку «Обновить», чтобы узнать — разгрузили ли нужный вагон.

Вся логика обновления построена на «manual refresh». Да-да, пользователь сам жмёт кнопку, чтобы получить свежие данные. Система автоматической разгрузки или другой человек разгрузил что-то на другом конце станции, но вы об этом не узнаете, пока не перезагрузите страницу.

А ещё — избыток HTTP-запросов, polling, перегруженные серверы и полное отсутствие real-time взаимодействия.

Есть вариант! Масштабируемая и отказоустойчивая архитектура с использованием Redis Sentinel + Pub/Sub + WebSocket/SSE.

В статье расскажем, какие проблемы возникают с real-time в Kubernetes, почему стандартные WebSocket-подходы не работают при нескольких подах, как построить отказоустойчивую систему с Redis Sentinel, как сделать real-time UI, сохранив отказоустойчивость и масштабируемость, и как всё это запустить локально для отладки.

Проблема: Медленный и статичный интерфейс

  • Пользователи не видят изменения в реальном времени

  • Если кто-то разгрузил вагоны на другом компьютере, на UI это не отобразится без перезагрузки страницы

  • Частые HTTP-запросы (polling) перегружают сервер

Задача: Реализовать real-time отображение

✔ Сразу показывать обновление количества разгруженных вагонов всем пользователям
✔ Минимизировать нагрузку на сервер (избавиться от постоянного polling’а)
✔ Обеспечить масштабируемость и отказоустойчивость, даже если развернуто несколько подов в Kubernetes

Когда система состоит из нескольких микросервисов, важно организовать масштабируемую и отказоустойчивую доставку уведомлений.
Стандартный WebSocket или SSE (Server-Sent Events) в монолитах работает просто:
Клиенты подписываются → Сервер отправляет уведомления напрямую.

Но если сервис развернут в Kubernetes (K8s) с несколькими подами, возникают сложности:

  • Клиент подключается к одному поду и может не получить уведомление, отправленное другим подом

  • Балансировщик распределяет подключения случайно

  • Если под рестартуется – все подключения теряются

Как мы попробуем решить эту задачу?

Мы разделим архитектуру уведомлений на три ключевых компонента:

1) Бизнес-микросервисы (генераторы событий) – публикуют события в Redis Pub/Sub.
2) Redis Sentinel + Redis Pub/Sub (брокер сообщений) – обеспечивает маршрутизацию и отказоустойчивость.
3) Сервис уведомлений (Notifier Service) с WebSocket/SSE – подписывается на Redis и доставляет уведомления клиентам в реальном времени.

!Клиенту неважно, какой под обрабатывает его WebSocket/SSE – все они получают одно и то же уведомление.

Проблема: Недостатки обычного Redis в Kubernetes

Обычно используют Redis Pub/Sub, но у него есть минусы:
1) Один Redis без Sentinel — один instance (точка отказа, нет High Availability (Высокой Доступности))
2) Если Redis упадёт, то все поды потеряют соединение с уведомлениями

   Какое решение нам нужно?

✔ Redis Sentinel автоматически переключает мастера при сбоях, а реплики обеспечивают высокую доступность и отказоустойчивость

✔ Сервис уведомлений подключается к Redis через Sentinel, поэтому при сбоях система автоматически переподключается к новому мастеру без простоев 

✔ Сервис уведомлений в нескольких подах, который слушает Redis через Sentinel, получает уведомления через Pub/Sub и рассылает их пользователям через WebSocket и SSE 

 Что это нам даёт?

  • Если один Redis-узел падает, Sentinel автоматически назначает новый мастер, и сервис продолжает получать данные.

  • Если под сервиса уведомлений перезапускается, соединение SSE/WebSocket разрывается. Клиентский код обнаружит разрыв (onerror/onclose) и попробует автоматически переподключиться. Балансировщик Kubernetes направит новый запрос клиента на доступный под.

  • Нагрузка на сервер минимизирована — нет постоянных HTTP-запросов (polling).

Как это работает в коде?

dependencies
       <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-data-redis-reactive</artifactId>             <version>3.4.2</version>         </dependency>         <dependency>             <groupId>io.lettuce</groupId>             <artifactId>lettuce-core</artifactId>             <version>6.6.0.BETA2</version>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-webflux</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-websocket</artifactId>        </dependency>

 1. Публикация уведомлений в Redis Sentinel

@Service @RequiredArgsConstructor public class MessageNotificationService {      private final ReactiveStringRedisTemplate redisTemplate;     private final MessageNotificationConverter messageNotificationConverter;     /**      * Публикует уведомление в канал Redis Pub/Sub.      * <p>      * Уведомление отправляется во все подписанные сервисы через механизм        * Pub/Sub, работающий в Redis Sentinel.      * </p>      *      * @param notificationMessage Сообщение {@link NotificationMessageReq},        *                            которое нужно отправить подписчикам. (Ваша ДТО,      которое должно быть приведено к единому формату с ответным DTO для удобства сериализации в JSON.)      */     public void publish(NotificationMessageReq notificationMessage) {         redisTemplate.convertAndSend(                 "notifications_channel",  //  Pub/Sub канал в Redis, в который публикуем сообщение                 messageNotificationConverter.convert(notificationMessage) //  Преобразуем в JSON         ).subscribe();     } } 

Теперь уведомление реплицируется в кластер Redis Sentinel и доставляется на все поды в Kubernetes.

2. Чтение Redis Pub/Sub и пересылка через SSE/WebSockets

Конфигурация реактивных компонентов для работы с Redis и потоками уведомлений
import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer; import reactor.core.publisher.Sinks;   /**  * Конфигурация реактивных компонентов для работы с Redis и потоками уведомлений.  */ @Configuration @RequiredArgsConstructor public class ReactiveListenerConfig {      private final ReactiveStringRedisTemplate redisTemplate;      /**      * Создаёт и настраивает {@link ReactiveRedisMessageListenerContainer}, который слушает каналы Redis.      * <p>      * Используется для подписки на сообщения в Redis с возможностью реактивной обработки.      * </p>      *      * @return Экземпляр {@link ReactiveRedisMessageListenerContainer} с подключением к Redis.      */     @Bean     public ReactiveRedisMessageListenerContainer listenerContainer() {         return new ReactiveRedisMessageListenerContainer(redisTemplate.getConnectionFactory());     }      /**      * Создаёт многопоточный (`multicast`) Sink для отправки уведомлений в реактивном стиле.      * <p>      * Позволяет подписчикам получать уведомления в реальном времени.      * </p>      * <p>      * Используется для потоковой отправки объектов {@link NotificationMessageResp } - ваша придуманная ДТО.      * </p>      *      * @return Экземпляр {@link Sinks.Many} для обработки уведомлений.      */     @Bean     public Sinks.Many<NotificationMessageResp> notificationSink() {         return Sinks.many().multicast().directAllOrNothing();     } }

Конфигурация WebSocket
import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;  /**  * Конфигурация WebSocket для отправки уведомлений.  */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {      /**      * Настраивает WebSocket STOMP Endpoints.      * Клиенты подключаются через `/ws`.      */     @Override     public void registerStompEndpoints(StompEndpointRegistry registry) {         registry.addEndpoint("/ws").setAllowedOrigins("*"); // Подключение на `/ws`     }      /**      * Настраивает брокер сообщений WebSocket.      * Клиенты могут подписываться на `/topic/notifications`.      */     @Override     public void configureMessageBroker(MessageBrokerRegistry registry) {         registry.enableSimpleBroker("/topic"); // Канал для уведомлений         registry.setApplicationDestinationPrefixes("/app");     } }

NotifierService подписывается на Redis Pub/Sub и отправляет уведомления WebSocket/SSE клиентам

@Service @RequiredArgsConstructor public class NotifierService {         private final ReactiveRedisMessageListenerContainer listenerContainer;          private final Sinks.Many<NotificationMessageResp> notificationSink;        @PostConstruct     public void subscribeToRedisNotifications() {         listenerContainer.receive(new ChannelTopic("notifications_channel")) //слушаем тот же канал, в который слали сообщение                 .map(message -> messageNotificationConverter.convert(message.getMessage()))                 .doOnNext(notification -> {                     log.info(" Получено уведомление из Redis: {}", notification);                        // Отправка в SSE                     notificationSink.tryEmitNext(notification);                       // Отправка в WebSocket                     messagingTemplate.convertAndSend("/topic/notifications", notification);                 })                 .subscribe();     }   }

3. SSE + Heartbeat, чтобы соединение не разрывалось

Браузеры и балансировщики часто рвут SSE по «таймауту» – исправляем это «пингами»

 @Operation(summary = "Получить поток уведомлений",                description = "Позволяет подписаться на SSE-уведомления из Redis-канала `notifications_channel`.",                responses = @ApiResponse(                        content = @Content(array = @ArraySchema(schema =                        @Schema(implementation = NotificationMessageResp.class)),                                           mediaType = MediaType.TEXT_EVENT_STREAM_VALUE))) @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<NotificationMessageResp> getSseNotifications() {     return Flux.merge(         notificationSink.asFlux(),        // Heartbeat ping         Flux.interval(Duration.ofSeconds(2))             .map(e -> NotificationMessageResp.builder()                                              .event(EventType.PING.name())                                              .data(RecordData.builder().timestamp(Instant.now()).build())                                              .build()))       .share(); //Все подписанные клиенты получат сообщения одновременно! }
Инструкция: Как задать таймаут в HAProxy Ingress через Kubernetes Dashboard

Можно избавиться от этих строк:

Flux.interval(Duration.ofSeconds(2))             .map(e -> NotificationMessageResp.builder()                                              .event(EventType.PING.name())                                              .data(RecordData.builder().timestamp(Instant.now()).build())                                              .build())

Если в вашем кластере Kubernetes используется HAProxy Ingress Controller

Шаги: Настройка таймаута в HAProxy Ingress (через UI)

1) Открываем Kubernetes Dashboard и переходим в раздел Networking → Routes.
2) Находим нужный сервис (notifications-service, если используем SSE/WebSocket).
3) Переходим во вкладку YAML (Редактирование манифеста).
4) В metadata.annotations добавляем следующую строку:

  haproxy.router.openshift.io/timeout: "10m"
apiVersion: route.openshift.io/v1 kind: Route metadata:   name: notifications-service   annotations:     haproxy.router.openshift.io/timeout: "10m"  #  Таймаут соединения (10 минут)

5) Сохраняем изменения и применяем конфигурацию.

Теперь HAProxy не будет разрывать соединения WebSocket/SSE при простое до 10 минут, а клиентский код автоматически переподключится в случае разрыва.

✔ Теперь SSE-соединение не разорвётся из-за тайм-аута!

Скриншёт

Производительность Redis Pub/Sub

Согласно документации Redis, система с 3 подами в Kubernetes может обрабатывать до 500 000 сообщений в секунду через Redis Pub/Sub, в зависимости от конфигурации серверов, сетевых задержек и размера сообщений.

Для нашей задачи такое решение более чем подходит.

Развертывание Redis Sentinel локально для экспериментов через Docker

 1) Создаём файлы конфигурации

 Создаём следующие файлы в рабочей директории:

docker-compose.yml
version: '3.8'  services:      redis-master:         image: redis:7.2         container_name: redis-master         restart: always         ports:             - "6379:6379"  #  Доступен на localhost:6379         networks:             - redis_network         command: [ "redis-server", "/etc/redis/redis.conf" ]         volumes:             - ./redis-master.conf:/etc/redis/redis.conf      redis-slave:         image: redis:7.2         container_name: redis-slave         restart: always         ports:             - "6380:6379"  #  Доступен на localhost:6380         networks:             - redis_network         command: [ "redis-server", "/etc/redis/redis.conf", "--replicaof", "redis-master", "6379" ]         volumes:             - ./redis-slave.conf:/etc/redis/redis.conf      redis-sentinel:         image: redis:7.2         container_name: redis-sentinel         restart: always         ports:             - "26379:26379"  #  Доступен на localhost:26379         networks:             - redis_network         command: ["redis-server", "/etc/redis/sentinel.conf", "--sentinel"]         volumes:             - ./sentinel.conf:/etc/redis/sentinel.conf  networks:     redis_network:         driver: bridge 

redis-master.conf
port 6379 bind 0.0.0.0 appendonly yes protected-mode no

redis-slave.conf
port 6379 bind 0.0.0.0 appendonly yes replicaof redis-master 6379 protected-mode no

sentinel.conf
port 26379 bind 0.0.0.0 sentinel monitor mymaster 127.0.0.1 6379 2 sentinel down-after-milliseconds mymaster 5000 sentinel failover-timeout mymaster 10000 sentinel parallel-syncs mymaster 1

Выполняем команду:

docker compose up -d
Как проверить работоспособность Redis Sentinel после развертывания?

После запуска Docker Compose с Redis Sentinel, можно выполнить следующие команды:

1) Проверка статуса Redis Sentinel

Убедимся, что Sentinel видит мастер-узел и следит за репликами:

docker exec -it redis-sentinel redis-cli -p 26379 info Sentinel

2) Проверка ролей мастер/реплика

 Проверим, кто сейчас мастер и сколько реплик подключено:

docker exec -it redis-master redis-cli info replication

Если всё работает, увидим строку:

role:master connected_slaves:1 

Аналогично можно проверить реплику:

docker exec -it redis-slave redis-cli info replication

Ответ должен содержать role:slave.

3) Тест Redis Pub/Sub (отправка уведомления)

 Подписка на канал notifications_channel:
Открываем первый терминал и запускаем слушатель Redis:

docker exec -it redis-master redis-cli SUBSCRIBE notifications_channel

Публикация сообщения в канал:
Во втором терминале отправляем тестовое событие:

docker exec -it redis-master redis-cli PUBLISH notifications_channel "Test message" 

В первом терминале должно появиться:

1) "message" 2) "notifications_channel" 3) "Test message" 

Это значит, что Pub/Sub работает корректно! 

Конфигурация spring-boot приложения для подключения к Redis Sentinel локально

application.yml
spring:    data:       redis:          sentinel:              master: ${SPRING_DATA_REDIS_SENTINEL_MASTER:mymaster}  # Имя master узла, указанное в sentinel.conf              nodes: ${SPRING_DATA_REDIS_SENTINEL_NODES:localhost:26379} # Список Sentinel узлов              password: ${SPRING_DATA_REDIS_SENTINEL_PASSWORD:mystrongpassword}  # Пароль для Sentinel (если он установлен)          password: ${SPRING_DATA_REDIS_PASSWORD:mystrongpassword}  # Пароль для Redis (отдельно от Sentinel). Для локальных эксперементов не пригодится.          timeout: ${SPRING_DATA_REDIS_TIMEOUT:60000}  # Таймаут в миллисекундах

Вывод: Почему Redis Sentinel + SSE/WebSocket — мощное решение?

✔ Уведомления отказоустойчивы благодаря Redis Sentinel — даже если одна нода Redis выходит из строя, система продолжает работать без перебоев.
✔ Поддержка SSE & WebSocket делает UI максимально реактивным — данные обновляются в реальном времени без необходимости ручного обновления страницы.
✔ Горизонтальное масштабирование в Kubernetes — сервис уведомлений легко масштабируется без дублирования событий.
✔ Redis Pub/Sub гарантирует, что все подписанные поды получают уведомления — нет проблем с балансировкой нагрузки.
✔ Стабильность соединений — обновление других микросервисов не влияет на сервис уведомлений, соединения SSE и WebSocket не разрываются при деплоях.
✔ Если Redis или сервис уведомлений вдруг перестанет работать, это затронет только механизм real-time обновления UI. Однако все остальные процессы, включая операции разгрузки вагонов, продолжат работать: пользователи смогут обновлять данные вручную.

Теперь система уведомлений в вашей архитектуре полностью отказоустойчива, масштабируема и готова к высоким нагрузкам! 

Дисклеймер: Все DTO и URL-адреса в данной статье являются выдуманными и приведены исключительно в демонстрационных целях. Все конфигурационные файлы основаны на стандартных настройках. Любые совпадения с реальными сервисами, системами или инфраструктурой случайны и не являются преднамеренными.


ссылка на оригинал статьи https://habr.com/ru/articles/882004/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *