Асинхронное взаимодействие Spring-микросервисов с помощью Kafka

от автора

В этой статье разберемся, как реализовать обмен сообщениями между Java-микросервисами на Spring с помощью Kafka.

1. Архитектура

У нас будет Producer-микросервис («писатель»), который получает заказы на еду (Food Order) и передает их через Kafka в Consumer-микросервис («читатель») для сохранения в базу данных.

2. Пара слов о Kafka

Кластер Kafka обладает высокой масштабируемостью и отказоустойчивостью: при поломке одного из узлов, другие узлы берут на себя его работу, обеспечивая непрерывность работы без потери данных.

Чтение и запись данных в Kafka выполняется в виде событий, содержащих информацию в различном формате, например, в виде строки, массива или JSON-объекта. 

Producer (производитель, издатель) публикует (записывает) события в Kafka, а Consumer (потребитель, подписчик) подписывается на эти события и обрабатывает их.

3. Топики

События группируются в топики (topic). Топик похож на папку, а события — на файлы в этой папке. У топика может быть ноль, один или много издателей и подписчиков.

События можно прочитать столько раз, сколько необходимо. В этом отличие Kafka от традиционных систем обмена сообщениями: после чтения события не удаляются. Можно настроить, как долго Kafka хранит события.

4. Разделы

Топики поделены на разделы (partition). Публикация события в топике фактически означает добавление его к одному из разделов. События с одинаковыми ключами записываются в один раздел. В рамках раздела Kafka гарантирует порядок событий.

Для отказоустойчивости и высокой доступности топик может быть реплицирован, в том числе между различными, географически удаленными, датацентрами. То есть всегда будет несколько брокеров с копиями данных на случай, если что-то пойдет не так.

5. Создание проектов

Перейдите на start.spring.io и создайте проекты с зависимостями, показанными на рисунках ниже.

Producer-микросервис:

Consumer-микросервис:

6. Запуск Kafka в докере

В корне одного из проектов, неважно каком, создайте файл docker-compose.yml, содержащий параметры запуска Kafka, Kafdrop и Zookeeper в докере.

version: "3.7"  networks:   kafka-net:     name: kafka-net     driver: bridge  services:   zookeeper:     image: zookeeper:3.7.0     container_name: zookeeper     restart: "no"     networks:       - kafka-net     ports:       - "2181:2181"    kafka:     image: obsidiandynamics/kafka     container_name: kafka     restart: "no"     networks:       - kafka-net     ports:       - "9092:9092"     environment:       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT       KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092       KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL       KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"       KAFKA_BROKER_ID: 1       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1     depends_on:       - zookeeper    kafdrop:     image: obsidiandynamics/kafdrop     container_name: kafdrop     restart: "no"     networks:       - kafka-net     ports:       - "9000:9000"     environment:       KAFKA_BROKERCONNECT: "kafka:29092"     depends_on:       - "kafka"

Далее, находясь в папке с docker-compose.yml выполните docker-compose up. После запуска контейнеров откройте Kafdrop (веб-интерфейс для управления Kafka) по адресу http://localhost:9000.

В Kafdrop можно смотреть топики, создавать их, удалять и делать многое другое.

7. Producer-микросервис

Архитектура:

Этапы создания Producer-микросервиса:

  • создаем конфигурационные бины;

  • создаем топик для заказов;

  • создаем контроллер FoodOrderController, сервис FoodOrderService и Producer;

  • преобразуем заказы FoodOrder в текстовый вид для отправки брокеру.

Переменные окружения и порт для нашего API (application.yml):

server:   port: 8080  topic:   name: t.food.order

Config отвечает за создание топика и бина KafkaTemplate, используемого для отправки сообщения.

@Configuration public class Config {      private final KafkaProperties kafkaProperties;      @Autowired     public Config(KafkaProperties kafkaProperties) {         this.kafkaProperties = kafkaProperties;     }      @Bean     public ProducerFactory<String, String> producerFactory() {         // get configs on application.properties/yml         Map<String, Object> properties = kafkaProperties.buildProducerProperties();         return new DefaultKafkaProducerFactory<>(properties);     }      @Bean     public KafkaTemplate<String, String> kafkaTemplate() {         return new KafkaTemplate<>(producerFactory());     }      @Bean     public NewTopic topic() {         return TopicBuilder                 .name("t.food.order")                 .partitions(1)                 .replicas(1)                 .build();     }  }

Класс модели FoodOrder:

@Data @Value public class FoodOrder {     String item;     Double amount; }

FoodOrderController отвечает за получение заказа FoodOrder и передачу его на уровень сервиса.

@Slf4j @RestController @RequestMapping("/order") public class FoodOrderController {      private final FoodOrderService foodOrderService;      @Autowired     public FoodOrderController(FoodOrderService foodOrderService) {         this.foodOrderService = foodOrderService;     }      @PostMapping     public String createFoodOrder(@RequestBody FoodOrder foodOrder) throws JsonProcessingException {         log.info("create food order request received");         return foodOrderService.createFoodOrder(foodOrder);     } }

FoodOrderService — получение заказа FoodOrder и передачу его Producer.

@Slf4j @Service public class FoodOrderService {      private final Producer producer;      @Autowired     public FoodOrderService(Producer producer) {         this.producer = producer;     }      public String createFoodOrder(FoodOrder foodOrder) throws JsonProcessingException {         return producer.sendMessage(foodOrder);     } }

Producer получает заказ FoodOrder и публикует его в Kafka в виде сообщения.

В строке 18 мы конвертируем объект FoodOrder в JSON-строку для его передачи в виде строки в Consumer-микросервис.

В строке 19 фактически отправляем сообщение, передавая топик для публикации (переменная окружения в строке 6) и заказ в виде сообщения.

@Slf4j @Component public class Producer {      @Value("${topic.name}")     private String orderTopic;      private final ObjectMapper objectMapper;     private final KafkaTemplate<String, String> kafkaTemplate;      @Autowired     public Producer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {         this.kafkaTemplate = kafkaTemplate;         this.objectMapper = objectMapper;     }      public String sendMessage(FoodOrder foodOrder) throws JsonProcessingException {         String orderAsMessage = objectMapper.writeValueAsString(foodOrder);         kafkaTemplate.send(orderTopic, orderAsMessage);          log.info("food order produced {}", orderAsMessage);          return "message sent";     } }

При запуске приложения мы должны увидеть топик, созданный в Kafdrop. А при отправке заказа FoodOrder — информацию в логе, что сообщение отправлено.

Теперь в Kafdrop в разделе Topics можем посмотреть созданный топик t.food.order и увидеть наше сообщение.

8. Consumer-микросервис

Архитектура:

Этапы создания Consumer-микросервиса:

  • конфигурируем group-id и бины;

  • настраиваем доступ к базе данных;

  • создаем Consumer и FoodOrderService;

  • создаем репозиторий FoodOrderRepository.

Начнем с настройки порта для запуска нашего API, топика, который будем слушать, group-id для Consumer-микросервиса и конфигурации базы данных.

server:   port: 8081  topic:   name: t.food.order  spring:   kafka:     consumer:       group-id: "default"    h2:     console:       enabled: true       path: /h2-console   datasource:     url: jdbc:h2:mem:testdb     username: sa     password: password

Config отвечает за настройку бина ModelMapper — библиотеки для маппинга одних объектов на другие. Например, для DTO, используемого далее.

@Configuration public class Config {      @Bean     public ModelMapper modelMapper() {         return new ModelMapper();     }  }

Классы модели:

@Data @Value public class FoodOrderDto {     String item;     Double amount; }
@Data @Entity @NoArgsConstructor @AllArgsConstructor public class FoodOrder {      @Id @GeneratedValue(strategy = GenerationType.IDENTITY)     private Long id;     private String item;     private Double amount; }

Consumer отвечает за прослушивание топика с заказами и получение сообщений. Полученные сообщения мы преобразуем в FoodOrderDto, не содержащего ничего, связанного с персистентностью, например, ID.

@Slf4j @Component public class Consumer {      private static final String orderTopic = "${topic.name}";      private final ObjectMapper objectMapper;     private final FoodOrderService foodOrderService;      @Autowired     public Consumer(ObjectMapper objectMapper, FoodOrderService foodOrderService) {         this.objectMapper = objectMapper;         this.foodOrderService = foodOrderService;     }      @KafkaListener(topics = orderTopic)     public void consumeMessage(String message) throws JsonProcessingException {         log.info("message consumed {}", message);          FoodOrderDto foodOrderDto = objectMapper.readValue(message, FoodOrderDto.class);         foodOrderService.persistFoodOrder(foodOrderDto);     }  }

FoodOrderService — преобразование полученного DTO в объект FoodOrder и сохранение его в БД.

@Slf4j @Service public class FoodOrderService {      private final FoodOrderRepository foodOrderRepository;     private final ModelMapper modelMapper;      @Autowired     public FoodOrderService(FoodOrderRepository foodOrderRepository, ModelMapper modelMapper) {         this.foodOrderRepository = foodOrderRepository;         this.modelMapper = modelMapper;     }      public void persistFoodOrder(FoodOrderDto foodOrderDto) {         FoodOrder foodOrder = modelMapper.map(foodOrderDto, FoodOrder.class);         FoodOrder persistedFoodOrder = foodOrderRepository.save(foodOrder);          log.info("food order persisted {}", persistedFoodOrder);     }  }

Код FoodOrderRepository:

@Repository public interface FoodOrderRepository extends JpaRepository<FoodOrder, Long> { }

Теперь при запуске Consumer-микросервиса отправленные ранее сообщения будут прочитаны из соответствующего топика.

Здесь отмечу одну важную деталь: если мы перейдем в Kafdrop и проверим сообщение, которое только что получили, оно будет доступно. Но, например, в RabbitMQ мы бы его не увидели.

9. Дополнительный функционал

Мы можем отправлять периодические сообщения, включив функционал запуска задач по расписанию.

Для этого добавляем аннотацию @EnableScheduling к классу конфигурации Producer-микросервиса.

@EnableScheduling @Configuration public class Config {          ...      }

Будем отправлять сообщения с фиксированным интервалом в 1000 миллисекунд.

@Slf4j @Component public class Scheduler {      @Autowired     private KafkaTemplate<String, String> kafkaTemplate;     private Integer count = 0;      @Scheduled(fixedRate = 1000)     public void sendMessage() {         count++;         kafkaTemplate.send("t.scheduled", "message " + count);         log.info("sent message count {}", count);     }  }

Топик будет создан автоматически, но можно определить бин также, как делали раньше.

Получим следующий результат:

10. Заключение

Основная идея статьи была познакомить вас с использованием Kafka совместно с Java и Spring для реализации на ее основе более сложных решений.

Исходный код из статьи доступен на GitHub здесь.

Ссылки

  1. Документация Apache Kafka

  2. Kafka The Definitive Guide, O’Reilly

  3. Apache Kafka, Matthias J. Sax


Приглашаем всех желающих на открытое занятие «Разработка консольных приложений на Spring и Picocli». На данном занятии мы покажем, как строить Command Line Interface и утилиты командной строки на Picocli, как альтернативу Spring Shell. Также будут рассмотрены некоторые возможности Java для создания таких консольных утилит. Регистрация — по ссылке.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/663264/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *