Kafka Streams ч4: Stateful processing

от автора

В предыдущих статьях

мы познакомились с основами Kafka Streams и рассмотрели stateless операции. В этой статье мы погрузимся в stateful processing и создадим приложение для управления запасами в реальном времени. Шаг за шагом мы реализуем функциональность, которая позволит отслеживать состояние запасов товаров, обрабатывать поступления и продажи, а также предоставлять доступ к текущему состоянию через REST API.

Описание задачи

Мы разработаем приложение, которое:

  • Обрабатывает потоки данных о поступлениях товаров на склад и о продажах товаров.

  • Поддерживает актуальное состояние запасов для каждого товара.

  • Обновляет состояние запасов при поступлении новых товаров и при продажах.

  • Использует stateful операции для хранения состояния запасов.

  • Предоставляет доступ к состоянию через REST API с использованием Javalin.

  • Обогащает данные о транзакциях информацией о товаре из справочника.

  • Выводит информацию об остатках товаров в выходной топик Kafka.

Архитектура приложения

Бизнес-сущности

У нас есть три основных сущности:

  1. Поступления товаров (RestockEvent)

  2. Продажи товаров (SaleEvent)

  3. Информация о товарах (ProductInfo)

Топики Kafka

Мы будем работать с четырьмя топиками:

  1. restock-events-topic (KStream)

  2. sale-events-topic (KStream)

  3. product-info-topic (GlobalKTable)

  4. inventory-output-topic (выходной топик)

Бизнес-схема

flowchart LR     RestockEvents[Поступления товаров] -->|KStream| InventoryProcessor[Процессор запасов]     SaleEvents[Продажи товаров] -->|KStream| InventoryProcessor     ProductInfo[Информация о товарах] -->|GlobalKTable| InventoryProcessor     InventoryProcessor -->|State Store| InventoryState[Состояние запасов]     InventoryProcessor -->|Выходной топик| InventoryOutput[Остатки товаров]     InventoryState -->|REST API| JavalinServer[Сервер Javalin]

Шаг 1: Настройка проекта

Структура проекта

Создадим проект со следующей структурой:

css Copy code kafka-streams-stateful/ ├── build.gradle.kts ├── src/ │   ├── main/ │   │   ├── java/ │   │   │   └── com/ │   │   │       └── example/ │   │   │           └── inventory/ │   │   │               ├── InventoryApp.java │   │   │               ├── InventoryChange.java │   │   │               ├── InventoryProcessor.java │   │   │               ├── InventoryRestService.java │   │   │               ├── GsonSerializer.java │   │   │               ├── GsonDeserializer.java │   │   │               ├── ProductInfo.java │   │   │               ├── RestockEvent.java │   │   │               └── SaleEvent.java │   │   └── resources/ │   └── test/ └── settings.gradle.kts  
  • build.gradle.kts: Файл сборки проекта с необходимыми зависимостями.

  • src/main/java/com/example/inventory/: Пакет с основными классами приложения.

    • InventoryApp.java: Главный класс приложения, запускающий Kafka Streams и REST API.

    • InventoryProcessor.java: Класс, содержащий логику обработки потоков данных и агрегации состояния запасов.

    • InventoryRestService.java: Класс, предоставляющий REST API для доступа к состоянию запасов.

    • InventoryChange.java: Модель данных для изменений запасов.

    • RestockEvent.java: Модель данных поступления товаров.

    • SaleEvent.java: Модель данных продаж товаров.

    • ProductInfo.java: Модель данных информации о товарах.

    • GsonSerializer.java: Класс для сериализации объектов в JSON.

    • GsonDeserializer.java: Класс для десериализации JSON в объекты.

  • src/main/resources/: Ресурсы приложения (если необходимо).

  • src/test/: Тесты для приложения (опционально).

  • settings.gradle.kts: Настройки Gradle для проекта.

Файл сборки build.gradle.kts

plugins {     id("java")     id("application") }  group = "com.example.inventory" version = "1.0"  repositories {     mavenCentral() }  dependencies {     implementation("org.apache.kafka:kafka-streams:3.8.0")     implementation("io.javalin:javalin:6.1.6")     implementation("com.squareup.okhttp3:okhttp:4.12.0")     implementation("com.google.code.gson:gson:2.9.1")     implementation("org.slf4j:slf4j-simple:2.0.12") }  application {     mainClass.set("com.example.inventory.InventoryApp") }

Шаг 2: Создание моделей данных

Модель RestockEvent

