
При работе с Apache Kafka рано или поздно возникает необходимость быстро проверить топик, прочитать сообщения или посмотреть состояние группы потребителей (consumer group). Можно применять стандартные инструменты Kafka. Однако на практике они часто оказываются не самыми удобными для повседневной работы. Многие команды получаются длинными, каждый раз требуют передачи параметров подключения и нуждаются в различных CLI-утилитах.
Есть и более легкие решения — например, kcat, который хорошо знаком многим инженерам и часто используется для диагностики Kafka. Но существует и вариант поудобнее — kafkactl.
Привет, Хабр! Меня зовут Сергей Кардапольцев, я технический писатель в Selectel. В этой статье мы познакомимся с kafkactl — CLI-инструментом для работы с Kafka. Посмотрим, чем он отличается от стандартных Kafka-утилит и kcat, а также разберем базовые сценарии работы на примере кластера.
Содержание
→ Чем kafkactl отличается от других Kafka CLI
→ Подготовка кластера Kafka
→ Устанавливаем и настраиваем kafkactl
→ Настраиваем конфигурацию
→ Проверяем подключение
→ Отправляем сообщения
→ Читаем сообщения
→ Работаем с consumer groups
→ kafkactl в shell-сценариях
→ Какой инструмент выбрать: kafkactl или kcat
→ Заключение
Чем kafkactl отличается от других Kafka CLI
Когда речь заходит о взаимодействии с Kafka из командной строки, на ум приходит несколько вариантов. Самый очевидный — стандартные инструменты, которые идут в комплекте. Кроме них, существуют и сторонние CLI-утилиты — например, kcat и kafkactl.
Все они во многом пересекаются по возможностям. Разница — главным образом в подходе к организации команд, управлении конфигурацией и удобстве выполнения отдельных сценариев.
Прежде чем переходить к kafkactl, полезно разобраться, какие задачи решают уже существующие инструменты и в каких сценариях они предпочтительнее.
Кстати, если вам больше близки по духу UI-инструменты, а не CLI, прочитайте нашу статью о том, как подключить Kafka UI к кластеру Kafka в DBaaS.
Стандартные Kafka-утилиты
Вместе с Apache Kafka поставляется набор CLI-инструментов, каждый из которых отвечает за свой набор задач:
-
kafka-topics.sh— для управления топиками: создания, удаления и просмотра; -
kafka-console-consumer.sh— для чтения сообщений; -
kafka-console-producer.sh— для отправки сообщений; -
kafka-consumer-groups.sh— для работы с группами потребителей (consumer groups); -
kafka-configs.sh— для просмотра и изменения конфигурации различных сущностей Kafka.
Например, чтобы получить список топиков:
kafka-topics.sh \ --bootstrap-server broker-1:9092 \ --list
А чтобы прочитать сообщения:
kafka-console-consumer.sh \ --bootstrap-server broker-1:9092 \ --topic orders \ --from-beginning
Стандартные утилиты позволяют выполнять все основные операции с Kafka. Однако повседневная работа с ними может показаться не столь удобной.
Во-первых, постоянно требуются разные утилиты, для каждой из них нужно держать в голове особенности составления команд.
Во-вторых, параметры подключения часто приходится передавать повторно. При работе сразу с несколькими кластерами, количество одинаковых аргументов быстро растет.
По этим причинам команды порой становятся громоздкими и отнимают существенно больше времени.
Кроме того, стандартные инструменты Kafka CLI требуют установленной JVM (Java Virtual Machine). Хотя они и выглядят как shell-скрипты, на самом деле — это лишь обертки для Java-приложений, поэтому для их работы необходима тяжеловесная среда выполнения. На серверах с развернутой Kafka это обычно не вызывает проблем, однако наличие виртуальной машины Java может стать дополнительным требованием — например, в Docker-контейнерах или CI/CD-окружениях.
kcat
Другой популярный инструмент — kcat (ранее kafkacat). Однако его популярность связана не столько с уникальными возможностями, сколько с удобством использования по сравнению со стандартным набором Kafka CLI.
kcat предлагает более компактный интерфейс для многих операций и богатые средства форматирования вывода. Он хорошо интегрируется с привычными инструментами командной строки и позволяет быстро извлекать и формировать данные для дальнейшей обработки. Благодаря легкости интеграции его часто используют в shell-сценариях, а также для быстрой проверки сообщений и диагностики проблем.
Пример команды для чтения сообщений из топика orders:
kcat -b broker-1:9092 -t orders -C
Как видно, эта команда заметно лаконичнее по сравнению с приведенным ранее вызовом kafka-console-consumer.sh.
Именно по причинам компактности своих команд kcat часто можно встретить в примерах, документации и различных диагностических сценариях при работе с Kafka.
kafkactl
kafkactl не предлагает принципиально новых возможностей по сравнению со стандартными Kafka CLI или kcat. Его основная идея заключается в другом — предоставить единообразный интерфейс для типовых операций и упростить управление подключениями к кластерам Kafka.
Для начала рассмотрим пару примеров.
Получить список топиков:
kafkactl get topics
Получить детальную информацию о топике orders:
kafkactl describe topic orders
В обоих случаях видим одинаковый принцип построения команды: сначала действие (get, describe), затем объект, над которым это действие выполняется.
Такой подход напоминает kubectl — консольный инструмент Kubernetes, где большинство операций также строится вокруг команд вроде get, describe, delete и т. д.
Благодаря такой структуре после знакомства с несколькими командами становится проще понимать и запоминать остальные. Полный список команд kafkactl можно посмотреть в kafkactl command documentation.
Еще одна особенность kafkactl — работа с конфигурацией подключений к кластеру.
В стандартных Kafka CLI и kcat параметры подключения чаще всего указываются непосредственно в командной строке, что для разовых операций вполне удобно. Однако если нужно работать с несколькими кластерами, параметры подключения придется повторять или хранить в собственных скриптах и переменных окружения.
В kafkactl параметры подключения можно сохранить в конфигурационном файле и затем использовать повторно. Команды становятся короче, а переключение между кластерами — удобнее.
Далее рассмотрим работу с kafkactl подробнее на примере кластера.

