Синхронный Запрос-Ответ с использованием Apache Kafka

от автора

Архитектуры, управляемые событиями (Event Driven Architecture), в целом, и Apache Kafka, в частности, привлекли в последнее время большое внимание. Для реализации всех преимуществ архитектуры, управляемой событиями, механизм делегирования событий должен быть по своей сути асинхронным. Тем не менее, могут существовать некоторые особые сценарии/потоки использования, в которых требуется семантика Синхронного Запроса-Ответа. В этом выпуске показано, как реализовать «Запрос-Ответ« с помощью Apache Kafka.

Перевел @middle_java

Дата оригинальной статьи: 26 October 2018

Apache Kafka по своей природе асинхронна. Следовательно, семантика «Запрос-Ответ» для Apache Kafka не является естественной. Тем не менее эта задача не нова. Паттерн Интеграции Корпоративных приложений (Enterprise Integration Pattern) Request-Reply обеспечивает проверенный механизм синхронного обмена сообщениями по асинхронным каналам:

Паттерн Return Address (Адрес Возврата) дополняет паттерн Request-Reply механизмом указания запрашивающей стороной адреса, на который должен быть отправлен ответ:

Недавно в Spring Kafka 2.1.3 была добавлена поддержка из коробки паттерна «Request Reply», а в версии 2.2 были отполированы некоторые из его шероховатостей. Рассмотрим, как работает эта поддержка:

На стороне клиента: ReplyingKafkaTemplate

Хорошо известная абстракция Template (Шаблон) формирует в Spring основу для клиентской части механизма Request-Reply.

 @Bean  public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate(      ProducerFactory < String, Request > pf,      KafkaMessageListenerContainer < String, Reply > lc) {      return new ReplyingKafkaTemplate < > (pf, lc);  }

Здесь все довольно прямолинейно: мы настраиваем ReplyingKafkaTemplate, который отправляет сообщения-запросы со String ключами и получает сообщения-ответы со String ключами. Вместе с тем ReplyingKafkaTemplate должен быть основан на ProducerFactory Запроса, ConsumerFactory Ответа и MessageListenerContainer с соответствующими конфигурациями консюмера и продюсера. Следовательно, необходимая конфигурация довольно развесиста:

 @Value("${kafka.topic.car.reply}")  private String replyTopic;   @Bean  public Map < String, Object > consumerConfigs() {      Map < String, Object > props = new HashMap < > ();      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);       return props;  }   @Bean  public Map < String, Object > producerConfigs() {      Map < String, Object > props = new HashMap < > ();      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);      return props;  }   @Bean  public ProducerFactory < String, Request > requestProducerFactory() {      return new DefaultKafkaProducerFactory < > (producerConfigs());  }   @Bean  public ConsumerFactory < String, Reply > replyConsumerFactory() {      return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),          new JsonSerializer < Reply > ());  }   @Bean  public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() {      ContainerProperties containerProperties = new ContainerProperties(replyTopic);      return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties);  }

В этом случае использование replyKafkaTemplate для отправки синхронного запроса и получения ответа выглядит следующим образом:

@Value("${kafka.topic.car.request}") private String requestTopic;  @Value("${kafka.topic.car.reply}") private String replyTopic;  @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;  ... RequestReply request = RequestReply.request(...); //создаем producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); // устанавливаем топик для ответа в заголовке record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); // отправляем запрос в топик Kafka и асинхронно получаем ответ в указанный топик для ответа RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () {     @Override     public void onSuccess(ConsumerRecord < String, Reply > result) {         // получаем значение consumer record         Reply reply = result.value();         System.out.println("Reply: " + reply.toString());     } });

Здесь также много бойлерплейта и низкоуровневого API, да еще этот устаревший API ListenableFuture вместо современной CompletableFuture.

requestReplyKafkaTemplate заботится о генерации и установке заголовка KafkaHeaders.CORRELATION_ID, но мы должны явно задать заголовок KafkaHeaders.REPLY_TOPIC для запроса. Обратите также внимание, что этот же топик для ответа был излишне заинжектен выше в replyListenerContainer. Гадость какая-то. Не совсем то, чего я ожидал от абстракции Spring.

Серверная сторона: @SendTo