public class RestockEvent {     private String eventId;     private String productId;     private int quantity;     private String supplier;     private long timestamp; }

Модель SaleEvent

public class SaleEvent {     private String eventId;     private String productId;     private int quantity;     private String storeId;     private String saleDate; // Формат YYYY-MM-DD     private long timestamp; }

Модель ProductInfo

public class ProductInfo {     private String productId;     private String productName;     private String category;     private double price;     private String manufacturer;     private int reorderLevel; }

Модель InventoryChange

public class InventoryChange {     private String productId;     private int quantityChange; }

Шаг 3: Создание класса обработки InventoryProcessor

В этом шаге мы создадим класс InventoryProcessor, который будет содержать логику обработки потоков данных и агрегации состояния запасов.

Код класса InventoryProcessor

import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore;  public class InventoryProcessor {      private final StreamsBuilder builder;     private static final String RESTOCK_TOPIC = "restock-events-topic";     private static final String SALE_TOPIC = "sale-events-topic";     private static final String PRODUCT_INFO_TOPIC = "product-info-topic";     private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic";      public InventoryProcessor() {         this.builder = new StreamsBuilder();     }      public Topology buildTopology() {         // Сериализаторы и десериализаторы         Serde<String> stringSerde = Serdes.String();         Serde<RestockEvent> restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class));         Serde<SaleEvent> saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class));         Serde<ProductInfo> productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class));         Serde<InventoryChange> inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class));         Serde<Integer> integerSerde = Serdes.Integer();          // Чтение потоков         KStream<String, RestockEvent> restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde));         KStream<String, SaleEvent> saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde));         GlobalKTable<String, ProductInfo> productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde));          // Обогащение данных о поступлениях информацией о товаре         KStream<String, RestockEvent> enrichedRestockStream = restockStream.join(                 productInfoTable,                 (key, value) -> value.getProductId(),                 (restockEvent, productInfo) -> {                     // Обогащаем restockEvent при необходимости                     return restockEvent;                 }         );          // Обогащение данных о продажах информацией о товаре         KStream<String, SaleEvent> enrichedSaleStream = saleStream.join(                 productInfoTable,                 (key, value) -> value.getProductId(),                 (saleEvent, productInfo) -> {                     // Обогащаем saleEvent при необходимости                     return saleEvent;                 }         );          // Создаем класс InventoryChange         KStream<String, InventoryChange> restockChanges = enrichedRestockStream                 .mapValues(restockEvent ->                         new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity())                 );          KStream<String, InventoryChange> saleChanges = enrichedSaleStream                 .mapValues(saleEvent ->                         new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity())                 );          // Объединяем поступления и продажи         KStream<String, InventoryChange> inventoryChanges = restockChanges.merge(saleChanges);          // Группируем по productId         KGroupedStream<String, InventoryChange> groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde));          // Агрегируем изменения         KTable<String, Integer> inventoryState = groupedInventory.aggregate(                 () -> 0,                 (key, value, aggregate) -> aggregate + value.getQuantityChange(),                 Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("inventory-store")                         .withKeySerde(stringSerde)                         .withValueSerde(integerSerde)         );          // Выводим состояние в выходной топик         inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));          return builder.build();     } }

Комментарии к коду

  • Сериализация и десериализация: Используем собственные классы GsonSerializer и GsonDeserializer для преобразования наших объектов в JSON и обратно.

  • Обогащение данных: Используем join для обогащения потоков данных информацией о товарах из productInfoTable.

  • Создание изменений запасов: Создаем экземпляры InventoryChange, представляющие изменение запасов (положительное для поступлений, отрицательное для продаж).

  • Группировка и агрегирование: Группируем изменения запасов по productId и агрегируем их с помощью aggregate, сохраняя текущее состояние запасов в inventory-store.

  • Выходной поток: Отправляем агрегированные данные в выходной топик inventory-output-topic.

Шаг 4: Реализация REST API с использованием Javalin

Создадим класс InventoryRestService, который будет предоставлять доступ к состоянию запасов через REST API.

Код класса InventoryRestService

import io.javalin.Javalin; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;  public class InventoryRestService {      private final KafkaStreams streams;      public InventoryRestService(KafkaStreams streams) {         this.streams = streams;     }      public void start() {         var app = Javalin.create().start(7777);          app.get("/inventory/{productId}", ctx -> {             String productId = ctx.pathParam("productId");              ReadOnlyKeyValueStore<String, Integer> keyValueStore = streams.store(                     StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore())             );              var quantity = keyValueStore.get(productId);             if (quantity != null) {                 ctx.json(new InventoryResponse(productId, quantity));             } else {                 ctx.status(404).result("Product not found");             }         });     }      public record InventoryResponse(String productId, int quantity) {     } }

Комментарии к коду

  • Маршрут /inventory/{productId}: Позволяет получить текущий запас товара по его productId.

  • Доступ к state store: Используем streams.store() с StoreQueryParameters, чтобы получить доступ к состоянию inventory-store.

  • Ответ: Возвращаем JSON с productId и текущим quantity в виде Java Record InventoryResponse.

Шаг 5: Объединение всего в InventoryApp

Создадим главный класс приложения InventoryApp, который будет запускать обработчик потоков и REST API.

Код класса InventoryApp

import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology;  public class InventoryApp {      public static void main(String[] args) {         var processor = new InventoryProcessor();         var topology = processor.buildTopology();          var streams = getKafkaStreams(topology);         streams.start();          var restService = new InventoryRestService(streams);         restService.start();          Runtime.getRuntime().addShutdownHook(new Thread(streams::close));     }      private static KafkaStreams getKafkaStreams(Topology topology) {         var props = new Properties();          props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-app");         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");          return new KafkaStreams(topology, props);     } }

Шаг 6: Создание классов сериализации и десериализации

Класс GsonSerializer

import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import org.apache.kafka.common.serialization.Serializer;  public class GsonSerializer<T> implements Serializer<T> {     private final Gson gson = new Gson();      @Override     public byte[] serialize(String topic, T data) {         if (data == null) {             return null;         }         return gson.toJson(data).getBytes(StandardCharsets.UTF_8);     } }

Класс GsonDeserializer

import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import org.apache.kafka.common.serialization.Deserializer;  public class GsonDeserializer<T> implements Deserializer<T> {     private final Gson gson = new Gson();     private final Class<T> deserializedClass;      public GsonDeserializer(Class<T> deserializedClass) {         this.deserializedClass = deserializedClass;     }      @Override     public T deserialize(String topic, byte[] data) {         if (data == null) {             return null;         }         return gson.fromJson(new String(data, StandardCharsets.UTF_8), deserializedClass);     } }

Шаг 7: Инструкции по запуску и тестированию приложения

Предварительные шаги

  1. Убедитесь, что Kafka запущена на localhost:9092 или доступна через Docker.

  2. Создайте необходимые топики:

kafka-topics.sh --create --topic restock-events-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic sale-events-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic product-info-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic inventory-output-topic --bootstrap-server localhost:9092

Сборка и запуск приложения

./gradlew build ./gradlew run

Отправка тестовых сообщений через Kafka UI или консоль

Использование Kafka UI

  1. Подключитесь к Kafka UI и убедитесь, что он настроен для подключения к вашему Kafka-брокеру.

  2. Отправьте сообщения с ключами в соответствующие топики:

  • Топик product-info-topic

    • Key: product123

    • Value:

      {   "productId": "product123",   "productName": "Товар 123",   "category": "Категория A",   "price": 100.0,   "manufacturer": "Производитель A",   "reorderLevel": 50 }
  • Топик restock-events-topic

    • Key: product123

    • Value:

      {   "eventId": "restock1",   "productId": "product123",   "quantity": 100,   "supplier": "Поставщик A",   "timestamp": 1690000000000 }
  • Топик sale-events-topic

    • Key: product123

    • Value:

      {   "eventId": "sale1",   "productId": "product123",   "quantity": 20,   "storeId": "storeA",   "saleDate": "2023-07-22",   "timestamp": 1690000100000 }

Проверка состояния через REST API

Отправьте GET-запрос:

curl <http://localhost:7777/inventory/product123>

Ожидаемый ответ:

{   "productId": "product123",   "quantity": 80 }

Проверка выходного топика inventory-output-topic

Вы можете просмотреть сообщения в этом топике через Kafka UI или консольного потребителя:

kafka-console-consumer.sh --topic inventory-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=": "

Ожидаемый вывод: product123: 80

Заключение

Мы успешно реализовали приложение для управления запасами в реальном времени с использованием Kafka Streams и stateful processing. Приложение обрабатывает поступления и продажи товаров, поддерживает актуальное состояние запасов и предоставляет доступ к этому состоянию через REST API.

Полный исходный код нашего приложения


ссылка на оригинал статьи https://habr.com/ru/articles/862976/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *