Инфраструктура для data engineer Kafka

от автора

В этой статье я хочу показать как можно использовать Kafka в дата-инженерии и как её «пощупать«.

Я не хотел бы повторяться по важным моментам, которые касаются архитектуры Kafka, поэтому рекомендую ознакомиться с данным видео.

В нём хорошо рассказано про основные концепции, которые будут дальше использоваться в статье, такие как:

  • Что такое producer.

  • Что такое consumer.

  • Что такое topic.

  • Что такое offset.

  • Что такое commit.

  • Что такое partition .

  • Что такое replication .

Весь код, который будет использоваться в статье будет доступен в моём репозитории.

Разворачивание сервиса

Начнём с того, что развернем Kafka локально в Docker. Для этого создадим docker-compose.yaml со следующим кодом:

version: '3.8'      services:     zookeeper:       image: 'confluentinc/cp-zookeeper:7.7.0'       hostname: zookeeper       container_name: zookeeper       environment:         ZOOKEEPER_CLIENT_PORT: 2181         ZOOKEEPER_TICK_TIME: 2000       ports:         - '2181:2181'        kafka:       image: 'confluentinc/cp-kafka:7.7.0'       hostname: kafka       container_name: kafka       depends_on:         - zookeeper       environment:         KAFKA_BROKER_ID: 1         KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181         KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092         KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092         KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT         KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1       ports:         - '9092:9092'         - '19092:19092'        kafka-ui:       image: 'provectuslabs/kafka-ui:v0.7.2'       container_name: kafka-ui       ports:         - '8080:8080'       environment:         KAFKA_CLUSTERS_0_NAME: local         KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092         KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181       depends_on:         - kafka      networks:     default:       name: kafka-network

Чтобы запустить все сервисы выполним команду:

docker-compose up -d

После этого у нас запустится Kafka, ZooKeeper и UI for Apache Kafka.

UI for Apache Kafka будет доступен по адресу http://localhost:8080/ через него можно будет: создавать topic, удалять topic, смотреть сообщения в topic и прочее. Очень удобный инструмент для работы с Kafka.

Создание и удаление topic

В данном разделе мы с вами попробуем создавать и удалять topic.

Создание и удаление topic через CLI

Чтобы создать topic нужно выполнить команды ниже.

Зайти в контейнер с Kafka:

docker exec -it kafka /bin/bash

Создание topic test в Kafka:

kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Просмотр всех доступных topic в Kafka:

kafka-topics --list --bootstrap-server kafka:9092

Удаление topic test в Kafka:

kafka-topics --delete --topic test --bootstrap-server kafka:9092

Создание и удаление topic через Python

Если вам удобнее взаимодействовать с Kafka через Python, то это не проблема.

Для работы с Kafka нам понадобится библиотека confluent-kafka. В примерах ниже я использую версию 2.5.0. Весь код и список всех зависимостей находится в моём репозитории.

Точно также эти операции можно произвести без подключения к контейнеру c Kafka, а через Python.

Чтобы создать topic через Kafka:

from confluent_kafka.admin import AdminClient, NewTopic      admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})         def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None:       """       Функция для создания `topic` в Kafka       :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для создания. Default `None`.    :return: Ничего не возвращает       """       new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]           try:               f.result()  # The result itself is None               print("Topic {} created".format(topic))           except Exception as e:               print("Failed to create topic {}: {}".format(topic, e))         example_create_topics(       a=admin_client,       topics=['test'],   )

Важно: IDE может ругаться, что модуля NewTopic не существует, но он есть. Это официальный пакет. Это касается версии 2.5.0.

Чтобы удалить topic:

from confluent_kafka.admin import AdminClient      admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})         def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None:       """       Функция для удаления `topic` в Kafka.       :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для удаления. Default `None`.    :return: Ничего не возвращает.       """       fs = a.delete_topics(topics, operation_timeout=30)          # Wait for operation to finish.       for topic, f in fs.items():           try:               f.result()  # The result itself is None               print("Topic {} deleted".format(topic))           except Exception as e:               print("Failed to delete topic {}: {}".format(topic, e))         example_delete_topics(       a=admin_client,       topics=['test'],   )

Больше примеров использования библиотеки confluent_kafka в официальном GitHub проекта.

Kafka CLI

CLI является популярным вариантов для взаимодействия с Kafka. Изначально его нет на вашем устройстве, поэтому необходимо его скачать следующей командой:

wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz

Затем распаковать:

tar -xzf kafka_2.13-3.8.0.tgz

После выполнения данных команд мы можем использовать CLI для взаимодействия с Kafka.

Важно: Все исполняемые файлы находятся в папке bin. Поэтому стоит обратить внимание, что все скрипты будут выполнять из неё.

Чтобы перейти в папку bin нужно выполнить команду:

cd kafka_2.13-3.8.0/bin/

Запись в Kafka через CLI

Чтобы произвести запись в Kafka выполним команду:

echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

Или так:

echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test

Важно: Мне привычнее вызывать скрипт командой sh, но можно и через ./.

Ещё можно создать producer в интерактивном режиме командой:

sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

После создания такого producer у нас появляется возможность писать все сообщения, которые хотим.

После выполнения команды у нас появится [> и после чего мы сможем вводить сообщения для Kafka.

Для выхода из интерактивного режима несколько раз нажмите CTRL + C.

Чтение из Kafka через CLI

Важно: topic в Kafka можно читать «с конца» и «с начала«.

Чтобы начать читать с самого начала:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning

Чтобы начать читать с конца и получать только новые сообщения:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test

Kafka Python

Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.

Запись в Kafka через Python

Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid, first_name, last_name, middle_name.

Вы можете запустить код ниже и в topic my_topic начнут записываться значения.

import json   import time   from confluent_kafka import Producer   from faker import Faker   import uuid_utils as uuid         def generate_list_of_dict() -> dict[str, str]:          fake = Faker(locale='ru_RU')          return {           'uuid': str(uuid.uuid7()),           'first_name': fake.first_name(),           'last_name': fake.last_name(),           'middle_name': fake.middle_name(),       }         # Define the Kafka configuration   conf = {'bootstrap.servers': "localhost:19092"}      # Create a Producer instance with the above configuration   producer = Producer(conf)         while True:       # Define some data to send to Kafka       data = generate_list_of_dict()          # Convert the data to a JSON string       data_str = json.dumps(data)          # Produce a message to the "my_topic" topic       producer.produce(topic="my_topic", value=data_str)          # Flush the producer to ensure all messages are sent       producer.flush()          # Sleep for a second before producing the next set of messages       time.sleep(3)

Важно: Если topic ранее не был создан, то он создастся при первой записи.

Чтение из Kafka через Python

Для того чтобы прочитать значения из Kafka нам необходимо создать consumer. Функция ниже имеет возможность прочитать topic с самого начала и с определённого offset.

from confluent_kafka import Consumer, KafkaError, TopicPartition         def consume_messages(topic: str = None, offset: int = None) -> None:       conf = {           'bootstrap.servers': 'localhost:19092',           'group.id': 'mygroup',           'auto.offset.reset': 'earliest'       }          consumer = Consumer(conf)          if offset is not None:           partitions = consumer.list_topics(topic).topics[topic].partitions           for partition in partitions:               consumer.assign([TopicPartition(topic, partition, offset)])       else:           consumer.subscribe([topic])          try:           while True:               msg = consumer.poll(1.0)               if msg is None:                   continue               if msg.error():                   if msg.error().code() == KafkaError:                       print('Reached end of partition')                   else:                       print(f'Error: {msg.error()}')               else:                   print(f'Received message: {msg.value().decode("utf-8")}')       except KeyboardInterrupt:           pass       finally:           consumer.close()         # Читать с начала   consume_messages('test')      # Читать с определенного offset   # consume_messages('test', offset=5) 

Ранее мы читали topic в Kafka без использования групп и поэтому атрибут --from-beginning срабатывал каждый раз при вызове (каждый раз создавалась новая группа).

Но при создании consumer через Python указание group.id является обязательным и поэтому мы можем столкнуться со следующей проблемой: если мы один раз прочитали topic, то при перезапуске кода мы начнем читать только новые сообщения и даже атрибут auto.offset.reset не поможет.

А всё это происходит, потому что мы произвели commit (фиксацию) offset для группы.

Чтобы проверить на каком сейчас offset находится группа необходимо выполнить команду в Kafka:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe

И мы увидим, что мы прочитали все сообщения. Поэтому offset стоит на последнем сообщении в topic.

  • CURRENT-OFFSET говорит о том на каком offset находится группа.

  • LOG-END-OFFSET текущий последний доступный offset для topic

Вообще, это не проблема, потому что данный offset можно «сбросить«, для этого необходимо выполнить команду:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test

Также можно прочитать topic заново изменив group.id. Но это делать не рекомендуется.

Использование Kafka в дата-инженерии

В дата-инженерии Kafka частый гость, потому что Kafka позволяет быстро и за дёшево покрыть множество бизнес-задач, таких как:

CDC

При реализации CDC вы можете встретиться с Kafka, потому что она является «стандартом» при работе с такого вида событиями.

Если вы хотите понять что такое CDC и какую роль там занимает Kafka вы можете изучить мою статью: CDC на примитивах.

Event-driven

Так как Kafka позволяет нам получать изменения «моментально«. В этом определении есть определённые нюансы, но это тема для другого разговора.

Если вернуться к мысли выше, то получая все события «моментально» мы можем на них реагировать.

Для примера: покупатель заходит на сайт нашего интернет-магазина и при заходе в какую-то категорию или раздел мы можем сделать ему какое-то предложение или перестроить для него страницу, в зависимости от его предпочтений или условий заложенных ранее.

Real-time Analytics

Также довольно часто Kafka используется для аналитики в реальном времени. Если к нам сообщения о событиях приходят постоянно и «моментально«, то мы можем реагировать на них и следить за своими метриками.

Для примера: маркетинговые акции. Мы запускаем какую-то акцию и сразу смотрим на важные для нас показатели. В зависимости от получаемых значений мы можем изменять условия акции, условия размещения и прочее.

Резюме

Kafka популярный инструмент, поэтому найти литературу, видео и примеры использования – не проблема. В данной статье я показал только верхушку айсберга, который можно изучать и изучать.

Если говорить про взаимодействие c Kafka, то CLI и Python – это не единственные инструменты, к ним можно добавить: PySpark, ClickHouse, Java и прочее.

Кстати, про то как читать из Kafka при помощи ClickHouse было описано в моей статье: CDC на примитивах.

Для более глубокого изучения инструмента рекомендую ознакомиться с книгой: Apache Kafka. Потоковая обработка и анализ данных» (авторы — Нархид Н., Шапира Г., Палино Т., год издания — 2019). В ней описывается много тонкостей и подводных камней при работе с Kafka. Уже вышло второе издание, я его не читал, но судя по наполнению; учтены новые моменты, поэтому порекомендовал бы изучать второе издание.

Ну и самое главное – Теория без практики мертва, практика без теории слепа. Поэтому попробуйте Kafka, даже на pet-проектах или в рамках данной статьи.


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.


ссылка на оригинал статьи https://habr.com/ru/articles/836302/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *