Создаем Spring Boot Starter для Kafka с Avro: пошаговое руководство

от автора

Kafka Spring Boot Starter Step By Step

Kafka Spring Boot Starter Step By Step

Apache Kafka — мощный инструмент для обработки потоков данных в реальном времени, но его интеграция в проекты на Spring Boot может быть непростой задачей

В этой статье вы узнаете о лучших практиках разработки стартеров Spring Boot Starter для Kafka с поддержкой Avro, а также получите примеры использования с различными настройками. Эта статья будет полезна разработчикам, желающим упростить работу с Kafka, и менеджерам проектов, ищущим способы оптимизации процессов. Дочитавшим статью до конца будет приятный бонус.

Я хотел поделиться опытом создания библиотеки, которая упрощает интеграцию Kafka в Spring Boot, предоставляя гибкую конфигурацию и поддержку сериализации Avro.

  • Уникальность. Многие стартеры для Kafka существуют, но мой фокусируется на enterprise‑функциях (идемпотентность, ретраи) и передаче схемы Avro через параметры.

  • Почему мне верят. Я подробно описываю процесс, включая ошибки и их решения, а также публикую рабочий код.

  • Проблема. Настройка Kafka в проектах часто требует много boilerplate-кода.

  • Цель. Дать читателю готовое решение и вдохновить на создание собственных стартеров.

Что такое Spring Boot Starter?

Spring Boot Starter — это модуль, который предоставляет готовую конфигурацию для определенной технологии. Например, spring-boot-starter-web настраивает веб-сервер, а spring-boot-starter-data-jpa — доступ к базе данных. Наш стартер будет:

  • автоматически создавать KafkaProducer и KafkaConsumer;

  • использовать Avro для сериализации и десериализации сообщений;

  • поддерживать настройку через application.yml.

Этапы разработки

Процесс разработки включал создание автоконфигурации для продюсера и консюмера Kafka. Вот ключевые шаги:

  1. Определение зависимостей:

    • Spring Boot для автоконфигурации;

    • Spring Kafka для работы с Kafka;

    • Apache Avro для сериализации (опционально).

  2. Создание KafkaProperties:

    • класс для чтения конфигурации из application.yml с префиксом apppetr.kafka.

  3. Автоконфигурация:

    • настройка KafkaTemplate для продюсера и ConcurrentKafkaListenerContainerFactory для консюмера.

  4. Поддержка Avro:

    • мы будем использовать Avro для строгой типизации сообщений. Определим схему при помощи настройки в конфигурационном файле стартера.

Создание класса свойств

Для настройки стартера через application.yml создадим класс KafkaProperties в kafka-starter/src/main/java/com/app/petr/KafkaProperties.java:

Код
package com.app.petr;  import org.springframework.boot.context.properties.ConfigurationProperties;  import java.util.HashMap; import java.util.Map;  @ConfigurationProperties(prefix = "apppetr.kafka") public class KafkaProperties {     private String bootstrapServers = "localhost:9092";     private Producer producer = new Producer();     private Consumer consumer = new Consumer();     private String avroSchemaPath;       public static class Producer {         private String topic = "default-topic";         private boolean idempotenceEnabled = true; // Идемпотентность для надежности         private int acks = 1; // Подтверждения: 0, 1, all         private int retries = 3; // Количество попыток при сбоях         private int retryBackoffMs = 1000; // Задержка между ретраями         private int deliveryTimeoutMs = 120000; // Таймаут доставки         private int requestTimeoutMs = 30000; // Таймаут запроса         private int maxInFlightRequests = 5; // Макс. количество неподтвержденных запросов         private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки          public String getTopic() { return topic; }         public void setTopic(String topic) { this.topic = topic; }         public boolean isIdempotenceEnabled() { return idempotenceEnabled; }         public void setIdempotenceEnabled(boolean idempotenceEnabled) { this.idempotenceEnabled = idempotenceEnabled; }         public int getAcks() { return acks; }         public void setAcks(int acks) { this.acks = acks; }         public int getRetries() { return retries; }         public void setRetries(int retries) { this.retries = retries; }         public int getRetryBackoffMs() { return retryBackoffMs; }         public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }         public int getDeliveryTimeoutMs() { return deliveryTimeoutMs; }         public void setDeliveryTimeoutMs(int deliveryTimeoutMs) { this.deliveryTimeoutMs = deliveryTimeoutMs; }         public int getRequestTimeoutMs() { return requestTimeoutMs; }         public void setRequestTimeoutMs(int requestTimeoutMs) { this.requestTimeoutMs = requestTimeoutMs; }         public int getMaxInFlightRequests() { return maxInFlightRequests; }         public void setMaxInFlightRequests(int maxInFlightRequests) { this.maxInFlightRequests = maxInFlightRequests; }         public Map<String, Object> getConfig() { return config; }         public void setConfig(Map<String, Object> config) { this.config = config; }     }      public static class Consumer {         private String topic = "default-topic";         private String groupId = "default-group";         private int maxPollRecords = 500; // Макс. записей за один poll         private int maxPollIntervalMs = 300000; // Макс. интервал между poll         private int sessionTimeoutMs = 10000; // Таймаут сессии         private int heartbeatIntervalMs = 3000; // Интервал heartbeat         private int fetchMaxBytes = 52428800; // Макс. размер выборки (50MB)         private boolean autoCommitEnabled = true; // Автокоммит оффсетов         private int autoCommitIntervalMs = 5000; // Интервал автокоммита         private String autoOffsetReset = "earliest"; // Сброс оффсета         private int retryBackoffMs = 1000; // Задержка между ретраями         private int maxRetries = 3; // Количество ретраев         private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки          public String getTopic() { return topic; }         public void setTopic(String topic) { this.topic = topic; }         public String getGroupId() { return groupId; }         public void setGroupId(String groupId) { this.groupId = groupId; }         public int getMaxPollRecords() { return maxPollRecords; }         public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; }         public int getMaxPollIntervalMs() { return maxPollIntervalMs; }         public void setMaxPollIntervalMs(int maxPollIntervalMs) { this.maxPollIntervalMs = maxPollIntervalMs; }         public int getSessionTimeoutMs() { return sessionTimeoutMs; }         public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; }         public int getHeartbeatIntervalMs() { return heartbeatIntervalMs; }         public void setHeartbeatIntervalMs(int heartbeatIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; }         public int getFetchMaxBytes() { return fetchMaxBytes; }         public void setFetchMaxBytes(int fetchMaxBytes) { this.fetchMaxBytes = fetchMaxBytes; }         public boolean isAutoCommitEnabled() { return autoCommitEnabled; }         public void setAutoCommitEnabled(boolean autoCommitEnabled) { this.autoCommitEnabled = autoCommitEnabled; }         public int getAutoCommitIntervalMs() { return autoCommitIntervalMs; }         public void setAutoCommitIntervalMs(int autoCommitIntervalMs) { this.autoCommitIntervalMs = autoCommitIntervalMs; }         public String getAutoOffsetReset() { return autoOffsetReset; }         public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; }         public int getRetryBackoffMs() { return retryBackoffMs; }         public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; }         public int getMaxRetries() { return maxRetries; }         public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }         public Map<String, Object> getConfig() { return config; }         public void setConfig(Map<String, Object> config) { this.config = config; }     }      public String getBootstrapServers() { return bootstrapServers; }     public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; }     public Producer getProducer() { return producer; }     public void setProducer(Producer producer) { this.producer = producer; }     public Consumer getConsumer() { return consumer; }     public void setConsumer(Consumer consumer) { this.consumer = consumer; }      public String getAvroSchemaPath() {         return avroSchemaPath;     }      public void setAvroSchemaPath(String avroSchemaPath) {         this.avroSchemaPath = avroSchemaPath;     }  }

Этот класс позволяет задавать параметры Kafka через конфигурацию, например:

apppetr:   kafka:     bootstrap-servers: localhost:9092     producer:       topic: test-topic       idempotence-enabled: true       acks: all       retries: 5       retry-backoff-ms: 2000       delivery-timeout-ms: 120000       request-timeout-ms: 30000       max-in-flight-requests: 5     consumer:       topic: test-topic       group-id: test-group       max-poll-records: 1000       max-poll-interval-ms: 600000       session-timeout-ms: 15000       heartbeat-interval-ms: 5000       fetch-max-bytes: 52428800       auto-commit-enabled: true       auto-commit-interval-ms: 5000       auto-offset-reset: earliest       retry-backoff-ms: 2000       max-retries: 5     avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl

Реализация автоконфигурации

Ключевой элемент стартера — класс автоконфигурации KafkaAutoConfiguration в kafka-starter/src/main/java/com/app/petr/KafkaAutoConfiguration.java:

Код
package com.app.petr;  import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff;  import java.util.HashMap; import java.util.Map;  @Configuration @ConditionalOnClass({ KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class }) @EnableKafka @EnableConfigurationProperties(KafkaProperties.class) @ConditionalOnProperty(prefix = "apppetr.kafka", name = "bootstrap-servers") public class KafkaAutoConfiguration {      private final KafkaProperties properties;      public KafkaAutoConfiguration(KafkaProperties properties) {         this.properties = properties;     }      // Producer Configuration     @Bean     @ConditionalOnMissingBean     public ProducerFactory<String, byte[]> producerFactory() {         Map<String, Object> config = new HashMap<>();         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");         config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, properties.getProducer().isIdempotenceEnabled());         config.put(ProducerConfig.ACKS_CONFIG, String.valueOf(properties.getProducer().getAcks()));         config.put(ProducerConfig.RETRIES_CONFIG, properties.getProducer().getRetries());         config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, properties.getProducer().getRetryBackoffMs());         config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, properties.getProducer().getDeliveryTimeoutMs());         config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProducer().getRequestTimeoutMs());         config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, properties.getProducer().getMaxInFlightRequests());         config.putAll(properties.getProducer().getConfig());         return new DefaultKafkaProducerFactory<>(config);     }      @Bean     @ConditionalOnMissingBean     public KafkaTemplate<String, byte[]> kafkaTemplate(ProducerFactory<String, byte[]> producerFactory) {         KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(producerFactory);         template.setDefaultTopic(properties.getProducer().getTopic());         return template;     }      // Consumer Configuration     @Bean     @ConditionalOnMissingBean     public ConsumerFactory<String, byte[]> consumerFactory() {         Map<String, Object> config = new HashMap<>();         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());         config.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumer().getGroupId());         config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");         config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getConsumer().getMaxPollRecords());         config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getConsumer().getMaxPollIntervalMs());         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getConsumer().getSessionTimeoutMs());         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getConsumer().getHeartbeatIntervalMs());         config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, properties.getConsumer().getFetchMaxBytes());         config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getConsumer().isAutoCommitEnabled());         config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getConsumer().getAutoCommitIntervalMs());         config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset());         config.putAll(properties.getConsumer().getConfig());         return new DefaultKafkaConsumerFactory<>(config);     }      @Bean     @ConditionalOnMissingBean     public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(             ConsumerFactory<String, byte[]> consumerFactory) {         ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =                 new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory);         factory.setConcurrency(1); // Параллелизм         factory.getContainerProperties().setPollTimeout(3000); // Таймаут опроса         factory.setCommonErrorHandler(errorHandler()); // Настраиваем обработку ошибок с ретраями         return factory;     }      @Bean     public DefaultErrorHandler errorHandler() {         BackOff backOff = new FixedBackOff(                 properties.getConsumer().getRetryBackoffMs(),                 properties.getConsumer().getMaxRetries()         );         DefaultErrorHandler errorHandler = new DefaultErrorHandler(                 (record, exception) -> {                     System.err.println("Failed to process record: " + record + ", exception: " + exception.getMessage());                 }, // Логирование ошибок                 backOff         );         return errorHandler;     } }

Что делает этот код:

  • создаёт KafkaProducer и KafkaConsumer с настройками из KafkaProperties;

  • использует @ConditionalOnMissingBean, чтобы пользователь мог переопределить бины;

  • добавляет DisposableBean для корректного закрытия ресурсов.

Добавление файла импортов автоконфигурации

В процессе разработки стартера я добавил файл src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports, который заменил устаревший подход с spring.factories для указания автоконфигурационных классов в Spring Boot 3.x. Этот файл содержит список классов автоконфигурации, которые Spring Boot должен загрузить.

Пример содержимого:

io.github.bigbox89.KafkaAutoConfiguration
  • Назначение. Сообщает Spring Boot, что KafkaAutoConfiguration является точкой входа для автоконфигурации стартера. Это упрощает обнаружение и регистрацию конфигурации без необходимости полного сканирования classpath.

Что будет, если его не добавить

Если файл org.springframework.boot.autoconfigure.AutoConfiguration.imports отсутствует:

  • Автоконфигурация не загрузится. Spring Boot не найдёт класс KafkaAutoConfiguration, и стартер не будет применён, даже если все зависимости и свойства указаны корректно.

  • Тихий сбой. Приложение запустится, но продюсер и консюмер Kafka не будут настроены, что может привести к неочевидным ошибкам (например, @KafkaListener не сработает).

  • Совместимость. В версиях Spring Boot до 3.0 можно было использовать spring.factories, но в 3.x без AutoConfiguration.imports стартер становится неработоспособным.

Примеры использования

Продюсер

Пример ProducerApplication в producer-example/src/main/java/com/app/petr/ProducerApplication.java
package com.app.petr;  import app.petr.Message; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate;  import java.nio.ByteBuffer;  @SpringBootApplication public class ProducerApplication {      public static void main(String[] args) {         SpringApplication.run(ProducerApplication.class, args);     }      @Bean     public ApplicationRunner runner(KafkaTemplate<String, byte[]> kafkaTemplate, KafkaProperties properties) {         return args -> {             String topic = properties.getProducer().getTopic();             if (topic == null) {                 throw new IllegalStateException("Producer topic is not configured");             }              for (int i = 0; i < 100; i++) {                 Message message = Message.newBuilder()                         .setId("id-" + i)                         .setContent("Message " + i)                         .setTimestamp(System.currentTimeMillis())                         .build();                  ByteBuffer buffer = message.toByteBuffer();                 byte[] messageBytes = new byte[buffer.remaining()];                 buffer.get(messageBytes);                  kafkaTemplate.send(topic, message.getId().toString(), messageBytes);                 System.out.println("Sent message: " + message.getId());                 Thread.sleep(1000);             }         };     } }

Консюмер

Пример ConsumerApplication в consumer-example/src/main/java/com/app/petr/ConsumerApplication.java
package com.app.petr;   import app.petr.Message; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener;  import java.nio.ByteBuffer;  @SpringBootApplication public class ConsumerApplication {      public static void main(String[] args) {         SpringApplication.run(ConsumerApplication.class, args);     }      @KafkaListener(topics = "#{kafkaProperties.consumer.topic}", groupId = "#{kafkaProperties.consumer.groupId}")     public void listen(byte[] message) throws Exception {         Message avroMessage = Message.fromByteBuffer(ByteBuffer.wrap(message));         System.out.printf("Received message: id=%s, content=%s, timestamp=%d%n",                 avroMessage.getId(), avroMessage.getContent(), avroMessage.getTimestamp());     } }

Тестирование с Testcontainers

Чтобы убедиться, что стартер работает, добавим интеграционные тесты:

Код
package com.app.petr;  import app.petr.Message; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName;  import java.io.ByteArrayOutputStream; import java.time.Duration; import java.util.Collections; import java.util.Properties;  import static org.assertj.core.api.Assertions.assertThat;  @Testcontainers public class IntegrationProducerAndConsumerTest {      private static final String TOPIC = "rest_data";      @Container     private static final KafkaContainer kafkaContainer = new KafkaContainer(             DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));      private KafkaProducer<String, byte[]> createProducer() {         Properties props = new Properties();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");         return new KafkaProducer<>(props);     }      private KafkaConsumer<String, byte[]> createConsumer() {         Properties props = new Properties();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());         props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         return new KafkaConsumer<>(props);     }      @Test     public void testProducerAndConsumer() throws Exception {          KafkaProducer<String, byte[]> producer = createProducer();         KafkaConsumer<String, byte[]> consumer = createConsumer();          consumer.subscribe(Collections.singleton(TOPIC));          Message message = Message.newBuilder()                 .setId("test-id")                 .setContent("Test content")                 .setTimestamp(System.currentTimeMillis())                 .build();          ByteArrayOutputStream out = new ByteArrayOutputStream();         Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);         SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);         writer.write(message, encoder);         encoder.flush();         byte[] serializedMessage = out.toByteArray();          ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);         producer.send(record).get();         producer.flush();          SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class);         ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));         assertThat(records.isEmpty()).isFalse();          records.forEach(consumerRecord -> {             try {                 Decoder decoder = DecoderFactory.get().binaryDecoder(consumerRecord.value(), null);                 Message receivedMessage = reader.read(null, decoder);                 assertThat(receivedMessage.getId().toString()).isEqualTo(message.getId());                 assertThat(receivedMessage.getContent().toString()).isEqualTo(message.getContent());                 assertThat(receivedMessage.getTimestamp()).isEqualTo(message.getTimestamp());             } catch (Exception e) {                 throw new RuntimeException("Failed to deserialize message", e);             }         });          producer.close();         consumer.close();     } }

Также добавим отдельно тесты для продюсера:

Код
package com.app.petr;  import app.petr.Message; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TimeoutException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName;  import java.io.ByteArrayOutputStream; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;  import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows;  @Testcontainers public class ProducerApplicationTest {      private static final String TOPIC = "test_data";      @Container     private static final KafkaContainer kafkaContainer = new KafkaContainer(             DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));      @BeforeAll     static void setup() {         kafkaContainer.start();     }      private KafkaProducer<String, byte[]> createProducer(Properties props) {         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");         return new KafkaProducer<>(props);     }      private byte[] serializeMessage(Message message) throws Exception {         ByteArrayOutputStream out = new ByteArrayOutputStream();         Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);         SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);         writer.write(message, encoder);         encoder.flush();         return out.toByteArray();     }      @Test     public void testProducerWithDefaultConfig() throws Exception {         Properties props = new Properties();         KafkaProducer<String, byte[]> producer = createProducer(props);          Message message = Message.newBuilder()                 .setId("test-id-1")                 .setContent("Default config test")                 .setTimestamp(System.currentTimeMillis())                 .build();          byte[] serializedMessage = serializeMessage(message);         ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);          Future<?> future = producer.send(record);         producer.flush();          assertThat(future.get()).isNotNull(); // Проверяем успешную отправку         producer.close();     }      @Test     public void testProducerWithIdempotenceEnabled() throws Exception {         Properties props = new Properties();         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Включаем идемпотентность         props.put(ProducerConfig.ACKS_CONFIG, "all"); // Требуется для идемпотентности         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Ограничение для идемпотентности         KafkaProducer<String, byte[]> producer = createProducer(props);          Message message = Message.newBuilder()                 .setId("test-id-2")                 .setContent("Idempotent test")                 .setTimestamp(System.currentTimeMillis())                 .build();          byte[] serializedMessage = serializeMessage(message);         ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);          Future<?> future = producer.send(record);         producer.send(record); // Отправляем тот же ключ ещё раз         producer.flush();          assertThat(future.get()).isNotNull(); // Успешная отправка с идемпотентностью         producer.close();     }      @Test     public void testProducerWithAcksZero() throws Exception {         Properties props = new Properties();         props.put(ProducerConfig.ACKS_CONFIG, "0"); // Без подтверждений         KafkaProducer<String, byte[]> producer = createProducer(props);          Message message = Message.newBuilder()                 .setId("test-id-3")                 .setContent("Acks=0 test")                 .setTimestamp(System.currentTimeMillis())                 .build();          byte[] serializedMessage = serializeMessage(message);         ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);          Future<?> future = producer.send(record);         producer.flush();          assertThat(future.get()).isNotNull(); // Отправка без ожидания подтверждения         producer.close();     }      @Test     public void testProducerWithRetries() throws Exception {         Properties props = new Properties();         props.put(ProducerConfig.RETRIES_CONFIG, "3"); // 3 попытки         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); // Задержка между попытками 100 мс         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500"); // Уменьшаем таймаут запроса до 500 мс         props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1000"); // Таймаут доставки 1000 мс (должен быть >= linger.ms + request.timeout.ms)         props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); // Явно задаём linger.ms для ясности         KafkaProducer<String, byte[]> producer = createProducer(props);          Message message = Message.newBuilder()                 .setId("test-id-4")                 .setContent("Retries test")                 .setTimestamp(System.currentTimeMillis())                 .build();          byte[] serializedMessage = serializeMessage(message);         ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);          Future<?> future = producer.send(record);         producer.flush();          assertThat(future.get()).isNotNull(); // Успешная отправка с ретраями         producer.close();     }      @Test     public void testProducerWithShortTimeout() {         Properties props = new Properties();         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут         props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут доставки         KafkaProducer<String, byte[]> producer = createProducer(props);          Message message = Message.newBuilder()                 .setId("test-id-5")                 .setContent("Short timeout test")                 .setTimestamp(System.currentTimeMillis())                 .build();          byte[] serializedMessage;         try {             serializedMessage = serializeMessage(message);         } catch (Exception e) {             throw new RuntimeException("Serialization failed", e);         }         ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage);          Future<?> future = producer.send(record);          // Ожидаем исключение из-за короткого таймаута         assertThrows(ExecutionException.class, future::get, "Expected timeout exception due to short timeout");         assertThat(future.isDone()).isTrue();          producer.close();     } }

И конcюмера:

Код
package com.app.petr;  import app.petr.Message; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName;  import java.io.ByteArrayOutputStream; import java.time.Duration; import java.util.Collections; import java.util.Properties;  import static org.assertj.core.api.Assertions.assertThat;  @Testcontainers public class ConsumerApplicationTest {      private static final String TOPIC = "consumer_test_data";      @Container     private static final KafkaContainer kafkaContainer = new KafkaContainer(             DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));      @BeforeAll     static void setup() {         kafkaContainer.start();     }      @BeforeEach     void clearTopic() {         Properties adminProps = new Properties();         adminProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());         try (AdminClient adminClient = AdminClient.create(adminProps)) {             adminClient.deleteTopics(Collections.singleton(TOPIC)).all().get();             adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC, 1, (short) 1))).all().get();         } catch (Exception e) {             // Игнорируем ошибки, если топик не существовал         }     }      private KafkaProducer<String, byte[]> createProducer() {         Properties props = new Properties();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");         return new KafkaProducer<>(props);     }      private KafkaConsumer<String, byte[]> createConsumer(Properties props) {         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");         return new KafkaConsumer<>(props);     }      private byte[] serializeMessage(Message message) throws Exception {         ByteArrayOutputStream out = new ByteArrayOutputStream();         Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);         SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class);         writer.write(message, encoder);         encoder.flush();         return out.toByteArray();     }      private Message deserializeMessage(byte[] data) throws Exception {         Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);         SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class);         return reader.read(null, decoder);     }      private void sendMessages(int count) throws Exception {         KafkaProducer<String, byte[]> producer = createProducer();         for (int i = 0; i < count; i++) {             Message message = Message.newBuilder()                     .setId("id-" + i)                     .setContent("Message " + i)                     .setTimestamp(System.currentTimeMillis())                     .build();             byte[] serializedMessage = serializeMessage(message);             producer.send(new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage)).get();         }         producer.flush();         producer.close();     }      @Test     public void testConsumerWithDefaultConfig() throws Exception {         Properties props = new Properties();         props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group-" + System.nanoTime());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         KafkaConsumer<String, byte[]> consumer = createConsumer(props);          consumer.subscribe(Collections.singleton(TOPIC));         sendMessages(5);          ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));         assertThat(records.count()).isEqualTo(5);          consumer.close();     }      @Test     public void testConsumerWithMaxPollRecords() throws Exception {         Properties props = new Properties();         props.put(ConsumerConfig.GROUP_ID_CONFIG, "max-poll-group-" + System.nanoTime());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");         KafkaConsumer<String, byte[]> consumer = createConsumer(props);          consumer.subscribe(Collections.singleton(TOPIC));         sendMessages(5);          ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));         assertThat(records.count()).isLessThanOrEqualTo(2);          consumer.close();     }      @Test     public void testConsumerWithAutoOffsetResetLatest() throws Exception {         Properties props = new Properties();         props.put(ConsumerConfig.GROUP_ID_CONFIG, "latest-group-" + System.nanoTime());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");         KafkaConsumer<String, byte[]> consumer = createConsumer(props);          // Подписываемся и вызываем poll для завершения регистрации         consumer.subscribe(Collections.singleton(TOPIC));         consumer.poll(Duration.ofSeconds(20)); // Даём Kafka время зарегистрировать консюмера          // Убеждаемся, что до отправки сообщений ничего не читается         ConsumerRecords<String, byte[]> recordsBefore = consumer.poll(Duration.ofSeconds(1));         assertThat(recordsBefore.isEmpty()).isTrue();          // Отправляем 3 сообщения после полной регистрации         sendMessages(3);          // Читаем новые сообщения         ConsumerRecords<String, byte[]> recordsAfter = consumer.poll(Duration.ofSeconds(5));         assertThat(recordsAfter.count()).isEqualTo(3);          consumer.close();     }      @Test     public void testConsumerWithDisableAutoCommit() throws Exception {         Properties props = new Properties();         props.put(ConsumerConfig.GROUP_ID_CONFIG, "no-auto-commit-group-" + System.nanoTime());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");         KafkaConsumer<String, byte[]> consumer = createConsumer(props);          consumer.subscribe(Collections.singleton(TOPIC));         sendMessages(5);          ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));         assertThat(records.count()).isEqualTo(5);          consumer.close();         consumer = createConsumer(props);         consumer.subscribe(Collections.singleton(TOPIC));         ConsumerRecords<String, byte[]> recordsAgain = consumer.poll(Duration.ofSeconds(5));         assertThat(recordsAgain.count()).isEqualTo(5);          consumer.close();     }      @Test     public void testConsumerWithShortSessionTimeout() throws Exception {         Properties props = new Properties();         props.put(ConsumerConfig.GROUP_ID_CONFIG, "short-session-group-" + System.nanoTime());         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");         props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000");         KafkaConsumer<String, byte[]> consumer = createConsumer(props);          consumer.subscribe(Collections.singleton(TOPIC));         sendMessages(3);          ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5));         assertThat(records.count()).isEqualTo(3);          consumer.close();     } }

Как работает тест:

  • Testcontainers запускает Kafka-контейнер;

  • сообщение отправляется через продюсер и читается консюмером;

  • проверяется соответствие отправленного и полученного сообщения.

Преимущества стартера

  • Простота подключения. Достаточно добавить зависимость и настроить application.yml.

  • Гибкость. Возможность переопределить бины или добавить кастомные настройки.

  • Типобезопасность. Avro обеспечивает строгую структуру данных.

Возможные ошибки и их решение

Проблема 1: генерация Avro-классов внутри стартера

Ситуация: изначально я использовал avro-maven-plugin для генерации классов из message.avdl в стартере. Это ограничивало гибкость, так как схема была фиксированной.

Решение:

  • убрал генерацию из стартера;

  • добавил параметр avro-schema-path в KafkaProperties, чтобы пользователь указывал путь к схеме;

  • переложил ответственность за генерацию на проект пользователя.

Пример конфигурации:

apppetr:   kafka:     avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl

Проблема 2: ошибки в тестах с auto.offset.reset=latest

Ситуация: тест testConsumerWithAutoOffsetResetLatest ожидал 3 сообщения, но получал 1 из-за асинхронности подписки.

Решение:

  • добавил consumer.poll(Duration.ofSeconds(2)) после subscribe, чтобы дождаться регистрации консюмера;

  • убрал Thread.sleep, сделав тест более надёжным.

consumer.subscribe(Collections.singleton(TOPIC)); consumer.poll(Duration.ofSeconds(20)); sendMessages(3); assertThat(consumer.poll(Duration.ofSeconds(5)).count()).isEqualTo(3);

Примеры применения стартера

1. Надёжная доставка транзакций

Задача: гарантировать доставку без дубликатов.

Конфигурация:

apppetr:   kafka:     bootstrap-servers: kafka1:9092,kafka2:9092     producer:       topic: transactions       idempotence-enabled: true       acks: all       retries: 5       retry-backoff-ms: 2000     avro-schema-path: src/main/resources/avro/transaction.avdl

Код:

@Autowired private KafkaTemplate<String, byte[]> kafkaTemplate;  public void sendTransaction(Transaction tx) throws Exception {     ByteArrayOutputStream out = new ByteArrayOutputStream();     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);     SpecificDatumWriter<Transaction> writer = new SpecificDatumWriter<>(Transaction.class);     writer.write(tx, encoder);     encoder.flush();     kafkaTemplate.send("transactions", tx.getId().toString(), out.toByteArray()); }

2. Высокоскоростной сбор логов

Задача: быстрая отправка логов с допустимой потерей.

Конфигурация:

apppetr:   kafka:     bootstrap-servers: localhost:9092     producer:       topic: logs       acks: 0       max-in-flight-requests: 10     consumer:       topic: logs       group-id: log-collector       max-poll-records: 1000       auto-offset-reset: latest     avro-schema-path: src/main/resources/avro/log.avdl

Код:

@KafkaListener(topics = "logs", groupId = "log-collector") public void processLog(byte[] data) throws Exception {     Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);     SpecificDatumReader<Log> reader = new SpecificDatumReader<>(Log.class);     Log log = reader.read(null, decoder);     System.out.println("Log: " + log.getMessage()); }

3. Обработка событий с ретраями

Задача: повторная обработка при сбоях.

Конфигурация:

apppetr:   kafka:     bootstrap-servers: kafka:9092     consumer:       topic: events       group-id: event-processor       retry-backoff-ms: 2000       max-retries: 5     avro-schema-path: src/main/resources/avro/event.avdl

Код:

@KafkaListener(topics = "events", groupId = "event-processor") public void handleEvent(byte[] data) throws Exception {     Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);     SpecificDatumReader<Event> reader = new SpecificDatumReader<>(Event.class);     Event event = reader.read(null, decoder);     processEvent(event); // Логика с возможными исключениями }

Бонус дочитавшим

Памятка-шпаргалка по основным параметрам конфигурации Kafka.

Общие параметры

Параметр

Описание

Тип

По умолчанию

bootstrap.servers

Список адресов брокеров Kafka

String

localhost:9092

Параметры продюсера

Параметр

Описание

Тип

По умолчанию

key.serializer

Сериализатор ключа

Class

value.serializer

Сериализатор значения

Class

acks

Уровень подтверждений (0, 1, all)

String

1

retries

Количество ретраев при сбоях

int

0

retry.backoff.ms

Задержка между ретраями (мс)

int

100

enable.idempotence

Включение идемпотентности

boolean

false

max.in.flight.requests.per.connection

Макс. неподтверждённых запросов

int

5

delivery.timeout.ms

Общий таймаут доставки (мс)

int

120000 (2 мин)

request.timeout.ms

Таймаут запроса к брокеру (мс)

int

30000 (30 сек)

buffer.memory

Размер буфера для отправки (байт)

long

33554432 (32MB)

batch.size

Размер батча для отправки (байт)

int

16384 (16KB)

linger.ms

Задержка перед отправкой батча (мс)

int

0

compression.type

Тип сжатия (none, gzip, snappy, lz4, zstd)

String

none

Параметры консюмера

Параметр

Описание

Тип

По умолчанию

key.deserializer

Десериализатор ключа

Class

value.deserializer

Десериализатор значения

Class

group.id

Идентификатор группы потребителей

String

null

auto.offset.reset

Политика сброса оффсета (earliest, latest, none)

String

latest

enable.auto.commit

Включение автокоммита оффсетов

boolean

true

auto.commit.interval.ms

Интервал автокоммита (мс)

int

5000 (5 сек)

max.poll.records

Макс. записей за один poll

int

500

max.poll.interval.ms

Макс. интервал между poll (мс)

int

300000 (5 мин)

session.timeout.ms

Таймаут сессии группы (мс)

