Пример использования RabbitMQ Delayed Message Exchange в Java Spring Framework

от автора

image В этом посте я хочу показать как использовать отложенные сообщения в RabbitMQ. В качестве примера задачи, где удобно использовать отложенную очередь, возьму механизм постбеков (s2s ping back, s2s pixel).

В двух словах о механизме постбеков:

1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут

Для повторного уведомления я буду использовать отложенную очередь.

RabbitMQ по умолчанию не умеет задерживать сообщения, они доставляются сразу после публикации. Функционал отложенной доставки доступен в виде плагина rabbitmq-delayed-message-exchange.

Сразу хочу отметить, что плагин экспериментальный. Не смотря на то, что в целом он достаточно стабилен, использовать в продакшене его нужно на свой страх и риск.

Собираем Docker контейнер с RMQ и плагином

За основу я возьму официальный образ с management plugin, пригодится для тестирования.

Dockerfile:

FROM rabbitmq:3.6-management RUN apt-get update && apt-get install -y curl RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange 

Сборка

docker build --tag=x25/rmq-delayed-message-exchange .

Запуск

docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange

Spring AMQP

Spring Framework полностью поддерживает плагин в проекте spring-rabbit. Начиная с версии 1.6.4 можно пользоваться как xml/bean конфигурациями так и аннотациями.

Я буду использовать Spring Boot Amqp Starter.

Зависимость для Maven

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

Зависимость для Gradle

compile "org.springframework.boot:spring-boot-starter-amqp"

Конфигурация через аннотации

При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME), exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME)) public void onMessage(Message<?> message) {     //... }

И при запуске приложения RabbitAdmin автоматически объявит delayed exchange, queue, создаст обработчики событий и привяжет их к аннотированному методу.

Нужно больше потоков для обработки сообщений? Это настраивается через файл внешней конфигурации (свойство spring.rabbitmq.listener.concurrency) или через параметр containerFactory у аннотации:

Пример

//Создаем конфигурацию: @Configuration public class RabbitConfiguration {     @Bean(name = "containerFactory")     @Autowired     public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();         factory.setConnectionFactory(connectionFactory);         factory.setConcurrentConsumers(10);         factory.setPrefetchCount(30);         return factory;     } }  //Добавляем параметр: @RabbitListener(containerFactory = "containerFactory", ...)

Для отправки отложенного сообщения удобно использовать RabbitTemplate:

rabbitTemplate.send(         DELAY_EXCHANGE_NAME,         DELAY_QUEUE_NAME,         MessageBuilder                 .withBody(data)                 .setHeader("x-delay", MESSAGE_DELAY_MS).build() );

Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке x-delay. Максимально допустимое время задержки (2^32-1) мс.

Готовый пример приложения:

@SpringBootApplication public class Application {      private final Logger log = LoggerFactory.getLogger(Application.class);      private final static String DELAY_QUEUE_NAME = "delayed.queue";     private final static String DELAY_EXCHANGE_NAME = "delayed.exchange";      private final static String DELAY_HEADER = "x-delay";     private final static String NUM_ATTEMPT_HEADER = "x-num-attempt";     private final static long   RETRY_BACKOFF = 5000;      @Autowired     private RabbitTemplate rabbitTemplate;      @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),     exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))     public void onMessage(Message<byte[]> message) {          String endpointUrl = new String(message.getPayload());         Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L);          log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt);          if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) {             rabbitTemplate.send(                     DELAY_EXCHANGE_NAME,                     DELAY_QUEUE_NAME,                     MessageBuilder                             .withBody(message.getPayload())                             .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF)                             .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1)                             .build()             );         }     }      private boolean doNotifyEndpoint(String url) {         try {             HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();             /* @todo: set up connection timeouts */             return (connection.getResponseCode() == 200);         } catch (Exception e) {             log.error(e.getMessage());             return false;         }     }      public static void main(String[] args) {         SpringApplication.run(Application.class, args);     } }

application.yml

spring:   rabbitmq:     host: 127.0.0.1     port: 5672     username: guest     password: guest     virtual-host: /     listener:       prefetch: 10       concurrency: 50

build.gradle

buildscript {     repositories {         mavenCentral()     }     dependencies {         classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")     } }  apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot'  jar {     baseName = 'rmq-delayed-demo'     version =  '0.1.0' }  repositories {     mavenCentral() }  sourceCompatibility = 1.8 targetCompatibility = 1.8  dependencies {     compile("org.springframework.boot:spring-boot-starter-amqp")     testCompile("org.springframework.boot:spring-boot-starter-test") }

Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:

image

Логи:

2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1 2016-12-21 14:27:25.941: Connection refused (Connection refused) 2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2 2016-12-21 14:27:30.951: Connection refused (Connection refused) 2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3

Конфигурация через XML

При использовании XML конфигураций нужно просто установить у exchange-бина свойство delayed в true и RabbitAdmin сделает все остальное за вас.

Пример xml-конфигурации в связке с Integration Framework

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"        xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"        xmlns:rabbit="http://www.springframework.org/schema/rabbit"        xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd   http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd   http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">      <int:channel id="to-delayed-rmq" />      <int-amqp:outbound-channel-adapter channel="to-delayed-rmq"                                        amqp-template="rabbitTemplate"                                        exchange-name="delayed.exchange"                                        routing-key="delayed.binding"                                        mapped-request-headers="x-delay" />      <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue"                                       queue-names="delayed.queue"                                       message-converter="amqpMessageConverter"                                       connection-factory="rabbitConnectionFactory"                                       concurrent-consumers="10"                                       prefetch-count="50" />      <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage">         <bean id="postbackServiceActivator" class="PostbackServiceActivator" />     </int:service-activator>      <rabbit:queue name="delayed.queue" />      <rabbit:direct-exchange name="delayed.exchange" delayed="true">         <rabbit:bindings>             <rabbit:binding queue="delayed.queue" key="delayed.binding" />         </rabbit:bindings>     </rabbit:direct-exchange>  </beans>

Полезные ссылки

ссылка на оригинал статьи https://habrahabr.ru/post/318118/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *