Apache Kafka — король асинхронного взаимодействия в микросервисных архитектурах. Но что если нужно получить ответ сразу? Перевод от команды Spring АйО шаг за шагом покажет, как превратить Kafka в инструмент синхронной коммуникации — с настройкой ReplyingKafkaTemplate, топиками для ответа и тайм-аутами.
1. Обзор
Apache Kafka зарекомендовал себя как одна из самых популярных и широко используемых систем обмена сообщениями для event-driven архитектур. В таких архитектурах один микросервис публикует сообщение в топик, а другой микросервис асинхронно потребляет и обрабатывает его.
Тем не менее, в некоторых сценариях требуется немедленный ответ от микросервиса-publisher`а для продолжения дальнейшей обработки. Хотя Kafka изначально предназначена для асинхронного взаимодействия, её можно настроить для поддержки синхронной коммуникации по принципу запрос-ответ с использованием отдельных топиков.
В этой статье мы рассмотрим, как реализовать синхронную коммуникацию типа запрос-ответ в приложении Spring Boot с использованием Apache Kafka.
2. Настройка проекта
Для демонстрации мы смоделируем систему отправки уведомлений. Мы создадим одно приложение на Spring Boot, которое будет одновременно выступать в роли продюсера и консюмера.
2.1. Зависимости
Начнём с добавления зависимости Spring Kafka в файл pom.xml нашего проекта:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.3.4</version> </dependency>
Эта зависимость предоставляет необходимые классы для установления соединения и взаимодействия с настроенным экземпляром Kafka.
2.2. Определение сообщений запроса и ответа
Далее определим два record-класса, которые будут представлять сообщения запроса и ответа:
record NotificationDispatchRequest(String emailId, String content) { } public record NotificationDispatchResponse(UUID notificationId) { }
Здесь record NotificationDispatchRequest содержит emailId и содержимое уведомления, а record NotificationDispatchResponse включает уникальный notificationId, который генерируется после обработки запроса.
2.3. Определение топиков Kafka и конфигурационных свойств
Теперь определим топики Kafka для запроса и ответа. Кроме того, настроим длительность тайм-аута для получения ответа от компонента-консюмера.
Эти свойства мы сохраним в файле application.yaml нашего проекта и воспользуемся аннотацией @ConfigurationProperties для сопоставления значений с Java record`ом, к которому смогут обращаться конфигурационный и сервисный уровни приложения:
@Validated @ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous") record SynchronousKafkaProperties( @NotBlank String requestTopic, @NotBlank String replyTopic, @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2) Duration replyTimeout ) { }
Мы также добавили аннотации валидации, чтобы гарантировать корректную настройку всех необходимых свойств. Если какая-либо из проверок не будет пройдена, контекст приложения Spring ApplicationContext не сможет подняться. Это позволяет нам придерживаться принципа fail-fast.
Ниже приведён фрагмент файла application.yaml, в котором определены необходимые свойства. Эти свойства будут автоматически сопоставлены с record`ом SynchronousKafkaProperties:
com: baeldung: kafka: synchronous: request-topic: notification-dispatch-request reply-topic: notification-dispatch-response reply-timeout: 30s
Здесь мы настраиваем имена топиков Kafka для запроса и ответа, а также тайм-аут ожидания ответа, равный тридцати секундам.
Помимо наших пользовательских свойств, добавим также несколько основных конфигурационных параметров Kafka в файл application.yaml:
spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: synchronous-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.baeldung.kafka.synchronous properties: allow: auto: create: topics: true
Прежде всего, чтобы наше приложение могло подключиться к настроенному экземпляру Kafka, мы указываем URL bootstrap-сервера, используя переменную окружения.
Далее мы настраиваем свойства сериализации и десериализации ключей и значений как для продюсера, так и для consumer`a. Кроме того, для consumer`a мы указываем group-id и задаём доверенный пакет, содержащий наши record`ы запроса и ответа, для корректной JSON-десериализации.
После настройки вышеуказанных свойств Spring Kafka автоматически создаёт для нас бины типов ConsumerFactory и ProducerFactory. Мы будем использовать их для определения дополнительных конфигурационных бинов Kafka в следующем разделе.
Наконец, мы включаем автоматическое создание топиков, чтобы Kafka могла создавать их при необходимости. Важно отметить, что данное свойство включено исключительно для целей демонстрации — в production приложениях так делать не следует.
2.4. Определение конфигурационных бинов Kafka
После настройки конфигурационных свойств давайте определим необходимые конфигурационные бины Kafka:
@Bean KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer( ConsumerFactory<String, NotificationDispatchResponse> consumerFactory ) { String replyTopic = synchronousKafkaProperties.replyTopic(); ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); }
Сначала мы внедряем экземпляр ConsumerFactory и используем его вместе с настроенным replyTopic для создания бина KafkaMessageListenerContainer. Этот бин отвечает за создание контейнера, который слушает сообщения из нашего replyTopic-а, то есть топика ответов.
Далее мы определим основной бин, который будет использоваться на уровне сервиса для реализации синхронного взаимодействия:
@Bean ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate( ProducerFactory<String, NotificationDispatchRequest> producerFactory, KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer ) { Duration replyTimeout = synchronousKafkaProperties.replyTimeout(); var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer); replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout); return replyingKafkaTemplate; }
С использованием ProducerFactory и ранее определённого бина KafkaMessageListenerContainer мы создаём бин ReplyingKafkaTemplate. Кроме того, с помощью внедрённого synchronousKafkaProperties настраиваем тайм-аут ожидания ответа, заданный в файле application.yaml. Этот параметр определяет, как долго наш сервис будет ждать ответное сообщение в replyTopic перед срабатыванием тайм-аута.
Бин ReplyingKafkaTemplate управляет взаимодействием между топиками запроса и ответа, обеспечивая возможность синхронной коммуникации через Kafka.
Наконец, определим бины, которые позволят нашему компоненту-consumer`у отправлять ответы обратно в топик ответов:
@Bean KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) { return new KafkaTemplate<>(producerFactory); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory( ConsumerFactory<String, NotificationDispatchRequest> consumerFactory, KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate ) { var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>(); factory.setConsumerFactory(consumerFactory); factory.setReplyTemplate(kafkaTemplate); return factory; }
Комментарий от команды Spring АйО
Напомним, что в данном простом случае, одно и то же приложение как отправляет запрос в Kafka, так и посылает ответное сообщение на этот запрос в replyTopic.
Сначала мы создаём стандартный бин KafkaTemplate, используя бин ProducerFactory.
Затем, совместно с ConsumerFactory, мы используем его для определения бина KafkaListenerContainerFactory. Этот бин позволяет нашим компонентам-consumer`ам, которые потребляют сообщения из топика запросов, отправлять ответное сообщение в топик ответов после завершения необходимой обработки.
3. Реализация синхронной коммуникации с использованием Kafka
После завершения настройки конфигурации давайте реализуем синхронную коммуникацию по принципу запрос-ответ между двумя настроенными топиками Kafka.
3.1. Отправка и получение сообщений с использованием ReplyingKafkaTemplate
Сначала создадим класс NotificationDispatchService, который будет отправлять сообщения в настроенный топик запросов с помощью ранее определённого бина ReplyingKafkaTemplate:
@Service @EnableConfigurationProperties(SynchronousKafkaProperties.class) class NotificationDispatchService { private final SynchronousKafkaProperties synchronousKafkaProperties; private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate; // standard constructor NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) { String requestTopic = synchronousKafkaProperties.requestTopic(); ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest); var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord); return requestReplyFuture.get().value(); } }
В методе dispatch() мы используем autowired экземпляр synchronousKafkaProperties, чтобы получить значение requestTopic, настроенное в файле application.yaml. Затем, совместно с переданным в аргументе метода объектом notificationDispatchRequest, мы создаём экземпляр ProducerRecord.
Далее мы передаём созданный ProducerRecord в метод sendAndReceive(), чтобы опубликовать сообщение в топик запросов. Этот метод возвращает объект RequestReplyFuture, который мы используем для ожидания ответа и последующего получения его значения.
Внутри, при вызове метода sendAndReceive(), класс ReplyingKafkaTemplate генерирует уникальный ID корреляции — случайный UUID — и добавляет его в заголовок исходящего Kafka сообщения. Также в заголовок добавляется имя топика для ответа, в который ожидается поступление ответного сообщения. Напомним, что мы уже настроили этот топик в бине KafkaMessageListenerContainer.
Бин ReplyingKafkaTemplate использует сгенерированный ID корреляции в качестве ключа для сохранения объекта RequestReplyFuture в потокобезопасной структуре ConcurrentHashMap. Это обеспечивает корректную работу в многопоточной среде и поддержку параллельных запросов.
3.2. Определение consumer`а сообщений Kafka
Теперь, чтобы завершить реализацию, создадим компонент-consumer, который будет обрабатывать сообщения из настроенного топика запросов и отправлять ответ в топик ответов:
@Component class NotificationDispatchListener { @SendTo @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}") NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) { // ... processing logic UUID notificationId = UUID.randomUUID(); return new NotificationDispatchResponse(notificationId); } }
Мы используем аннотацию @KafkaListener для прослушивания топика запросов, указанного в файле application.yaml.
Внутри метода listen() мы просто возвращаем record NotificationDispatchResponse, содержащий уникальный notificationId.
Ключевой момент — аннотация @SendTo, которой мы помечаем метод. Она сообщает Spring Kafka, что из заголовков входящего сообщения необходимо извлечь ID корреляции и имя топика для ответа. Эти данные используются для автоматической отправки возвращаемого значения метода в соответствующий топик ответов с тем же ID корреляции в заголовке сообщения.
Комментарий от команды Spring АйО
На самом деле, аннотация @SendTo общая, и не имеет чёткой привязки к Spring Kafka проекту. Она также используется в проектах Spring Integration и в Spring Jms, например. То, как Spring обрабатывает @SendTo , очень сильно зависит от технологии (Spring Kafka, Spring Jms и т.д.), от настроек среды окружения и т.д. Поэтому будьте с ней аккуратны!
Благодаря этому бин ReplyingKafkaTemplate в классе NotificationDispatchService может по ID корреляции найти соответствующий объект RequestReplyFuture и получить ожидаемый ответ.
4. Заключение
В этой статье мы рассмотрели использование Apache Kafka для реализации синхронной коммуникации между двумя компонентами в приложении Spring Boot.
Мы пошагово прошли через необходимые этапы конфигурации и смоделировали систему отправки уведомлений.
С помощью ReplyingKafkaTemplate можно преобразовать асинхронную природу Apache Kafka в паттерн синхронного взаимодействия по принципу запрос-ответ. Этот подход является нетипичным, поэтому перед его применением в production среде важно тщательно оценить его соответствие архитектуре проекта.
Как всегда, все примеры кода, использованные в статье, доступны на GitHub.

Присоединяйтесь к русскоязычному сообществу разработчиков на Spring Boot в телеграм — Spring АйО, чтобы быть в курсе последних новостей из мира разработки на Spring Boot и всего, что с ним связано
ссылка на оригинал статьи https://habr.com/ru/articles/934936/
Добавить комментарий