На стороне сервера обычный KafkaListener, прослушивающий топик для запроса, дополнительно декорирован аннотацией @SendTo, чтобы предоставить сообщение-ответ. Объект, возвращаемый методом слушателя, автоматически оборачивается (wrapped) в ответное сообщение, добавляется CORRELATION_ID и ответ публикуется в топике, указанном в заголовке REPLY_TOPIC.

 @Bean  public Map < String, Object > consumerConfigs() {      Map < String, Object > props = new HashMap < > ();      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);       return props;  }   @Bean  public Map < String, Object > producerConfigs() {      Map < String, Object > props = new HashMap < > ();      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);       return props;  }   @Bean  public ConsumerFactory < String, Request > requestConsumerFactory() {      return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),          new JsonSerializer < Request > ());  }   @Bean  public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() {      ConcurrentKafkaListenerContainerFactory < String, Request > factory =          new ConcurrentKafkaListenerContainerFactory < > ();      factory.setConsumerFactory(requestConsumerFactory());      factory.setReplyTemplate(replyTemplate());      return factory;  }   @Bean  public ProducerFactory < String, Reply > replyProducerFactory() {      return new DefaultKafkaProducerFactory < > (producerConfigs());  }   @Bean  public KafkaTemplate < String, Reply > replyTemplate() {      return new KafkaTemplate < > (replyProducerFactory());  }

Здесь также требуется некоторая конфигурация, но конфигурация слушателя проще:

 @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")  @SendTo()  public Reply receive(Request request) {      Reply reply = ...;      return reply;  }

Но как насчет нескольких экземпляров консюмера?

Все вроде работает, пока мы не используем несколько экземпляров консюмера. Если у нас есть несколько экземпляров клиента, мы должны убедиться, что ответ отправляется в корректный экземпляр клиента. Документация Spring Kafka предполагает, что каждый консюмер может использовать уникальный топик или, что с запросом отправляется дополнительное значение заголовка KafkaHeaders.RESPONSE_PARTITION — четырехбайтовое поле, содержащее BIG-ENDIAN-представление целочисленного номера раздела. Использование раздельных топиков для разных клиентов явно не очень гибко, поэтому мы выбираем явную настройку REPLY_PARTITION. Тогда клиент должен знать, на какую партицию он назначен. В документации предлагается использовать явную конфигурацию для выбора конкретной партиции. Давайте добавим ее к нашему примеру:

 @Value("${kafka.topic.car.reply.partition}")  private int replyPartition;   ...   @Bean  public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() {      ContainerProperties containerProperties = new ContainerProperties(replyTopic);      TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);      return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset);  }  private static byte[] intToBytesBigEndian(final int data) {          return new byte[] {              (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),              (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),          };      }       ...      record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));  record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));  RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);  ...

Не очень красиво, но это работает. Требуемая конфигурация обширна и API выглядит низкоуровнево. Необходимость явной настройки партиций усложняет процесс динамического масштабирования количества клиентов. Очевидно, можно сделать лучше.

Инкапсулирование обработки топика для ответа и партиции

Давайте начнем с инкапсуляции паттерна Return Address, передавая вместе топик для ответа и партицию. Топик для ответа должен быть заинжектен в RequestReplyTemplate и, следовательно, вообще не должен присутствовать в API. Когда речь идет о партициях для ответа, сделаем наоборот: извлечем партицию (партиции), назначенную слушателю топика для ответа, и передадим эту партицию автоматически. Это избавляет клиента от необходимости заботиться об этих заголовках.
При этом, давайте также сделаем API таким, чтобы он напоминал стандартный KafkaTemplate (перегрузим метод sendAndReceive() упрощенными параметрами и добавим соответствующие перегруженные методы, использующие настроенный по умолчанию топик):

public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > {      public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,         GenericMessageListenerContainer < K, R > replyContainer) {         super(producerFactory, replyContainer);     }      private TopicPartition getFirstAssignedReplyTopicPartition() {         if (getAssignedReplyTopicPartitions() != null &&             getAssignedReplyTopicPartitions().iterator().hasNext()) {             TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();             if (this.logger.isDebugEnabled()) {                 this.logger.debug("Using partition " + replyPartition.partition());             }             return replyPartition;         } else {             throw new KafkaException("Illegal state: No reply partition is assigned to this instance");         }     }      private static byte[] intToBytesBigEndian(final int data) {         return new byte[] {             (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),             (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),         };     }      public RequestReplyFuture < K,     V,     R > sendAndReceiveDefault(@Nullable V data) {         return sendAndReceive(getDefaultTopic(), data);     }      public RequestReplyFuture < K,     V,     R > sendAndReceiveDefault(K key, @Nullable V data) {         return sendAndReceive(getDefaultTopic(), key, data);     }      ...      public RequestReplyFuture < K,     V,     R > sendAndReceive(String topic, @Nullable V data) {         ProducerRecord < K, V > record = new ProducerRecord < > (topic, data);         return doSendAndReceive(record);     }      public RequestReplyFuture < K,     V,     R > sendAndReceive(String topic, K key, @Nullable V data) {         ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data);         return doSendAndReceive(record);     }      ...      @Override     public RequestReplyFuture < K,     V,     R > sendAndReceive(ProducerRecord < K, V > record) {         return doSendAndReceive(record);     }      protected RequestReplyFuture < K,     V,     R > doSendAndReceive(ProducerRecord < K, V > record) {         TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();         record.headers()             .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))             .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,                 intToBytesBigEndian(replyPartition.partition())));         return super.sendAndReceive(record);     } }

