Продюсеры и консьюмеры с Apache Kafka в Python

от автора

Салют, Хабр!

Apache Kafka — это распределенная платформа потоковой обработки, предназначенная для построения систем обработки данных. Kafka позволяет публиковать, подписываться, хранить и обрабатывать потоки данных в реальном времени. Все это дает нам очень высокую пропускную способность и масштабируемость.

Основные фигуры в кафке это продюсеры и консюмеры. Продюсеры — это компоненты, которые производят и отправляют данные в Kafka. Они могут быть чем угодно: от простых скриптов до сложных систем. Консюмеры — это те, кто подписывается на данные и обрабатывает их. Они могут быть реализованы в различных формах, например, для анализа данных или мониторинга.

В статье мы и поговорим именно про продюсерах и консюмерах в экосистеме Kafka в коннекте с Python.

Запустим кафку на локалке

Скачиваем Java, так как Kafka написана на этом языке.

Переходим на официальный сайт Apache Kafka и скачивем последнюю версию Kafka. Выбираем бинарный файл, соответствующий вашей операционной системе.

Kafka использует Zookeeper для координации, поэтому сначала нужно запустить Zookeeper. Распакаваем скачанный файл и запускаем Zookeeper, используя скрипт zookeeper-server-start.sh (или .bat для Windows) из папки Kafka.

Далее запускаем Kafka Server используя kafka-server-start.sh (или .bat).

Библиотеки для работы с кафкой в питоне

1. confluent-kafka-pythonразработана Confluent, компанией, стоящей за коммерческим развитием Kafka. Библиотека обеспечивает высокую производительность и поддерживает последние функции Kafka, включая управление потреблением сообщений. Она написана на C и Python, что звучит очень круто и хорошо показывает себя на практике

2. kafka-python полностью написана на питоне, что делает ее легко устанавливаемой и используемой. Идеально подходит для разработчиков, которые предпочитают работать исключительно в питон-экосистеме, не опираясь на внешние зависимости C-библиотек. Следует учесть, что она может быть менее производительной по сравнению с confluent-kafka-python.

В статье будем использовать confluent-kafka-python. Потому что она предлагает лучшее сочетание производительности и поддержки новых возможностей Kafka. К тому же, благодаря своей близости к исходному проекту Kafka, confluent-kafka-python часто обновляется и включает последние улучшения и исправления.

Так что:

pip install confluent-kafka

Producer

Создаем скрипт и начинаем с импорта нужной библиотеки:

from confluent_kafka import Producer

Определяем параметры конфигурации для нашего producer’а. Эти настройки включают в себя адреса серверов Kafka (в нашем случае, это локальный сервер), а также другие опции:

config = {     'bootstrap.servers': 'localhost:9092',     'group.id': 'mygroup',     'auto.offset.reset': 'earliest' }

Теперь инициализируем объект Producer и пишем функцию для отправки сообщений:

producer = Producer(config)  def send_message(topic, message):     producer.produce(topic, value=message)     producer.flush()

Вызываем функцию send_message, указывая тему и сообщение:

send_message('test_topic', 'Hello Kafka World!')

'test_topic' – это название темы в Kafka, куда мы отправляем сообщение 'Hello Kafka World!'.

Можно определить функцию обратного вызова для отслеживания успешной доставки сообщений или ошибок:

def delivery_report(err, msg):     if err is not None:         print(f'Message delivery failed: {err}')     else:         print(f'Message delivered to {msg.topic()} [{msg.partition()}]')  # используем эту функцию при отправке producer.produce('test_topic', 'Hello again!', callback=delivery_report) producer.flush()

Можно автоматически подтверждать смещения:

config['enable.auto.commit'] = True config['auto.commit.interval.ms'] = 1000

Для тонкой обработки, можно управлять смещениями вручную:

config['enable.auto.commit'] = False  for msg in consumer:     process_message(msg)     consumer.commit()

Kafka позволяет прямо работать с партициями в теме:

from confluent_kafka import TopicPartition  # подписка на конкретную партицию topic_partition = TopicPartition('test_topic', 0) consumer.assign([topic_partition])  # перемещение к определенному смещению в партиции consumer.seek(TopicPartition('test_topic', 0, 10))

К примеру у нас есть приложение, которое собирает данные с различных источников, сериализует их в JSON и отправляет в Kafka. Создадим функцию для генерации этих данных, функцию для их сериализации и функцию для отправки в Kafka:

import json import random import time from confluent_kafka import Producer  # конфигурация Producer'а config = {     'bootstrap.servers': 'localhost:9092',     'client.id': 'python-producer' } producer = Producer(config)  # функция для генерации случайных данных def generate_data():     return {         'sensor_id': random.randint(1, 100),         'temperature': random.uniform(20.0, 30.0),         'humidity': random.uniform(30.0, 50.0),         'timestamp': int(time.time())     }  # функция для сериализации данных в JSON def serialize_data(data):     return json.dumps(data)  # функция для отправки сообщения def send_message(topic, data):     producer.produce(topic, value=data)     producer.flush()  # основной цикл отправки сообщений try:     while True:         # генерируем случайные данные         data = generate_data()                  # сериализуем данные         serialized_data = serialize_data(data)          # отправляем данные в Kafka         send_message('sensor_data', serialized_data)          # логирование отправленного сообщения         print(f'Sent data: {serialized_data}')          # пауза между отправками         time.sleep(1) except KeyboardInterrupt:     print('Stopped.')  producer.close()

Симулируем сбор данных от сенсоров. Каждый сенсор имеет уникальный ID, а также измеряет температуру и влажность. Данные генерируются случайным образом, сериализуются в JSON и отправляются в Kafka topic 'sensor_data'. Используем цикл для непрерывной отправки данных, имитируя реальный поток данных от сенсоров.

Consumer

Перед тем, как начать, нужно настроить параметры нашего consumer’а.

from confluent_kafka import Consumer, KafkaException  config = {     'bootstrap.servers': 'localhost:9092',  # Список серверов Kafka     'group.id': 'mygroup',                  # Идентификатор группы потребителей     'auto.offset.reset': 'earliest'         # Начальная точка чтения ('earliest' или 'latest') }

Создадим объект Consumer и настраиваем его с использованием наших параметров:

consumer = Consumer(config)

Consumer должен подписаться на одну или несколько тем. Предположим, что мы хотим подписаться на тему с именем ‘test_topic’.

consumer.subscribe(['test_topic'])

Создадим бесконечный цикл, который будет получать сообщения из Kafka:

try:     while True:         msg = consumer.poll(timeout=1.0)  # ожидание сообщения         if msg is None:                   # если сообщений нет             continue         if msg.error():                   # обработка ошибок             raise KafkaException(msg.error())         else:             # действия с полученным сообщением             print(f"Received message: {msg.value().decode('utf-8')}") except KeyboardInterrupt:     pass finally:     consumer.close()  # не забываем закрыть соединение

Внутри цикла чтения сообщений можно реализовать любую логику обработки этих сообщений. Это может быть запись в базу данных, обработка данных для аналитики, отправка уведомлений и многое другое.

Иногда нужно обрабатывать только определенные сообщения, например, фильтруя их по ключу или содержимому:

for msg in consumer:     if msg.key() == b'specific_key':         # обработка сообщений с определенным ключом         pass     elif b'important' in msg.value():         # обработка сообщений, содержащих 'important'         pass

Напишем пример consumer’а, который подключается к Kafka cluster, подписывается на определённый topic и обрабатывает входящие сообщения:

from confluent_kafka import Consumer, KafkaException, KafkaError import sys import logging  def create_consumer(config):     consumer = Consumer(config)      def basic_consume_loop(consumer, topics):         try:             # подписываемся на топик             consumer.subscribe(topics)              while True:                 msg = consumer.poll(timeout=1.0)                 if msg is None: continue                 if msg.error():                     if msg.error().code() == KafkaError._PARTITION_EOF:                         sys.stderr.write('%% %s [%d] reached end at offset %d\n' %                                         (msg.topic(), msg.partition(), msg.offset()))                     elif msg.error():                         raise KafkaException(msg.error())                 else:                     print(f"Received message: {msg.value().decode('utf-8')}")         except KeyboardInterrupt:             pass         finally:             consumer.close()      return basic_consume_loop  def main():     kafka_config = {         'bootstrap.servers': 'localhost:9092',          'group.id': 'mygroup',         'auto.offset.reset': 'earliest'     }      consumer_loop = create_consumer(kafka_config)     consumer_loop(['test_topoc'])  if __name__ == '__main__':     logging.basicConfig(level=logging.INFO)     main()

Сериализация данных на примере consumer

JSON, будучи текстовым форматом, легко читаем и универсален, но может быть не таким эффективным по сравнению с бинарными форматами, особенно при больших объемах данных:

import json from confluent_kafka import Consumer  # настройка consumer'а # ...  consumer.subscribe(['json_topic'])  for msg in consumer:     if msg.value():         data = json.loads(msg.value().decode('utf-8'))

Avro в свою очередь предлагает хорошую производительность благодаря бинарной сериализации и сжатию. Он идеален для больших данных. Для работы с Avro в Kafka, можно использовать confluent-kafka-python совместно с confluent-kafka-python[avro]:

from confluent_kafka import DeserializingConsumer from confluent_kafka.avro import AvroDeserializer from confluent_kafka.avro.serializer import SerializerError from confluent_kafka.schema_registry import SchemaRegistryClient  schema_registry_conf = {'url': 'http://myschemaregistry.com'} schema_registry_client = SchemaRegistryClient(schema_registry_conf)  avro_deserializer = AvroDeserializer(schema_registry_client)  consumer_conf = {     'bootstrap.servers': 'localhost:9092',     'group.id': 'avro_group',     'auto.offset.reset': 'earliest',     'key.deserializer': avro_deserializer,     'value.deserializer': avro_deserializer }  consumer = DeserializingConsumer(consumer_conf) consumer.subscribe(['avro_topic'])  try:     while True:         try:             msg = consumer.poll(1.0)             if msg is None:                 continue             data = msg.value()             if data is not None:         except SerializerError as e:             print(f"Error deserializing Avro message: {e}")             continue finally:     consumer.close()

Однако, чтобы максимально эффективно использовать Kafka, необходимо хорошо понимать его более глубокие концепции и принципы работы, поэтому предлагаю вам обратить внимание на онлайн-курс Apache Kafka от моих друзей из OTUS. А познакомиться с форматом обучения вы сможете на бесплатном уроке. Регистрируйтесь, будет интересно!


ссылка на оригинал статьи https://habr.com/ru/articles/789896/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *