Как обновить необновляемое: кастомная замена для @RabbitListener

от автора

У вас есть приложение Spring Boot, которое динамически подгружает собственную конфигурацию из Spring Cloud Config Server, то есть является его клиентом. Разумеется, вы хотите, чтобы при изменениях хранимой на сервере конфигурации клиент мог применить новые настройки без перезапуска. Эта тема описана во многих источниках в Интернет, но есть один тонкий момент, который встречается не очень часто, но тем не менее, некоторые заказчики выдвигают одно специфическое требование, которого нельзя достигнуть ни одним стандартным описанным способом, во всяком случае — ничего подобного в явном виде я в сети не нашел. Эта ситуация связана с использованием в клиенте очередей RabbitMQ.

Итак, один заказчик потребовал при разработке приложения, чтобы клиент мог динамически обновить не только те параметры приложения, которые чаще всего меняют, но и такую специфическую настройку, как настройки используемых очередей RabbitMQ. Оказалось, что это не так уж и просто, стандартные способы для этого не подходят. Давайте рассмотрим эту ситуацию и решим ее.

Мы не будем углубляться в то, как настроен сам Spring Cloud Config Server — об этом написано много, есть варианты разных способов предоставления настроек на сервере и на клиенте, чтобы забирать и применять конфигурацию. Остановимся на настройках самого клиента на одном конкретном примере. Пример такой:

На клиенте мы имеем стандартный файл bootstrap.yml с примерно такими настройками:

spring:     application:         name: pyramid     config:         import: optional:configserver:http://localhost:8800     profiles:         active: local     cloud:         config:             fail-fast: true             enabled: true             username: configserver             password: configserver  

Таким образом, у нас реализована подгрузка конфигурации из сервера конфигураций. Далее, чтобы параметры из подгруженной конфигурации динамически менялись в приложении без перезапуска, нам необходимо поставить аннотацию @RefreshScope на классы, которые используют динамические параметры, например, вот так:

@RefreshScope @Service public class Service {     @Value("${rabbitmq.exchange}")     private String defaultExchange;     ... 

Но это не сработает для конфигурационных классов с аннотацией @Configuration, в таком случае мы должны поставить аннотацию @RefreshScope не только на сам класс, но и на каждый бин, который использует динамические параметры, например, вот так:

@Configuration @RefreshScope public class RabbitConfig {     @Value("${rabbitmq.exchange}")     private String defaultExchange;     ...............................     @Bean(SUCCESS_COMMAND_RABBIT_TEMPLATE)     @RefreshScope     public RabbitTemplate rabbitTemplateSuccessCommand() {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());         rabbitTemplate.setDefaultReceiveQueue(successCommandQueue);         rabbitTemplate.setExchange(defaultExchange);         rabbitTemplate.setRoutingKey(successCommandRoutingKey);         return rabbitTemplate;     }     ......... }      

Однако, что будет, если у нас есть специальный класс, который содержит listener из пакета spring-boot-starter-amqp, и заказчик желает, чтобы этот слушатель мог динамически обновлять свои настройки? Например, вот такой:

@Configuration @RefreshScope public class RabbitMeterListener { ......     @RabbitListener(id = "pyramidCommandListener", bindings = @QueueBinding(             value = @Queue(value = "${rabbitmq.commands.queue}"),             exchange = @Exchange(value = "${rabbitmq.MetersUuids.exchange}"),             key = "${rabbitmq.MetersUuids.routingKey}"     ))     public void listenPyramidCommands(String in) throws JsonProcessingException {     .....     какой-то функционал в ответ на событие listener-a     .....     } } 

Мы обнаружим следующий неприятный момент — listener не может обновить собственные параметры динамически, даже если в аннотацию подставлены ссылки на динамические параметры и сам класс имеет аннотацию @RefreshScope — потому что метод с аннотацией @RabbitLister не является бином в обычном понимании этого термина. То есть, если почитать литературу и покопаться в недрах Spring, то мы отыщем, что где-то глубоко внутри все-таки используются именно бины, но сам метод с такой аннотацией не поддерживает ожидаемого функционала, если и на него тоже попытаться подвесить @RefreshScope — это не сработает!