Хотите выиграть призы и бонусы на аренду серверов?
Приглашаем решить ИТ-кроссворд! Более 100 вопросов на разные темы из мира ИИ и машинного обучения — ежедневно с 6 по 9 июля.
Подготовка кластера Kafka
Создаем кластер Kafka
Создадим кластер Kafka.
О том, как реализована платформа Kafka в DBaaS от Selectel, можно детально ознакомиться в отдельной статье или подглядеть в нашей документации.
1. Заходим в панель управления → раздел Продукты → Облачные базы данных.
2. Нажимаем Создать кластер.
3. Выбираем СУБД Kafka и подходящие локацию и конфигурацию.
4. Подключаться к кластеру будем с локального компьютера, поэтому в блоке Сеть выбираем публичную подсеть для прямого доступа из интернета.
5. После этого нажимаем Создать кластер и ждем, когда он перейдет в статус ACTIVE.
Подробнее о процессе создания кластера — в нашей инструкции.
Создаем топик
Для организации данных Kafka использует модель топиков и разделов (партиций).
Топик — это принцип группировки потока сообщений по категориям. Каждый топик может иметь несколько разделов — физических единиц хранения данных. Разделы служат для параллельной обработки данных и масштабирования системы. Каждое сообщение записывается в определенный раздел топика. Все сообщения внутри одного раздела сохраняют свой порядок.
Для примера работы с kafkactl создадим топик orders. Топики можно создавать только через панель управления или API.
1. Откроем страницу кластера → вкладка Топики → нажимаем Создать топик.
2. Вводим имя топика (orders) и указываем количество разделов (максимум — 4 000). Выбор количества разделов зависит от потребностей. После создания топика количество разделов можно только увеличивать. Однако при необходимости всегда можно легко удалить топик и создать новый с меньшим количеством разделов. Сделать это можно также в панели управления.
3. Нажимаем Сохранить и ждем, когда топик перейдет в статус ACTIVE.
Создаем пользователя
Чтобы работать с топиками, нужно сначала создать пользователя, назначить ему роли и выдать доступы. Сделать это можно в панели управления или с помощью API.
1. На странице кластера переходим на вкладку Пользователи и нажимаем Создать пользователя.
2. Вводим имя и пароль — они понадобятся нам в дальнейшем для настройки конфигурации подключения к кластеру. Важно: в панели управления пароль не хранится, его можно задать только заново; подробности — в документации.
3. Нажимаем Создать.
4. Чтобы назначить роли пользователю и настроить доступ к топику, в меню созданного пользователя выбираем Настроить доступы.
5. Задаем тип доступа ко всем топикам (можно оставить значения по умолчанию).
6. Выбираем роль пользователя:
-
продюсер (producer) — для отправки сообщений в топик;
-
консьюмер (consumer) — только для чтения сообщений из топика.
Так как нам нужно и читать сообщения из топика, а также писать в него, выбираем обе роли.
Устанавливаем и настраиваем kafkactl
Устанавливается kafkactl по инструкции из официальной документации. Мы посмотрим на работу с kafkactl на примере Ubuntu.
В этом месте предлагаю немного прерваться на теорию, а затем снова вернемся к практике.
Конфигурационный файл config.yml
Конфигурация подключения к кластеру настраивается в файле config.yml. Главное преимущество такого подхода — действие выполняется один раз, после чего команды можно выполнять без постоянной передачи параметров подключения.
Файл создается автоматически в процессе установки kafkactl и по умолчанию находится в одном из следующих мест:
-
$HOME/.config/kafkactl/config.yml; -
$HOME/.kafkactl/config.yml; -
$APPDATA/kafkactl/config.yml; -
/etc/kafkactl/config.yml.
В нашем случае, на Ubuntu, файл расположен по пути:
~/.config/kafkactl/config.yml
С помощью команды kafkactl config view можно посмотреть содержимое файла, например:
contexts: default: brokers: - localhost:9092
Еще одна из особенностей kafkactl в том, что в одном файле можно указать параметры подключения сразу к нескольким кластерам. Задаются они в конфигурационном файле в виде так называемых контекстов (contexts), что позволяет проще переключаться между кластерами.
Взгляните на пример конфигурационного файла с параметрами подключений для двух кластеров — cluster-dev и cluster-prod:
contexts: # Параметры подключения к первому кластеру cluster-dev: brokers: - <host>:<port> # SASL/SCRAM аутентификация sasl: enabled: true mechanism: scram-sha512 username: <user> password: <password> # TLS(SSL)-шифрование (если нужно) tls: enabled: true ca: <path-to-certificate> # Параметры подключения ко второму кластеру cluster-prod: brokers: - <host>:<port> sasl: enabled: true mechanism: scram-sha512 username: <user> password: <password> tls: enabled: true ca: <path-to-certificate>
Здесь:
-
<host>— DNS-адрес ноды Kafka; -
<port>— порт для подключения:9092— без SSL-сертификата,9093— с SSL-сертификатом; -
<user>— имя пользователя Kafka; -
<password>— пароль пользователя Kafka; -
<path-to-certificate>— путь к SSL-сертификату.
Текущий контекст можно задать с помощью аргумента командной строки --context, переменной окружения CURRENT_CONTEXT или определить в файле.
Текущий контекст задается в специальном файле current-context.yml, который автоматически создается в каталоге с конфигурационным файлом config.yml. При первом вызове контекст может быть не определен — тогда берется первый указанный в файле конфигурации.
Вот как будет выглядеть команда для получения информации о топиках в первом кластере (контекст cluster-dev):
kafkactl get topics --context cluster-dev
Параметры подключения для этого кластера автоматически подтянутся из конфигурационного файла. При этом содержимое файла current-context.yml сохранит указанный контекст:
current-context: cluster-dev
Если работа ведется с тем контекстом, то аргумент --context в последующих командах можно опускать — информация о текущем контексте возьмется из файла. В следующем примере отобразится информация о топиках в cluster-dev:
kafkactl get topics
Полный пример конфигурационного файла config.yml можно посмотреть в документации kafkactl.
Конфигурационные файлы для Git-репозиториев
Представьте работу над несколькими проектами. Если используется глобальный конфиг (config.yml), каждый раз придется:
-
вручную менять конфиг или переключать контекст для разных проектов;
-
помнить, какой кластер к какому проекту относится;
-
рисковать ошибкой — например, можно случайно отправить данные в продакшн вместо стейджинга.
Для таких случаев kafkactl позволяет для каких‑то проектов создавать собственные конфигурационные файлы. Размещаются они в произвольных каталогах — например, связанных с работой над проектами. Глобальный файл конфигурации, используемый по умолчанию, находится в корневом каталоге — рядом с репозиторием .git.
При запуске kafkactl начинается прямой поиск конфигурации — начиная от текущей директории и вверх до границы репозитория, то есть пока не найдется .git. Таким образом, можно запускать kafkactl находясь в любой подпапке проекта и не беспокоиться о существовании отдельной конфигурации — все равно найдется хотя бы одна в корне.
Удобно в CI/CD и различных автоматизациях: файл конфигурации хранится в репозитории — он доступен и работает сразу, без ручной настройки переменных окружения или копирования файлов.
Для идентификации файла конфигурации, как принадлежащего kafkactl, можно использовать имена kafkactl.yml или .kafkactl.yml.
Такой подход работает и в монорепозиториях: каждый подпроект может иметь свой kafkactl.yml, например:
my-monorepo/├── .git/├── kafkactl.yml ← общий конфиг для всего монорепозитория├── service-a/│ ├── kafkactl.yml ← переопределяет конфиг для service│ └── src/│ └── main.py ← если kafkactl запустится из этого скрипта,│ она найдет конфиг service-a/kafkactl.yml├── service-b/│ └── src/│ └── main.py ← если kafkactl запустится из этого скрипта,│ она найдет общий конфиг из корня
Переменные окружения
Также в конфигурационном файле можно переопределить любой ключ с помощью переменных окружения. Для этого используется синтаксис ${VAR_NAME}, где VAR_NAME — имя переменной.
Хорошо и в целях безопасности: не держать пароли для подключения в открытом виде, особенно если конфигурация хранится в репозитории.
Вместо пароля в файле будет видна только переменная:
sasl: enabled: true mechanism: scram-sha512 username: user password: ${KAFKA_PASSWORD}
Пароль же будет автоматически подтягиваться из надежного источника, выбранного для его хранения.
Настраиваем конфигурацию
Итак, приступим к настройке конфигурационного файла config.yml.
1. Подключаться к кластеру будем с использованием TLS(SSL)-шифрования. Этот вариант обеспечивает защищенную передачу данных и проверку подлинности сервера. Скачаем CA-сертификат и поместим его в папку ~/.kafka/:
mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt
2. Теперь укажем в конфиге следующее содержимое:
contexts: selectel-kafka: brokers: - <host>:9093 sasl: enabled: true mechanism: scram-sha512 username: <user> password: <password> tls: enabled: true ca: <path-to-certificate>
Здесь:
-
<host>— DNS-адрес ноды Kafka; -
<user>— имя пользователя Kafka; -
<password>— пароль пользователя Kafka; -
<path-to-certificate>— путь к SSL-сертификату.
Не забывайте: при подключении с SSL-сертификатом используется порт 9093.
3. Чтобы не указывать каждый раз контекст, зададим его в файле current-context.yml:
current-context: selectel-kafka
Проверяем подключение
Попробуем получить список топиков. Так как мы задали параметры подключения и текущий контекст в файле, то указывать их в команде не надо:
kafkactl get topics
Если подключение настроено корректно, увидим созданный ранее топик:
TOPIC PARTITIONS REPLICATION FACTORorders 1 0
Топик создавался с одним разделом, поэтому все верно.
Отправляем сообщения
Теперь отправим несколько тестовых сообщений в топик orders. Например:
echo '{"order_id":1001,"status":"created"}' \ | kafkactl produce orders
Здесь:
-
echoвыводит JSON-сообщение; -
пайп (
|) передает его вkafkactl; -
produceотправляет сообщение в Kafka-топик.
В выводе должны получить сообщение об успешной отправке:
1 messages produced
Отправим еще одно сообщение:
echo '{"order_id":1002,"status":"paid"}' \ | kafkactl produce orders
Такой способ позволяет проверить работу топика или сервиса без написания отдельного приложения-продюсера, поэтому его можно использовать для быстрых тестов и диагностики, как в нашем случае.
Читаем сообщения
Теперь прочитаем сообщения из топика — например, начиная с самых ранних (с помощью флага --from-beginning):
kafkactl consume orders --from-beginning
Пример вывода:
{"order_id":1001,"status":"created"}{"order_id":1002,"status":"paid"}
kafkactl будет читать и новые сообщения до тех пор, пока не прервется выполнение команды. Проверим, отправив сообщение через другой терминал, например:
echo '{"order_id":1003,"status":"new"}' \ | kafkactl produce orders
Тогда вывод первого терминала обновится:
{"order_id":1001,"status":"created"}{"order_id":1002,"status":"paid"}{"order_id":1003,"status":"new"}
Можно читать только последние сообщения с помощью флага --tail:
kafkactl consume orders --tail 1
Это удобно, когда нужно быстро проверить, доходят ли новые события до топика и как на них реагирует консьюмер.
На самом деле флагов и возможностей у этой и других команд намного больше. Их можно посмотреть в документации kafkactl.
Работаем с consumer groups
При чтении сообщений Kafka может отслеживать, какие сообщения уже были обработаны приложением. Для этого используются группы потребителей (consumer groups), которые позволяют:
-
нескольким экземплярам сервиса совместно читать сообщения;
-
хранить информацию о прочитанных сообщениях;
-
отслеживать прогресс обработки сообщений.
Создаем consumer groups
Создадим группу потребителей с именем orders-service. Для этого достаточно начать читать сообщения в топике с указанием группы:
kafkactl consume orders --group orders-service
После запуска команды новая группа появится в Kafka.
Проверяем consumer groups
Получим список всех групп:
kafkactl get consumer-groups
Пример вывода:
CONSUMER_GROUP TOPICSorders-service
Мы убедились, что группа потребителей orders-service появилась в кластере.
Можно также получить и подробную информацию:
kafkactl describe consumer-group orders-service
Такие команды особенно полезны при диагностике проблем с обработкой сообщений. Например, можно проверить:
-
читает ли сервис сообщения;
-
есть ли непрочитанные сообщения — то есть насколько конкретная группа потребителей отстает от новых сообщений;
-
подключен ли консьюмер к Kafka.
Проведем небольшой интерактивный эксперимент и проверим отслеживание сообщений в реальном времени. Для удобства и наглядности понадобятся три терминала:
-
в первом терминале будем имитировать работу консьюмера — читать сообщения;
-
во втором терминале будем имитировать работу продюсера — писать сообщения;
-
а в третьем проверим работу
consumer group orders-service.
1. В первом терминале начнем читать сообщения с указанием группы:
kafkactl consume orders --group orders-service
Сначала вывод будет пустой.
2. Через второй терминал отправим сообщение:
echo '{"order_id":1004,"status":"delivered"}' \ | kafkactl produce orders
В первом терминале появится отправленное сообщение:
{"order_id":1004,"status":"delivered"}
3. Получим подробную информацию о группе:
kafkactl describe consumer-group orders-service
Пример вывода:
TOPIC PARTITION NEWEST_OFFSET OLDEST_OFFSET CONSUMER_OFFSET LEAD LAGorders 0 4 0 4 4 0CLIENT_HOST CLIENT_ID GROUP_INSTANCE_ID TOPIC ASSIGNED_PARTITIONS31.184.216.112 kafkactl-name orders 0
Значение в поле LAG показывает, на сколько сообщений консьюмер отстает от конца топика. Оно определяется разницей между NEWEST_OFFSET и CONSUMER_OFFSET. В нашем примере случае LAG равен нулю, значит отставания нет.
kafkactl в shell-сценариях
CLI-инструменты ценятся не только за интерактивную работу, но и за возможность автоматизации. Благодаря тому, что kafkactl читает данные из стандартного ввода и хорошо работает в пайплайнах командной строки, его удобно использовать в автоматизированных сценариях, например:
-
для генерации тестовых данных;
-
локальной разработки;
-
CI/CD-пайплайнов;
-
диагностики сервисов.
Взглянем лишь на пару простых возможностей.
Например, можно отправить содержимое JSON-файла в Kafka:
cat orders.json | kafkactl produce orders
Или быстро сгенерировать несколько тестовых сообщений:
seq 1 5 | while read n; do echo "{\"order_id\":$n}" | kafkactl produce ordersdone
С помощью этой команды в топик orders запишутся пять новых сообщений сообщений:
{"order_id":1}{"order_id":2}{"order_id":3}{"order_id":4}{"order_id":5}
Какой инструмент выбрать: kafkactl или kcat
На практике kafkactl и kcat скорее не конкурируют, а дополняют друг друга. Оба инструмента позволяют решать большинство повседневных задач при работе с Kafka, однако делают это по-разному.
В таблице ниже приведены основные отличия между ними.
|
Критерий |
kafkactl |
kcat |
|
Синтаксис команд |
Название действия и объекта видно непосредственно в команде ( |
Команды формируются комбинацией флагов ( |
|
Настройка подключений |
Конфигурация хранится в YAML-файле |
Параметры подключения обычно указываются в командной строке |
|
Работа с несколькими кластерами |
Поддерживаются контексты и быстрое переключение между ними |
Параметры каждого подключения указываются отдельно |
|
Работа в скриптах и CI/CD |
Поддерживаются YAML-конфигурация, переменные окружения и контексты |
Хорошо подходит для небольших shell-сценариев и пайплайнов |
|
Consumer groups |
Можно получать список групп, подробную информацию о них и отслеживать lag |
Возможности работы с consumer groups ограничены |
|
Работа с метаданными сообщений |
Ориентирован на типовые операции с Kafka |
Предоставляет гибкие возможности для вывода офсетов, разделов, заголовков и других метаданных |
|
Shell-пайплайны |
Поддерживаются, но не являются основной особенностью инструмента |
Часто используется в пайплайнах командной строки совместно с |
|
Порог входа |
Команды построены по единому шаблону и обычно проще запоминаются |
Требуется знание набора флагов и параметров |
На основе этого сравнения можно сделать следующий вывод:
-
kafkactlхорошо подходит для регулярной работы с Kafka, особенно если приходится переключаться между несколькими кластерами или использовать инструмент в автоматизированных сценариях; -
kcatудобно использовать для диагностики, анализа сообщений и ситуаций, когда требуется гибкое форматирование вывода или интеграция с shell-пайпами; -
можно использовать оба инструмента одновременно, выбирая наиболее подходящий для конкретной задачи.
Заключение
kafkactl предлагает иной подход к работе с Kafka из командной строки. Вместо набора разрозненных утилит и множества параметров подключения он использует единый интерфейс и централизованное хранение конфигурации.
Наиболее заметны преимущества такого решения при регулярной работе: просмотре топиков, чтении и отправке сообщений, анализе consumer groups и переключении между несколькими кластерами. Команды kafkactl легко читать и понимать без постоянного обращения к документации, поскольку в них явно обозначены выполняемое действие и целевой объект.
При этом выбор инструмента зависит прежде всего от конкретного сценария. Стандартные Kafka CLI, kcat и kafkactl решают схожие задачи, но отличаются организацией работы. Если требуется единый интерфейс команд и удобное управление подключениями к нескольким кластерам, kafkactl может стать отличным дополнением к привычному набору инструментов для работы с Kafka.
ссылка на оригинал статьи https://habr.com/ru/articles/1053616/