Привет, Хабр! Это моя первая статья + я являюсь джуном, так что очень жду вашей критики (пожалуйста адекватной)
Немного о мотивации
Я ежедневно работаю с кафкой, но вот поймал себя на мысли, что не понимаю как она устроена(прям вот совсем плохо)! В моей голове живет мысль — «хочешь понять технологию — напиши ее», но изобретать такой велосипед как кафка — сил мне не хватит (надеюсь пока что). Так что мной было принято решение написать какое‑то маленькое демо‑приложение с поднятием кафки с нуля.
Apache Kafka
Есть множество статей и видео с детальным описанием устройства кафки, но пока сам не потрогаешь — все эти топики, партиции, офсеты будут непонятны(конечно говорю о своем джунском уровне).
С моего джунского трона — кафка это меганадежная очередь сообщений, то есть если все правильно настроено, то продюсер точно отправит сообщение и оно с невероятно большим шансом дойдет до консьюмера. Кафка подразумевает в себе огромное количество перестраховок на случай падения той или иной части сервиса, репликаций и тп. Можно сказать, что в ней нет функции удаления сообщений(они будут хранится там пока не пройдет их TTL)
Кластер — множество брокеров.
Продюсеры — клиентский отправитель сообщения в топик.
Топик — логический канал, куда публикуются сообщения(по сути своей очередь, тк сообщения добавляются в конец), по сути своей множество партиций.
Брокеры — серверы, образующие кластер кафки, в них как раз лежат топики.(Клиенты (производители и потребители) подключаются к любому брокеру, используя его как bootstrap‑сервер. После подключения брокер предоставляет данные обо всем кластере, включая информацию о других брокерах, топиках и пкартициях)
Партиции — каждый топик делится на партиции, и партиции разбросаны по разным брокерам (напомню, топик это логический канал).
Консьюмер— клиент, который подписывается на топики и читает сообщения. Собственно это и отличает кафку от обычной очереди — тк разные консьюмеры могут вычитывать из топика.
Оффсет — уникальный номер (по сути от 0 до последнего сообщения) сообщения в партиции.
Резюмируем(упрощенно, пока без реплик) — есть кластер, это множество брокеров, в брокерах лежат логические каналы — топики( при том, что один топик может быть разделен на несколько партиций, которые лежат в разных брокерах — горизонтальное масштабирование и параллелизм:) ).
P.S.
После прочтения данного материала не считайте, что вы разобрались в кафке, у нее еще есть множество свойств — сколько не читал все равно ощущение что лягушка на дне колодца(реплики партиций, контроллер‑брокер, Zookeeper, правила распределения партиций по брокерам и тд)
Docker + Maven
Для начала создадим проект в spring-initializr, добавим кафку и докер зависимости, тк поднимать кафку мы будем в докере.
Получим такой pom с нужными зависимостями.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>kafka.demo.docker</groupId> <artifactId>kafkaDemoDocker</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkaDemoDocker</name> <description>Demo project for Spring Boot</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-docker-compose</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Далее поднимем кафку в докере:
services: kafka: image: apache/kafka:3.7.0 container_name: kafka-broker ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: CONTROLLER://0.0.0.0:29093,BROKER://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
Разберем файл компоуза:
с 1 по 6 строчку мы определяем сервис и контейнер(ы) — его название (kafka), версию кафки(3.7.0), имя контейнера — kafka‑broker, настраиваем пробрасывание порта с хоста(в моем случае ноутбука) в контейнер чтобы обращаться из спрингбут приложения в кафку.
Далее будет немного сложнее — в 7–9 строчках мы настраиваем номер ноды в кафке(чтобы на хорошем уровне понимать что это значит стоит прочитать это). Если кратко — раньше был Zookeeper, в каком‑то смысле менеджер кафки и в то же время хранилище метаданных/настроек оной (с помощью Zookeeper, например, выбирался контроллер кластера). Теперь вместо Zookeeper кафка сама хранит метаданные и управляет собой на основе технологии рафт‑ на одном сервере. (до версии 3.3.0 сервер был либо контроллером, либо брокером, но теперь он может работать в комбинированном режиме, что и указано в 9 строчке)
В 10 строчке мы указываем какие порты кафка будет слушать внутри контейнера, один для связи контроллеров, другой для связи между брокерами.
В 11 строчке мы указываем два адреса. Первый — адрес для докер сети, по которому к нашему брокеру будут обращаться, а второй адрес — подключения на хост‑машине(в моем случае ноутбук).
В 12 строчке устанавливаем, что контроллером будет листинер, который называется контроллер и соответственно слушает 29 093 порт
В 13 строчке определяем кворум контроллер для кластера. В Формате <node_id>@<hostname>:<port>
Кворум это
Кворум — нечетное количество серверов с ролью контроллер(или комбинированной), работающих по протоколу Raft — гарантирующему, что все ноды кворума имеют одинаковые метаданные.
Используется для перераспределения партиций в случае падения брокера или выбора нового главного контроллера в случае его падения.
В 14 строчке настраиваем протокол обменивания информации, в нашем случае достаточно передавать голый текст, без использования SSL??
В 15 строчке выбираем брокера(аналогично контроллеру).
В 16 указываем количество репликаций внутреннего топика(тк по умолчанию их 3, а у нас всего один брокер, а исходя из правила кафки все ломается. Попробуйте разобраться подробнее и напишите в комментарии!).
Правило Kafka
Фактор репликации (количество копий топика) не может быть больше, чем число живых брокеров в кластере.
УРА! Вроде бы разобрались
Далее мы поднимаем кафку:
docker compose up -d
Затем заходим в контейнер и создаем топик прямо из него
docker exec -it kafka-broker /bin/sh
Если вам как и мне лень добавлять что-то в PATH, а чтобы создать топик кафки — нам нужен файл kafka-topics.sh, как именно это работает я разбирать не буду… В общем нам нужно перейти в директорию с этим файлом.
cd /opt/kafka/bin/
Возможно у вас он лежит в другом месте, тогда команда find в помощь.
После этого выполняем команду
bash kafka-topics.sh --create \ --topic demo-topic \ --bootstrap-server kafka:29092 \ --partitions 1 \ --replication-factor 1
Создаем топик с названием demo‑topic, далее указываем сервер из yaml файла, тк в докере мы общаемся по именам, с 1 партицией и одной ее копией.
Эта команда под капотом запускает нужный джава класс(тк кафка написана на джаве) с данными аргументами.
Вызванный нами скрипт, если убрать комментарии выглядит так:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
Springboot
Наконец‑то переходим к java коду:
Начнем с продюсера.
Для начала укажем конфиги — адрес сервера, тип сериализации для ключа и значения нашего сообщения из строк в байты.
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Далее создадим экземпляр класса KafkaProducer
Producer<String, String> producer = new KafkaProducer<>(props);
Далее определим наше сообщение — топик, куда оно отправится, ключ и его значение(key — необязательный параметр)
Немного про ключ
Сообщения с одинаковым ключом всегда попадают в одну и ту же партицию и обрабатываются в порядке отправления + ими удобно разделять по принадлежности к какому-то атрибуту
String topic = "demo-topic"; String key = "test-key"; String value = "Hello, world!";
И конечно сама логика отправки сообщения обернутая в try-catch конечно же
try { RecordMetadata metadata = producer.send(record).get(); System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) { System.err.println("Ошибка при отправке сообщения: " + e.getMessage()); } finally { producer.close(); }
Что происходит под капотом producer.send(record)
1) Сериализация ключа и сообщения
2) Определяется хеш ключа(если указан), и определяется раздел топика
3) Отправка в брокер через батч
4) Далее ожидание ответа
-
.get()блокирует текущий поток до получения ответа от брокера
5) После успешной записи возврщается объект классa RecordMetadata с информацией
-
Топик, в который записано сообщение
-
Раздел (partition)
-
Offset (позиция в разделе)
-
Временная метка
Хочется сказать, что для продакшена не рекомендуется использовать.get, то есть не блокировать поток, а делать так
producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("Отправлено успешно: partition=%d, offset=%d%n", metadata.partition(), metadata.offset()); } else { System.err.println("Ошибка отправки: " + exception.getMessage()); } });
Итоговый класс выглядит так:
public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); String topic = "demo-topic"; String key = "test-key"; String value = "Hello, world!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); try { RecordMetadata metadata = producer.send(record).get(); System.out.printf("Сообщение отправлено успешно! topic = %s, partition = %d, offset = %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (InterruptedException | ExecutionException e) { System.err.println("Ошибка при отправке сообщения: " + e.getMessage()); } finally { producer.close(); } } }
Теперь напишем Консюмер
Здесь в конфигах укажем то же самое + название группы потребителей + поведение при отсутствии сохраненного офсета для вычитки.
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Далее создадим экземпляр класса потребителя
Consumer<String, String> consumer = new KafkaConsumer<>(props);
Подписываемся на нужный топик:
String topic = "demo-topic"; consumer.subscribe(Collections.singletonList(topic));
Добавим небольшое логирование и сама логика отправки:
System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C"); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } } } finally { consumer.close(); }
Что происходит под капотом consumer.poll(Duration.ofMillis(100));
0) Наши приложение тригерит кафка-консюмер, а дальше действует он
1) Спрашиваем брокер о метаданных
-
Какие разделы (partitions) ему назначены после балансировки
-
Текущую позицию (offset) для каждого раздела
-
Какой брокер является лидером для каждого раздела
2) Консюмер отправляет fetch-запросы к брокерам лидерам партиций
3) В течение 100мс ожидает ответ
4) Орабатывает ответ
-
Десериализуются (преобразуются из байтов в строки)
-
Группируются в объект
ConsumerRecords -
Offsets автоматически обновляются для последующего коммита
Итоговый класс выглядит так:
public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "demo-topic"; consumer.subscribe(Collections.singletonList(topic)); System.out.println("Ожидание сообщений из топика: " + topic + ". Для выхода нажмите Ctrl+C"); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Получено сообщение: key = %s, value = %s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } } } finally { consumer.close(); } } }
Чтобы проверить работу приложения, запустите класс консюмера, а потом продюсера — готово!
ссылка на оригинал статьи https://habr.com/ru/articles/944672/
Добавить комментарий