Тестируем Kafka с Testcontainers

от автора

Привет, Хабр!

Сегодня мы рассмотрим, как протестировать Kafka с помощью Testcontainers.

Testcontainers — это библиотека, которая из JUnit‑теста запускает Docker‑контейнеры как обычные Java‑объекты. Вы пишете пару строк — а на фоне поднимается полноценная инфраструктура: база, брокер, Redis, что угодно. После теста контейнер гарантированно останавливается, поэтому окружение всегда чистое, а CI не засоряется процессами.

Kafka в Testcontainers запускается теми же двумя строками. Получаем реальный брокер, который ничем не отличается от продакшен‑копии, но живёт ровно столько, сколько идёт тест.

Подключаем зависимости: Gradle и Maven

Если вы на Gradle:

dependencies {     testImplementation platform("org.testcontainers:testcontainers-bom:1.21.3")     testImplementation("org.testcontainers:kafka") }

А если Maven:

<dependencyManagement>   <dependencies>     <dependency>       <groupId>org.testcontainers</groupId>       <artifactId>testcontainers-bom</artifactId>       <version>1.21.3</version>       <type>pom</type>       <scope>import</scope>     </dependency>   </dependencies> </dependencyManagement>  <dependencies>   <dependency>     <groupId>org.testcontainers</groupId>     <artifactId>kafka</artifactId>     <scope>test</scope>   </dependency> </dependencies>

Версия 1.21.3 — на момент написания последняя стабильная.

Первый интеграционный тест с KafkaContainer

Начнём с базового кейса: хочется протестировать, что Kafka работает, продюсер может записать сообщение, а консьюмер прочитать его обратно.

@Testcontainers public class KafkaSmokeTest {      // Объявляем KafkaContainer как @Container — Testcontainers сам поднимет и убьёт его     @Container     static final KafkaContainer kafka = new KafkaContainer(         DockerImageName.parse("apache/kafka-native:3.8.0")     );      @Test     void produceConsume() throws Exception {         // Получаем bootstrap-адрес запущенного брокера         String bootstrap = kafka.getBootstrapServers();          // Конфигурируем Kafka-продюсер         Properties producerProps = new Properties();         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());          // Создаём продюсер и отправляем одно сообщение в demo-topic         KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);         producer.send(new ProducerRecord<>("demo-topic", "my-key", "Hello Kafka!")).get();         producer.close();          // Конфигурируем Kafka-консьюмер         Properties consumerProps = new Properties();         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());          // Создаём консьюмера и подписываемся на тот же топик         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);         consumer.subscribe(List.of("demo-topic"));          // Ждём сообщения         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));          // Проверяем, что пришло именно то сообщение         assertEquals(1, records.count());         assertEquals("Hello Kafka!", records.iterator().next().value());     } }

Контейнер Kafka поднимается автоматически перед тестом, и не нужно ничего запускать вручную. apache/kafka-native:3.8.0 это официальный Kafka‑образ, работающий в KRaft‑режиме (без ZooKeeper).

Вручную создаём продюсера и консьюмера, чтобы максимально контролировать процесс. Проверка assertEquals подтверждает, что сообщение не просто «куда‑то ушло», а дошло до консьюмера.

Что с ZooKeeper и режимами Kafka?

Kafka долгое время зависела от ZooKeeper — для хранения метаданных, регистрации брокеров, и всей магии с кворумами. Но с версии 3.3 Kafka официально переведена в KRaft‑режим (Kafka Raft), где всё хранится внутри самого брокера.

В нашем примере используется образ:

DockerImageName.parse("apache/kafka-native:3.8.0")

Он работает в режиме KRaft по умолчанию. Это значит:

  • ZooKeeper не нужен;

  • всё конфигурируется проще;

  • быстрее стартует;

  • меньше зависимостей.

Если вам по какой‑то причине всё‑таки нужен ZooKeeper — например, для старых клиентов или особой топологии, то есть ConfluentKafkaContainer с образом confluentinc/cp-kafka:<до-7.4.0>.

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:6.2.1");

Но если нет строгих требований используемapache/kafka-native и живем спокойно.

Создание топиков вручную через AdminClient

По дефолту Kafka может создавать топики при первом продюсе. Но это плохая практика, продакшене чаще всего auto.create.topics.enable=false. Поэтому явно создаём нужные топики через AdminClient.

Вот так:

try (AdminClient admin = AdminClient.create(Map.of(         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers() ))) {     List<NewTopic> topics = List.of(         new NewTopic("input", 1, (short)1),         new NewTopic("primes", 1, (short)1),         new NewTopic("composites", 1, (short)1),         new NewTopic("dlq", 1, (short)1)     );      admin.createTopics(topics).all().get(); }

new NewTopic("имя", партиции, фактор репликации) — в тестах у нас один брокер, так что replicationFactor должен быть строго 1. createTopics(...).all().get() блокирует выполнение до полной регистрации тем на брокере. Без этого возможны гонки.

Если не задать replicationFactor=1, Kafka может начать пытаться найти других брокеров (которых нет) и выбросит TimeoutException.

Пример поинтереснее: маршрутизация по топикам

Читаем из топика числа в виде строк, парсим их, проверяем, простое ли число, и отправляем либо в primes, либо в composites. Если парс не удался — в dlq.

public void routeMessages(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer) {     consumer.subscribe(List.of("input"));      while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));         for (ConsumerRecord<String, String> record : records) {             try {                 int value = Integer.parseInt(record.value());                 String topic = isPrime(value) ? "primes" : "composites";                 producer.send(new ProducerRecord<>(topic, record.key(), record.value()));             } catch (NumberFormatException ex) {                 producer.send(new ProducerRecord<>("dlq", record.key(), record.value()));             }         }     } }  private boolean isPrime(int n) {     if (n < 2) return false;     for (int i = 2; i <= Math.sqrt(n); i++) {         if (n % i == 0) return false;     }     return true; }

Простой алгоритм, но идеальный для теста. Мы можем:

  • отправить 7, получить primes;

  • отправить 8, получить composites;

  • отправить "abc", получить dlq.

Сетевые listener и взаимодействие между контейнерами

Если есть ещё один контейнер (например, с kcat, или с сервисом) и хочется, чтобы он подключался к Kafka в той же сети:

Network net = Network.newNetwork();  KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")     .withNetwork(net)     .withListener("broker:19092"); // будет виден как "broker:19092"  GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.9.0")     .withNetwork(net)     .withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint("sh"))     .withCopyToContainer(Transferable.of("7\n8\nabc\n"), "/msgs.txt")     .withCommand("-c", "tail -f /dev/null");  kcat.start(); kafka.start();

Теперь можно из kcat в тесте:

kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "input", "-P", "-l", "/msgs.txt");

Или зачекать результат:

String output = kcat.execInContainer("kcat", "-b", "broker:19092", "-t", "primes", "-C", "-e", "-c", "1").getStdout();

Делитесь своим опытом тестирования Kafka в комментариях, задавайте вопросы, спорьте с решениями — чем больше практических кейсов мы соберём под этой статьёй, тем сильнее станет сообщество. Спасибо, что дочитали!

Приглашаем вас принять участие в серии открытых уроков по Apache Kafka, которые помогут глубже понять ключевые аспекты работы с этой технологией.

30 июля в 19:00 пройдет занятие «Apache Kafka в микросервисной архитектуре — лучшие практики асинхронного обмена». На нем рассмотрим подходы к организации обмена сообщениями в распределенных системах.

13 августа в 18:00 вы сможете ознакомиться с архитектурными паттернами работы с Kafka — на уроке обсудим методы построения устойчивых и масштабируемых решений.

20 августа в 20:00 пройдет урок «Kafka и Clickhouse — как организовать взаимодействие», посвященный интеграции Kafka с аналитической платформой Clickhouse.

А если вы настроены на серьезное обучение, рекомендуем ознакомиться с программой курса по Apache Kafka — на нём максимум практики по работе с Kafka для инженеров данных и разработчиков.


ссылка на оригинал статьи https://habr.com/ru/articles/931264/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *