До свидания, Kafka, или graceful shutdown на Spring Boot для Kafka

от автора

Привет! В этой статье я немного объясню важность graceful shutdown и расскажу как сделать плавное завершение работы твоего Spring Boot приложения, которое взаимодействует с Kafka.

Представь, что ты сидишь в кафе, пьешь свой раф на кокосовом, ожидаешь свой ролл и круассан, и решаешь срочно уйти, потому что позвонили с работы. Но тут появляется официант и приносит заказанный тобой круассан. Ты же не встанешь и не уйдешь сразу? Конечно нет — сначала попросишь отменить ролл, съешь круассан, затем допьешь раф и только потом уйдешь. Примерно такая же история должна происходить с твоим приложением: оно должно перестать принимать новые дела, закончить старые, а только потом уже спокойно выключаться.

Правильная настройка graceful shutdown важна не только для того, чтобы ты всегда мог допить кофе, но и повышает целостность данных и стабильность всей системы. Перейдем к делу!

Перестаем принимать запросы

Для этого нам будет необходимо пробежаться по всем запущенным в контейнерах листенерам кафки и завершить их работу. Изменить количество листенеров нам поможет класс для динамического взаимодействия с ними — KafkaListenerEndpointRegistry. (Подробнее можно почитать здесь)

log.info("Starting shutdown kafka components!");  kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {   c.stop();   log.info("Container {} has stopped", c.getListenerId()); }); log.info("Finish stopping kafka listeners");

Дожидаемся конца обработки вычитанных запросов

Чтобы не завершить работу приложения раньше, чем оно закончит обрабатывать уже считанные из кафки запросы, необходимо вести учет количества запросов в обработке. Это можно сделать, например, через потокобезопасные коллекции, но у меня многопоточная обработка вычитанных сообщений реализована с помощью ThreadPoolTaskExecutor, который предоставляет методы учета недообработанных запросов.

Вычитанные запросы могут обрабатываться бесконечно долго, поэтому давайте определим две переменные, которые будут отвечать за наше терпение:

  • tpCheckMaxTimes — максимальное количество проверок на наличие недообработанных запросов, которые мы можем сделать, прежде, чем принудительно завершим программу;

  • tpCheckDelayMs- задержка между проверками в миллисекундах.

Инкапсулируем эти переменные и вынесем их в стендозависимые проперти:

public class ShutDownProperties {     private final long tpCheckDelayMs; // Задержка между проверок тасок у ThreadPool     private final long tpCheckMaxTimes; // Максимальное количество проверок тасок }

Код проверки моего ThreadPoolTaskExecutor с учетом логики принудительной остановки будет выглядеть так:

var counter = shutDownProperties.getTpCheckMaxTimes(); while (flowExecutor.isRunning() && flowExecutor.getActiveCount() > 0 && counter-- > 0) {   log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",     shutDownProperties.getTpCheckDelayMs(), flowExecutor.getActiveCount());   Thread.sleep(shutDownProperties.getTpCheckDelayMs());   } if (counter == -1) {   throw new RuntimeException("Can't wait for end of processing consumed messages!"); }

Таким образом, сервис, который отвечает за функционал плавной остановки приложения будет выглядеть так:

@Slf4j @Service @AllArgsConstructor public class ShutdownService {      private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;     private final ThreadPoolTaskExecutor flowPprbExecutor;     private final ShutDownProperties shutDownProperties;      public void turnOffConsumers() throws InterruptedException {         log.info("Starting shutdown kafka components!");          kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(c -> {             c.stop();             log.info("Container {} has stopped", c.getListenerId());         });         log.info("Finish stopping kafka listeners");          var counter = shutDownProperties.getTpCheckMaxTimes();         while (flowPprbExecutor.isRunning() && flowPprbExecutor.getActiveCount() > 0 && counter-- > 0) {             log.info("Waiting for end of processing consumed messages for {} ms. Active Executors count: {}",                     shutDownProperties.getTpCheckDelayMs(), flowPprbExecutor.getActiveCount());             Thread.sleep(shutDownProperties.getTpCheckDelayMs());         }         if (counter == -1) {             flowPprbExecutor.shutdown();             throw new RuntimeException("Can't wait for end of processing consumed messages!");         }          log.info("No active executors!");     }

Мы реализовали сервис, чтобы наше приложение на Spring Boot могло плавно завершить свою работу, не оставляя за собой бардак в виде потерянных сообщений в Kafka. Осталось только настроить свою облачную среду на взаимодействие с разработанным сервисом и все, ты можешь спать спокойно, зная, что твои сервисы умеют грамотно завершать свои дела, даже если что-то пошло не так 🙂


ссылка на оригинал статьи https://habr.com/ru/articles/908022/