В предыдущих статьях
мы познакомились с основами Kafka Streams и рассмотрели stateless операции. В этой статье мы погрузимся в stateful processing и создадим приложение для управления запасами в реальном времени. Шаг за шагом мы реализуем функциональность, которая позволит отслеживать состояние запасов товаров, обрабатывать поступления и продажи, а также предоставлять доступ к текущему состоянию через REST API.
Описание задачи
Мы разработаем приложение, которое:
-
Обрабатывает потоки данных о поступлениях товаров на склад и о продажах товаров.
-
Поддерживает актуальное состояние запасов для каждого товара.
-
Обновляет состояние запасов при поступлении новых товаров и при продажах.
-
Использует stateful операции для хранения состояния запасов.
-
Предоставляет доступ к состоянию через REST API с использованием Javalin.
-
Обогащает данные о транзакциях информацией о товаре из справочника.
-
Выводит информацию об остатках товаров в выходной топик Kafka.
Архитектура приложения
Бизнес-сущности
У нас есть три основных сущности:
-
Поступления товаров (
RestockEvent) -
Продажи товаров (
SaleEvent) -
Информация о товарах (
ProductInfo)
Топики Kafka
Мы будем работать с четырьмя топиками:
-
restock-events-topic(KStream) -
sale-events-topic(KStream) -
product-info-topic(GlobalKTable) -
inventory-output-topic(выходной топик)
Бизнес-схема
flowchart LR RestockEvents[Поступления товаров] -->|KStream| InventoryProcessor[Процессор запасов] SaleEvents[Продажи товаров] -->|KStream| InventoryProcessor ProductInfo[Информация о товарах] -->|GlobalKTable| InventoryProcessor InventoryProcessor -->|State Store| InventoryState[Состояние запасов] InventoryProcessor -->|Выходной топик| InventoryOutput[Остатки товаров] InventoryState -->|REST API| JavalinServer[Сервер Javalin]
Шаг 1: Настройка проекта
Структура проекта
Создадим проект со следующей структурой:
css Copy code kafka-streams-stateful/ ├── build.gradle.kts ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com/ │ │ │ └── example/ │ │ │ └── inventory/ │ │ │ ├── InventoryApp.java │ │ │ ├── InventoryChange.java │ │ │ ├── InventoryProcessor.java │ │ │ ├── InventoryRestService.java │ │ │ ├── GsonSerializer.java │ │ │ ├── GsonDeserializer.java │ │ │ ├── ProductInfo.java │ │ │ ├── RestockEvent.java │ │ │ └── SaleEvent.java │ │ └── resources/ │ └── test/ └── settings.gradle.kts
-
build.gradle.kts: Файл сборки проекта с необходимыми зависимостями. -
src/main/java/com/example/inventory/: Пакет с основными классами приложения.-
InventoryApp.java: Главный класс приложения, запускающий Kafka Streams и REST API. -
InventoryProcessor.java: Класс, содержащий логику обработки потоков данных и агрегации состояния запасов. -
InventoryRestService.java: Класс, предоставляющий REST API для доступа к состоянию запасов. -
InventoryChange.java: Модель данных для изменений запасов. -
RestockEvent.java: Модель данных поступления товаров. -
SaleEvent.java: Модель данных продаж товаров. -
ProductInfo.java: Модель данных информации о товарах. -
GsonSerializer.java: Класс для сериализации объектов в JSON. -
GsonDeserializer.java: Класс для десериализации JSON в объекты.
-
-
src/main/resources/: Ресурсы приложения (если необходимо). -
src/test/: Тесты для приложения (опционально). -
settings.gradle.kts: Настройки Gradle для проекта.
Файл сборки build.gradle.kts
plugins { id("java") id("application") } group = "com.example.inventory" version = "1.0" repositories { mavenCentral() } dependencies { implementation("org.apache.kafka:kafka-streams:3.8.0") implementation("io.javalin:javalin:6.1.6") implementation("com.squareup.okhttp3:okhttp:4.12.0") implementation("com.google.code.gson:gson:2.9.1") implementation("org.slf4j:slf4j-simple:2.0.12") } application { mainClass.set("com.example.inventory.InventoryApp") }
Шаг 2: Создание моделей данных
Модель RestockEvent
public class RestockEvent { private String eventId; private String productId; private int quantity; private String supplier; private long timestamp; }
Модель SaleEvent
public class SaleEvent { private String eventId; private String productId; private int quantity; private String storeId; private String saleDate; // Формат YYYY-MM-DD private long timestamp; }
Модель ProductInfo
public class ProductInfo { private String productId; private String productName; private String category; private double price; private String manufacturer; private int reorderLevel; }
Модель InventoryChange
public class InventoryChange { private String productId; private int quantityChange; }
Шаг 3: Создание класса обработки InventoryProcessor
В этом шаге мы создадим класс InventoryProcessor, который будет содержать логику обработки потоков данных и агрегации состояния запасов.
Код класса InventoryProcessor
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; public class InventoryProcessor { private final StreamsBuilder builder; private static final String RESTOCK_TOPIC = "restock-events-topic"; private static final String SALE_TOPIC = "sale-events-topic"; private static final String PRODUCT_INFO_TOPIC = "product-info-topic"; private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic"; public InventoryProcessor() { this.builder = new StreamsBuilder(); } public Topology buildTopology() { // Сериализаторы и десериализаторы Serde<String> stringSerde = Serdes.String(); Serde<RestockEvent> restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class)); Serde<SaleEvent> saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class)); Serde<ProductInfo> productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class)); Serde<InventoryChange> inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class)); Serde<Integer> integerSerde = Serdes.Integer(); // Чтение потоков KStream<String, RestockEvent> restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde)); KStream<String, SaleEvent> saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde)); GlobalKTable<String, ProductInfo> productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde)); // Обогащение данных о поступлениях информацией о товаре KStream<String, RestockEvent> enrichedRestockStream = restockStream.join( productInfoTable, (key, value) -> value.getProductId(), (restockEvent, productInfo) -> { // Обогащаем restockEvent при необходимости return restockEvent; } ); // Обогащение данных о продажах информацией о товаре KStream<String, SaleEvent> enrichedSaleStream = saleStream.join( productInfoTable, (key, value) -> value.getProductId(), (saleEvent, productInfo) -> { // Обогащаем saleEvent при необходимости return saleEvent; } ); // Создаем класс InventoryChange KStream<String, InventoryChange> restockChanges = enrichedRestockStream .mapValues(restockEvent -> new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity()) ); KStream<String, InventoryChange> saleChanges = enrichedSaleStream .mapValues(saleEvent -> new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity()) ); // Объединяем поступления и продажи KStream<String, InventoryChange> inventoryChanges = restockChanges.merge(saleChanges); // Группируем по productId KGroupedStream<String, InventoryChange> groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde)); // Агрегируем изменения KTable<String, Integer> inventoryState = groupedInventory.aggregate( () -> 0, (key, value, aggregate) -> aggregate + value.getQuantityChange(), Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("inventory-store") .withKeySerde(stringSerde) .withValueSerde(integerSerde) ); // Выводим состояние в выходной топик inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde)); return builder.build(); } }
Комментарии к коду
-
Сериализация и десериализация: Используем собственные классы
GsonSerializerиGsonDeserializerдля преобразования наших объектов в JSON и обратно. -
Обогащение данных: Используем
joinдля обогащения потоков данных информацией о товарах изproductInfoTable. -
Создание изменений запасов: Создаем экземпляры
InventoryChange, представляющие изменение запасов (положительное для поступлений, отрицательное для продаж). -
Группировка и агрегирование: Группируем изменения запасов по
productIdи агрегируем их с помощьюaggregate, сохраняя текущее состояние запасов вinventory-store. -
Выходной поток: Отправляем агрегированные данные в выходной топик
inventory-output-topic.
Шаг 4: Реализация REST API с использованием Javalin
Создадим класс InventoryRestService, который будет предоставлять доступ к состоянию запасов через REST API.
Код класса InventoryRestService
import io.javalin.Javalin; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; public class InventoryRestService { private final KafkaStreams streams; public InventoryRestService(KafkaStreams streams) { this.streams = streams; } public void start() { var app = Javalin.create().start(7777); app.get("/inventory/{productId}", ctx -> { String productId = ctx.pathParam("productId"); ReadOnlyKeyValueStore<String, Integer> keyValueStore = streams.store( StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore()) ); var quantity = keyValueStore.get(productId); if (quantity != null) { ctx.json(new InventoryResponse(productId, quantity)); } else { ctx.status(404).result("Product not found"); } }); } public record InventoryResponse(String productId, int quantity) { } }
Комментарии к коду
-
Маршрут
/inventory/{productId}: Позволяет получить текущий запас товара по егоproductId. -
Доступ к state store: Используем
streams.store()сStoreQueryParameters, чтобы получить доступ к состояниюinventory-store. -
Ответ: Возвращаем JSON с
productIdи текущимquantityв виде Java RecordInventoryResponse.
Шаг 5: Объединение всего в InventoryApp
Создадим главный класс приложения InventoryApp, который будет запускать обработчик потоков и REST API.
Код класса InventoryApp
import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; public class InventoryApp { public static void main(String[] args) { var processor = new InventoryProcessor(); var topology = processor.buildTopology(); var streams = getKafkaStreams(topology); streams.start(); var restService = new InventoryRestService(streams); restService.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } private static KafkaStreams getKafkaStreams(Topology topology) { var props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); return new KafkaStreams(topology, props); } }
Шаг 6: Создание классов сериализации и десериализации
Класс GsonSerializer
import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import org.apache.kafka.common.serialization.Serializer; public class GsonSerializer<T> implements Serializer<T> { private final Gson gson = new Gson(); @Override public byte[] serialize(String topic, T data) { if (data == null) { return null; } return gson.toJson(data).getBytes(StandardCharsets.UTF_8); } }
Класс GsonDeserializer
import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import org.apache.kafka.common.serialization.Deserializer; public class GsonDeserializer<T> implements Deserializer<T> { private final Gson gson = new Gson(); private final Class<T> deserializedClass; public GsonDeserializer(Class<T> deserializedClass) { this.deserializedClass = deserializedClass; } @Override public T deserialize(String topic, byte[] data) { if (data == null) { return null; } return gson.fromJson(new String(data, StandardCharsets.UTF_8), deserializedClass); } }
Шаг 7: Инструкции по запуску и тестированию приложения
Предварительные шаги
-
Убедитесь, что Kafka запущена на
localhost:9092или доступна через Docker. -
Создайте необходимые топики:
kafka-topics.sh --create --topic restock-events-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic sale-events-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic product-info-topic --bootstrap-server localhost:9092 kafka-topics.sh --create --topic inventory-output-topic --bootstrap-server localhost:9092
Сборка и запуск приложения
./gradlew build ./gradlew run
Отправка тестовых сообщений через Kafka UI или консоль
Использование Kafka UI
-
Подключитесь к Kafka UI и убедитесь, что он настроен для подключения к вашему Kafka-брокеру.
-
Отправьте сообщения с ключами в соответствующие топики:
-
Топик
product-info-topic-
Key:
product123 -
Value:
{ "productId": "product123", "productName": "Товар 123", "category": "Категория A", "price": 100.0, "manufacturer": "Производитель A", "reorderLevel": 50 }
-
-
Топик
restock-events-topic-
Key:
product123 -
Value:
{ "eventId": "restock1", "productId": "product123", "quantity": 100, "supplier": "Поставщик A", "timestamp": 1690000000000 }
-
-
Топик
sale-events-topic-
Key:
product123 -
Value:
{ "eventId": "sale1", "productId": "product123", "quantity": 20, "storeId": "storeA", "saleDate": "2023-07-22", "timestamp": 1690000100000 }
-
Проверка состояния через REST API
Отправьте GET-запрос:
curl <http://localhost:7777/inventory/product123>
Ожидаемый ответ:
{ "productId": "product123", "quantity": 80 }
Проверка выходного топика inventory-output-topic
Вы можете просмотреть сообщения в этом топике через Kafka UI или консольного потребителя:
kafka-console-consumer.sh --topic inventory-output-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=": "
Ожидаемый вывод: product123: 80
Заключение
Мы успешно реализовали приложение для управления запасами в реальном времени с использованием Kafka Streams и stateful processing. Приложение обрабатывает поступления и продажи товаров, поддерживает актуальное состояние запасов и предоставляет доступ к этому состоянию через REST API.
Полный исходный код нашего приложения
ссылка на оригинал статьи https://habr.com/ru/articles/862976/
Добавить комментарий