Kafka Streams ч1: Привет мир

от автора

Привет сообщество Хабр =)

Начав изучать Kafka Streams, я заметил, что для решения различных задач приходится искать информацию по разным источникам, поэтому со временем накопилось много собственных конспектов. Хочу поделиться ими в виде серии туториалов на Хабре.

Несмотря на обилие ресурсов по Kafka Streams и отличные статьи на Хабре [ноль, один, два], мне не хватало пошаговых руководств, которые детально раскрывают изъяны и преимущества этой технологии. Поэтому решил создать такой материал, чтобы помочь другим разобраться структурно и последовательно.

? Не буду рассматривать, что такое Apache Kafka. Предполагается, что вы уже знакомы с данным брокером.

Я не сразу понял ценность данной библиотеки (Kafka Streams это часть Apache Kafka) как бы приложение все равно читает сообщение применяет набор действий к сообщению и записывает снова в тот или другой топик, или в совсем другой источник данных, в общем на первый взгляд будто бы никакой разницы, но с погружением в детали начинаешь понимать почему сделали этот инструмент и почему он удобен и востребован.

Без Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-clients:3.8.0")

При прямом использовании Apache Kafka нужно использовать KafkaConsumer для чтения сообщений из топика. Вот простой пример:

import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Collections;  public class SimpleKafkaConsumerProducer {     public static void main(String[] args) {         // Настройки для consumer         Properties consumerProps = new Properties();         consumerProps.put("bootstrap.servers", "localhost:9092");         consumerProps.put("group.id", "my-group");         consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");          // Настройки для producer         Properties producerProps = new Properties();         producerProps.put("bootstrap.servers", "localhost:9092");         producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");          var consumer = new KafkaConsumer<String, String>(consumerProps);         var producer = new KafkaProducer<String, String>(producerProps);          consumer.subscribe(Collections.singletonList("input-topic"));          try {             while (true) {                 ConsumerRecords<String, String> records = consumer.poll(100);                  for (ConsumerRecord<String, String> record : records) {                     // Обработка сообщения                     String originalValue = record.value();                     String processedValue = originalValue.toUpperCase();                      // Отправка обработанного сообщения в другой топик                     producer.send(new ProducerRecord<>("output-topic", record.key(), processedValue));                      System.out.println("Обработанное сообщение: " + processedValue);                 }             }         } finally {             consumer.close();             producer.close();         }     } } 
  • Чтение сообщения: Используем KafkaConsumer для чтения сообщений из input-topic.

  • Обработка сообщения: В данном случае просто преобразуем текст сообщения в верхний регистр.

  • Запись сообщения: Используем KafkaProducer для отправки обработанного сообщения в output-topic.

С Kafka Streams

Установим нужный пакет

implementation("org.apache.kafka:kafka-streams:3.8.0")

Теперь тот же функционал с использованием Kafka Streams:

import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.StreamsConfig; import java.util.Properties;  public class SimpleKafkaStreamsApp {     public static void main(String[] args) {         // Настройки для Kafka Streams         Properties props = new Properties();         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());         // чтобы читать данные с начала топика в явном виде указываем начальное смещение         props.put("consumer.auto.offset.reset", "earliest");          // Строим топологию потоков         StreamsBuilder builder = new StreamsBuilder();          // Читаем данные из входного топика         KStream<String, String> source = builder.stream("input-topic");          // Обрабатываем данные (преобразуем в верхний регистр)         KStream<String, String> processed = source.mapValues(value -> value.toUpperCase());          // Отправляем обработанные данные в выходной топик         processed.to("output-topic");          // Создаем и запускаем Kafka Streams приложение         KafkaStreams streams = new KafkaStreams(builder.build(), props);         streams.start();          // Добавляем shutdown hook для корректного завершения работы         Runtime.getRuntime().addShutdownHook(new Thread(streams::close));     } } 
  • Чтение и обработка данных: Используем StreamsBuilder для построения топологии потоков. Метод mapValues применяется для преобразования значения сообщения.

  • Запись данных: Обработанные данные автоматически отправляются в output-topic с помощью метода to.

Без Kafka Streams:

  • Нужно самостоятельно управлять потребителем и производителем.

  • Больше кода для настройки и управления.

  • Менее декларативный подход.

С Kafka Streams:

  • Управление потоками данных абстрагировано библиотекой.

  • Код более декларативен и фокусируется на логике обработки.

  • Легче масштабировать и добавлять сложные преобразования.

Визуальная схема, того почему обработка данных с помощью Kafka Streams проще и эффективнее.

Kafka Streams

Kafka Streams

На схеме видно, что при прямом использовании Kafka нам приходится самостоятельно управлять потребителями и производителями, что усложняет код и архитектуру приложения. С Kafka Streams же мы работаем с более высоким уровнем абстракции, где многие детали управления потоками данных уже реализованы за нас. Это позволяет сосредоточиться на логике обработки данных, а не на инфраструктурных деталях.

Пока рано делать окончательные выводы — это лишь верхушка айсберга. Чем глубже мы погружаемся в Kafka Streams, тем более очевидной становится её ценность. Продолжение следует в следующей статье.


ссылка на оригинал статьи https://habr.com/ru/articles/850832/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *