В этой статье я хочу показать как можно использовать Kafka в дата-инженерии и как её «пощупать«.
Я не хотел бы повторяться по важным моментам, которые касаются архитектуры Kafka, поэтому рекомендую ознакомиться с данным видео.
В нём хорошо рассказано про основные концепции, которые будут дальше использоваться в статье, такие как:
-
Что такое
producer. -
Что такое
consumer. -
Что такое
topic. -
Что такое
offset. -
Что такое
commit. -
Что такое
partition. -
Что такое
replication.
Весь код, который будет использоваться в статье будет доступен в моём репозитории.
Разворачивание сервиса
Начнём с того, что развернем Kafka локально в Docker. Для этого создадим docker-compose.yaml со следующим кодом:
version: '3.8' services: zookeeper: image: 'confluentinc/cp-zookeeper:7.7.0' hostname: zookeeper container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - '2181:2181' kafka: image: 'confluentinc/cp-kafka:7.7.0' hostname: kafka container_name: kafka depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - '9092:9092' - '19092:19092' kafka-ui: image: 'provectuslabs/kafka-ui:v0.7.2' container_name: kafka-ui ports: - '8080:8080' environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 depends_on: - kafka networks: default: name: kafka-network
Чтобы запустить все сервисы выполним команду:
docker-compose up -d
После этого у нас запустится Kafka, ZooKeeper и UI for Apache Kafka.
UI for Apache Kafka будет доступен по адресу http://localhost:8080/ через него можно будет: создавать topic, удалять topic, смотреть сообщения в topic и прочее. Очень удобный инструмент для работы с Kafka.
Создание и удаление topic
В данном разделе мы с вами попробуем создавать и удалять topic.
Создание и удаление topic через CLI
Чтобы создать topic нужно выполнить команды ниже.
Зайти в контейнер с Kafka:
docker exec -it kafka /bin/bash
Создание topic test в Kafka:
kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Просмотр всех доступных topic в Kafka:
kafka-topics --list --bootstrap-server kafka:9092
Удаление topic test в Kafka:
kafka-topics --delete --topic test --bootstrap-server kafka:9092
Создание и удаление topic через Python
Если вам удобнее взаимодействовать с Kafka через Python, то это не проблема.
Для работы с Kafka нам понадобится библиотека confluent-kafka. В примерах ниже я использую версию 2.5.0. Весь код и список всех зависимостей находится в моём репозитории.
Точно также эти операции можно произвести без подключения к контейнеру c Kafka, а через Python.
Чтобы создать topic через Kafka:
from confluent_kafka.admin import AdminClient, NewTopic admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'}) def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None: """ Функция для создания `topic` в Kafka :param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для создания. Default `None`. :return: Ничего не возвращает """ new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics] try: f.result() # The result itself is None print("Topic {} created".format(topic)) except Exception as e: print("Failed to create topic {}: {}".format(topic, e)) example_create_topics( a=admin_client, topics=['test'], )
Важно: IDE может ругаться, что модуля NewTopic не существует, но он есть. Это официальный пакет. Это касается версии 2.5.0.
Чтобы удалить topic:
from confluent_kafka.admin import AdminClient admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'}) def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None: """ Функция для удаления `topic` в Kafka. :param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для удаления. Default `None`. :return: Ничего не возвращает. """ fs = a.delete_topics(topics, operation_timeout=30) # Wait for operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None print("Topic {} deleted".format(topic)) except Exception as e: print("Failed to delete topic {}: {}".format(topic, e)) example_delete_topics( a=admin_client, topics=['test'], )
Больше примеров использования библиотеки confluent_kafka в официальном GitHub проекта.
Kafka CLI
CLI является популярным вариантов для взаимодействия с Kafka. Изначально его нет на вашем устройстве, поэтому необходимо его скачать следующей командой:
wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
Затем распаковать:
tar -xzf kafka_2.13-3.8.0.tgz
После выполнения данных команд мы можем использовать CLI для взаимодействия с Kafka.
Важно: Все исполняемые файлы находятся в папке bin. Поэтому стоит обратить внимание, что все скрипты будут выполнять из неё.
Чтобы перейти в папку bin нужно выполнить команду:
cd kafka_2.13-3.8.0/bin/
Запись в Kafka через CLI
Чтобы произвести запись в Kafka выполним команду:
echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
Или так:
echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test
Важно: Мне привычнее вызывать скрипт командой sh, но можно и через ./.
Ещё можно создать producer в интерактивном режиме командой:
sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
После создания такого producer у нас появляется возможность писать все сообщения, которые хотим.
После выполнения команды у нас появится [> и после чего мы сможем вводить сообщения для Kafka.
Для выхода из интерактивного режима несколько раз нажмите CTRL + C.
Чтение из Kafka через CLI
Важно: topic в Kafka можно читать «с конца» и «с начала«.
Чтобы начать читать с самого начала:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning
Чтобы начать читать с конца и получать только новые сообщения:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test
Kafka Python
Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.
Запись в Kafka через Python
Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid, first_name, last_name, middle_name.
Вы можете запустить код ниже и в topic my_topic начнут записываться значения.
import json import time from confluent_kafka import Producer from faker import Faker import uuid_utils as uuid def generate_list_of_dict() -> dict[str, str]: fake = Faker(locale='ru_RU') return { 'uuid': str(uuid.uuid7()), 'first_name': fake.first_name(), 'last_name': fake.last_name(), 'middle_name': fake.middle_name(), } # Define the Kafka configuration conf = {'bootstrap.servers': "localhost:19092"} # Create a Producer instance with the above configuration producer = Producer(conf) while True: # Define some data to send to Kafka data = generate_list_of_dict() # Convert the data to a JSON string data_str = json.dumps(data) # Produce a message to the "my_topic" topic producer.produce(topic="my_topic", value=data_str) # Flush the producer to ensure all messages are sent producer.flush() # Sleep for a second before producing the next set of messages time.sleep(3)
Важно: Если topic ранее не был создан, то он создастся при первой записи.
Чтение из Kafka через Python
Для того чтобы прочитать значения из Kafka нам необходимо создать consumer. Функция ниже имеет возможность прочитать topic с самого начала и с определённого offset.
from confluent_kafka import Consumer, KafkaError, TopicPartition def consume_messages(topic: str = None, offset: int = None) -> None: conf = { 'bootstrap.servers': 'localhost:19092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' } consumer = Consumer(conf) if offset is not None: partitions = consumer.list_topics(topic).topics[topic].partitions for partition in partitions: consumer.assign([TopicPartition(topic, partition, offset)]) else: consumer.subscribe([topic]) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError: print('Reached end of partition') else: print(f'Error: {msg.error()}') else: print(f'Received message: {msg.value().decode("utf-8")}') except KeyboardInterrupt: pass finally: consumer.close() # Читать с начала consume_messages('test') # Читать с определенного offset # consume_messages('test', offset=5)
Ранее мы читали topic в Kafka без использования групп и поэтому атрибут --from-beginning срабатывал каждый раз при вызове (каждый раз создавалась новая группа).
Но при создании consumer через Python указание group.id является обязательным и поэтому мы можем столкнуться со следующей проблемой: если мы один раз прочитали topic, то при перезапуске кода мы начнем читать только новые сообщения и даже атрибут auto.offset.reset не поможет.
А всё это происходит, потому что мы произвели commit (фиксацию) offset для группы.
Чтобы проверить на каком сейчас offset находится группа необходимо выполнить команду в Kafka:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe
И мы увидим, что мы прочитали все сообщения. Поэтому offset стоит на последнем сообщении в topic.
-
CURRENT-OFFSETговорит о том на какомoffsetнаходится группа. -
LOG-END-OFFSETтекущий последний доступныйoffsetдляtopic

Вообще, это не проблема, потому что данный offset можно «сбросить«, для этого необходимо выполнить команду:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test
Также можно прочитать topic заново изменив group.id. Но это делать не рекомендуется.
Использование Kafka в дата-инженерии
В дата-инженерии Kafka частый гость, потому что Kafka позволяет быстро и за дёшево покрыть множество бизнес-задач, таких как:
CDC
При реализации CDC вы можете встретиться с Kafka, потому что она является «стандартом» при работе с такого вида событиями.
Если вы хотите понять что такое CDC и какую роль там занимает Kafka вы можете изучить мою статью: CDC на примитивах.
Event-driven
Так как Kafka позволяет нам получать изменения «моментально«. В этом определении есть определённые нюансы, но это тема для другого разговора.
Если вернуться к мысли выше, то получая все события «моментально» мы можем на них реагировать.
Для примера: покупатель заходит на сайт нашего интернет-магазина и при заходе в какую-то категорию или раздел мы можем сделать ему какое-то предложение или перестроить для него страницу, в зависимости от его предпочтений или условий заложенных ранее.
Real-time Analytics
Также довольно часто Kafka используется для аналитики в реальном времени. Если к нам сообщения о событиях приходят постоянно и «моментально«, то мы можем реагировать на них и следить за своими метриками.
Для примера: маркетинговые акции. Мы запускаем какую-то акцию и сразу смотрим на важные для нас показатели. В зависимости от получаемых значений мы можем изменять условия акции, условия размещения и прочее.
Резюме
Kafka популярный инструмент, поэтому найти литературу, видео и примеры использования – не проблема. В данной статье я показал только верхушку айсберга, который можно изучать и изучать.
Если говорить про взаимодействие c Kafka, то CLI и Python – это не единственные инструменты, к ним можно добавить: PySpark, ClickHouse, Java и прочее.
Кстати, про то как читать из Kafka при помощи ClickHouse было описано в моей статье: CDC на примитивах.
Для более глубокого изучения инструмента рекомендую ознакомиться с книгой: Apache Kafka. Потоковая обработка и анализ данных» (авторы — Нархид Н., Шапира Г., Палино Т., год издания — 2019). В ней описывается много тонкостей и подводных камней при работе с Kafka. Уже вышло второе издание, я его не читал, но судя по наполнению; учтены новые моменты, поэтому порекомендовал бы изучать второе издание.
Ну и самое главное – Теория без практики мертва, практика без теории слепа. Поэтому попробуйте Kafka, даже на pet-проектах или в рамках данной статьи.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.
ссылка на оригинал статьи https://habr.com/ru/articles/836302/
Добавить комментарий