int

10000 (10 сек)

heartbeat.interval.ms

Интервал heartbeat (мс)

int

3000 (3 сек)

fetch.max.bytes

Макс. размер выборки (байт)

int

52428800 (50MB)

fetch.min.bytes

Мин. размер выборки (байт)

int

1

fetch.max.wait.ms

Макс. ожидание данных (мс)

int

500

Полезные заметки

  • Идемпотентность. Для enable.idempotence=true требуется acks=all и retries > 0.

  • Производительность. Увеличьте batch.size и linger.ms для продюсера или max.poll.records для консюмера, чтобы повысить пропускную способность.

  • Надёжность. Используйте acks=all и высокий retries для продюсера, отключите enable.auto.commit для точного контроля оффсетов в консюмере.

  • Отладка. Логируйте group.id и проверяйте auto.offset.reset, если данные не читаются.

Заключение

Создание kafka-spring-boot-starter позволило мне упростить интеграцию Kafka в проекты на Spring Boot. Проблемы с Avro и тестами научили меня гибкости и важности синхронизации в асинхронных системах. Надеюсь, этот опыт вдохновит вас на создание собственных библиотек!

Вопрос к читателям: какую функциональность вы бы добавили в такой стартер? Делитесь идеями в комментариях!

Попробуйте внедрить стартер в свой проект! Какие задачи вы решаете с Kafka? Делитесь опытом в комментариях — обсудим, как улучшить этот подход!

Исходный код доступен на GitHub https://github.com/bigbox89/kafka-spring-boot-starter . Если у вас возникнут вопросы по настройке или тестированию, пишите — разберёмся вместе!

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Предпочитаете писать стартеры сами или пользуетесь готовыми?

100% да2
0% нет0

Проголосовали 2 пользователя. Воздержались 2 пользователя.

ссылка на оригинал статьи https://habr.com/ru/articles/894402/