И как же нам быть, если заказчик требует, что у нас был именно слушатель очереди, и чтобы его параметры обновлялись динамически? Решение проблемы существует, и оно заключается в отказе от использования стандартной аннотации @RabbitListener и в создании собственного кастомного слушателя, который мог бы удовлетворить нашим требованиям. Как это сделать?

Прежде всего, мы непосредственно реализуем примерно вот такой метод контейнера слушателей в том же классе RabbitMeterListener вместо аннотированного @RabbitListener метода:

    @Value("${rabbitmq.commands.queue}")     private String metersUuidsQueue;     ......     @Bean     @RefreshScope     public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {         SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);         simpleMessageListenerContainer.addQueues(new Queue(metersUuidsQueue));         simpleMessageListenerContainer.setMessageListener(this::pyramidCommandListener);         simpleMessageListenerContainer.start();         return simpleMessageListenerContainer;     } 

Этот метод уже является обычным бином, поэтому использованные внутри него динамические параметры уже будут подменяться при обновлении их на сервере, @RefreshScope сделает это для нас.

Далее мне понадобился промежуточный метод-адаптер, потому что мой функционал изначально был помещен внутри метода с сигнатурой public void listenPyramidCommands(String in), между тем, как в контейнере слушателей требуется, чтобы параметр this::pyramidCommandListener имел другую сигнатуру, и их нужно было совместить. Я сделал это вот так:

    public void pyramidCommandListener(Message message) {         String in = new String(message.getBody());         try {             listenPyramidCommands(in);         } catch (JsonProcessingException e) {             log.error("Ошибка при конвертации в строку входящей команды из очереди {}", metersUuidsQueue);         }     } 

То есть требуется, чтобы в this::pyramidCommandListener параметр был с типом Message, соответственно, из него нужно извлечь строку с телом сообщения, получаемого, когда listener срабатывает. У вас может быть совсем другая логика обработки принятого сообщения, поэтому детали могут отличаться, их реализация — на ваше усмотрение.

Кроме того, контейнер слушателей требует прямого указания connectionFactory для RabbitMQ, поэтому фабрику тоже придется реализовать в отдельном бине, например, вот так:

    @Bean     @RefreshScope     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, Integer.parseInt(port));         connectionFactory.setUsername(username);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(virtualHost);         return connectionFactory;     } 

Ну вот, казалось бы, у нас есть все бины, размеченные @RefreshScope, которые будут обновляться при подгрузке новой конфигурации. Однако, тут нас ждет еще один «подводный камень». А именно — контейнер слушателей после получения новых динамических параметров конфигурации должен быть перезагружен, потому что сами слушатели, хранящиеся в нем, не воспримут новые параметры! Почему так сделано, зачем, я не стал углубляться в эту проблему, а просто сделал еще один класс, который решает эту задачу:

@Component public class RefreshListener {      private final SimpleMessageListenerContainer simpleMessageListenerContainer;      @Autowired     public RefreshListener(SimpleMessageListenerContainer simpleMessageListenerContainer) {         this.simpleMessageListenerContainer = simpleMessageListenerContainer;     }      // необходимо перезапустить контейнер слушателей после обновления конфигурации     @EventListener     public void onEnvironmentChangeEvent(RefreshScopeRefreshedEvent event) {         simpleMessageListenerContainer.stop();         simpleMessageListenerContainer.start();     } } 

Вот теперь действительно все получилось — динамические параметры из конфигурации после ее обновления также подхватываются, и у меня появилась возможность динамически менять настройки слушателя RabbitMQ на клиенте из централизованного сервера конфигураций, как и требовал заказчик, отлажено и проверено, работает на продуктиве. Кейс нечастый, но встречается иногда и вполне может быть кому-то полезен.

На этом все. Хочу порекомендовать вам бесплатный урок по теме: «Хаки для командной работы в git», который уже 9 июня пройдет на платформе OTUS.


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/670058/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *