Kafka умеет синхронно. В Spring Boot

от автора

Apache Kafka — король асинхронного взаимодействия в микросервисных архитектурах. Но что если нужно получить ответ сразу? Перевод от команды Spring АйО шаг за шагом покажет, как превратить Kafka в инструмент синхронной коммуникации — с настройкой ReplyingKafkaTemplate, топиками для ответа и тайм-аутами.


1. Обзор

Apache Kafka зарекомендовал себя как одна из самых популярных и широко используемых систем обмена сообщениями для event-driven архитектур. В таких архитектурах один микросервис публикует сообщение в топик, а другой микросервис асинхронно потребляет и обрабатывает его.

Тем не менее, в некоторых сценариях требуется немедленный ответ от микросервиса-publisher`а для продолжения дальнейшей обработки. Хотя Kafka изначально предназначена для асинхронного взаимодействия, её можно настроить для поддержки синхронной коммуникации по принципу запрос-ответ с использованием отдельных топиков.

В этой статье мы рассмотрим, как реализовать синхронную коммуникацию типа запрос-ответ в приложении Spring Boot с использованием Apache Kafka.

2. Настройка проекта

Для демонстрации мы смоделируем систему отправки уведомлений. Мы создадим одно приложение на Spring Boot, которое будет одновременно выступать в роли продюсера и консюмера.

2.1. Зависимости

Начнём с добавления зависимости Spring Kafka в файл pom.xml нашего проекта:

<dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId>     <version>3.3.4</version> </dependency>

Эта зависимость предоставляет необходимые классы для установления соединения и взаимодействия с настроенным экземпляром Kafka.

2.2. Определение сообщений запроса и ответа

Далее определим два record-класса, которые будут представлять сообщения запроса и ответа:

record NotificationDispatchRequest(String emailId, String content) { }  public record NotificationDispatchResponse(UUID notificationId) { }

Здесь record NotificationDispatchRequest содержит emailId и содержимое уведомления, а record NotificationDispatchResponse включает уникальный notificationId, который генерируется после обработки запроса.

2.3. Определение топиков Kafka и конфигурационных свойств

Теперь определим топики Kafka для запроса и ответа. Кроме того, настроим длительность тайм-аута для получения ответа от компонента-консюмера.

Эти свойства мы сохраним в файле application.yaml нашего проекта и воспользуемся аннотацией @ConfigurationProperties для сопоставления значений с Java record`ом, к которому смогут обращаться конфигурационный и сервисный уровни приложения:

@Validated @ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous") record SynchronousKafkaProperties(     @NotBlank     String requestTopic,      @NotBlank     String replyTopic,      @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)     Duration replyTimeout ) { }

Мы также добавили аннотации валидации, чтобы гарантировать корректную настройку всех необходимых свойств. Если какая-либо из проверок не будет пройдена, контекст приложения Spring ApplicationContext не сможет подняться. Это позволяет нам придерживаться принципа fail-fast.

Ниже приведён фрагмент файла application.yaml, в котором определены необходимые свойства. Эти свойства будут автоматически сопоставлены с record`ом SynchronousKafkaProperties:

com:   baeldung:     kafka:       synchronous:         request-topic: notification-dispatch-request         reply-topic: notification-dispatch-response         reply-timeout: 30s

Здесь мы настраиваем имена топиков Kafka для запроса и ответа, а также тайм-аут ожидания ответа, равный тридцати секундам.

Помимо наших пользовательских свойств, добавим также несколько основных конфигурационных параметров Kafka в файл application.yaml:

spring:   kafka:     bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}     producer:       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer     consumer:       group-id: synchronous-kafka-group       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer       properties:         spring:           json:             trusted:               packages: com.baeldung.kafka.synchronous     properties:       allow:         auto:           create:             topics: true

Прежде всего, чтобы наше приложение могло подключиться к настроенному экземпляру Kafka, мы указываем URL bootstrap-сервера, используя переменную окружения.

Далее мы настраиваем свойства сериализации и десериализации ключей и значений как для продюсера, так и для consumer`a. Кроме того, для consumer`a мы указываем group-id и задаём доверенный пакет, содержащий наши record`ы запроса и ответа, для корректной JSON-десериализации.

После настройки вышеуказанных свойств Spring Kafka автоматически создаёт для нас бины типов ConsumerFactory и ProducerFactory. Мы будем использовать их для определения дополнительных конфигурационных бинов Kafka в следующем разделе.

Наконец, мы включаем автоматическое создание топиков, чтобы Kafka могла создавать их при необходимости. Важно отметить, что данное свойство включено исключительно для целей демонстрации — в production приложениях так делать не следует.

2.4. Определение конфигурационных бинов Kafka

После настройки конфигурационных свойств давайте определим необходимые конфигурационные бины Kafka:

@Bean KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(     ConsumerFactory<String, NotificationDispatchResponse> consumerFactory ) {     String replyTopic = synchronousKafkaProperties.replyTopic();     ContainerProperties containerProperties = new ContainerProperties(replyTopic);     return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); }

Сначала мы внедряем экземпляр ConsumerFactory и используем его вместе с настроенным replyTopic для создания бина KafkaMessageListenerContainer. Этот бин отвечает за создание контейнера, который слушает сообщения из нашего replyTopic-а, то есть топика ответов.

Далее мы определим основной бин, который будет использоваться на уровне сервиса для реализации синхронного взаимодействия:

@Bean ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(     ProducerFactory<String, NotificationDispatchRequest> producerFactory,     KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer ) {     Duration replyTimeout = synchronousKafkaProperties.replyTimeout();     var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);     replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);     return replyingKafkaTemplate; }

С использованием ProducerFactory и ранее определённого бина KafkaMessageListenerContainer мы создаём бин ReplyingKafkaTemplate. Кроме того, с помощью внедрённого synchronousKafkaProperties настраиваем тайм-аут ожидания ответа, заданный в файле application.yaml. Этот параметр определяет, как долго наш сервис будет ждать ответное сообщение в replyTopic перед срабатыванием тайм-аута.

Бин ReplyingKafkaTemplate управляет взаимодействием между топиками запроса и ответа, обеспечивая возможность синхронной коммуникации через Kafka.

Наконец, определим бины, которые позволят нашему компоненту-consumer`у отправлять ответы обратно в топик ответов:

@Bean KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {     return new KafkaTemplate<>(producerFactory); }  @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(     ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,     KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate ) {     var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();     factory.setConsumerFactory(consumerFactory);     factory.setReplyTemplate(kafkaTemplate);     return factory; }
Комментарий от команды Spring АйО

Напомним, что в данном простом случае, одно и то же приложение как отправляет запрос в Kafka, так и посылает ответное сообщение на этот запрос в replyTopic.

Сначала мы создаём стандартный бин KafkaTemplate, используя бин ProducerFactory.

Затем, совместно с ConsumerFactory, мы используем его для определения бина KafkaListenerContainerFactory. Этот бин позволяет нашим компонентам-consumer`ам, которые потребляют сообщения из топика запросов, отправлять ответное сообщение в топик ответов после завершения необходимой обработки.

3. Реализация синхронной коммуникации с использованием Kafka

После завершения настройки конфигурации давайте реализуем синхронную коммуникацию по принципу запрос-ответ между двумя настроенными топиками Kafka.

3.1. Отправка и получение сообщений с использованием ReplyingKafkaTemplate

Сначала создадим класс NotificationDispatchService, который будет отправлять сообщения в настроенный топик запросов с помощью ранее определённого бина ReplyingKafkaTemplate:

@Service @EnableConfigurationProperties(SynchronousKafkaProperties.class) class NotificationDispatchService {      private final SynchronousKafkaProperties synchronousKafkaProperties;     private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;      // standard constructor      NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {         String requestTopic = synchronousKafkaProperties.requestTopic();         ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);          var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);         return requestReplyFuture.get().value();     } }

В методе dispatch() мы используем autowired экземпляр synchronousKafkaProperties, чтобы получить значение requestTopic, настроенное в файле application.yaml. Затем, совместно с переданным в аргументе метода объектом notificationDispatchRequest, мы создаём экземпляр ProducerRecord.

Далее мы передаём созданный ProducerRecord в метод sendAndReceive(), чтобы опубликовать сообщение в топик запросов. Этот метод возвращает объект RequestReplyFuture, который мы используем для ожидания ответа и последующего получения его значения.

Внутри, при вызове метода sendAndReceive(), класс ReplyingKafkaTemplate генерирует уникальный ID корреляции — случайный UUID — и добавляет его в заголовок исходящего Kafka сообщения. Также в заголовок добавляется имя топика для ответа, в который ожидается поступление ответного сообщения. Напомним, что мы уже настроили этот топик в бине KafkaMessageListenerContainer.

Бин ReplyingKafkaTemplate использует сгенерированный ID корреляции в качестве ключа для сохранения объекта RequestReplyFuture в потокобезопасной структуре ConcurrentHashMap. Это обеспечивает корректную работу в многопоточной среде и поддержку параллельных запросов.

3.2. Определение consumer`а сообщений Kafka

Теперь, чтобы завершить реализацию, создадим компонент-consumer, который будет обрабатывать сообщения из настроенного топика запросов и отправлять ответ в топик ответов:

@Component class NotificationDispatchListener {      @SendTo     @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")     NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {         // ... processing logic         UUID notificationId = UUID.randomUUID();         return new NotificationDispatchResponse(notificationId);     } }

Мы используем аннотацию @KafkaListener для прослушивания топика запросов, указанного в файле application.yaml.

Внутри метода listen() мы просто возвращаем record NotificationDispatchResponse, содержащий уникальный notificationId.

Ключевой момент — аннотация @SendTo, которой мы помечаем метод. Она сообщает Spring Kafka, что из заголовков входящего сообщения необходимо извлечь ID корреляции и имя топика для ответа. Эти данные используются для автоматической отправки возвращаемого значения метода в соответствующий топик ответов с тем же ID корреляции в заголовке сообщения.

Комментарий от команды Spring АйО

На самом деле, аннотация @SendTo общая, и не имеет чёткой привязки к Spring Kafka проекту. Она также используется в проектах Spring Integration и в Spring Jms, например. То, как Spring обрабатывает @SendTo , очень сильно зависит от технологии (Spring Kafka, Spring Jms и т.д.), от настроек среды окружения и т.д. Поэтому будьте с ней аккуратны!

Благодаря этому бин ReplyingKafkaTemplate в классе NotificationDispatchService может по ID корреляции найти соответствующий объект RequestReplyFuture и получить ожидаемый ответ.

4. Заключение

В этой статье мы рассмотрели использование Apache Kafka для реализации синхронной коммуникации между двумя компонентами в приложении Spring Boot.

Мы пошагово прошли через необходимые этапы конфигурации и смоделировали систему отправки уведомлений.

С помощью ReplyingKafkaTemplate можно преобразовать асинхронную природу Apache Kafka в паттерн синхронного взаимодействия по принципу запрос-ответ. Этот подход является нетипичным, поэтому перед его применением в production среде важно тщательно оценить его соответствие архитектуре проекта.

Как всегда, все примеры кода, использованные в статье, доступны на GitHub.


Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано


ссылка на оригинал статьи https://habr.com/ru/articles/934936/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *