Привет, Хабр!
Сегодня мы рассмотрим, как протестировать Kafka с помощью Testcontainers.
Testcontainers — это библиотека, которая из JUnit‑теста запускает Docker‑контейнеры как обычные Java‑объекты. Вы пишете пару строк — а на фоне поднимается полноценная инфраструктура: база, брокер, Redis, что угодно. После теста контейнер гарантированно останавливается, поэтому окружение всегда чистое, а CI не засоряется процессами.
Kafka в Testcontainers запускается теми же двумя строками. Получаем реальный брокер, который ничем не отличается от продакшен‑копии, но живёт ровно столько, сколько идёт тест.
Подключаем зависимости: Gradle и Maven
Если вы на Gradle:
dependencies { testImplementation platform("org.testcontainers:testcontainers-bom:1.21.3") testImplementation("org.testcontainers:kafka") }
А если Maven:
<dependencyManagement> <dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers-bom</artifactId> <version>1.21.3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency> </dependencies>
Версия 1.21.3 — на момент написания последняя стабильная.
Первый интеграционный тест с KafkaContainer
Начнём с базового кейса: хочется протестировать, что Kafka работает, продюсер может записать сообщение, а консьюмер прочитать его обратно.
@Testcontainers public class KafkaSmokeTest { // Объявляем KafkaContainer как @Container — Testcontainers сам поднимет и убьёт его @Container static final KafkaContainer kafka = new KafkaContainer( DockerImageName.parse("apache/kafka-native:3.8.0") ); @Test void produceConsume() throws Exception { // Получаем bootstrap-адрес запущенного брокера String bootstrap = kafka.getBootstrapServers(); // Конфигурируем Kafka-продюсер Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Создаём продюсер и отправляем одно сообщение в demo-topic KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("demo-topic", "my-key", "Hello Kafka!")).get(); producer.close(); // Конфигурируем Kafka-консьюмер Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Создаём консьюмера и подписываемся на тот же топик KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(List.of("demo-topic")); // Ждём сообщения ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); // Проверяем, что пришло именно то сообщение assertEquals(1, records.count()); assertEquals("Hello Kafka!", records.iterator().next().value()); } }
Контейнер Kafka поднимается автоматически перед тестом, и не нужно ничего запускать вручную. apache/kafka-native:3.8.0 это официальный Kafka‑образ, работающий в KRaft‑режиме (без ZooKeeper).
Вручную создаём продюсера и консьюмера, чтобы максимально контролировать процесс. Проверка assertEquals подтверждает, что сообщение не просто «куда‑то ушло», а дошло до консьюмера.
Что с ZooKeeper и режимами Kafka?
Kafka долгое время зависела от ZooKeeper — для хранения метаданных, регистрации брокеров, и всей магии с кворумами. Но с версии 3.3 Kafka официально переведена в KRaft‑режим (Kafka Raft), где всё хранится внутри самого брокера.
В нашем примере используется образ:
DockerImageName.parse("apache/kafka-native:3.8.0")
Он работает в режиме KRaft по умолчанию. Это значит:
-
ZooKeeper не нужен;
-
всё конфигурируется проще;
-
быстрее стартует;
-
меньше зависимостей.
Если вам по какой‑то причине всё‑таки нужен ZooKeeper — например, для старых клиентов или особой топологии, то есть ConfluentKafkaContainer с образом confluentinc/cp-kafka:<до-7.4.0>.
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:6.2.1");
Но если нет строгих требований используемapache/kafka-native и живем спокойно.
Создание топиков вручную через AdminClient
По дефолту Kafka может создавать топики при первом продюсе. Но это плохая практика, продакшене чаще всего auto.create.topics.enable=false. Поэтому явно создаём нужные топики через AdminClient.
Вот так:
try (AdminClient admin = AdminClient.create(Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers() ))) { List<NewTopic> topics = List.of( new NewTopic("input", 1, (short)1), new NewTopic("primes", 1, (short)1), new NewTopic("composites", 1, (short)1), new NewTopic("dlq", 1, (short)1) ); admin.createTopics(topics).all().get(); }
new NewTopic("имя", партиции, фактор репликации) — в тестах у нас один брокер, так что replicationFactor должен быть строго 1. createTopics(...).all().get() блокирует выполнение до полной регистрации тем на брокере. Без этого возможны гонки.
Если не задать replicationFactor=1, Kafka может начать пытаться найти других брокеров (которых нет) и выбросит TimeoutException.
Пример поинтереснее: маршрутизация по топикам
Читаем из топика числа в виде строк, парсим их, проверяем, простое ли число, и отправляем либо в primes, либо в composites. Если парс не удался — в dlq.
public void routeMessages(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer) { consumer.subscribe(List.of("input")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { try { int value = Integer.parseInt(record.value()); String topic = isPrime(value) ? "primes" : "composites"; producer.send(new ProducerRecord<>(topic, record.key(), record.value())); } catch (NumberFormatException ex) { producer.send(new ProducerRecord<>("dlq", record.key(), record.value())); } } } } private boolean isPrime(int n) { if (n < 2) return false; for (int i = 2; i <= Math.sqrt(n); i++) { if (n % i == 0) return false; } return true; }
Простой алгоритм, но идеальный для теста. Мы можем:
-
отправить
7, получитьprimes; -
отправить
8, получитьcomposites; -
отправить
"abc", получитьdlq.
Сетевые listener и взаимодействие между контейнерами
Если есть ещё один контейнер (например, с kcat, или с сервисом) и хочется, чтобы он подключался к Kafka в той же сети:
Network net = Network.newNetwork(); KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0") .withNetwork(net) .withListener("broker:19092"); // будет виден как "broker:19092" GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.9.0") .withNetwork(net) .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh")) .withCopyToContainer(Transferable.of("7\n8\nabc\n"), "/msgs.txt") .withCommand("-c", "tail -f /dev/null"); kcat.start(); kafka.start();
Теперь можно из kcat в тесте:
kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "input", "-P", "-l", "/msgs.txt");
Или зачекать результат:
String output = kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "primes", "-C", "-e", "-c", "1").getStdout();
Делитесь своим опытом тестирования Kafka в комментариях, задавайте вопросы, спорьте с решениями — чем больше практических кейсов мы соберём под этой статьёй, тем сильнее станет сообщество. Спасибо, что дочитали!
Приглашаем вас принять участие в серии открытых уроков по Apache Kafka, которые помогут глубже понять ключевые аспекты работы с этой технологией.
30 июля в 19:00 пройдет занятие «Apache Kafka в микросервисной архитектуре — лучшие практики асинхронного обмена». На нем рассмотрим подходы к организации обмена сообщениями в распределенных системах.
13 августа в 18:00 вы сможете ознакомиться с архитектурными паттернами работы с Kafka — на уроке обсудим методы построения устойчивых и масштабируемых решений.
20 августа в 20:00 пройдет урок «Kafka и Clickhouse — как организовать взаимодействие», посвященный интеграции Kafka с аналитической платформой Clickhouse.
А если вы настроены на серьезное обучение, рекомендуем ознакомиться с программой курса по Apache Kafka — на нём максимум практики по работе с Kafka для инженеров данных и разработчиков.
ссылка на оригинал статьи https://habr.com/ru/articles/931264/
Добавить комментарий