Перевел @middle_java
Дата оригинальной статьи: 26 October 2018
Apache Kafka по своей природе асинхронна. Следовательно, семантика «Запрос-Ответ» для Apache Kafka не является естественной. Тем не менее эта задача не нова. Паттерн Интеграции Корпоративных приложений (Enterprise Integration Pattern) Request-Reply обеспечивает проверенный механизм синхронного обмена сообщениями по асинхронным каналам:

Паттерн Return Address (Адрес Возврата) дополняет паттерн Request-Reply механизмом указания запрашивающей стороной адреса, на который должен быть отправлен ответ:

Недавно в Spring Kafka 2.1.3 была добавлена поддержка из коробки паттерна «Request Reply», а в версии 2.2 были отполированы некоторые из его шероховатостей. Рассмотрим, как работает эта поддержка:
На стороне клиента: ReplyingKafkaTemplate
Хорошо известная абстракция Template (Шаблон) формирует в Spring основу для клиентской части механизма Request-Reply.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Здесь все довольно прямолинейно: мы настраиваем ReplyingKafkaTemplate, который отправляет сообщения-запросы со String ключами и получает сообщения-ответы со String ключами. Вместе с тем ReplyingKafkaTemplate должен быть основан на ProducerFactory Запроса, ConsumerFactory Ответа и MessageListenerContainer с соответствующими конфигурациями консюмера и продюсера. Следовательно, необходимая конфигурация довольно развесиста:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
В этом случае использование replyKafkaTemplate для отправки синхронного запроса и получения ответа выглядит следующим образом:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); //создаем producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); // устанавливаем топик для ответа в заголовке record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); // отправляем запрос в топик Kafka и асинхронно получаем ответ в указанный топик для ответа RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { // получаем значение consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } });
Здесь также много бойлерплейта и низкоуровневого API, да еще этот устаревший API ListenableFuture вместо современной CompletableFuture.
requestReplyKafkaTemplate заботится о генерации и установке заголовка KafkaHeaders.CORRELATION_ID, но мы должны явно задать заголовок KafkaHeaders.REPLY_TOPIC для запроса. Обратите также внимание, что этот же топик для ответа был излишне заинжектен выше в replyListenerContainer. Гадость какая-то. Не совсем то, чего я ожидал от абстракции Spring.
Серверная сторона: @SendTo
На стороне сервера обычный KafkaListener, прослушивающий топик для запроса, дополнительно декорирован аннотацией @SendTo, чтобы предоставить сообщение-ответ. Объект, возвращаемый методом слушателя, автоматически оборачивается (wrapped) в ответное сообщение, добавляется CORRELATION_ID и ответ публикуется в топике, указанном в заголовке REPLY_TOPIC.
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Здесь также требуется некоторая конфигурация, но конфигурация слушателя проще:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
Но как насчет нескольких экземпляров консюмера?
Все вроде работает, пока мы не используем несколько экземпляров консюмера. Если у нас есть несколько экземпляров клиента, мы должны убедиться, что ответ отправляется в корректный экземпляр клиента. Документация Spring Kafka предполагает, что каждый консюмер может использовать уникальный топик или, что с запросом отправляется дополнительное значение заголовка KafkaHeaders.RESPONSE_PARTITION — четырехбайтовое поле, содержащее BIG-ENDIAN-представление целочисленного номера раздела. Использование раздельных топиков для разных клиентов явно не очень гибко, поэтому мы выбираем явную настройку REPLY_PARTITION. Тогда клиент должен знать, на какую партицию он назначен. В документации предлагается использовать явную конфигурацию для выбора конкретной партиции. Давайте добавим ее к нашему примеру:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
Не очень красиво, но это работает. Требуемая конфигурация обширна и API выглядит низкоуровнево. Необходимость явной настройки партиций усложняет процесс динамического масштабирования количества клиентов. Очевидно, можно сделать лучше.
Инкапсулирование обработки топика для ответа и партиции
Давайте начнем с инкапсуляции паттерна Return Address, передавая вместе топик для ответа и партицию. Топик для ответа должен быть заинжектен в RequestReplyTemplate и, следовательно, вообще не должен присутствовать в API. Когда речь идет о партициях для ответа, сделаем наоборот: извлечем партицию (партиции), назначенную слушателю топика для ответа, и передадим эту партицию автоматически. Это избавляет клиента от необходимости заботиться об этих заголовках.
При этом, давайте также сделаем API таким, чтобы он напоминал стандартный KafkaTemplate (перегрузим метод sendAndReceive() упрощенными параметрами и добавим соответствующие перегруженные методы, использующие настроенный по умолчанию топик):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Следующий шаг: Адаптируем ListenableFuture к более современной CompletableFuture.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; // добавим коллбек к результату отправки запроса requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); // добавим коллбек к ответу requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } }
Упакуем это в утилитную библиотеку и теперь у нас есть API, который намного больше соответствует основной философии проектирования Spring «Соглашение над Конфигурацией» («Convention over Configuration»). Вот итоговый код клиента:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Подводим итоги
Подводя итог, Spring для Kafka 2.2 обеспечивает полностью функциональную реализацию паттерна Request-Reply в Apache Kafka, но API по-прежнему имеет некоторые шероховатости. В этом выпуске мы увидели, что некоторые дополнительные абстракции и адаптации API могут дать более логичный высокоуровневый API.
Предупреждение 1:
Одним из главных преимуществ архитектуры, управляемой событиями, является разделение (decoupling) продюсеров и консюмеров событий, что позволяет создавать гораздо более гибкие и эволюционирующие системы. Использование синхронной семантики «Запрос-Ответ» является полной противоположностью, когда запрашивающая и отвечающая стороны сильно связаны между собой. Следовательно, ее следует использовать только в случае необходимости.
Предупреждение 2:
Если требуется синхронный Запрос-Ответ, то протокол на основе HTTP намного проще и эффективнее, чем использование асинхронного канала типа Apache Kafka.
Тем не менее, могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл. Разумно выбирайте лучший инструмент для работы.
Полностью рабочий пример можно найти на сайте github.com/callistaenterprise/blog-synchronous-kafka.
Комментарии
Federico • 7 месяцев назад
А когда у нас есть гибридные потребности, допустим, в 50% кейсов нам нужен Запрос-Ответ и в 50% нам нужно событийное управление? Как нам это сделать? Конфигурация, необходимая Spring Kafka, выглядит довольно ужасно.
Jehanzeb Qayyum • 6 месяцев назад
Теперь Spring имеет дефолтную поддержку с использованием партиций в одном общем топике для ответа.
Начиная с версии 2.2, шаблон пытается определить топик для ответа или партицию из сконфигурированного контейнера ответа (reply container).
https://docs.spring.io/spring-kafka/reference/html/#replying-template
nir rozenberg • 8 месяцев назад
Привет,
В последнем абзаце вы написали, что могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл по сравнению с HTTP.
Можно привести примеры таких сценариев?
Спасибо,
Nir
Janne Keskitalo nir rozenberg • 8 месяцев назад
Мы собираемся разделить систему обработки транзакций большого объема на несколько микросервисов и у меня есть идея использовать обмен сообщениями Kafka «Запрос-Ответ» для достижения похожих способов обработки (processing affinity). В основном Kafka используется для маршрутизации всех вызовов одного клиента в один и тот же процесс обработчика транзакций, который затем последовательно их выполняет по одному. Такой вид обработки гарантирует линеаризируемость (https://stackoverflow.com/a/19515375/7430325), причинно-следственную связность, а также позволяет эффективное кэширование. По сути, усилия по координации были бы перенесены из базы данных в Kafka и мы могли бы запустить базу данных в строгом режиме изоляции Serializable.
Мне еще предстоит углубиться в детали нашей семантики транзакций, чтобы увидеть, где здесь не дотягивает, так что это пока просто идея.
Перевел @middle_java
ссылка на оригинал статьи https://habr.com/ru/post/476156/
Добавить комментарий