Введение
В рамках задачи возникло требование: если таксономия была удалена до обработки событий расчёта кэша, события, относящиеся к этой таксономии, не должны приводить к созданию кэша.
На первый взгляд решение выглядит очевидным: найти и удалить соответствующие сообщения из Kafka topic. Однако при анализе выяснилось, что точечное физическое удаление сообщений из Kafka не подходит для такого сценария. Поэтому решение было построено не на удалении записей из topic, а на логическом исключении неактуальных событий из обработки.
Исходная схема
В системе есть два сервиса:
-
xbrl-taxonomy— регистрирует и удаляет XBRL-таксономии; -
xbrl-taxonomy-cache— обрабатывает события регистрации таксономий и формирует связанные данные кэша.
При регистрации таксономии сервис xbrl-taxonomy отправляет в Kafka события TaxonomyRegistrationEvent. Для каждой точки входа entryPoint создаётся отдельное сообщение.
Упрощённо событие выглядит так:
public record TaxonomyRegistrationEvent( String version, String comments, LocalDate dateBegin, LocalDate dateEnd, String entryPoint, Instant registrationTimestamp) {}
Producer формирует событие и отправляет его в Kafka:
for (String entryPoint : entryPoints) { TaxonomyRegistrationEvent event = new TaxonomyRegistrationEvent( dto.getVersion(), dto.getComments(), dto.getDateBegin(), dto.getDateEnd(), entryPoint, Instant.now()); String messageKey = dto.getVersion() + "_" + entryPoint + "_" + dto.getDateBegin(); kafkaTemplate.send(topicName, messageKey, event);}
Consumer в сервисе xbrl-taxonomy-cache получает событие и запускает обработку:
@KafkaListener( topics = "${kafka.topic.taxonomy-registration}", containerFactory = "kafkaListenerContainerFactory")public void listen(ConsumerRecord<String, TaxonomyRegistrationEvent> consumerRecord, Acknowledgment acknowledgment) { TaxonomyRegistrationEvent event = consumerRecord.value(); try { processTaxonomyRegistration(event, consumerRecord.key()); } finally { acknowledgment.acknowledge(); }}
Такая схема работает, пока таксономия остаётся актуальной к моменту обработки события. Проблема возникает при удалении таксономии до того, как consumer успел обработать уже опубликованные сообщения.
Проблемный сценарий
Рассмотрим последовательность событий:
-
Пользователь регистрирует таксономию
1.0.0. -
Сервис
xbrl-taxonomyпубликует Kafka-события по точкам входа. -
Сервис
xbrl-taxonomy-cacheещё не успевает обработать эти сообщения. -
Пользователь удаляет таксономию
1.0.0. -
Consumer позже получает старое событие регистрации из Kafka.
С технической точки зрения сообщение корректное: оно было опубликовано в topic, имеет key, value и offset.
С точки зрения бизнес-логики это событие уже неактуально: таксономия удалена, значит создавать по ней кэш нельзя.
Следовательно, задача заключается не только в работе с Kafka, но и в проверке актуальности бизнес-сущности на момент обработки события.
Почему не подходит физическое удаление сообщений из Kafka
Первым вариантом было рассмотреть физическое удаление сообщений из topic по версии таксономии или по ключу сообщения.
Однако Kafka хранит данные как append-only log. Сообщения внутри partition упорядочены по offset. Механизм AdminClient.deleteRecords() позволяет удалить записи только до определённого offset в конкретной partition, но не удалить отдельное сообщение по условию.
Это создаёт риск потери данных.
Например, в одной partition могут находиться события разных таксономий:
partition-0Offset 10: taxonomy=1.0.0Offset 11: taxonomy=2.0.0Offset 12: taxonomy=1.0.0Offset 13: taxonomy=3.0.0
Если требуется удалить событие на offset 12, удаление до нужного offset затронет также другие сообщения, включая событие таксономии 2.0.0. Это недопустимо, потому что оно может относиться к активной таксономии и ещё не быть обработанным.
Поэтому прямое удаление сообщений из середины topic не было выбрано как безопасное решение.
Рассмотренные варианты
Удаление через AdminClient.deleteRecords()
Вариант был отклонён, потому что удаление выполняется по offset и затрагивает все записи до указанной позиции в partition. Это может привести к удалению событий других таксономий.
Очистка всего topic
Очистка topic также не подходит. В topic могут находиться события по другим активным таксономиям, поэтому полная очистка приведёт к потере корректных сообщений.
Отдельный topic на каждую таксономию
Теоретически такой подход позволил бы удалять topic целиком при удалении таксономии. Но на практике он усложняет эксплуатацию:
-
увеличивается количество topic;
-
требуется динамическое создание и удаление topic;
-
усложняется конфигурация producer и consumer;
-
появляются дополнительные вопросы мониторинга и сопровождения.
Для рассматриваемого сценария этот вариант оказался избыточным.
Compacted topic и tombstone-сообщения
Compacted topic позволяет хранить последнее значение по ключу. Если отправить сообщение с тем же key и null payload, такая запись будет tombstone-сообщением.
Пример отправки tombstone:
kafkaTemplate.send(topicName, messageKey, null);
Этот механизм можно использовать для последующей очистки устаревших записей. Однако compaction не выполняется мгновенно. До её выполнения consumer всё ещё может прочитать старое событие регистрации.
Поэтому tombstone был выбран как часть решения, но не как единственный механизм защиты.
Проверка актуальности таксономии на стороне consumer
Основным механизмом стала проверка состояния таксономии перед обработкой события.
Если таксономия удалена или находится в неактивном статусе, consumer подтверждает offset и пропускает сообщение без бизнес-обработки.
Такой подход работает даже в том случае, если старое событие физически ещё находится в Kafka topic.
Выбранное решение
В итоге был применён комбинированный подход:
-
Использовать стабильный Kafka key для событий таксономии.
-
При удалении таксономии отправлять tombstone-сообщения по всем связанным ключам.
-
Настроить topic с политикой
compact,delete, если это допускается инфраструктурой. -
Перед обработкой события проверять актуальное состояние таксономии.
-
Если таксономия неактивна, подтверждать offset и пропускать событие.
-
Если проверить состояние не удалось, не подтверждать offset и позволить механизму retry обработать сообщение позже.
Ключевая идея решения: не пытаться гарантировать мгновенное физическое удаление старой записи из Kafka, а гарантировать, что consumer не выполнит обработку неактуального события.
Стабильный Kafka key
Для tombstone-сообщений важно, чтобы событие регистрации и событие удаления использовали один и тот же Kafka key.
Если ключ формируется в разных местах вручную, легко получить расхождение. Поэтому формирование key было вынесено в отдельный компонент:
@Componentpublic class TaxonomyKafkaKeyFactory { public String cacheEventKey(String version, String entryPoint) { return version + "|" + entryPoint; }}
Producer регистрации использует этот компонент:
String messageKey = keyFactory.cacheEventKey(dto.getVersion(), entryPoint);kafkaTemplate.send(topicName, messageKey, event);
При удалении таксономии используется тот же key:
String messageKey = keyFactory.cacheEventKey(version, entryPoint);kafkaTemplate.send(topicName, messageKey, null);
Так обеспечивается соответствие между исходным событием и tombstone-записью.
Отправка tombstone при удалении таксономии
При удалении таксономии необходимо заранее получить список всех entryPoint, относящихся к удаляемой версии. Если сначала удалить данные, может быть потеряна информация о том, по каким ключам нужно отправить tombstone.
Упрощённая логика удаления:
@DeleteMapping("/{version}")public ResponseEntity<String> delete(@PathVariable String version) { Taxonomy taxonomy = taxonomyRepository.findByVersion(version); if (taxonomy == null) { throw TaxonomyException.notFound(); } List<String> entryPointNames = taxonomyService.getEntryPointNames(taxonomy); taxonomyService.delete(taxonomy); taxonomyKafkaProducer.publishTaxonomyDeletionTombstones(version, entryPointNames); return ResponseEntity.ok(MessageFormat.format("Таксономия {0} успешно удалена", version) );}
Метод отправки tombstone:
@Async@Overridepublic void publishTaxonomyDeletionTombstones(String version, Collection<String> entryPoints) { if (entryPoints == null || entryPoints.isEmpty()) { log.warn("Нет entryPoint для отправки tombstone по таксономии {}", version); return; } for (String entryPoint : entryPoints) { String messageKey = keyFactory.cacheEventKey(version, entryPoint); kafkaTemplate.send(topicName, messageKey, null) .whenComplete((result, ex) -> { if (ex != null) { log.error("[{}] Ошибка отправки tombstone", messageKey, ex); } else { log.info("[{}] Tombstone отправлен. partition={}, offset={}", messageKey, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }); }}
Если удаление таксономии выполняется в транзакции, отправку tombstone лучше выполнять после успешного commit. Это позволяет избежать ситуации, когда tombstone уже отправлен в Kafka, но транзакция удаления в БД была отменена.
Для этого можно использовать @TransactionalEventListener:
@Component@RequiredArgsConstructorpublic class TaxonomyDeletedEventListener { private final TaxonomyKafkaProducer taxonomyKafkaProducer; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onTaxonomyDeleted(TaxonomyDeletedEvent event) { taxonomyKafkaProducer.publishTaxonomyDeletionTombstones( event.version(), event.entryPointNames() ); }}
Настройка topic
Topic можно настроить с политикой compact,delete:
@Beanpublic NewTopic taxonomyRegistrationTopic( @Value("${kafka.topic.taxonomy-registration.name}") String topicName, @Value("${kafka.topic.taxonomy-registration.partitions}") int partitions, @Value("${kafka.topic.taxonomy-registration.replicas}") short replicas) { return TopicBuilder.name(topicName) .partitions(partitions) .replicas(replicas) .config(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") .config(TopicConfig.RETENTION_MS_CONFIG, "604800000") .config(TopicConfig.DELETE_RETENTION_MS_CONFIG, "86400000") .build();}
Такая настройка помогает Kafka со временем очищать устаревшие записи. При этом compaction не заменяет проверку бизнес-состояния на стороне consumer.
Endpoint состояния таксономии
Для проверки актуальности таксономии был добавлен endpoint в сервис xbrl-taxonomy.
Изначально можно было бы возвращать простой boolean, например true, если таксономия активна, и false в остальных случаях. Но для диагностики и логирования полезно знать причину, по которой таксономия считается неактивной.
Поэтому был использован DTO:
public record TaxonomyStateResponse( String version, boolean active, String status) {}
Controller:
@GetMapping("/{version}/state")public ResponseEntity<TaxonomyStateResponse> getState(@PathVariable String version) { return ResponseEntity.ok(taxonomyStateService.getState(version));}
Service:
@Transactional(readOnly = true)public TaxonomyStateResponse getState(String version) { Taxonomy taxonomy = taxonomyRepository.findByVersion(version); if (taxonomy == null) { return new TaxonomyStateResponse(version, false, "NOT_FOUND"); } boolean active = taxonomy.getStatus() == TaxonomyStatus.ACTIVE; return new TaxonomyStateResponse( version, active, taxonomy.getStatus().name() );}
Consumer использует поле active для принятия решения, а поле status остаётся в логах:
version=1.0.0, active=false, status=DELETING
Это делает поведение системы более прозрачным при разборе инцидентов и тестировании сценариев удаления.
Клиент состояния в сервисе cache
В сервисе xbrl-taxonomy-cache был добавлен клиент для проверки состояния таксономии:
public interface TaxonomyStateClient { TaxonomyStateResponse getState(String version);}
Пример реализации через REST:
@Component@RequiredArgsConstructorpublic class TaxonomyStateRestClient implements TaxonomyStateClient { private final RestClient taxonomyRestClient; @Override public TaxonomyStateResponse getState(String version) { return taxonomyRestClient.get() .uri("/api/taxonomy/{version}/state", version) .retrieve() .body(TaxonomyStateResponse.class); }}
Если сервис состояния временно недоступен, consumer не должен подтверждать offset. В таком случае лучше пробросить исключение, чтобы сработал настроенный механизм повторной обработки или отправки в DLT.
Обработка tombstone в consumer
После добавления tombstone-сообщений consumer должен корректно обрабатывать null payload.
if (event == null) { log.info("Получен tombstone. key={}, partition={}, offset={}", consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset()); acknowledgment.acknowledge(); return;}
Tombstone не запускает бизнес-обработку. Consumer только подтверждает получение сообщения и завершает обработку.
Проверка активности перед обработкой события
Основная логика listener была изменена так, чтобы перед обработкой проверять состояние таксономии:
@KafkaListener( topics = "${kafka.topic.taxonomy-registration}", containerFactory = "kafkaListenerContainerFactory")public void listen(ConsumerRecord<String, TaxonomyRegistrationEvent> consumerRecord, Acknowledgment acknowledgment) { TaxonomyRegistrationEvent event = consumerRecord.value(); if (event == null) { log.info("Получен tombstone. key={}", consumerRecord.key()); acknowledgment.acknowledge(); return; } String eventKey = resolveEventKey(consumerRecord.key(), event); try { TaxonomyStateResponse state = taxonomyStateClient.getState(event.version()); if (!state.active()) { log.info("Таксономия неактивна. version={}, status={}, eventKey={}. Событие пропущено.", state.version(), state.status(), eventKey); acknowledgment.acknowledge(); return; } processTaxonomyRegistration(event, eventKey); acknowledgment.acknowledge(); } catch (Exception e) { log.error("Ошибка обработки события {}", eventKey, e); throw e; }}
Теперь событие обрабатывается только в том случае, если таксономия находится в активном состоянии. Если таксономия удалена, удаляется, не найдена или находится в другом неактивном статусе, сообщение подтверждается и пропускается.
Исправление acknowledge-логики
В исходной реализации offset подтверждался в блоке finally:
try { processTaxonomyRegistration(event, eventKey);} finally { acknowledgment.acknowledge();}
Такой подход небезопасен. Если обработка завершается ошибкой, offset всё равно подтверждается, и сообщение не будет обработано повторно.
После изменения логика стала более явной:
-
событие успешно обработано, выполняется
ack; -
событие осознанно пропущено из-за неактивной таксономии, выполняется
ack; -
получен tombstone, выполняется
ack; -
произошла ошибка обработки или проверки состояния, исключение пробрасывается дальше,
ackне выполняется.
Такой подход лучше согласуется с ручным подтверждением offset и механизмом retry/DLT.
Идемпотентность обработки
Даже после добавления проверки состояния consumer должен оставаться идемпотентным.
Kafka может доставить сообщение повторно. Например, приложение может выполнить бизнес-операцию, но завершиться до подтверждения offset. После рестарта consumer получит то же сообщение ещё раз.
Поэтому проверка уже созданных данных остаётся полезной:
if (hasRoles(event.entryPoint())) { log.info("Для точки входа {} уже вычислен кэш. Событие пропущено.", event.entryPoint()); return;}
При этом in-memory набор обработанных событий подходит только как дополнительная оптимизация в рамках жизни одного процесса. Для устойчивой идемпотентности лучше использовать состояние в БД, уникальные ограничения, таблицу обработанных событий или статусы обработки.
Что в итоге применили
В результате был применён следующий механизм:
-
события регистрации отправляются со стабильным key
version|entryPoint; -
при удалении таксономии заранее собирается список связанных
entryPoint; -
после успешного удаления публикуются tombstone-сообщения с теми же ключами;
-
topic может быть настроен с политикой
compact,delete; -
в
xbrl-taxonomyдобавлен endpoint состояния таксономии; -
состояние возвращается через DTO
TaxonomyStateResponse, содержащийversion,activeиstatus; -
consumer перед обработкой события проверяет
active; -
если
active=false, событие подтверждается и пропускается; -
если проверить состояние не удалось, offset не подтверждается;
-
acknowledge()вызывается только после успешной обработки или осознанного пропуска.
Такой механизм позволяет выполнить бизнес-требование: события удалённой таксономии не будут обработаны сервисом xbrl-taxonomy-cache, даже если физически они ещё находятся в Kafka topic.
Итоговая схема
Сценарий регистрации:
xbrl-taxonomy -> TaxonomyRegistrationEvent(version, entryPoint) -> Kafka topic -> xbrl-taxonomy-cache -> проверка состояния таксономии -> обработка, если active=true
Сценарий удаления до обработки события:
xbrl-taxonomy -> удаление таксономии -> TaxonomyStateResponse.active=false -> tombstone в Kafka по ключам version|entryPointxbrl-taxonomy-cache -> получает старое registration-событие -> запрашивает состояние таксономии -> видит active=false -> подтверждает offset -> не выполняет бизнес-обработку
В этой схеме tombstone отвечает за последующую очистку Kafka topic, а проверка состояния таксономии защищает бизнес-логику в момент обработки сообщения.
Вывод
Физическое удаление отдельных необработанных сообщений из Kafka topic не подходит для сценария удаления таксономии, потому что Kafka не предоставляет безопасного точечного удаления записей из середины partition по бизнес-признаку.
Вместо этого был использован комбинированный подход: стабильный Kafka key, tombstone-сообщения для последующей очистки topic, политика compact,delete и обязательная проверка состояния таксономии на стороне consumer.
В результате события удалённой таксономии не обрабатываются сервисом xbrl-taxonomy-cache, даже если физически они ещё находятся в Kafka topic. Это позволяет сохранить корректность бизнес-логики без риска удалить сообщения, относящиеся к другим активным таксономиям.
ссылка на оригинал статьи https://habr.com/ru/articles/1035526/