Следующий шаг: Адаптируем ListenableFuture к более современной CompletableFuture.

public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > {      public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,         GenericMessageListenerContainer < K, R > replyContainer) {         super(producerFactory, replyContainer);     }      public CompletableFuture < R > requestReplyDefault(V value) {         return adapt(sendAndReceiveDefault(value));     }      public CompletableFuture < R > requestReplyDefault(K key, V value) {         return adapt(sendAndReceiveDefault(key, value));     }      ...      public CompletableFuture < R > requestReply(String topic, V value) {         return adapt(sendAndReceive(topic, value));     }      public CompletableFuture < R > requestReply(String topic, K key, V value) {         return adapt(sendAndReceive(topic, key, value));     }      ...      private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) {         CompletableFuture < R > completableResult = new CompletableFuture < R > () {             @Override             public boolean cancel(boolean mayInterruptIfRunning) {                 boolean result = requestReplyFuture.cancel(mayInterruptIfRunning);                 super.cancel(mayInterruptIfRunning);                 return result;             }         };         // добавим коллбек к результату отправки запроса         requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () {             @Override             public void onSuccess(SendResult < K, V > sendResult) {                 // NOOP             }             @Override             public void onFailure(Throwable t) {                 completableResult.completeExceptionally(t);             }         });         // добавим коллбек к ответу         requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () {             @Override             public void onSuccess(ConsumerRecord < K, R > result) {                 completableResult.complete(result.value());             }             @Override             public void onFailure(Throwable t) {                 completableResult.completeExceptionally(t);             }         });         return completableResult;     } }

Упакуем это в утилитную библиотеку и теперь у нас есть API, который намного больше соответствует основной философии проектирования Spring «Соглашение над Конфигурацией» («Convention over Configuration»). Вот итоговый код клиента:

 @Autowired  private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;   ...   requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - >      System.out.println("Reply: " + reply.toString());  );

Подводим итоги

Подводя итог, Spring для Kafka 2.2 обеспечивает полностью функциональную реализацию паттерна Request-Reply в Apache Kafka, но API по-прежнему имеет некоторые шероховатости. В этом выпуске мы увидели, что некоторые дополнительные абстракции и адаптации API могут дать более логичный высокоуровневый API.

Предупреждение 1:
Одним из главных преимуществ архитектуры, управляемой событиями, является разделение (decoupling) продюсеров и консюмеров событий, что позволяет создавать гораздо более гибкие и эволюционирующие системы. Использование синхронной семантики «Запрос-Ответ» является полной противоположностью, когда запрашивающая и отвечающая стороны сильно связаны между собой. Следовательно, ее следует использовать только в случае необходимости.

Предупреждение 2:
Если требуется синхронный Запрос-Ответ, то протокол на основе HTTP намного проще и эффективнее, чем использование асинхронного канала типа Apache Kafka.
Тем не менее, могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл. Разумно выбирайте лучший инструмент для работы.

Полностью рабочий пример можно найти на сайте github.com/callistaenterprise/blog-synchronous-kafka.

Комментарии

Federico • 7 месяцев назад
А когда у нас есть гибридные потребности, допустим, в 50% кейсов нам нужен Запрос-Ответ и в 50% нам нужно событийное управление? Как нам это сделать? Конфигурация, необходимая Spring Kafka, выглядит довольно ужасно.

Jehanzeb Qayyum • 6 месяцев назад
Теперь Spring имеет дефолтную поддержку с использованием партиций в одном общем топике для ответа.

Начиная с версии 2.2, шаблон пытается определить топик для ответа или партицию из сконфигурированного контейнера ответа (reply container).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • 8 месяцев назад
Привет,
В последнем абзаце вы написали, что могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл по сравнению с HTTP.
Можно привести примеры таких сценариев?
Спасибо,
Nir

Janne Keskitalo nir rozenberg • 8 месяцев назад
Мы собираемся разделить систему обработки транзакций большого объема на несколько микросервисов и у меня есть идея использовать обмен сообщениями Kafka «Запрос-Ответ» для достижения похожих способов обработки (processing affinity). В основном Kafka используется для маршрутизации всех вызовов одного клиента в один и тот же процесс обработчика транзакций, который затем последовательно их выполняет по одному. Такой вид обработки гарантирует линеаризируемость (https://stackoverflow.com/a/19515375/7430325), причинно-следственную связность, а также позволяет эффективное кэширование. По сути, усилия по координации были бы перенесены из базы данных в Kafka и мы могли бы запустить базу данных в строгом режиме изоляции Serializable.
Мне еще предстоит углубиться в детали нашей семантики транзакций, чтобы увидеть, где здесь не дотягивает, так что это пока просто идея.

Перевел @middle_java


ссылка на оригинал статьи https://habr.com/ru/post/476156/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *