Салют, Хабр!
Apache Kafka — это распределенная платформа потоковой обработки, предназначенная для построения систем обработки данных. Kafka позволяет публиковать, подписываться, хранить и обрабатывать потоки данных в реальном времени. Все это дает нам очень высокую пропускную способность и масштабируемость.
Основные фигуры в кафке это продюсеры и консюмеры. Продюсеры — это компоненты, которые производят и отправляют данные в Kafka. Они могут быть чем угодно: от простых скриптов до сложных систем. Консюмеры — это те, кто подписывается на данные и обрабатывает их. Они могут быть реализованы в различных формах, например, для анализа данных или мониторинга.
В статье мы и поговорим именно про продюсерах и консюмерах в экосистеме Kafka в коннекте с Python.
Запустим кафку на локалке
Скачиваем Java, так как Kafka написана на этом языке.
Переходим на официальный сайт Apache Kafka и скачивем последнюю версию Kafka. Выбираем бинарный файл, соответствующий вашей операционной системе.
Kafka использует Zookeeper для координации, поэтому сначала нужно запустить Zookeeper. Распакаваем скачанный файл и запускаем Zookeeper, используя скрипт zookeeper-server-start.sh
(или .bat
для Windows) из папки Kafka.
Далее запускаем Kafka Server используя kafka-server-start.sh
(или .bat
).
Библиотеки для работы с кафкой в питоне
1. confluent-kafka-python
разработана Confluent, компанией, стоящей за коммерческим развитием Kafka. Библиотека обеспечивает высокую производительность и поддерживает последние функции Kafka, включая управление потреблением сообщений. Она написана на C и Python, что звучит очень круто и хорошо показывает себя на практике
2. kafka-python
полностью написана на питоне, что делает ее легко устанавливаемой и используемой. Идеально подходит для разработчиков, которые предпочитают работать исключительно в питон-экосистеме, не опираясь на внешние зависимости C-библиотек. Следует учесть, что она может быть менее производительной по сравнению с confluent-kafka-python
.
В статье будем использовать confluent-kafka-python
. Потому что она предлагает лучшее сочетание производительности и поддержки новых возможностей Kafka. К тому же, благодаря своей близости к исходному проекту Kafka, confluent-kafka-python
часто обновляется и включает последние улучшения и исправления.
Так что:
pip install confluent-kafka
Producer
Создаем скрипт и начинаем с импорта нужной библиотеки:
from confluent_kafka import Producer
Определяем параметры конфигурации для нашего producer’а. Эти настройки включают в себя адреса серверов Kafka (в нашем случае, это локальный сервер), а также другие опции:
config = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' }
Теперь инициализируем объект Producer и пишем функцию для отправки сообщений:
producer = Producer(config) def send_message(topic, message): producer.produce(topic, value=message) producer.flush()
Вызываем функцию send_message
, указывая тему и сообщение:
send_message('test_topic', 'Hello Kafka World!')
'test_topic'
– это название темы в Kafka, куда мы отправляем сообщение 'Hello Kafka World!'
.
Можно определить функцию обратного вызова для отслеживания успешной доставки сообщений или ошибок:
def delivery_report(err, msg): if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}]') # используем эту функцию при отправке producer.produce('test_topic', 'Hello again!', callback=delivery_report) producer.flush()
Можно автоматически подтверждать смещения:
config['enable.auto.commit'] = True config['auto.commit.interval.ms'] = 1000
Для тонкой обработки, можно управлять смещениями вручную:
config['enable.auto.commit'] = False for msg in consumer: process_message(msg) consumer.commit()
Kafka позволяет прямо работать с партициями в теме:
from confluent_kafka import TopicPartition # подписка на конкретную партицию topic_partition = TopicPartition('test_topic', 0) consumer.assign([topic_partition]) # перемещение к определенному смещению в партиции consumer.seek(TopicPartition('test_topic', 0, 10))
К примеру у нас есть приложение, которое собирает данные с различных источников, сериализует их в JSON и отправляет в Kafka. Создадим функцию для генерации этих данных, функцию для их сериализации и функцию для отправки в Kafka:
import json import random import time from confluent_kafka import Producer # конфигурация Producer'а config = { 'bootstrap.servers': 'localhost:9092', 'client.id': 'python-producer' } producer = Producer(config) # функция для генерации случайных данных def generate_data(): return { 'sensor_id': random.randint(1, 100), 'temperature': random.uniform(20.0, 30.0), 'humidity': random.uniform(30.0, 50.0), 'timestamp': int(time.time()) } # функция для сериализации данных в JSON def serialize_data(data): return json.dumps(data) # функция для отправки сообщения def send_message(topic, data): producer.produce(topic, value=data) producer.flush() # основной цикл отправки сообщений try: while True: # генерируем случайные данные data = generate_data() # сериализуем данные serialized_data = serialize_data(data) # отправляем данные в Kafka send_message('sensor_data', serialized_data) # логирование отправленного сообщения print(f'Sent data: {serialized_data}') # пауза между отправками time.sleep(1) except KeyboardInterrupt: print('Stopped.') producer.close()
Симулируем сбор данных от сенсоров. Каждый сенсор имеет уникальный ID, а также измеряет температуру и влажность. Данные генерируются случайным образом, сериализуются в JSON и отправляются в Kafka topic 'sensor_data'
. Используем цикл для непрерывной отправки данных, имитируя реальный поток данных от сенсоров.
Consumer
Перед тем, как начать, нужно настроить параметры нашего consumer’а.
from confluent_kafka import Consumer, KafkaException config = { 'bootstrap.servers': 'localhost:9092', # Список серверов Kafka 'group.id': 'mygroup', # Идентификатор группы потребителей 'auto.offset.reset': 'earliest' # Начальная точка чтения ('earliest' или 'latest') }
Создадим объект Consumer и настраиваем его с использованием наших параметров:
consumer = Consumer(config)
Consumer должен подписаться на одну или несколько тем. Предположим, что мы хотим подписаться на тему с именем ‘test_topic’.
consumer.subscribe(['test_topic'])
Создадим бесконечный цикл, который будет получать сообщения из Kafka:
try: while True: msg = consumer.poll(timeout=1.0) # ожидание сообщения if msg is None: # если сообщений нет continue if msg.error(): # обработка ошибок raise KafkaException(msg.error()) else: # действия с полученным сообщением print(f"Received message: {msg.value().decode('utf-8')}") except KeyboardInterrupt: pass finally: consumer.close() # не забываем закрыть соединение
Внутри цикла чтения сообщений можно реализовать любую логику обработки этих сообщений. Это может быть запись в базу данных, обработка данных для аналитики, отправка уведомлений и многое другое.
Иногда нужно обрабатывать только определенные сообщения, например, фильтруя их по ключу или содержимому:
for msg in consumer: if msg.key() == b'specific_key': # обработка сообщений с определенным ключом pass elif b'important' in msg.value(): # обработка сообщений, содержащих 'important' pass
Напишем пример consumer’а, который подключается к Kafka cluster, подписывается на определённый topic и обрабатывает входящие сообщения:
from confluent_kafka import Consumer, KafkaException, KafkaError import sys import logging def create_consumer(config): consumer = Consumer(config) def basic_consume_loop(consumer, topics): try: # подписываемся на топик consumer.subscribe(topics) while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: print(f"Received message: {msg.value().decode('utf-8')}") except KeyboardInterrupt: pass finally: consumer.close() return basic_consume_loop def main(): kafka_config = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' } consumer_loop = create_consumer(kafka_config) consumer_loop(['test_topoc']) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main()
Сериализация данных на примере consumer
JSON, будучи текстовым форматом, легко читаем и универсален, но может быть не таким эффективным по сравнению с бинарными форматами, особенно при больших объемах данных:
import json from confluent_kafka import Consumer # настройка consumer'а # ... consumer.subscribe(['json_topic']) for msg in consumer: if msg.value(): data = json.loads(msg.value().decode('utf-8'))
Avro в свою очередь предлагает хорошую производительность благодаря бинарной сериализации и сжатию. Он идеален для больших данных. Для работы с Avro в Kafka, можно использовать confluent-kafka-python
совместно с confluent-kafka-python[avro]
:
from confluent_kafka import DeserializingConsumer from confluent_kafka.avro import AvroDeserializer from confluent_kafka.avro.serializer import SerializerError from confluent_kafka.schema_registry import SchemaRegistryClient schema_registry_conf = {'url': 'http://myschemaregistry.com'} schema_registry_client = SchemaRegistryClient(schema_registry_conf) avro_deserializer = AvroDeserializer(schema_registry_client) consumer_conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'avro_group', 'auto.offset.reset': 'earliest', 'key.deserializer': avro_deserializer, 'value.deserializer': avro_deserializer } consumer = DeserializingConsumer(consumer_conf) consumer.subscribe(['avro_topic']) try: while True: try: msg = consumer.poll(1.0) if msg is None: continue data = msg.value() if data is not None: except SerializerError as e: print(f"Error deserializing Avro message: {e}") continue finally: consumer.close()
Однако, чтобы максимально эффективно использовать Kafka, необходимо хорошо понимать его более глубокие концепции и принципы работы, поэтому предлагаю вам обратить внимание на онлайн-курс Apache Kafka от моих друзей из OTUS. А познакомиться с форматом обучения вы сможете на бесплатном уроке. Регистрируйтесь, будет интересно!
ссылка на оригинал статьи https://habr.com/ru/articles/789896/
Добавить комментарий