Spring Boot приложение с использованием Kafka Streams

от автора

Привет, Хабр! В этой статье мы рассмотрим, как в МегаФоне производят потоковую обработку данных, и разработаем простое Spring Boot приложение с использованием Kafka Streams.


У нас в компании есть домен «Фабрика микросервисов» (MFactory) — подразделение, которое занимается исследованием и разработкой приложений на базе микросервисной архитектуры. Для того, чтобы микросервисы могли «общаться» между собой, мы используем брокеры сообщений RabbitMQ и Kafka, но сегодня мы поговорим подробнее о Kafka, так как именно этот брокер реализует очень мощный инструмент для потоковой обработки данных — Kafka Streams.

Рассмотрим стандартный подход к обработке данных: получаем поток данных, помещаем их в базу данных, после чего подключаемся к базе, выгружаем данные и обрабатываем. Но существует ряд задач, в которых мы можем увеличить производительность, обрабатывая поток данных в режиме реального времени, минуя базу данных. Для их решения используем интерфейс Kafka Streams. Но для начала, вспомним, что такое Kafka.

Kafka — это распределённый программный брокер сообщений. «Распределённый» значит, что брокер может работать синхронно на нескольких серверах, образуя кластер. Kafka состоит из генератора сообщений, потребителя сообщений и топика. Топик — это файл, в котором входящие записи добавляются в конец файла. Другими словами, топик — это упорядоченный по времени журнал сообщений.

Рассмотрим следующую задачу. Мы получаем поток данных с информацией об изменении баланса абонента в формате JSON — {«phone_number»:number,«balance»:balance} и записываем его в топик “src”. И нам нужно в реальном времени отфильтровать сообщения, в которых баланс меньше либо равен нулю.

Теперь рассмотрим, что же такое Kafka Streams. Apache Kafka Streams API — это клиентская библиотека для создания приложений и микросервисов, которые позволяют в режиме реального времени обрабатывать данные, хранящиеся в кластерах Kafka. Kafka Streams сочетает в себе простоту написания и развертывания стандартных приложений Java и Scala на стороне клиента с преимуществами серверной кластерной технологии Kafka.

Устройство Kafka Streams можно представить в виде графа (топологии обработки):

1. узел-обработчик, который подписывается на топик и вычитывает входящие сообщения;
2. узел-обработчик, который реализует бизнес-логику;
3. узел-обработчик, который отправляет результат обработки в топик.

Представим следующую задачу. Мы получаем поток информации об изменении баланса абонентов в формате JSON вида {«phone_number»:number,«balance»:balance} и записываем его в топик “src”. Нам нужно в реальном времени отфильтровать сообщения, в которых баланс меньше и равен нулю, и записать результат в топик “out”, на который подписан сервис для обработки данных сообщений. Сымитируем подобный поток, используя консольный генератор Kafka:

PS C:\Users\User> cd C:\kafka_2.12-2.5.0\ PS C:\kafka> bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic src Created topic src. PS C:\kafka> bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic out Created topic out. PS C:\kafka> bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 out src PS C:\kafka> bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic src >{"phone_number":79301,"balance":100} >{"phone_number":79302,"balance":0} >{"phone_number":79303,"balance":-50} >

Теперь разработаем простое Spring Boot приложение и посмотрим на Kafka Streams в действии. Для начала определим следующие зависимости:

<dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-streams</artifactId> </dependency> <dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId> </dependency>

Создадим объект User:

package ru.megafon.kafkastreamsdemo.model;  import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter;  @Getter public class User {     @JsonProperty("phone_number")     private Long phoneNumber;     @JsonProperty("balance")     private Double balance; }

Напишем конфигурацию нашего приложения:

package ru.megafon.kafkastreamsdemo.config;  import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import ru.megafon.kafkastreamsdemo.model.User;  import java.util.HashMap; import java.util.Map;  @Slf4j @Configuration @EnableKafka @EnableKafkaStreams public class KafkaStreamsConfig {     @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)     public KafkaStreamsConfiguration kStreamsConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "id");         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());         return new KafkaStreamsConfiguration(props);     }      @Bean     public Serde<User> userSerde() {         return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(User.class));     }      @Bean     public KStream<String, User> kStream(StreamsBuilder kStreamBuilder) {         KStream<String, String> stream = kStreamBuilder                 .stream("src", Consumed.with(Serdes.String(), Serdes.String()));         KStream<String, User> userStream = stream                 .mapValues(this::getUserFromString)                 .filter((key, value) -> value.getBalance() <= 0);         userStream.to("out", Produced.with(Serdes.String(), userSerde()));         return userStream;     }      @Bean     public ObjectMapper objectMapper() {         return new ObjectMapper();     }      User getUserFromString(String userString) {         User user = null;         try {             user = objectMapper().readValue(userString, User.class);         } catch (JsonProcessingException e) {             log.error(e.getMessage(), e);         }         return user;     } }

Рассмотрим подробнее каждый метод:

В методе kStreamsConfigs мы создаём минимальную конфигурацию для подключения нашего приложения к серверу Kafka: устанавливаем идентификатор нашего приложения и адрес сервера брокера, дефолтный сериализатор/десериализатор.
В методе userSerde мы определили собственный сериализатор/десериализатор для модели User.
В методе kStream мы создаём топологию нашего Kafka Streams приложения и возвращаем объект KStream<String, User>. Основным объектом для создания топологии является объект StreamsBuilder:

1. в методе stream мы подписываемся на топик “src” и вычитываем входящие сообщения;
2. в методе filter мы реализовали нашу бизнес-логику — фильтрацию;
3. в методе to мы отправляем результат обработки в результирующий топик “out”.

Запускаем приложение и подписываемся на топик “out”:

PS C:\Users\User> cd C:\kafka_2.12-2.5.0\ PS C:\kafka> bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic out --from-beginning {"phone_number":79302,"balance":0.0} {"phone_number":79303,"balance":-50.0} 

Мы видим, что данные успешно обработаны.

Преимущества Kafka Streams:

• простая и легковесная библиотека, которая может быть легко встроена в любое Java-приложение и интегрирована с инструментами упаковки, развертывания и эксплуатации;
• не имеет внешних зависимостей от систем кроме самого Apache Kafka;
• поддерживает отказоустойчивое локальное состояние, что обеспечивает очень быстрые и эффективные операции с сохранением состояния;
• поддерживает правило “обработка ровно один раз”, чтобы гарантировать, что каждая запись будет обработана один и только один раз, даже если в процессе обработки происходит сбой на клиентах Kafka Streams или брокерах Kafka;
• использует обработку “по одной записи за раз” для достижения миллисекундной задержки.

ссылка на оригинал статьи https://habr.com/ru/company/megafon/blog/504422/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *