В предыдущей статье мы с вами разобрали, как развернуть кластер Kafka из трех брокеров в режиме KRaft на Windows в WSL, и передали простое сообщение от консольного продюсера к консольному консьюмеру.
Продолжая следовать правилу «практика — лучший способ глубокого понимания теории», перейдем к следующей части.
В целом о работе Kafka написано достаточно много. Поэтому предлагаю остановиться на одном из ключевых аспектов с точки зрения интегрируемых систем — структуре и обработке сообщений Kafka:
-
более подробно разберем структуру сообщения в Kafka с точки зрения продюсера;
-
сформулируем ключевые требования к проектированию шины данных на основе Apache Kafka;
-
разработаем собственные продюсер и консьюмер на Python;
-
на практике увидим, что такое JSON‑сериализация и десериализация;
Формирование и отправка сообщений в Apache Kafka — это многоэтапный процесс, в котором участвуют продюсер (producer), брокеры Kafka и внутренние механизмы сериализации, буферизации и сетевой передачи.
Сообщение, формируемое продюсером, состоит из полезной нагрузки (payload) и сопутствующих метаданных, необходимых для его обработки и маршрутизации. По умолчанию максимальный размер одного сообщения в Kafka ограничен 1 МБ, однако этот лимит можно настроить. Кроме того, продюсер может отправлять сообщения по одному или группировать их в пакеты (batch) — это зависит от конфигурации производительности и задержек, выбранной в настройках продюсера.
Последовательность формирования запроса продюсером в адрес брокера выглядит следующим образом:
-
Создание сообщения (Record)
-
Сериализация ключа и значения (преобразование в массив байтов)
-
Определение партиции, в которую будет помещено сообщение.
-
Формирование Record Batch (группировка сообщений по партициям)
-
Формирование запроса ProduceRequest и отправка брокеру
Понимание этапов формирования запроса поможет не только разобраться в его внутренней структуре, но и глубже осмыслить назначение ключевых параметров продюсера. На первый взгляд, данные от одного датчика — это всего лишь несколько байт. Однако в распределённой системе вроде Kafka, где миллионы таких сообщений проходят через продюсеры, наполняются метаданными, накапливаются в буферах, группируются в батчи и сжимаются, — из этих «крошек» формируется значительный объём сетевого трафика.
Итак, давайте рассмотрим каждый этап по порядку.
Создание сообщения (Record)
На первом этапе, перед сериализацией, приложение создаёт Record — логическое сообщение, которое включает в себя следующие компоненты:
Key (ключ)
-
Kafka использует ключ для определения партиции, в которую будет отправлено сообщение. Это произвольный объект, который будет сериализован в байты. Например: String, Integer, Dictionary, None
-
Гарантируется, что все сообщения с одинаковым ключом попадут в одну и ту же партицию топика. Это обеспечивает порядок сообщений для данного ключа.
-
Если ключ не задан (null), сообщения распределяются по партициям равномерно — обычно с помощью циклического выбора (round‑robin) или случайного распределения.
-
Ключ удобно использовать для семантической группировки сообщений, например, по идентификатору пользователя, заказа или агрегата в CQRS/Event Sourcing.
Value (значение)
-
Основная полезная нагрузка (payload) сообщения — именно те данные, которые вы хотите передать.
-
Сериализация прикладных данных в байты выполняется продюсером, десериализация — консьюмером. Формат (JSON, Avro, Protobuf и др.) определяется на стороне продюсера.
-
Для проверки целостности и валидации брокер распаковывает сжатые батчи (например, LZ4, ZSTD), но при этом не интерпретирует содержимое Value — оно остаётся «чёрным ящиком».
Headers (заголовки)
-
Опциональные метаданные сообщения, добавленные в Kafka начиная с версии 0.11.
-
Представляют собой список пар «ключ‑значение», где значение может быть строкой, числом или null.
-
Не влияют на маршрутизацию или партиционирование.
-
Удобны для передачи служебной информации:
-
идентификаторы трассировки (например, для distributed tracing),
-
токены аутентификации,
-
версии схемы,
-
флаги обработки.
-
Topic (топик)
-
Не входит в состав сериализованного сообщения — не передаётся в RecordBatch.
-
Используется только на стороне продюсера для определения партиции и формирования ProduceRequest, где топик указывается на уровне запроса.
Сериализация сообщения
Поскольку Kafka передаёт данные по сети в бинарном виде, объекты ключа (key) и значения (value) должны быть преобразованы в массивы байтов (byte[]). Эту задачу выполняют сериализаторы, такие как StringSerializer, ByteArraySerializer, а также форматно‑ориентированные — Avro, Protobuf, JSON схемы и другие.
Timestamp и Headers — также сериализуются внутренним форматом Kafka и передаются, но не так, как key/value. Они — часть формата сообщения Kafka, а не полезной нагрузки.
Топик — единственное из перечисленных полей, которое не попадает в RecordBatch и не сериализуется в сообщение вообще.
Определение партиции.
На этом этапе продюсер выполняет следующие шаги.
-
Решает в какую партицию топика отправить сообщение:
-
Если ключ (key) указан, используется хеш от ключа для распределения сообщения в партицию, где расположены сообщения с таким же ключом.
-
Если ключ (key) = null, используется стратегия равномерного распределения (round‑robin) или кастомная.
-
-
Проверяет актуальность текущих метаданных— информация о топиках, партициях, брокерах, лидерах и репликах. Проверка актуальности происходит перед отправкой каждого RecordBatch.
Формирование RecordBatch
Упрощенная структура RecordBatch, формируемая продюсером, будет примерно следующей:
RecordBatch
Сообщения обычно не отправляются по одному — они буферизуются в памяти продюсера и группируются в Record Batches (пакеты записей). Это повышает производительность и снижает накладные расходы.
-
Каждый Record Batch относится к одному топику и одной партиции.
-
Батчи формируются в Record Accumulator — внутреннем буфере продюсера.
-
Размер батча ограничен параметром
batch.size(по умолчанию 16 КБ). -
Если батч не заполнен до конца, но прошло время linger.ms (по умолчанию 0 мс), он всё равно отправляется.
Только на этом этапе применяется сжатие — весь батч сжимается целиком, что эффективнее, чем сжимать каждое сообщение отдельно.
Шаги формирования RecordBatch.
-
Продюсер получает новое сообщение.
-
Определяет партицию (по ключу или round‑robin).
-
Смотрит, есть ли открытый батч для этой партиции:
-
Если есть и он ещё не полный (<
batch.size), то добавляет сообщение. -
Если батч полный или закрыт, то создаёт новый.
-
-
После добавления первого сообщения запускается таймер
linger.ms(по умолчанию — 0 мс):-
Если за это время пришли новые записи, они добавятся.
-
Если не пришли, батч отправляется «как есть».
-
-
Применяет выбранный алгоритм сжатия: producer, gzip, snappy, lz4, zstd или none.
-
Указывает тип сжатия в поле
RecordBatch.Attributes.
Параметры продюсера, отвечающие за формирование RecordBatch.
|
Параметр |
Назначение |
Пример значений |
|
|
Размер батча в байтах, при котором он отправляется |
16384 (16 КБ) или больше — чем больше батч, тем эффективнее сжатие |
|
|
Время ожидания перед отправкой батча (чтобы набрать больше сообщений) |
5–20 мс — помогает увеличить размер батча, по умолчанию 0 мс. |
|
|
Тип сжатия данных RecordBatch |
none | gzip | snappy | lz4 | zstd | producer |
Описание значений compression.type:
-
none — сжатие отключено (по умолчанию).
-
gzip — высокая степень сжатия, но высокая нагрузка на CPU.
-
snappy — умеренное сжатие, низкое потребление CPU (часто используется).
-
lz4 — быстрее snappy, хорошее сжатие (рекомендуется по умолчанию).
-
zstd — отличное сжатие, поддерживается с Kafka 2.1+, баланс между эффективностью и скоростью.
-
producer — продюсер выбирает тип, брокер сохраняет как есть (рекомендуется с Kafka 2.4+).
Формирование ProduceRequest и отправка брокеру
Когда батч считается готовым (заполнен batch.size или истек linger.ms), продюсер формирует ProduceRequest — сетевой запрос к брокеру Kafka.
-
ProduceRequest содержит один или несколько Record Batch, направленных разным партициям (но обычно группируются по брокерам).
-
Запрос отправляется через сеть с использованием протокола Kafka (на базе TCP).
-
Продюсер ожидает ответа от брокера в зависимости от настройки acks:
-
acks=0 — не ждать подтверждения;
-
acks=1 — подтверждение от лидера партициона;
-
acks=all — подтверждение от всех реплик (максимальная надёжность).
-
Логическая структура запроса ProduceRequest, который продюсер отправляет брокеру Kafka для записи данных:
Взаимодействие продюсеров и консьюмеров с брокером Kafka
Процесс обмена данными в Kafka строится на основе строго определенных запросов. После формирования ProduceRequest продюсер отправляет его брокеру. Брокер, в свою очередь, записывает содержащийся в запросе RecordBatch в лог‑файл соответствующего топика. Данные хранятся в партиции до истечения срока хранения, заданного политикой retention (по времени или по достижению максимального размера), и всё это время остаются доступными для чтения.
Для получения данных консьюмеры отправляют брокеру FetchRequest. В ответ брокер возвращает FetchResponse, который содержит один или несколько сжатых RecordBatch. Важно отметить, что интенсивность этих запросов и объем запрашиваемых данных определяются настройками консьюмера (такими как fetch.min.bytes и fetch.max.wait.ms) и его внутренней логикой работы.
Ключевой механизм оптимизации:
Фундаментальным механизмом, лежащим в основе эффективности Apache Kafka, является пакетная обработка (batching) и сжатие (compression) сообщений. Вместо обработки каждого сообщения (Record) по отдельности Kafka объединяет их в группы — RecordBatch. Такой подход обеспечивает ряд критически важных преимуществ:
-
Значительное повышение пропускной способности (throughput) системы.
-
Снижение нагрузки на сеть за счет передачи larger, сжатых блоков данных вместо множества мелких сообщений.
-
Эффективное использование дискового пространства и снижение нагрузки на I/O, так как на диск записываются уже сжатые батчи.
Разберем процесс по шагам:
-
Формирование батча на продюсере (Producer):
-
Продюсер не отправляет каждое сообщение мгновенно. Он накапливает их в памяти, формируя RecordBatch.
-
Перед отправкой весь батч целиком сжимается с использованием выбранного кодека (например, gzip, snappy, lz4 или zstd).
-
-
Хранение в сжатом виде на брокере (Broker):
-
Брокер получает уже сжатый RecordBatch и записывает его на диск в том же виде, без распаковки. Это экономит не только пропускную способность сети, но и дисковое пространство (I/O).
-
Данные хранятся в логах (log segments) именно как сжатые батчи. Метка compressionType в заголовке батча сообщает, какой алгоритм был использован.
-
-
Передача консюмеру (Consumer) и распаковка:
-
Когда консюмер отправляет запрос (FetchRequest) на чтение данных, брокер возвращает ему те же самые сжатые RecordBatch.
-
Распаковка происходит на стороне консюмера его клиентской библиотекой. Это распределяет нагрузку по распаковке между всеми консюмерами в системе, а не нагружает брокеры.
-
-
Обработка отдельных сообщений:
-
Только после распаковки батча консюмер получает доступ к индивидуальным сообщениям (Records) и их полям: ключу (Key), значению (Value), заголовкам (Headers) и временной метке (Timestamp).
-
PlantUML
@startuml skinparam sequenceMessageAlign center skinparam sequenceArrowThickness 2 skinparam sequenceLifeLineBorderColor #888 skinparam sequenceLifeLineBackgroundColor #EEE actor Producer participant "Broker\n(Partition Log)" as Broker actor Consumer Producer -> Broker : Отправка сообщений\n(RecordBatch c compressionType) note right of Producer Producer собирает несколько Record в RecordBatch и применяет сжатие (gzip/snappy/lz4/zstd) end note Broker -> Broker : Хранение RecordBatch\n(сжатый вид в log segment) note right of Broker В лог записан целый батч с указанием compressionType end note Consumer -> Broker : FetchRequest (запросить партицию) Broker --> Consumer : RecordBatch (сжатый) note right of Consumer Клиентская библиотека консьюмера распаковывает батч по compressionType end note Consumer -> Consumer : Обработка Records\n(Key, Value, Headers, Timestamp) @enduml
Проектирование топиков и потоков данных
Перед тем как приступить к созданию топиков, продюсеров и консьюмеров, предлагаю подумать над нашими к ним требованиями. Для этого ответим на ключевые вопросы, которые определят их конфигурацию и поведение. Это позволит избежать ошибок на архитектурном уровне и обеспечить отказоустойчивость, производительность и согласованность данных.
Для этого определите:
-
Набор топиков.
-
принцип единственной ответственности — каждый топик должен быть посвящен одной конкретной бизнес‑сущности или событию.
-
избегайте сквозных топиков — не создавайте один топик all‑events для всех типов событий, кроме как для тестирования.
-
семантика именования — имя топика должно быть понятно не только вам, но и другим разработчикам, аналитикам и архитекторам. Используйте соглашение, например,
<домен>.<сущность>.<версия>или<команда>.<событие>.
-
-
Продюсеры и консюмеры для каждого топика.
-
какой сервис или приложение создает эти события/данные?
-
какие сервисы подписываются на эти данные и что они с ними делают?
-
-
Необходимость использования ключей (key) для обеспечения порядковой гарантии.
-
помните: Kafka использует ключ для определения партиции. Сообщения с одинаковым ключом всегда попадают в одну партицию.
-
если порядок сообщений в рамках некоторой сущности (например, пользователя или заказа) критичен — используйте её ID в качестве ключа.
-
если порядок не важен — можно оставить ключ пустым (null) для равномерного распределения нагрузки.
-
-
Требования к задержке (latency) отправки сообщений
-
linger.ms— сколько миллисекунд продюсер будет ждать, чтобы собрать больше сообщений в батч. -
batch.size— максимальный размер батча в байтах. При достижении этого размера батч отправится немедленно.
-
-
Уровень надёжности передачи данных:
-
Для критически важных данных: Используйте replication.factor=3 и acks=all. Это гарантирует, что сообщение будет сохранено на нескольких брокерах прежде, чем продюсер получит подтверждение.
-
Для данных средней важности: Используйте replication.factor=3 и acks=1. Это защитит от сбоя брокера, но не гарантирует, что все реплики получили данные к моменту подтверждения.
-
Для неважных данных (логи): Можно использовать replication.factor=2 и acks=1 или даже acks=0 для максимальной скорости.
-
-
Формат (схему) данных для каждого топика (поле value).
-
Формат — будем ли мы использовать JSON, Avro, Protobuf или простой текст?
-
Тип данных — будут это сырые события, снимки состояния сущности или команды?
-
Структура — используйте единый шаблон для всех сообщений в топике.
-
-
Политику хранения данных для каждого топика.
Данные в Kafka не хранятся вечно. Вы должны явно задать правила их удаления, которые зависят от назначения данных:-
retention.ms — сколько времени сообщения должны храниться (например, 7 дней, 90 дней).
-
retention.bytes — максимальный объем данных может находиться в топике (например, 1 ТБ).
-
Подробней о топиках
После того как мы изучили структуру сообщений, запроектировали параметры нашей системы, перейдём к созданию топиков.
Ранее мы создали базовый топик с обязательными параметрами. Все остальные параметры имеют значения по умолчанию, которые можно настроить двумя способами:
-
Через динамические свойства топика (с помощью команды kafka‑configs)
-
Через конфигурационные файлы брокера (
server.properties), но только для глобальных настроек
Важно понимать:
-
Динамические настройки топика сохраняются между перезапусками
-
Глобальные настройки в
server.propertiesвлияют на все топики -
Некоторые параметры можно задать только при создании топика
Далее рассмотрим основные параметры топиков:
|
Параметр |
Назначение |
Пример значений |
|
|
Адрес брокера Kafka для подключения |
kafka1:9092,kafka2:9092 (для кластера) |
|
(Название топика) |
Уникальное имя топика для идентификации в кластере. Желательно указать назначение в комментариях. |
— |
|
(Количество партиций) |
Количество разделов для параллельной обработки. |
Зависит от нагрузки, соответствует расчетному количеству потребителей. По умолчанию 1. |
|
(Фактор репликации) |
Количество копий (реплик) каждой партиции для отказоустойчивости. |
По умолчанию = 1. Обычно 2 или 3. |
|
(Минимум in-sync реплик) |
Минимальное количество реплик, которые должны быть синхронизированы для успешной записи данныхдля acks=all. |
2 (при replication.factor=3). По умолчанию 1. |
|
(Политика очистки) |
Управление хранением данных: delete (удаление) или compact (компактизация). |
delete (логи), compact (ключевые данные). По умолчанию delete. |
|
(Тип сжатия) |
Алгоритм сжатия сообщений: none, gzip, snappy, lz4, zstd. Влияет на производительность. |
producer (использует настройки продюсера), zstd (лучшее сжатие). |
Полный список параметров:
Bash
/opt/kafka/bin/kafka-topics.sh --help
Далее нам необходимо проверить наличие топика с названием test-topic.
Bash
-
list — команда для вывода списка всех существующих топиков
-
bootstrap-server — адрес сервера Kafka для подключения
Если топика нет, создаем
Bash
/opt/kafka/bin/kafka-topics.sh \ --create \ --bootstrap-server localhost:9092 \ --topic test-topic \ --partitions 1 \ --replication-factor 3 \
-
create — указывает на операцию создания нового топика
-
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
-
topic — имя создаваемого топика (в примере: test‑topic)
-
partitions — количество партиций в топике (1 в данном случае)
-
replication‑factor — фактор репликации (количество копий данных, 3 в примере)
Допустим мы решили, что все сообщения, которые поступили брокеру, должны обязательно соответствовать алгоритму сжатия продюсера. И решили назначить этот через динамические свойства.
Bash
/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --entity-type topics \ --entity-name test-topic \ --add-config compression.type=producer
-
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
-
alter — флаг, указывающий, что вы хотите изменить конфигурацию сущности
-
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете, в нашем случае topics
-
entity-name — указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
-
add‑config compression.type=producer — добавляет или изменяет параметр конфигурации топика
Результат выполнения команды:
Completed updating config for topic test-topic.
Чтобы убедиться, что параметр установлен корректно, можно выполнить:
Bash
/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --describe \ --entity-type topics \ --entity-name test-topic
-
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
-
describe — выводит текущую конфигурацию указанной сущности (в данном случае — топика).
-
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете. В нашем случае topics.
-
entity-name — указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
Результат проверки параметра:
Dynamic configs for topic test-topic are: compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer}
Для удаления записи конфигурации выполним команду:
Bash
/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --entity-type topics \ --entity-name test-topic \ --delete-config compression.type
-
bootstrap‑server — адрес и порт сервера Kafka для подключения (например, localhost:9092)
-
alter — флаг, указывающий, что вы хотите изменить конфигурацию сущности
-
entity‑type — определяет тип сущности, конфигурацию которой вы изменяете, в нашем случае topics
-
entity-name — указывает имя сущности, конфигурацию которой вы изменяете. В нашем случае test-topics.
-
delete-config — удаляет указанный динамический параметр.
Результат выполнения команды:
Completed updating config for topic test-topic.
Производитель (Producer)
Далее перейдём к созданию продюсера.
Хотя у Kafka есть встроенные инструменты, их функциональность довольно ограничена. В такой ситуации лучше всего использовать Java или Python. Мы выберем Python — он проще в освоении и быстрее в разработке. Сначала определимся с библиотекой.
Выбор библиотеки
Среди наиболее популярных решений для работы с Kafka в Python — confluent_kafka и kafka‑python. Мы остановимся на confluent_kafka, и вот почему:
Эта библиотека построена на основе librdkafka — высокопроизводительной реализации клиента Kafka, написанной на C++. Благодаря этому она обеспечивает высокую скорость, надёжность и масштабируемость. Кроме того, confluent_kafka поддерживает все современные функции Kafka, включая Exactly‑Once Semantics, расширенную маршрутизацию сообщений и интеграцию с Confluent Cloud.
Для сериализации данных в формат JSON мы будем использовать стандартный модуль Python — json.
from confluent_kafka import Producer import json import socket
Конфигурация продюсера
Следующий шаг — конфигурация продюсера. Конфигурация определяет поведение продюсера в Kafka: от способа отправки сообщений до обработки ошибок и взаимодействия с кластером. Правильно подобранные параметры обеспечивают надёжность, производительность и соответствие архитектурным требованиям вашей системы.
conf = { 'bootstrap.servers': '<ваш_IP>:9092', 'client.id': socket.gethostname(), 'acks': 'all', 'compression.type': 'none', 'retries': 5 } producer = Producer(conf)
<ваш_IP> — заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.
Параметр 'client.id': socket.gethostname() в конфигурации продюсера Kafka выполняет важную роль идентификации и мониторинга:
-
client.id— это уникальный идентификатор, который продюсер (или консьюмер) передаёт брокеру Kafka при подключении. -
socket.gethostname()— это стандартная Python‑функция, которая возвращает имя хоста (компьютера), на котором запущен ваш скрипт.
Далее, producer = Producer(conf) — создаем экземпляр продюсера, передаем ему конфигурацию
Остальные параметры — это параметры продюсера Kafka. Надеюсь, они вам уже знакомы.
Основные параметры продюсера.
|
Параметр |
Назначение |
Пример значений |
|
|
Адреса брокеров Kafka для подключения |
broker1:9092, broker2:9094 |
|
(Сериализатор ключа) |
Сериализатор для ключа сообщения |
StringSerializer — преобразует строки (String) в байты (byte[]) |
|
(Сериализатор значения) |
Сериализатор для значения сообщения |
StringSerializer — преобразует строки (String) в байты (byte[]) |
|
(Подтверждение записи) |
Уровень подтверждения записи (0, 1, all). Определяет надежность доставки. |
all (для гарантированной доставки) |
|
(Протокол безопасности) |
Протокол безопасности (PLAINTEXT, SSL, SASL_SSL). |
SASL_SSL — для продакшена PLAINTEXT — без шифрования и аутентификации |
|
(Повторные попытки) |
Количество попыток повтора при ошибках. |
5-10 (избегайте бесконечных повторов) |
|
(Таймаут запроса) |
Таймаут ожидания ответа от брокера (мс). |
30000 (30 сек) |
|
(Тип сжатия) |
Алгоритм сжатия (none, gzip, snappy, lz4, zstd). |
snappy или lz4 (баланс скорости/сжатия) |
|
(Макс. размер запроса) |
Максимальный размер одного запроса (байты). |
1 МБ (1048576 байт) по умолчанию |
|
(Буфер памяти) |
Объем памяти для буферизации неотправленных сообщений (байты). |
32 МБ (33,554,432 байта) по умолчанию |
Сериализация данных
В качестве метода сериализации value выберем JSON.
Сначала преобразуем словарь в JSON-строку с помощью json.dumps(), затем кодируем строку в байты методом encode(‘utf-8’).
Кодировку указываем ‘utf-8’, т.к. она будет понятна среде Linux.
message_value = json.dumps(value).encode('utf-8')
Добавим параметры заголовка
Создадим массив из пар (ключ-значение):
headers = [ ("source", "python-producer"), ("version", "1.0"), ("content-type", "text/plain") ]
Обработка статуса доставки
Функция delivery_report принимает два параметра:
-
err — объект ошибки, который будет None, если сообщение успешно доставлено, или содержит информацию об ошибке, если доставка не удалась.
-
msg — объект сообщения, которое было отправлено. Если доставка прошла успешно, этот объект содержит метаданные о сообщении: тему, партицию и offset.
def delivery_report(err, msg): if err is not None: print(f'Ошибка доставки: {err}') else: print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
Отправка сообщения
Несмотря на то, что формирование ProduceRequest — достаточно сложный процесс, отправка сообщения с помощью продюсера на Python ничуть не сложнее, чем через kafka-console-producer.sh.
producer.produce( topic=topic, # Укажем топик key=key, # Укажем ключ value=serialized_value, # Добавляем явно преобразованное значение headers=headers, # Добавляем заголовки callback=delivery_report )
Завершение работы
producer.flush()
Что делает flush()
-
Ждёт, пока все сообщения из внутреннего буфера продюсера будут обработаны.
-
Вызывает колбэки delivery_report для каждого сообщения.
-
Возвращает число неудалённых сообщений.
Synchronous and Asynchronous Sending
Далее мы создадим два варианта продюсера: один — для асинхронной отправки сообщений, другой — для синхронной. Выбор зависит от задач, которые вы решаете.
Какой выбрать, зависит от решаемой вами задачи.
Asynchronous:
Быстрая отправка, подходит в большинстве случаев. Например, для логгирования, метрик, high‑throughput систем.
Пример продюсера, работающего в асинхронном режиме:
Asynchronous
from confluent_kafka import Producer import json import socket # Конфигурация продюсера conf = { 'bootstrap.servers': '<ваш_IP>:9092', # Используем advertised.listeners из конфига 'client.id': socket.gethostname(), # Присваиваем продюсеру имя вашего хоста 'acks': 'all', # Ждем подтверждения от всех реплик 'compression.type': 'none', # Можно изменить на 'gzip', 'snappy' и т.д. 'retries': 5 # Число попыток при ошибка } # Создаем экземпляр продюсера, передаем ему конфигурацию producer = Producer(conf) # Callback-функция для обработки статуса доставки сообщений def delivery_report(err, msg): if err is not None: # Ошибка доставки (таймаут, недоступность брокера, etc.) print(f'Ошибка доставки: {err}') else: # Сообщение успешно доставлено и подтверждено брокером print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') # Функция асинхронной отправки сообщения def produce_async(topic, headers, key, value): # JSON сериализация данных value serialized_value = json.dumps(value).encode('utf-8') # Отправка сообщения producer.produce( topic=topic, # Укажем топик key=key, # Укажем ключ value=serialized_value, # Добавляем явно преобразованное значение headers=headers, # Добавляем заголовки callback=delivery_report ) # Подготавливаем сообщение message_topic = 'test-topic' message_headers = [ ("source", "python-producer"), ("version", "1.0"), ("content-type", "text/plain") ] message_key = 'synch_123' message_value = {'data_1': 123, 'data_2': 'ОК'} # Отправляем сообщения # Сообщение 1 produce_async(message_topic, message_headers, message_key, message_value) # Сообщение 2 produce_async(message_topic, message_headers, message_key, message_value) # Завершаем работу: ждём, пока сообщения из буфера будут отправлены и обработаны колбэками. # Затем закрываем соединения и освобождаем ресурсы producer.flush()
Synchronous:
Подходит, например, для финансовых транзакций, где необходима строгая очередность операций.
Чтобы обеспечить синхронность доставки, добавим в функцию отправки сообщений вызов метода flush() с таймаутом 5 секунд:
def produce_async(topic, headers, key, value): … producer.flush(timeout=5.0)
Этот метод блокирует выполнение до тех пор, пока все сообщения из очереди не будут отправлены или пока не истечёт указанное время ожидания. Если по истечении 5 секунд очередь не успела полностью очиститься, метод вернёт ненулевое значение — количество неотправленных сообщений.
Такой подход позволяет контролировать состояние доставки: при обнаружении «застрявших» сообщений можно, например, инициировать повторную отправку, записать ошибку в лог или прервать операцию, чтобы избежать нарушения целостности данных.
Пример продюсера, работающего в синхронном режиме:
Synchronous
from confluent_kafka import Producer import json import socket # Конфигурация продюсера conf = { 'bootstrap.servers': '<ваш_IP>:9092', # Используем advertised.listeners из конфига 'client.id': socket.gethostname(), # Присваиваем продюсеру имя вашего хоста 'acks': 'all', # Ждем подтверждения от всех реплик 'compression.type': 'none', # Можно изменить на 'gzip', 'snappy' и т.д. 'retries': 5 # Число попыток при ошибка } # Создаем экземпляр продюсера, передаем ему конфигурацию producer = Producer(conf) # Callback-функция для обработки статуса доставки сообщений def delivery_report(err, msg): if err is not None: # Ошибка доставки (таймаут, недоступность брокера, etc.) print(f'Ошибка доставки: {err}') else: # Сообщение успешно доставлено и подтверждено брокером print(f'Доставлено в {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') # Функция синхронной отправки сообщения def produce_sync(topic, headers, key, value): # Сериализация данных serialized_value = json.dumps(value).encode('utf-8') # Отправка сообщения producer.produce( topic=topic, # Укажем топик key=key, # Укажем ключ value=serialized_value, # Добавляем явно преобразованное значение headers=headers, # Добавляем заголовки callback=delivery_report ) # Блокируем очередь. Ожидание подтверждения доставки 5 секунд producer.flush(timeout=5.0) # Подготавливаем сообщение message_topic = 'test-topic' message_headers = [ ("source", "python-producer"), ("version", "1.0"), ("content-type", "text/plain") ] message_key = 'synch_123' message_value = {'data_1': 123, 'data_2': 'ОК'} # Отправляем сообщения # Сообщение 1 produce_sync(message_topic, message_headers, message_key, message_value) # Сообщение 2 produce_sync(message_topic, message_headers, message_key, message_value) # Завершаем работу: ждём, пока сообщения из буфера будут отправлены и обработаны колбэками. # Затем закрываем соединения и освобождаем ресурсы producer.flush()
Потребитель (Consumer)
Подобным образом создадим консьюмер. Как и в случае с продюсером, мы можем реализовать разные подходы, но обо всем по порядку.
Конфигурация консьюмера
conf = { 'bootstrap.servers': '<ваш_IP>:9092', # Адрес брокера 'group.id': 'test-group', # Идентификатор группы 'auto.offset.reset': 'earliest', # Чтение с начала топика 'enable.auto.commit': False # Ручное подтверждение сообщений } # Создание консьюмера consumer = Consumer(conf)
<ваш_IP> — также заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.group.id определяет группу потребителей, к которой принадлежит данный консьюмер. Это основа для:
-
Распределения нагрузки между консьюмерами.
-
Kafka автоматически распределяет партиции топика между консьюмерами одной группы
-
Каждое сообщение обрабатывается только одним консьюмером в группе
-
-
Отслеживания позиции чтения (офсетов).
-
Kafka хранит офсеты для каждой группы отдельно
-
При перезапуске консьюмер продолжает с того места, где остановилась группа
-
-
Гарантии доставки «каждый сообщение один раз»
-
В рамках одной группы сообщение не повторяется
-
auto.offset.reset (Сброс смещения) — Действие при отсутствии сохраненного офсета:
-
earliest— чтение с самого первого доступного сообщения (аналог--from-beginningв CLI) -
latest— чтение только новых сообщений (после подключения) -
none— выброс исключения, если офсет не найден
enable.auto.commit — автоматическое подтверждение получения сообщения. В нашем случае отключено. Это особенно полезно, когда нужно:
-
Проверить качество и валидность данных
-
Обработать сообщение (например, сохранить в БД)
-
И только затем подтвердить успешную обработку
Такой подход гарантирует, что сообщение не будет потеряно при ошибках обработки.
consumer = Consumer(conf) — создадим экземпляр консьюмера и передадим ему конфигурацию
Далее можете ознакомится с основными параметрами консьюмера.
|
Параметр |
Назначение |
Пример значений |
|
(Адреса брокеров) |
Список брокеров для подключения (формат host:port). |
kafka1:9092, kafka2:9092 |
|
(ID группы) |
Идентификатор группы потребителей (для координации работы группы). |
«order-processors» |
|
(Сброс смещения) |
Действие при отсутствии оффсета: earliest (с начала), latest (только новые), none (ошибка). |
earliest (для восстановления данных) |
|
(Автоподтверждение) |
Автоматическое подтверждение оффсетов (true/false). Лучше отключать для точного контроля. |
false (ручное управление) |
|
(Макс. записей за опрос) |
Максимальное количество сообщений, возвращаемых за один вызов poll(). |
500 (баланс между скоростью и нагрузкой) |
|
(Таймаут сессии) |
Время, после которого потребитель считается «мертвым» и исключается из группы (мс). |
10000 (10 сек) |
|
(Интервал heartbeat) |
Частота отправки heartbeat-сигналов брокеру (мс). |
3000 (3 сек) |
|
(Макс. интервал опроса) |
Максимальное время между вызовами poll() до исключения из группы (мс). |
300000 (5 мин) |
Подписываемся на топик
Следующий шаг, подписываемся на топик. При использовании метода subscribe(), консьюмер настроится на работу в автоматическом режиме, в соответствии со своей конфигурацией. Также Kafka автоматически распределит партиции между потребителями одной группы.
consumer.subscribe(['test-topic']) # Подписка на топик
Если необходимо управлять подпиской вручную, выбираем метод assign(). Топик, партицию и начальный offset передаем методу напрямую. При этом настройки автоматической подписки и распределения партиций в конфигурации игнорируются. Для корректной работы с assign() также важно установить enable.auto.commit = False
# Указываем топик, партицию и начальный offset вручную topic = 'test-topic' partition = 0 # Номер партиции (можно получить через consumer.assignment()) start_offset = 42 # Чтение начиная с offset=42 # Создаем объект TopicPartition и назначаем offset tp = TopicPartition(topic, partition, start_offset) consumer.assign([tp])
Читаем сообщения.
После подписки на топик мы можем получать сообщения из партиций. Это можно делать как по одному сообщению с помощью метода poll(), так и пакетами — с помощью метода consume().
Poll( ) подход
msg = consumer.poll(timeout)
-
consumer.poll ()возвращает сообщение -
timeout— сколько времени ждать прихода сообщений, секунд. Если за указанное время сообщения не поступили, метод возвращает пустой список
Получение сообщений и обработка ошибок для метода Poll( ) будут выглядеть следующим образом:
try: while True: msg = consumer.poll(timeout=1.0) if msg is None: # Нет сообщений в течение таймаута — продолжаем опрос continue # Обработка ошибок Kafka if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: # Достигнут конец партиции — продолжаем слушать continue else: # Получена критическая ошибка – прерываем цикл print(f"Ошибка: {msg.error()}") break
Сonsume( ) подход
messages = consumer.consume(num_messages, timeout
-
consumer.consume ()возвращает список из сообщений -
timeout— сколько времени ждать прихода сообщений, секунд. -
num_messages— максимальное количество сообщений, которое вернется за раз. Может вернуться меньше (если в буфере меньше сообщений или сработал таймаут).
Получение сообщений и обработка ошибок для метода Consume( ) будут выглядеть следующим образом:
try: while True: messages = consumer.consume(num_messages=10, timeout=1.0) if not messages: # Нет сообщений в течение таймаута — продолжаем опрос continue for msg in messages: # Обработка ошибок Kafka if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: # Достигнут конец партиции — продолжаем слушать continue else: # Получена критическая ошибка – прерываем цикл print(f"Ошибка: {msg.error()}") break
Обработка и десериализация
Несмотря на то, что в Java‑API Kafka (org.apache.kafka.clients.consumer.KafkaConsumer) действительно есть конфиги key.deserializer и value.deserializer, которые автоматически превращают байты в объект, в Python‑обёртке confluent_kafka этого механизма нет
В этой части работа библиотеки Python c C‑библиотекой librdkafka ограничена
Поэтому в Python вы всегда получаете msg.value() как байты, и десериализацию нужно делать вручную (.decode() + json.loads() или другая логика).
# Получаем значение сообщения key = msg.key().decode('utf-8') if msg.key() else None raw_value = msg.value().decode('utf-8') if msg.value() else None # Десериализуем JSON, если значение есть value = json.loads(raw_value) if raw_value else None
# Обработка заголовков headers = {} if msg.headers(): for header in msg.headers(): headers[header[0]] = header[1].decode('utf-8')
# Вывод информации print("\n--- Получено сообщение ---") print(f"Топик: {msg.topic()}") print(f"Партиция: {msg.partition()}") print(f"Смещение: {msg.offset()}") print(f"Ключ: {key}") print(f"Значение: {value}") print(f"Заголовки: {json.dumps(headers, indent=2)}")
Фиксация смещений (Committing Offsets)
После того как мы обработали наши сообщения – можем подтвердить получение сообщения брокеру.
consumer.commit(asynchronous=False)
Метод commit () принимает экземпляр объекта msg и свойствоasynchronous.
-
msg не обязательный параметр. Используйте его, только если вам нужно точно указать, какое сообщение коммитить. Без указания коммитятся все обработанные смещения (offsets) для всех партиций, которые в данный момент назначены потребителю
-
asynchronous = false — коммит происходит синхронно, потребитель ждёт подтверждения от брокера Kafka, что offset успешно сохранён. Замедляется обработка
-
asynchronous = true — коммит идёт в фоне. Потребитель не ждёт ответа — быстрее, но есть риск, что коммит не успеет выполниться до сбоя.
Завершаем работу
try: # Работа с потребителем consumer.subscribe(['topic']) # ... обработка сообщений ... finally: consumer.close()
Метод consumer.close() — это обязательный финальный шаг работы с Kafka‑потребителем, который правильно завершает все процессы и освобождает ресурсы.
Что происходит при вызове:
-
Завершение сессии — потребитель сообщает группе о выходе
-
Сохранение позиций — фиксируются текущие смещения чтения
-
Закрытие соединений — разрываются все сетевые подключения к Kafka
-
Остановка процессов — завершаются фоновые потоки
-
Очистка памяти — освобождаются буферы и ресурсы
Важно не забывать вызывать consumer.close(), иначе:
-
Группа потребителей будет ждать возвращения «пропавшего» участника
-
Другие консьюмеры не смогут быстро взять его партиции
-
Могут возникнуть утечки памяти
-
Позиции чтения (offsets) могут не сохраниться
Перебалансировка потребителей
Последний шаг — повышаем надёжность нашего потребителя.
Добавим функции‑обработчики перебалансировки on_assign и on_revoke, которые позволяют реализовать пользовательскую логику при перераспределении партиций в группе потребителей (rebalance). Они вызываются автоматически, когда Kafka переназначает партиции между экземплярами потребителей.
Конфигурация
# Конфигурация консьюмера conf = { … # Стратегия распределения партиций 'partition.assignment.strategy': 'roundrobin' }
partition.assignment.strategy:‘roundrobin’ указывает, что партиции между потребителями будут распределяться по алгоритму «round‑robin» — равномерно, по кругу. Альтернатива — range, но roundrobin даёт более сбалансированную нагрузку.
Далее описываем логику функций on_assign и on_revoke.
Пока в них заложена базовая логика — логирование событий, чтобы было видно, какие партиции назначаются или отзываются.
В дальнейшем вы можете адаптировать их под свои задачи — например, управлять offset-ами, сохранять состояние или интегрироваться с внешними системами.
def on_assign(partitions): """Callback при назначении новых партиций потребителю.""" print("\n--- Назначены новые партиции ---") for p in partitions: print(f"Топик: {p.topic}, Партиция: {p.partition}") # При необходимости можно вручную установить offset # Например, чтобы продолжить с последнего закоммиченного значения: # consumer.assign(partitions) def on_revoke(consumer, partitions): """Callback при отзыве партиций у потребителя.""" print("\n--- Отозваны партиции ---") for p in partitions: print(f"Топик: {p.topic}, Партиция: {p.partition}") # Критически важно зафиксировать offset'ы перед потерей контроля над партициями consumer.commit(offsets=partitions) print("Offsets зафиксированы.")
on_assign ( ) — функция вызывается, когда потребителю назначаются новые партиции, например:
-
при первом запуске,
-
после добавления нового потребителя в группу,
-
при восстановлении после сбоя.
on_revoke ( ) — функция вызывается, когда у потребителя отбираются партиции, например:
-
перед остановкой,
-
при масштабировании группы (запуске новых экземпляров),
-
при сбое соединения.
Консьюмер в действии
Подводя итог, тестируем разработанный нами консьюмер.
Помним, <ваш_IP> — заменяем на IP адрес, указанный в advertised.listeners, конфигурации кластера server.properties.
Пример консьюмера
from confluent_kafka import Consumer, KafkaException import json import sys import time # Получаем идентификатор консьюмера из аргументов consumer_id = sys.argv[1] if len(sys.argv) > 1 else "default_consumer" # Конфигурация консьюмера conf = { 'bootstrap.servers': '172.26.139.176:9092', 'group.id': 'test-group', # Одинаковая группа для всех! 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'partition.assignment.strategy': 'roundrobin' } def on_assign(consumer, partitions): """Callback при назначении новых партиций потребителю.""" print(f"\n[{consumer_id}] --- Назначены новые партиции ---") for p in partitions: print(f"[{consumer_id}] Топик: {p.topic}, Партиция: {p.partition}") # Можно вручную установить offset # consumer.seek(partitions[0]) def on_revoke(consumer, partitions): """Callback при отзыве партиций у потребителя.""" print(f"\n[{consumer_id}] --- Отозваны партиции ---") for p in partitions: print(f"[{consumer_id}] Топик: {p.topic}, Партиция: {p.partition}") # Фиксация offset перед потерей партиций if partitions: consumer.commit(offsets=partitions) print(f"[{consumer_id}] Offsets зафиксированы.") # Создание консьюмера consumer = Consumer(conf) # Подписка с callback-функциями consumer.subscribe(['test-topic'], on_assign=on_assign, on_revoke=on_revoke) print(f"[{consumer_id}] Консьюмер запущен и ожидает сообщения...") try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: continue else: print(f"[{consumer_id}] Ошибка: {msg.error()}") break # Обработка сообщения key = msg.key().decode('utf-8') if msg.key() else None raw_value = msg.value().decode('utf-8') if msg.value() else None value = json.loads(raw_value) if raw_value else None headers = {} if msg.headers(): for header in msg.headers(): headers[header[0]] = header[1].decode('utf-8') # Вывод информации с идентификатором консьюмера print(f"\n[{consumer_id}] --- Получено сообщение ---") print(f"[{consumer_id}] Топик: {msg.topic()}") print(f"[{consumer_id}] Партиция: {msg.partition()}") print(f"[{consumer_id}] Смещение: {msg.offset()}") print(f"[{consumer_id}] Ключ: {key}") print(f"[{consumer_id}] Значение: {value}") # Подтверждение обработки consumer.commit(asynchronous=False) time.sleep(0.5) # Задержка для наглядности except KeyboardInterrupt: print(f"[{consumer_id}] Прерывание пользователем") finally: consumer.close() print(f"[{consumer_id}] Консьюмер остановлен")
Проверяем работу брокера. Отправляем из продюсера тестовое сообщение “Hello World”— главное заклинание разработчиков.
Проверяем работу ребалансировщика.
-
Запускаем консьюмер №1
-
Запускаем консьюмер №2. Произошла ребаллансировка:
Заключение
В Kafka каждую тему можно раскрывать, пожалуй, до бесконечности. Мы же с вами разобрали наиболее важные моменты, чтобы начать пользоваться этой системой.
Главное, чему мы научились:
-
Разобрались с основами — от проектирования топиков до настройки продюсеров и консьюмеров
-
Познакомились с тонкостями работы с сообщениями: сериализация, партиционирование, батчинг и сжатие
-
Научились настраивать надёжные и производительные системы обмена данными
-
Освоили библиотеку confluent_kafka для работы с Python
Самое главное, что вы теперь умеете:
-
Проектировать топки с учётом бизнес‑требований
-
Оптимизировать производительность через батчинг и сжатие
-
Управлять группами потребителей и их перебалансировкой
-
Настраивать надёжную доставку сообщений
Помните, что Kafka — это мощный инструмент, который открывает перед вами огромные возможности. Продолжайте практиковаться, и вы сможете создавать по‑настоящему крутые решения!
ссылка на оригинал статьи https://habr.com/ru/articles/944432/
Добавить комментарий