Туториал kafka + springboot + docker

от автора

Привет, Хабр! Это моя первая статья + я являюсь джуном, так что очень жду вашей критики (пожалуйста адекватной)

Немного о мотивации

Я ежедневно работаю с кафкой, но вот поймал себя на мысли, что не понимаю как она устроена(прям вот совсем плохо)! В моей голове живет мысль — «хочешь понять технологию — напиши ее», но изобретать такой велосипед как кафка — сил мне не хватит (надеюсь пока что). Так что мной было принято решение написать какое‑то маленькое демо‑приложение с поднятием кафки с нуля.

Apache Kafka

Есть множество статей и видео с детальным описанием устройства кафки, но пока сам не потрогаешь — все эти топики, партиции, офсеты будут непонятны(конечно говорю о своем джунском уровне).

Если нужно что‑то подробнее

С моего джунского трона — кафка это меганадежная очередь сообщений, то есть если все правильно настроено, то продюсер точно отправит сообщение и оно с невероятно большим шансом дойдет до консьюмера. Кафка подразумевает в себе огромное количество перестраховок на случай падения той или иной части сервиса, репликаций и тп. Можно сказать, что в ней нет функции удаления сообщений(они будут хранится там пока не пройдет их TTL)

Кластер — множество брокеров.

Продюсеры — клиентский отправитель сообщения в топик.

Топик — логический канал, куда публикуются сообщения(по сути своей очередь, тк сообщения добавляются в конец), по сути своей множество партиций.

Брокеры — серверы, образующие кластер кафки, в них как раз лежат топики.(Клиенты (производители и потребители) подключаются к любому брокеру, используя его как bootstrap‑сервер. После подключения брокер предоставляет данные обо всем кластере, включая информацию о других брокерах, топиках и пкартициях)

Партиции — каждый топик делится на партиции, и партиции разбросаны по разным брокерам (напомню, топик это логический канал).

Консьюмер— клиент, который подписывается на топики и читает сообщения. Собственно это и отличает кафку от обычной очереди — тк разные консьюмеры могут вычитывать из топика.

Оффсет — уникальный номер (по сути от 0 до последнего сообщения) сообщения в партиции.

Резюмируем(упрощенно, пока без реплик) — есть кластер, это множество брокеров, в брокерах лежат логические каналы — топики( при том, что один топик может быть разделен на несколько партиций, которые лежат в разных брокерах — горизонтальное масштабирование и параллелизм:) ).

Один топик в разных брокерах(картинка из статьи)

Один топик в разных брокерах(картинка из статьи)
P.S.

После прочтения данного материала не считайте, что вы разобрались в кафке, у нее еще есть множество свойств — сколько не читал все равно ощущение что лягушка на дне колодца(реплики партиций, контроллер‑брокер, Zookeeper, правила распределения партиций по брокерам и тд)

Docker + Maven

Для начала создадим проект в spring-initializr, добавим кафку и докер зависимости, тк поднимать кафку мы будем в докере.

Получим такой pom с нужными зависимостями.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>kafka.demo.docker</groupId> <artifactId>kafkaDemoDocker</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkaDemoDocker</name> <description>Demo project for Spring Boot</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-docker-compose</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>  <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>  </project>

Далее поднимем кафку в докере:

services:   kafka:     image: apache/kafka:3.7.0     container_name: kafka-broker     ports:       - "9092:9092"     environment:       KAFKA_NODE_ID: 1       KAFKA_PROCESS_ROLES: broker,controller       KAFKA_LISTENERS: CONTROLLER://0.0.0.0:29093,BROKER://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092       KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:29092,PLAINTEXT_HOST://localhost:9092       KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER       KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT       KAFKA_INTER_BROKER_LISTENER_NAME: BROKER       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1       KAFKA_LOG_DIRS: /tmp/kraft-combined-logs       CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

Разберем файл компоуза:

