
Apache Kafka — мощный инструмент для обработки потоков данных в реальном времени, но его интеграция в проекты на Spring Boot может быть непростой задачей
В этой статье вы узнаете о лучших практиках разработки стартеров Spring Boot Starter для Kafka с поддержкой Avro, а также получите примеры использования с различными настройками. Эта статья будет полезна разработчикам, желающим упростить работу с Kafka, и менеджерам проектов, ищущим способы оптимизации процессов. Дочитавшим статью до конца будет приятный бонус.
Я хотел поделиться опытом создания библиотеки, которая упрощает интеграцию Kafka в Spring Boot, предоставляя гибкую конфигурацию и поддержку сериализации Avro.
-
Уникальность. Многие стартеры для Kafka существуют, но мой фокусируется на enterprise‑функциях (идемпотентность, ретраи) и передаче схемы Avro через параметры.
-
Почему мне верят. Я подробно описываю процесс, включая ошибки и их решения, а также публикую рабочий код.
-
Проблема. Настройка Kafka в проектах часто требует много boilerplate-кода.
-
Цель. Дать читателю готовое решение и вдохновить на создание собственных стартеров.
Что такое Spring Boot Starter?
Spring Boot Starter — это модуль, который предоставляет готовую конфигурацию для определенной технологии. Например, spring-boot-starter-web настраивает веб-сервер, а spring-boot-starter-data-jpa — доступ к базе данных. Наш стартер будет:
-
автоматически создавать
KafkaProducer
иKafkaConsumer
; -
использовать Avro для сериализации и десериализации сообщений;
-
поддерживать настройку через application.yml.
Этапы разработки
Процесс разработки включал создание автоконфигурации для продюсера и консюмера Kafka. Вот ключевые шаги:
-
Определение зависимостей:
-
Spring Boot для автоконфигурации;
-
Spring Kafka для работы с Kafka;
-
Apache Avro для сериализации (опционально).
-
-
Создание KafkaProperties:
-
класс для чтения конфигурации из application.yml с префиксом
apppetr.kafka
.
-
-
Автоконфигурация:
-
настройка
KafkaTemplate
для продюсера иConcurrentKafkaListenerContainerFactory
для консюмера.
-
-
Поддержка Avro:
-
мы будем использовать Avro для строгой типизации сообщений. Определим схему при помощи настройки в конфигурационном файле стартера.
-
Создание класса свойств
Для настройки стартера через application.yml создадим класс KafkaProperties
в kafka-starter/src/main/java/com/app/petr/KafkaProperties.java:
Код
package com.app.petr; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.HashMap; import java.util.Map; @ConfigurationProperties(prefix = "apppetr.kafka") public class KafkaProperties { private String bootstrapServers = "localhost:9092"; private Producer producer = new Producer(); private Consumer consumer = new Consumer(); private String avroSchemaPath; public static class Producer { private String topic = "default-topic"; private boolean idempotenceEnabled = true; // Идемпотентность для надежности private int acks = 1; // Подтверждения: 0, 1, all private int retries = 3; // Количество попыток при сбоях private int retryBackoffMs = 1000; // Задержка между ретраями private int deliveryTimeoutMs = 120000; // Таймаут доставки private int requestTimeoutMs = 30000; // Таймаут запроса private int maxInFlightRequests = 5; // Макс. количество неподтвержденных запросов private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public boolean isIdempotenceEnabled() { return idempotenceEnabled; } public void setIdempotenceEnabled(boolean idempotenceEnabled) { this.idempotenceEnabled = idempotenceEnabled; } public int getAcks() { return acks; } public void setAcks(int acks) { this.acks = acks; } public int getRetries() { return retries; } public void setRetries(int retries) { this.retries = retries; } public int getRetryBackoffMs() { return retryBackoffMs; } public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; } public int getDeliveryTimeoutMs() { return deliveryTimeoutMs; } public void setDeliveryTimeoutMs(int deliveryTimeoutMs) { this.deliveryTimeoutMs = deliveryTimeoutMs; } public int getRequestTimeoutMs() { return requestTimeoutMs; } public void setRequestTimeoutMs(int requestTimeoutMs) { this.requestTimeoutMs = requestTimeoutMs; } public int getMaxInFlightRequests() { return maxInFlightRequests; } public void setMaxInFlightRequests(int maxInFlightRequests) { this.maxInFlightRequests = maxInFlightRequests; } public Map<String, Object> getConfig() { return config; } public void setConfig(Map<String, Object> config) { this.config = config; } } public static class Consumer { private String topic = "default-topic"; private String groupId = "default-group"; private int maxPollRecords = 500; // Макс. записей за один poll private int maxPollIntervalMs = 300000; // Макс. интервал между poll private int sessionTimeoutMs = 10000; // Таймаут сессии private int heartbeatIntervalMs = 3000; // Интервал heartbeat private int fetchMaxBytes = 52428800; // Макс. размер выборки (50MB) private boolean autoCommitEnabled = true; // Автокоммит оффсетов private int autoCommitIntervalMs = 5000; // Интервал автокоммита private String autoOffsetReset = "earliest"; // Сброс оффсета private int retryBackoffMs = 1000; // Задержка между ретраями private int maxRetries = 3; // Количество ретраев private Map<String, Object> config = new HashMap<>(); // Дополнительные настройки public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public int getMaxPollRecords() { return maxPollRecords; } public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; } public int getMaxPollIntervalMs() { return maxPollIntervalMs; } public void setMaxPollIntervalMs(int maxPollIntervalMs) { this.maxPollIntervalMs = maxPollIntervalMs; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public int getHeartbeatIntervalMs() { return heartbeatIntervalMs; } public void setHeartbeatIntervalMs(int heartbeatIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; } public int getFetchMaxBytes() { return fetchMaxBytes; } public void setFetchMaxBytes(int fetchMaxBytes) { this.fetchMaxBytes = fetchMaxBytes; } public boolean isAutoCommitEnabled() { return autoCommitEnabled; } public void setAutoCommitEnabled(boolean autoCommitEnabled) { this.autoCommitEnabled = autoCommitEnabled; } public int getAutoCommitIntervalMs() { return autoCommitIntervalMs; } public void setAutoCommitIntervalMs(int autoCommitIntervalMs) { this.autoCommitIntervalMs = autoCommitIntervalMs; } public String getAutoOffsetReset() { return autoOffsetReset; } public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; } public int getRetryBackoffMs() { return retryBackoffMs; } public void setRetryBackoffMs(int retryBackoffMs) { this.retryBackoffMs = retryBackoffMs; } public int getMaxRetries() { return maxRetries; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public Map<String, Object> getConfig() { return config; } public void setConfig(Map<String, Object> config) { this.config = config; } } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public Producer getProducer() { return producer; } public void setProducer(Producer producer) { this.producer = producer; } public Consumer getConsumer() { return consumer; } public void setConsumer(Consumer consumer) { this.consumer = consumer; } public String getAvroSchemaPath() { return avroSchemaPath; } public void setAvroSchemaPath(String avroSchemaPath) { this.avroSchemaPath = avroSchemaPath; } }
Этот класс позволяет задавать параметры Kafka через конфигурацию, например:
apppetr: kafka: bootstrap-servers: localhost:9092 producer: topic: test-topic idempotence-enabled: true acks: all retries: 5 retry-backoff-ms: 2000 delivery-timeout-ms: 120000 request-timeout-ms: 30000 max-in-flight-requests: 5 consumer: topic: test-topic group-id: test-group max-poll-records: 1000 max-poll-interval-ms: 600000 session-timeout-ms: 15000 heartbeat-interval-ms: 5000 fetch-max-bytes: 52428800 auto-commit-enabled: true auto-commit-interval-ms: 5000 auto-offset-reset: earliest retry-backoff-ms: 2000 max-retries: 5 avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
Реализация автоконфигурации
Ключевой элемент стартера — класс автоконфигурации KafkaAutoConfiguration
в kafka-starter/src/main/java/com/app/petr/KafkaAutoConfiguration.java:
Код
package com.app.petr; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; import java.util.HashMap; import java.util.Map; @Configuration @ConditionalOnClass({ KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class }) @EnableKafka @EnableConfigurationProperties(KafkaProperties.class) @ConditionalOnProperty(prefix = "apppetr.kafka", name = "bootstrap-servers") public class KafkaAutoConfiguration { private final KafkaProperties properties; public KafkaAutoConfiguration(KafkaProperties properties) { this.properties = properties; } // Producer Configuration @Bean @ConditionalOnMissingBean public ProducerFactory<String, byte[]> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, properties.getProducer().isIdempotenceEnabled()); config.put(ProducerConfig.ACKS_CONFIG, String.valueOf(properties.getProducer().getAcks())); config.put(ProducerConfig.RETRIES_CONFIG, properties.getProducer().getRetries()); config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, properties.getProducer().getRetryBackoffMs()); config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, properties.getProducer().getDeliveryTimeoutMs()); config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getProducer().getRequestTimeoutMs()); config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, properties.getProducer().getMaxInFlightRequests()); config.putAll(properties.getProducer().getConfig()); return new DefaultKafkaProducerFactory<>(config); } @Bean @ConditionalOnMissingBean public KafkaTemplate<String, byte[]> kafkaTemplate(ProducerFactory<String, byte[]> producerFactory) { KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(producerFactory); template.setDefaultTopic(properties.getProducer().getTopic()); return template; } // Consumer Configuration @Bean @ConditionalOnMissingBean public ConsumerFactory<String, byte[]> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); config.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumer().getGroupId()); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getConsumer().getMaxPollRecords()); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, properties.getConsumer().getMaxPollIntervalMs()); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getConsumer().getSessionTimeoutMs()); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, properties.getConsumer().getHeartbeatIntervalMs()); config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, properties.getConsumer().getFetchMaxBytes()); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getConsumer().isAutoCommitEnabled()); config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getConsumer().getAutoCommitIntervalMs()); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset()); config.putAll(properties.getConsumer().getConfig()); return new DefaultKafkaConsumerFactory<>(config); } @Bean @ConditionalOnMissingBean public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory( ConsumerFactory<String, byte[]> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(1); // Параллелизм factory.getContainerProperties().setPollTimeout(3000); // Таймаут опроса factory.setCommonErrorHandler(errorHandler()); // Настраиваем обработку ошибок с ретраями return factory; } @Bean public DefaultErrorHandler errorHandler() { BackOff backOff = new FixedBackOff( properties.getConsumer().getRetryBackoffMs(), properties.getConsumer().getMaxRetries() ); DefaultErrorHandler errorHandler = new DefaultErrorHandler( (record, exception) -> { System.err.println("Failed to process record: " + record + ", exception: " + exception.getMessage()); }, // Логирование ошибок backOff ); return errorHandler; } }
Что делает этот код:
-
создаёт
KafkaProducer
иKafkaConsumer
с настройками изKafkaProperties
; -
использует
@ConditionalOnMissingBean
, чтобы пользователь мог переопределить бины; -
добавляет
DisposableBean
для корректного закрытия ресурсов.
Добавление файла импортов автоконфигурации
В процессе разработки стартера я добавил файл src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports, который заменил устаревший подход с spring.factories для указания автоконфигурационных классов в Spring Boot 3.x. Этот файл содержит список классов автоконфигурации, которые Spring Boot должен загрузить.
Пример содержимого:
io.github.bigbox89.KafkaAutoConfiguration
-
Назначение. Сообщает Spring Boot, что
KafkaAutoConfiguration
является точкой входа для автоконфигурации стартера. Это упрощает обнаружение и регистрацию конфигурации без необходимости полного сканирования classpath.
Что будет, если его не добавить
Если файл org.springframework.boot.autoconfigure.AutoConfiguration.imports отсутствует:
-
Автоконфигурация не загрузится. Spring Boot не найдёт класс KafkaAutoConfiguration, и стартер не будет применён, даже если все зависимости и свойства указаны корректно.
-
Тихий сбой. Приложение запустится, но продюсер и консюмер Kafka не будут настроены, что может привести к неочевидным ошибкам (например,
@KafkaListener
не сработает). -
Совместимость. В версиях Spring Boot до 3.0 можно было использовать spring.factories, но в 3.x без AutoConfiguration.imports стартер становится неработоспособным.
Примеры использования
Продюсер
Пример ProducerApplication в producer-example/src/main/java/com/app/petr/ProducerApplication.java
package com.app.petr; import app.petr.Message; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import java.nio.ByteBuffer; @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Bean public ApplicationRunner runner(KafkaTemplate<String, byte[]> kafkaTemplate, KafkaProperties properties) { return args -> { String topic = properties.getProducer().getTopic(); if (topic == null) { throw new IllegalStateException("Producer topic is not configured"); } for (int i = 0; i < 100; i++) { Message message = Message.newBuilder() .setId("id-" + i) .setContent("Message " + i) .setTimestamp(System.currentTimeMillis()) .build(); ByteBuffer buffer = message.toByteBuffer(); byte[] messageBytes = new byte[buffer.remaining()]; buffer.get(messageBytes); kafkaTemplate.send(topic, message.getId().toString(), messageBytes); System.out.println("Sent message: " + message.getId()); Thread.sleep(1000); } }; } }
Консюмер
Пример ConsumerApplication в consumer-example/src/main/java/com/app/petr/ConsumerApplication.java
package com.app.petr; import app.petr.Message; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import java.nio.ByteBuffer; @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @KafkaListener(topics = "#{kafkaProperties.consumer.topic}", groupId = "#{kafkaProperties.consumer.groupId}") public void listen(byte[] message) throws Exception { Message avroMessage = Message.fromByteBuffer(ByteBuffer.wrap(message)); System.out.printf("Received message: id=%s, content=%s, timestamp=%d%n", avroMessage.getId(), avroMessage.getContent(), avroMessage.getTimestamp()); } }
Тестирование с Testcontainers
Чтобы убедиться, что стартер работает, добавим интеграционные тесты:
Код
package com.app.petr; import app.petr.Message; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.ByteArrayOutputStream; import java.time.Duration; import java.util.Collections; import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers public class IntegrationProducerAndConsumerTest { private static final String TOPIC = "rest_data"; @Container private static final KafkaContainer kafkaContainer = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); private KafkaProducer<String, byte[]> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(props); } private KafkaConsumer<String, byte[]> createConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new KafkaConsumer<>(props); } @Test public void testProducerAndConsumer() throws Exception { KafkaProducer<String, byte[]> producer = createProducer(); KafkaConsumer<String, byte[]> consumer = createConsumer(); consumer.subscribe(Collections.singleton(TOPIC)); Message message = Message.newBuilder() .setId("test-id") .setContent("Test content") .setTimestamp(System.currentTimeMillis()) .build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class); writer.write(message, encoder); encoder.flush(); byte[] serializedMessage = out.toByteArray(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); producer.send(record).get(); producer.flush(); SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5)); assertThat(records.isEmpty()).isFalse(); records.forEach(consumerRecord -> { try { Decoder decoder = DecoderFactory.get().binaryDecoder(consumerRecord.value(), null); Message receivedMessage = reader.read(null, decoder); assertThat(receivedMessage.getId().toString()).isEqualTo(message.getId()); assertThat(receivedMessage.getContent().toString()).isEqualTo(message.getContent()); assertThat(receivedMessage.getTimestamp()).isEqualTo(message.getTimestamp()); } catch (Exception e) { throw new RuntimeException("Failed to deserialize message", e); } }); producer.close(); consumer.close(); } }
Также добавим отдельно тесты для продюсера:
Код
package com.app.petr; import app.petr.Message; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TimeoutException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.ByteArrayOutputStream; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @Testcontainers public class ProducerApplicationTest { private static final String TOPIC = "test_data"; @Container private static final KafkaContainer kafkaContainer = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); @BeforeAll static void setup() { kafkaContainer.start(); } private KafkaProducer<String, byte[]> createProducer(Properties props) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(props); } private byte[] serializeMessage(Message message) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class); writer.write(message, encoder); encoder.flush(); return out.toByteArray(); } @Test public void testProducerWithDefaultConfig() throws Exception { Properties props = new Properties(); KafkaProducer<String, byte[]> producer = createProducer(props); Message message = Message.newBuilder() .setId("test-id-1") .setContent("Default config test") .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage = serializeMessage(message); ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); Future<?> future = producer.send(record); producer.flush(); assertThat(future.get()).isNotNull(); // Проверяем успешную отправку producer.close(); } @Test public void testProducerWithIdempotenceEnabled() throws Exception { Properties props = new Properties(); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Включаем идемпотентность props.put(ProducerConfig.ACKS_CONFIG, "all"); // Требуется для идемпотентности props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // Ограничение для идемпотентности KafkaProducer<String, byte[]> producer = createProducer(props); Message message = Message.newBuilder() .setId("test-id-2") .setContent("Idempotent test") .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage = serializeMessage(message); ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); Future<?> future = producer.send(record); producer.send(record); // Отправляем тот же ключ ещё раз producer.flush(); assertThat(future.get()).isNotNull(); // Успешная отправка с идемпотентностью producer.close(); } @Test public void testProducerWithAcksZero() throws Exception { Properties props = new Properties(); props.put(ProducerConfig.ACKS_CONFIG, "0"); // Без подтверждений KafkaProducer<String, byte[]> producer = createProducer(props); Message message = Message.newBuilder() .setId("test-id-3") .setContent("Acks=0 test") .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage = serializeMessage(message); ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); Future<?> future = producer.send(record); producer.flush(); assertThat(future.get()).isNotNull(); // Отправка без ожидания подтверждения producer.close(); } @Test public void testProducerWithRetries() throws Exception { Properties props = new Properties(); props.put(ProducerConfig.RETRIES_CONFIG, "3"); // 3 попытки props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100"); // Задержка между попытками 100 мс props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500"); // Уменьшаем таймаут запроса до 500 мс props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1000"); // Таймаут доставки 1000 мс (должен быть >= linger.ms + request.timeout.ms) props.put(ProducerConfig.LINGER_MS_CONFIG, "0"); // Явно задаём linger.ms для ясности KafkaProducer<String, byte[]> producer = createProducer(props); Message message = Message.newBuilder() .setId("test-id-4") .setContent("Retries test") .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage = serializeMessage(message); ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); Future<?> future = producer.send(record); producer.flush(); assertThat(future.get()).isNotNull(); // Успешная отправка с ретраями producer.close(); } @Test public void testProducerWithShortTimeout() { Properties props = new Properties(); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "1"); // Очень короткий таймаут доставки KafkaProducer<String, byte[]> producer = createProducer(props); Message message = Message.newBuilder() .setId("test-id-5") .setContent("Short timeout test") .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage; try { serializedMessage = serializeMessage(message); } catch (Exception e) { throw new RuntimeException("Serialization failed", e); } ProducerRecord<String, byte[]> record = new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage); Future<?> future = producer.send(record); // Ожидаем исключение из-за короткого таймаута assertThrows(ExecutionException.class, future::get, "Expected timeout exception due to short timeout"); assertThat(future.isDone()).isTrue(); producer.close(); } }
И конcюмера:
Код
package com.app.petr; import app.petr.Message; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.ByteArrayOutputStream; import java.time.Duration; import java.util.Collections; import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; @Testcontainers public class ConsumerApplicationTest { private static final String TOPIC = "consumer_test_data"; @Container private static final KafkaContainer kafkaContainer = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); @BeforeAll static void setup() { kafkaContainer.start(); } @BeforeEach void clearTopic() { Properties adminProps = new Properties(); adminProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers()); try (AdminClient adminClient = AdminClient.create(adminProps)) { adminClient.deleteTopics(Collections.singleton(TOPIC)).all().get(); adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC, 1, (short) 1))).all().get(); } catch (Exception e) { // Игнорируем ошибки, если топик не существовал } } private KafkaProducer<String, byte[]> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(props); } private KafkaConsumer<String, byte[]> createConsumer(Properties props) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); return new KafkaConsumer<>(props); } private byte[] serializeMessage(Message message) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); SpecificDatumWriter<Message> writer = new SpecificDatumWriter<>(Message.class); writer.write(message, encoder); encoder.flush(); return out.toByteArray(); } private Message deserializeMessage(byte[] data) throws Exception { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); SpecificDatumReader<Message> reader = new SpecificDatumReader<>(Message.class); return reader.read(null, decoder); } private void sendMessages(int count) throws Exception { KafkaProducer<String, byte[]> producer = createProducer(); for (int i = 0; i < count; i++) { Message message = Message.newBuilder() .setId("id-" + i) .setContent("Message " + i) .setTimestamp(System.currentTimeMillis()) .build(); byte[] serializedMessage = serializeMessage(message); producer.send(new ProducerRecord<>(TOPIC, message.getId().toString(), serializedMessage)).get(); } producer.flush(); producer.close(); } @Test public void testConsumerWithDefaultConfig() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group-" + System.nanoTime()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<String, byte[]> consumer = createConsumer(props); consumer.subscribe(Collections.singleton(TOPIC)); sendMessages(5); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5)); assertThat(records.count()).isEqualTo(5); consumer.close(); } @Test public void testConsumerWithMaxPollRecords() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "max-poll-group-" + System.nanoTime()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); KafkaConsumer<String, byte[]> consumer = createConsumer(props); consumer.subscribe(Collections.singleton(TOPIC)); sendMessages(5); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5)); assertThat(records.count()).isLessThanOrEqualTo(2); consumer.close(); } @Test public void testConsumerWithAutoOffsetResetLatest() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "latest-group-" + System.nanoTime()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); KafkaConsumer<String, byte[]> consumer = createConsumer(props); // Подписываемся и вызываем poll для завершения регистрации consumer.subscribe(Collections.singleton(TOPIC)); consumer.poll(Duration.ofSeconds(20)); // Даём Kafka время зарегистрировать консюмера // Убеждаемся, что до отправки сообщений ничего не читается ConsumerRecords<String, byte[]> recordsBefore = consumer.poll(Duration.ofSeconds(1)); assertThat(recordsBefore.isEmpty()).isTrue(); // Отправляем 3 сообщения после полной регистрации sendMessages(3); // Читаем новые сообщения ConsumerRecords<String, byte[]> recordsAfter = consumer.poll(Duration.ofSeconds(5)); assertThat(recordsAfter.count()).isEqualTo(3); consumer.close(); } @Test public void testConsumerWithDisableAutoCommit() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "no-auto-commit-group-" + System.nanoTime()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, byte[]> consumer = createConsumer(props); consumer.subscribe(Collections.singleton(TOPIC)); sendMessages(5); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5)); assertThat(records.count()).isEqualTo(5); consumer.close(); consumer = createConsumer(props); consumer.subscribe(Collections.singleton(TOPIC)); ConsumerRecords<String, byte[]> recordsAgain = consumer.poll(Duration.ofSeconds(5)); assertThat(recordsAgain.count()).isEqualTo(5); consumer.close(); } @Test public void testConsumerWithShortSessionTimeout() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "short-session-group-" + System.nanoTime()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000"); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000"); KafkaConsumer<String, byte[]> consumer = createConsumer(props); consumer.subscribe(Collections.singleton(TOPIC)); sendMessages(3); ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(5)); assertThat(records.count()).isEqualTo(3); consumer.close(); } }
Как работает тест:
-
Testcontainers запускает Kafka-контейнер;
-
сообщение отправляется через продюсер и читается консюмером;
-
проверяется соответствие отправленного и полученного сообщения.
Преимущества стартера
-
Простота подключения. Достаточно добавить зависимость и настроить application.yml.
-
Гибкость. Возможность переопределить бины или добавить кастомные настройки.
-
Типобезопасность. Avro обеспечивает строгую структуру данных.
Возможные ошибки и их решение
Проблема 1: генерация Avro-классов внутри стартера
Ситуация: изначально я использовал avro-maven-plugin для генерации классов из message.avdl в стартере. Это ограничивало гибкость, так как схема была фиксированной.
Решение:
-
убрал генерацию из стартера;
-
добавил параметр avro-schema-path в KafkaProperties, чтобы пользователь указывал путь к схеме;
-
переложил ответственность за генерацию на проект пользователя.
Пример конфигурации:
apppetr: kafka: avro-schema-path: ${project.basedir}/src/main/resources/avro/message.avdl
Проблема 2: ошибки в тестах с auto.offset.reset=latest
Ситуация: тест testConsumerWithAutoOffsetResetLatest
ожидал 3 сообщения, но получал 1 из-за асинхронности подписки.
Решение:
-
добавил
consumer.poll(Duration.ofSeconds(2))
послеsubscribe
, чтобы дождаться регистрации консюмера; -
убрал
Thread.sleep
, сделав тест более надёжным.
consumer.subscribe(Collections.singleton(TOPIC)); consumer.poll(Duration.ofSeconds(20)); sendMessages(3); assertThat(consumer.poll(Duration.ofSeconds(5)).count()).isEqualTo(3);
Примеры применения стартера
1. Надёжная доставка транзакций
Задача: гарантировать доставку без дубликатов.
Конфигурация:
apppetr: kafka: bootstrap-servers: kafka1:9092,kafka2:9092 producer: topic: transactions idempotence-enabled: true acks: all retries: 5 retry-backoff-ms: 2000 avro-schema-path: src/main/resources/avro/transaction.avdl
Код:
@Autowired private KafkaTemplate<String, byte[]> kafkaTemplate; public void sendTransaction(Transaction tx) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); SpecificDatumWriter<Transaction> writer = new SpecificDatumWriter<>(Transaction.class); writer.write(tx, encoder); encoder.flush(); kafkaTemplate.send("transactions", tx.getId().toString(), out.toByteArray()); }
2. Высокоскоростной сбор логов
Задача: быстрая отправка логов с допустимой потерей.
Конфигурация:
apppetr: kafka: bootstrap-servers: localhost:9092 producer: topic: logs acks: 0 max-in-flight-requests: 10 consumer: topic: logs group-id: log-collector max-poll-records: 1000 auto-offset-reset: latest avro-schema-path: src/main/resources/avro/log.avdl
Код:
@KafkaListener(topics = "logs", groupId = "log-collector") public void processLog(byte[] data) throws Exception { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); SpecificDatumReader<Log> reader = new SpecificDatumReader<>(Log.class); Log log = reader.read(null, decoder); System.out.println("Log: " + log.getMessage()); }
3. Обработка событий с ретраями
Задача: повторная обработка при сбоях.
Конфигурация:
apppetr: kafka: bootstrap-servers: kafka:9092 consumer: topic: events group-id: event-processor retry-backoff-ms: 2000 max-retries: 5 avro-schema-path: src/main/resources/avro/event.avdl
Код:
@KafkaListener(topics = "events", groupId = "event-processor") public void handleEvent(byte[] data) throws Exception { Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); SpecificDatumReader<Event> reader = new SpecificDatumReader<>(Event.class); Event event = reader.read(null, decoder); processEvent(event); // Логика с возможными исключениями }
Бонус дочитавшим
Памятка-шпаргалка по основным параметрам конфигурации Kafka.
Общие параметры
Параметр |
Описание |
Тип |
По умолчанию |
---|---|---|---|
bootstrap.servers |
Список адресов брокеров Kafka |
String |
localhost:9092 |
Параметры продюсера
Параметр |
Описание |
Тип |
По умолчанию |
---|---|---|---|
key.serializer |
Сериализатор ключа |
Class |
— |
value.serializer |
Сериализатор значения |
Class |
— |
acks |
Уровень подтверждений (0, 1, all) |
String |
1 |
retries |
Количество ретраев при сбоях |
int |
0 |
Задержка между ретраями (мс) |
int |
100 |
|
enable.idempotence |
Включение идемпотентности |
boolean |
false |
max.in.flight.requests.per.connection |
Макс. неподтверждённых запросов |
int |
5 |
Общий таймаут доставки (мс) |
int |
120000 (2 мин) |
|
Таймаут запроса к брокеру (мс) |
int |
30000 (30 сек) |
|
buffer.memory |
Размер буфера для отправки (байт) |
long |
33554432 (32MB) |
batch.size |
Размер батча для отправки (байт) |
int |
16384 (16KB) |
Задержка перед отправкой батча (мс) |
int |
0 |
|
compression.type |
Тип сжатия (none, gzip, snappy, lz4, zstd) |
String |
none |
Параметры консюмера
Параметр |
Описание |
Тип |
По умолчанию |
---|---|---|---|
key.deserializer |
Десериализатор ключа |
Class |
— |
value.deserializer |
Десериализатор значения |
Class |
— |
Идентификатор группы потребителей |
String |
null |
|
auto.offset.reset |
Политика сброса оффсета (earliest, latest, none) |
String |
latest |
enable.auto.commit |
Включение автокоммита оффсетов |
boolean |
true |
Интервал автокоммита (мс) |
int |
5000 (5 сек) |
|
max.poll.records |
Макс. записей за один poll |
int |
500 |
Макс. интервал между poll (мс) |
int |
300000 (5 мин) |
|
Таймаут сессии группы (мс) |
int |
10000 (10 сек) |
|
Интервал heartbeat (мс) |
int |
3000 (3 сек) |
|
fetch.max.bytes |
Макс. размер выборки (байт) |
int |
52428800 (50MB) |
fetch.min.bytes |
Мин. размер выборки (байт) |
int |
1 |
Макс. ожидание данных (мс) |
int |
500 |
Полезные заметки
-
Идемпотентность. Для
enable.idempotence=true
требуетсяacks=all
иretries > 0
. -
Производительность. Увеличьте
batch.size
иlinger.ms
для продюсера илиmax.poll.records
для консюмера, чтобы повысить пропускную способность. -
Надёжность. Используйте
acks=all
и высокийretries
для продюсера, отключитеenable.auto
.commit
для точного контроля оффсетов в консюмере. -
Отладка. Логируйте
group.id
и проверяйтеauto.offset.reset
, если данные не читаются.
Заключение
Создание kafka-spring-boot-starter позволило мне упростить интеграцию Kafka в проекты на Spring Boot. Проблемы с Avro и тестами научили меня гибкости и важности синхронизации в асинхронных системах. Надеюсь, этот опыт вдохновит вас на создание собственных библиотек!
Вопрос к читателям: какую функциональность вы бы добавили в такой стартер? Делитесь идеями в комментариях!
Попробуйте внедрить стартер в свой проект! Какие задачи вы решаете с Kafka? Делитесь опытом в комментариях — обсудим, как улучшить этот подход!
Исходный код доступен на GitHub https://github.com/bigbox89/kafka-spring-boot-starter . Если у вас возникнут вопросы по настройке или тестированию, пишите — разберёмся вместе!
ссылка на оригинал статьи https://habr.com/ru/articles/894402/
Добавить комментарий