Привет сообщество Хабр =)
Начав изучать Kafka Streams, я заметил, что для решения различных задач приходится искать информацию по разным источникам, поэтому со временем накопилось много собственных конспектов. Хочу поделиться ими в виде серии туториалов на Хабре.
Несмотря на обилие ресурсов по Kafka Streams и отличные статьи на Хабре [ноль, один, два], мне не хватало пошаговых руководств, которые детально раскрывают изъяны и преимущества этой технологии. Поэтому решил создать такой материал, чтобы помочь другим разобраться структурно и последовательно.
? Не буду рассматривать, что такое Apache Kafka. Предполагается, что вы уже знакомы с данным брокером.
Я не сразу понял ценность данной библиотеки (Kafka Streams это часть Apache Kafka) как бы приложение все равно читает сообщение применяет набор действий к сообщению и записывает снова в тот или другой топик, или в совсем другой источник данных, в общем на первый взгляд будто бы никакой разницы, но с погружением в детали начинаешь понимать почему сделали этот инструмент и почему он удобен и востребован.
Без Kafka Streams
Установим нужный пакет
implementation("org.apache.kafka:kafka-clients:3.8.0")
При прямом использовании Apache Kafka нужно использовать KafkaConsumer
для чтения сообщений из топика. Вот простой пример:
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Collections; public class SimpleKafkaConsumerProducer { public static void main(String[] args) { // Настройки для consumer Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // Настройки для producer Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); var consumer = new KafkaConsumer<String, String>(consumerProps); var producer = new KafkaProducer<String, String>(producerProps); consumer.subscribe(Collections.singletonList("input-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // Обработка сообщения String originalValue = record.value(); String processedValue = originalValue.toUpperCase(); // Отправка обработанного сообщения в другой топик producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue)); System.out.println("Обработанное сообщение: " + processedValue); } } } finally { consumer.close(); producer.close(); } } }
-
Чтение сообщения: Используем
KafkaConsumer
для чтения сообщений изinput-topic
. -
Обработка сообщения: В данном случае просто преобразуем текст сообщения в верхний регистр.
-
Запись сообщения: Используем
KafkaProducer
для отправки обработанного сообщения вoutput-topic
.
С Kafka Streams
Установим нужный пакет
implementation("org.apache.kafka:kafka-streams:3.8.0")
Теперь тот же функционал с использованием Kafka Streams:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties; public class SimpleKafkaStreamsApp { public static void main(String[] args) { // Настройки для Kafka Streams Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // чтобы читать данные с начала топика в явном виде указываем начальное смещение props.put("consumer.auto.offset.reset", "earliest"); // Строим топологию потоков StreamsBuilder builder = new StreamsBuilder(); // Читаем данные из входного топика KStream<String, String> source = builder.stream("input-topic"); // Обрабатываем данные (преобразуем в верхний регистр) KStream<String, String> processed = source.mapValues(value -> value.toUpperCase()); // Отправляем обработанные данные в выходной топик processed.to("output-topic"); // Создаем и запускаем Kafka Streams приложение KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // Добавляем shutdown hook для корректного завершения работы Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
-
Чтение и обработка данных: Используем
StreamsBuilder
для построения топологии потоков. МетодmapValues
применяется для преобразования значения сообщения. -
Запись данных: Обработанные данные автоматически отправляются в
output-topic
с помощью методаto
.
Без Kafka Streams:
-
Нужно самостоятельно управлять потребителем и производителем.
-
Больше кода для настройки и управления.
-
Менее декларативный подход.
С Kafka Streams:
-
Управление потоками данных абстрагировано библиотекой.
-
Код более декларативен и фокусируется на логике обработки.
-
Легче масштабировать и добавлять сложные преобразования.
Визуальная схема, того почему обработка данных с помощью Kafka Streams проще и эффективнее.
На схеме видно, что при прямом использовании Kafka нам приходится самостоятельно управлять потребителями и производителями, что усложняет код и архитектуру приложения. С Kafka Streams же мы работаем с более высоким уровнем абстракции, где многие детали управления потоками данных уже реализованы за нас. Это позволяет сосредоточиться на логике обработки данных, а не на инфраструктурных деталях.
Пока рано делать окончательные выводы — это лишь верхушка айсберга. Чем глубже мы погружаемся в Kafka Streams, тем более очевидной становится её ценность. Продолжение следует в следующей статье.
ссылка на оригинал статьи https://habr.com/ru/articles/850832/
Добавить комментарий