с 1 по 6 строчку мы определяем сервис и контейнер(ы) — его название (kafka), версию кафки(3.7.0), имя контейнера — kafka‑broker, настраиваем пробрасывание порта с хоста(в моем случае ноутбука) в контейнер чтобы обращаться из спрингбут приложения в кафку.

Далее будет немного сложнее — в 7–9 строчках мы настраиваем номер ноды в кафке(чтобы на хорошем уровне понимать что это значит стоит прочитать это). Если кратко — раньше был Zookeeper, в каком‑то смысле менеджер кафки и в то же время хранилище метаданных/настроек оной (с помощью Zookeeper, например, выбирался контроллер кластера). Теперь вместо Zookeeper кафка сама хранит метаданные и управляет собой на основе технологии рафт‑ на одном сервере. (до версии 3.3.0 сервер был либо контроллером, либо брокером, но теперь он может работать в комбинированном режиме, что и указано в 9 строчке)

В 10 строчке мы указываем какие порты кафка будет слушать внутри контейнера, один для связи контроллеров, другой для связи между брокерами.

В 11 строчке мы указываем два адреса. Первый — адрес для докер сети, по которому к нашему брокеру будут обращаться, а второй адрес — подключения на хост‑машине(в моем случае ноутбук).

В 12 строчке устанавливаем, что контроллером будет листинер, который называется контроллер и соответственно слушает 29 093 порт

В 13 строчке определяем кворум контроллер для кластера. В Формате <node_id>@<hostname>:<port>

Кворум это

Кворум — нечетное количество серверов с ролью контроллер(или комбинированной), работающих по протоколу Raft — гарантирующему, что все ноды кворума имеют одинаковые метаданные.

Используется для перераспределения партиций в случае падения брокера или выбора нового главного контроллера в случае его падения.

В 14 строчке настраиваем протокол обменивания информации, в нашем случае достаточно передавать голый текст, без использования SSL??

В 15 строчке выбираем брокера(аналогично контроллеру).

В 16 указываем количество репликаций внутреннего топика(тк по умолчанию их 3, а у нас всего один брокер, а исходя из правила кафки все ломается. Попробуйте разобраться подробнее и напишите в комментарии!).

Правило Kafka

Фактор репликации (количество копий топика) не может быть больше, чем число живых брокеров в кластере.

УРА! Вроде бы разобрались

Далее мы поднимаем кафку:

docker compose up -d

Затем заходим в контейнер и создаем топик прямо из него

docker exec -it kafka-broker /bin/sh

Если вам как и мне лень добавлять что-то в PATH, а чтобы создать топик кафки — нам нужен файл kafka-topics.sh, как именно это работает я разбирать не буду… В общем нам нужно перейти в директорию с этим файлом.

cd /opt/kafka/bin/

Возможно у вас он лежит в другом месте, тогда команда find в помощь.

После этого выполняем команду

bash kafka-topics.sh --create \   --topic demo-topic \   --bootstrap-server kafka:29092 \   --partitions 1 \   --replication-factor 1

Создаем топик с названием demo‑topic, далее указываем сервер из yaml файла, тк в докере мы общаемся по именам, с 1 партицией и одной ее копией.

Эта команда под капотом запускает нужный джава класс(тк кафка написана на джаве) с данными аргументами.

Вызванный нами скрипт, если убрать комментарии выглядит так:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

Springboot

Наконец‑то переходим к java коду:

Начнем с продюсера.

Для начала укажем конфиги — адрес сервера, тип сериализации для ключа и значения нашего сообщения из строк в байты.

Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Далее создадим экземпляр класса KafkaProducer

Producer<String, String> producer = new KafkaProducer<>(props);

Далее определим наше сообщение — топик, куда оно отправится, ключ и его значение(key — необязательный параметр)

Немного про ключ

Сообщения с одинаковым ключом всегда попадают в одну и ту же партицию и обрабатываются в порядке отправления + ими удобно разделять по принадлежности к какому-то атрибуту

String topic = "demo-topic"; String key = "test-key"; String value = "Hello, world!";

И конечно сама логика отправки сообщения обернутая в try-catch конечно же

try {     RecordMetadata metadata = producer.send(record).get();     System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",             metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) {     System.err.println("Ошибка при отправке сообщения: " + e.getMessage()); } finally {     producer.close(); }
Что происходит под капотом producer.send(record)

1) Сериализация ключа и сообщения

2) Определяется хеш ключа(если указан), и определяется раздел топика

3) Отправка в брокер через батч

4) Далее ожидание ответа

  • .get() блокирует текущий поток до получения ответа от брокера

5) После успешной записи возврщается объект классa RecordMetadata с информацией

  • Топик, в который записано сообщение

  • Раздел (partition)

  • Offset (позиция в разделе)

  • Временная метка

Хочется сказать, что для продакшена не рекомендуется использовать.get, то есть не блокировать поток, а делать так

producer.send(record, (metadata, exception) -> {     if (exception == null) {         System.out.printf("Отправлено успешно: partition=%d, offset=%d%n",              metadata.partition(), metadata.offset());     } else {         System.err.println("Ошибка отправки: " + exception.getMessage());     } });

Итоговый класс выглядит так:

public class SimpleProducer {     public static void main(String[] args) {         Properties props = new Properties();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");           Producer<String, String> producer = new KafkaProducer<>(props);           String topic = "demo-topic";         String key = "test-key";         String value = "Hello, world!";         ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);          try {             RecordMetadata metadata = producer.send(record).get();             System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n",                     metadata.topic(), metadata.partition(), metadata.offset());         } catch (InterruptedException | ExecutionException e) {             System.err.println("Ошибка при отправке сообщения: " + e.getMessage());         } finally {             producer.close();         }     } }

Теперь напишем Консюмер

Здесь в конфигах укажем то же самое + название группы потребителей + поведение при отсутствии сохраненного офсета для вычитки.

Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Далее создадим экземпляр класса потребителя

Consumer<String, String> consumer = new KafkaConsumer<>(props);

Подписываемся на нужный топик:

String topic = "demo-topic"; consumer.subscribe(Collections.singletonList(topic));

Добавим небольшое логирование и сама логика отправки:

System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");  try {     while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));          for (ConsumerRecord<String, String> record : records) {             System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",                     record.key(), record.value(), record.partition(), record.offset());         }     } } finally {     consumer.close(); }
Что происходит под капотом consumer.poll(Duration.ofMillis(100));

0) Наши приложение тригерит кафка-консюмер, а дальше действует он

1) Спрашиваем брокер о метаданных

  • Какие разделы (partitions) ему назначены после балансировки

  • Текущую позицию (offset) для каждого раздела

  • Какой брокер является лидером для каждого раздела

2) Консюмер отправляет fetch-запросы к брокерам лидерам партиций

3) В течение 100мс ожидает ответ

4) Орабатывает ответ

  • Десериализуются (преобразуются из байтов в строки)

  • Группируются в объект ConsumerRecords

  • Offsets автоматически обновляются для последующего коммита

Итоговый класс выглядит так:

public class SimpleConsumer {     public static void main(String[] args) {         Properties props = new Properties();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");          Consumer<String, String> consumer = new KafkaConsumer<>(props);          String topic = "demo-topic";         consumer.subscribe(Collections.singletonList(topic));          System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C");          try {             while (true) {                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));                  for (ConsumerRecord<String, String> record : records) {                     System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n",                             record.key(), record.value(), record.partition(), record.offset());                 }             }         } finally {             consumer.close();         }     } } 

Чтобы проверить работу приложения, запустите класс консюмера, а потом продюсера — готово!


ссылка на оригинал статьи https://habr.com/ru/articles/944672/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *