
В этой статье разберемся, как реализовать обмен сообщениями между Java-микросервисами на Spring с помощью Kafka.
1. Архитектура

У нас будет Producer-микросервис («писатель»), который получает заказы на еду (Food Order) и передает их через Kafka в Consumer-микросервис («читатель») для сохранения в базу данных.
2. Пара слов о Kafka
Кластер Kafka обладает высокой масштабируемостью и отказоустойчивостью: при поломке одного из узлов, другие узлы берут на себя его работу, обеспечивая непрерывность работы без потери данных.
Чтение и запись данных в Kafka выполняется в виде событий, содержащих информацию в различном формате, например, в виде строки, массива или JSON-объекта.
Producer (производитель, издатель) публикует (записывает) события в Kafka, а Consumer (потребитель, подписчик) подписывается на эти события и обрабатывает их.
3. Топики
События группируются в топики (topic). Топик похож на папку, а события — на файлы в этой папке. У топика может быть ноль, один или много издателей и подписчиков.
События можно прочитать столько раз, сколько необходимо. В этом отличие Kafka от традиционных систем обмена сообщениями: после чтения события не удаляются. Можно настроить, как долго Kafka хранит события.
4. Разделы
Топики поделены на разделы (partition). Публикация события в топике фактически означает добавление его к одному из разделов. События с одинаковыми ключами записываются в один раздел. В рамках раздела Kafka гарантирует порядок событий.

Для отказоустойчивости и высокой доступности топик может быть реплицирован, в том числе между различными, географически удаленными, датацентрами. То есть всегда будет несколько брокеров с копиями данных на случай, если что-то пойдет не так.
5. Создание проектов
Перейдите на start.spring.io и создайте проекты с зависимостями, показанными на рисунках ниже.
Producer-микросервис:

Consumer-микросервис:

6. Запуск Kafka в докере
В корне одного из проектов, неважно каком, создайте файл docker-compose.yml, содержащий параметры запуска Kafka, Kafdrop и Zookeeper в докере.
version: "3.7" networks: kafka-net: name: kafka-net driver: bridge services: zookeeper: image: zookeeper:3.7.0 container_name: zookeeper restart: "no" networks: - kafka-net ports: - "2181:2181" kafka: image: obsidiandynamics/kafka container_name: kafka restart: "no" networks: - kafka-net ports: - "9092:9092" environment: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092 KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper kafdrop: image: obsidiandynamics/kafdrop container_name: kafdrop restart: "no" networks: - kafka-net ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka:29092" depends_on: - "kafka"
Далее, находясь в папке с docker-compose.yml выполните docker-compose up. После запуска контейнеров откройте Kafdrop (веб-интерфейс для управления Kafka) по адресу http://localhost:9000.
В Kafdrop можно смотреть топики, создавать их, удалять и делать многое другое.

7. Producer-микросервис
Архитектура:

Этапы создания Producer-микросервиса:
-
создаем конфигурационные бины;
-
создаем топик для заказов;
-
создаем контроллер FoodOrderController, сервис FoodOrderService и Producer;
-
преобразуем заказы FoodOrder в текстовый вид для отправки брокеру.
Переменные окружения и порт для нашего API (application.yml):
server: port: 8080 topic: name: t.food.order
Config отвечает за создание топика и бина KafkaTemplate, используемого для отправки сообщения.
@Configuration public class Config { private final KafkaProperties kafkaProperties; @Autowired public Config(KafkaProperties kafkaProperties) { this.kafkaProperties = kafkaProperties; } @Bean public ProducerFactory<String, String> producerFactory() { // get configs on application.properties/yml Map<String, Object> properties = kafkaProperties.buildProducerProperties(); return new DefaultKafkaProducerFactory<>(properties); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public NewTopic topic() { return TopicBuilder .name("t.food.order") .partitions(1) .replicas(1) .build(); } }
Класс модели FoodOrder:
@Data @Value public class FoodOrder { String item; Double amount; }
FoodOrderController отвечает за получение заказа FoodOrder и передачу его на уровень сервиса.
@Slf4j @RestController @RequestMapping("/order") public class FoodOrderController { private final FoodOrderService foodOrderService; @Autowired public FoodOrderController(FoodOrderService foodOrderService) { this.foodOrderService = foodOrderService; } @PostMapping public String createFoodOrder(@RequestBody FoodOrder foodOrder) throws JsonProcessingException { log.info("create food order request received"); return foodOrderService.createFoodOrder(foodOrder); } }
FoodOrderService — получение заказа FoodOrder и передачу его Producer.
@Slf4j @Service public class FoodOrderService { private final Producer producer; @Autowired public FoodOrderService(Producer producer) { this.producer = producer; } public String createFoodOrder(FoodOrder foodOrder) throws JsonProcessingException { return producer.sendMessage(foodOrder); } }
Producer получает заказ FoodOrder и публикует его в Kafka в виде сообщения.
В строке 18 мы конвертируем объект FoodOrder в JSON-строку для его передачи в виде строки в Consumer-микросервис.
В строке 19 фактически отправляем сообщение, передавая топик для публикации (переменная окружения в строке 6) и заказ в виде сообщения.
@Slf4j @Component public class Producer { @Value("${topic.name}") private String orderTopic; private final ObjectMapper objectMapper; private final KafkaTemplate<String, String> kafkaTemplate; @Autowired public Producer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) { this.kafkaTemplate = kafkaTemplate; this.objectMapper = objectMapper; } public String sendMessage(FoodOrder foodOrder) throws JsonProcessingException { String orderAsMessage = objectMapper.writeValueAsString(foodOrder); kafkaTemplate.send(orderTopic, orderAsMessage); log.info("food order produced {}", orderAsMessage); return "message sent"; } }
При запуске приложения мы должны увидеть топик, созданный в Kafdrop. А при отправке заказа FoodOrder — информацию в логе, что сообщение отправлено.


Теперь в Kafdrop в разделе Topics можем посмотреть созданный топик t.food.order и увидеть наше сообщение.

8. Consumer-микросервис
Архитектура:

Этапы создания Consumer-микросервиса:
-
конфигурируем group-id и бины;
-
настраиваем доступ к базе данных;
-
создаем Consumer и FoodOrderService;
-
создаем репозиторий FoodOrderRepository.
Начнем с настройки порта для запуска нашего API, топика, который будем слушать, group-id для Consumer-микросервиса и конфигурации базы данных.
server: port: 8081 topic: name: t.food.order spring: kafka: consumer: group-id: "default" h2: console: enabled: true path: /h2-console datasource: url: jdbc:h2:mem:testdb username: sa password: password
Config отвечает за настройку бина ModelMapper — библиотеки для маппинга одних объектов на другие. Например, для DTO, используемого далее.
@Configuration public class Config { @Bean public ModelMapper modelMapper() { return new ModelMapper(); } }
Классы модели:
@Data @Value public class FoodOrderDto { String item; Double amount; }
@Data @Entity @NoArgsConstructor @AllArgsConstructor public class FoodOrder { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String item; private Double amount; }
Consumer отвечает за прослушивание топика с заказами и получение сообщений. Полученные сообщения мы преобразуем в FoodOrderDto, не содержащего ничего, связанного с персистентностью, например, ID.
@Slf4j @Component public class Consumer { private static final String orderTopic = "${topic.name}"; private final ObjectMapper objectMapper; private final FoodOrderService foodOrderService; @Autowired public Consumer(ObjectMapper objectMapper, FoodOrderService foodOrderService) { this.objectMapper = objectMapper; this.foodOrderService = foodOrderService; } @KafkaListener(topics = orderTopic) public void consumeMessage(String message) throws JsonProcessingException { log.info("message consumed {}", message); FoodOrderDto foodOrderDto = objectMapper.readValue(message, FoodOrderDto.class); foodOrderService.persistFoodOrder(foodOrderDto); } }
FoodOrderService — преобразование полученного DTO в объект FoodOrder и сохранение его в БД.
@Slf4j @Service public class FoodOrderService { private final FoodOrderRepository foodOrderRepository; private final ModelMapper modelMapper; @Autowired public FoodOrderService(FoodOrderRepository foodOrderRepository, ModelMapper modelMapper) { this.foodOrderRepository = foodOrderRepository; this.modelMapper = modelMapper; } public void persistFoodOrder(FoodOrderDto foodOrderDto) { FoodOrder foodOrder = modelMapper.map(foodOrderDto, FoodOrder.class); FoodOrder persistedFoodOrder = foodOrderRepository.save(foodOrder); log.info("food order persisted {}", persistedFoodOrder); } }
Код FoodOrderRepository:
@Repository public interface FoodOrderRepository extends JpaRepository<FoodOrder, Long> { }
Теперь при запуске Consumer-микросервиса отправленные ранее сообщения будут прочитаны из соответствующего топика.

Здесь отмечу одну важную деталь: если мы перейдем в Kafdrop и проверим сообщение, которое только что получили, оно будет доступно. Но, например, в RabbitMQ мы бы его не увидели.

9. Дополнительный функционал
Мы можем отправлять периодические сообщения, включив функционал запуска задач по расписанию.
Для этого добавляем аннотацию @EnableScheduling к классу конфигурации Producer-микросервиса.
@EnableScheduling @Configuration public class Config { ... }
Будем отправлять сообщения с фиксированным интервалом в 1000 миллисекунд.
@Slf4j @Component public class Scheduler { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Integer count = 0; @Scheduled(fixedRate = 1000) public void sendMessage() { count++; kafkaTemplate.send("t.scheduled", "message " + count); log.info("sent message count {}", count); } }
Топик будет создан автоматически, но можно определить бин также, как делали раньше.
Получим следующий результат:

10. Заключение
Основная идея статьи была познакомить вас с использованием Kafka совместно с Java и Spring для реализации на ее основе более сложных решений.
Исходный код из статьи доступен на GitHub здесь.
Ссылки
-
Kafka The Definitive Guide, O’Reilly
-
Apache Kafka, Matthias J. Sax
Приглашаем всех желающих на открытое занятие «Разработка консольных приложений на Spring и Picocli». На данном занятии мы покажем, как строить Command Line Interface и утилиты командной строки на Picocli, как альтернативу Spring Shell. Также будут рассмотрены некоторые возможности Java для создания таких консольных утилит. Регистрация — по ссылке.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/663264/
Добавить комментарий