RabbitMQ Spring tutorial

от автора

На сайте rabbitmq.com уже есть подробные примеры и клиент для java. Однако если в проекте уже используется спринг, то намного удобнее использовать библиотеку Spring AMQP. Эта статья содержит реализацию всех шести официальных примеров работы с RabbitMQ.

Сразу ссылка на проекты на GitHub.

Для примеров я буду использовать простейшее приложение на спринге. После того, как пользователь перейдёт по опредленной ссылке, в RabbitMQ будет посылаться сообщение которое будет отправляться в один из листенеров. Листенер в свою очередь будет просто выводить сообщение в лог. На хабре уже были переводы официальных туториалов на php и python, и я думаю многие уже знакомы с принципами работы rabbitmq, поэтому я сконцентрируюсь на работе именно с Spring AMQP.

Подготовка

Установка RabbitMQ

Установка RabbitMQ детально описана на официальном сайте. Тут проблем возникнуть не должно.

Настройка Spring

Для простоты я использовал Spring Boot. Он отлично подходит, чтобы быстро развернуть приложения на спринге и не заниматься его долгим кофигурированием. При этом сам Spring AMQP я буду конфигурировать «классическим способом» — т.е. так, как я конфигурировал в реальном проекте без Spring Boot (разве что в ConnectionFactory не описаны некоторые специфичные для heroku вещи).

Cодержимое минимального pom.xml необходимого нам для запуска. Здесь уже есть Spring boot и Spring AMQP.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>      <groupId>rabbitmq</groupId>     <artifactId>example-1</artifactId>     <version>1.0-SNAPSHOT</version>     <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>1.2.4.RELEASE</version>     </parent>     <dependencies>         <dependency>             <groupId>org.springframework.amqp</groupId>             <artifactId>spring-rabbit</artifactId>             <version>1.4.5.RELEASE</version>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-web</artifactId>         </dependency>     </dependencies> </project> 

Основной файл конфигурации. Кроме имени класса, его содержимое будет одинаковым для всех наших примеров.

package com.rabbitmq.example1;  import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import;  @EnableAutoConfiguration @ComponentScan @Import(RabbitConfiguration.class) public class Example1Configuration {     public static void main(String[] args) throws Exception {         SpringApplication.run(Example1Configuration.class, args);     } } 

Пример 1. «Hello World!»

Для работы с RabbitMQ нам потребуются следующие бины:
— сonnectionFactory — для соединения с RabbitMQ;
— rabbitAdmin — для регистрации/отмены регистрации очередей и т.п.;
— rabbitTemplate — для отправки сообщений (producer);
— myQueue1 — собственно очередь куда посылаем сообщения;
— messageListenerContainer — принимает сообщения (consumer).

Код конфигурации для этих бинов

package com.rabbitmq.example1;  import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean;  public class RabbitConfiguration {     Logger logger = Logger.getLogger(RabbitConfiguration.class);      //настраиваем соединение с RabbitMQ     @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory =                 new CachingConnectionFactory("localhost");         return connectionFactory;     }      @Bean     public AmqpAdmin amqpAdmin() {         return new RabbitAdmin(connectionFactory());     }      @Bean     public RabbitTemplate rabbitTemplate() {         return new RabbitTemplate(connectionFactory());     }      //объявляем очередь с именем queue1     @Bean     public Queue myQueue1() {         return new Queue("queue1");     }      //объявляем контейнер, который будет содержать листенер для сообщений     @Bean     public SimpleMessageListenerContainer messageListenerContainer1() {         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();         container.setConnectionFactory(connectionFactory());         container.setQueueNames("queue1");         container.setMessageListener(new MessageListener() {         	//тут ловим сообщения из queue1             public void onMessage(Message message) {                 logger.info("received from queue1 : " + new String(message.getBody()));             }         });         return container;     } } 

В этом и следующих примерах в качестве продюссера будет контроллер, который будет посылать сообщения в rabbitmq.

package com.rabbitmq.example1;  import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;  @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     AmqpTemplate template;      @RequestMapping("/emit")     @ResponseBody     String queue1() {         logger.info("Emit to queue1");         template.convertAndSend("queue1","Message to queue");         return "Emit to queue";     } } 

Теперь, если запустить Example1Configuration и перейти в браузере по адресу http://localhost:8080/emit, то в консоли мы увидим что-то типа:

 2015-06-23 21:16:26.250  INFO 6460 --- [nio-8080-exec-2] com.rabbitmq.example1.SampleController   : Emit to queue1 2015-06-23 21:16:26.252  INFO 6460 --- [cTaskExecutor-1] c.rabbitmq.example1.RabbitConfiguration  : received from queue 1: Message to queue 

Рассмотрим подробнее получившийся результат. Тут мы в SampleController.java отправляем сообщение:

template.convertAndSend("queue1","Message to queue"); 

А здесь мы его получаем:

public void onMessage(Message message) {     logger.info("received from queue 1: " + new String(message.getBody())); } 

Ничего сложного, но описывать все листенеры в конфигурации не совсем удобно, особенно когда их становится много. Гораздо удобнее конфигуририровать листенеры аннотациями.

Пример 1.1. «Hello World!» на аннотациях

Вместо листенера в конфигурации добавим в проект класс RabbitMqListener, в который будут приходить сообщения. Соответственно messageListenerContainer1 уже не нужен.

RabbitMqListener — это обыкновенный компонент(@Component) спринга с методом, помеченным анотацией @RabbitListener. В этом метод будут приходить сообщения. При этом мы можем получать как полное сообщение Message заголовками и телом как массив байт, так и просто сконвертированное тело в том виде, в каком мы его отправляли.

    @RabbitListener(queues = "queue1")     public void processQueue1(String message) {         logger.info("Received from queue 1: " + message);     } 

Исходный код RabbitMqListener.java и обновленного RabbitConfiguration.java

package com.rabbitmq.example1annotated;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  @EnableRabbit //нужно для активации обработки аннотаций @RabbitListener @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);      @RabbitListener(queues = "queue1")     public void processQueue1(String message) {         logger.info("Received from queue 1: " + message);     } }  

package com.rabbitmq.example1annotated;  import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @Configuration public class RabbitConfiguration {     Logger logger = Logger.getLogger(RabbitConfiguration.class);     @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory =                 new CachingConnectionFactory("localhost");         return connectionFactory;     }      @Bean     public AmqpAdmin amqpAdmin() {         return new RabbitAdmin(connectionFactory());     }      @Bean     public RabbitTemplate rabbitTemplate() {         return new RabbitTemplate(connectionFactory());     }      @Bean     public Queue myQueue1() {         return new Queue("queue1");     }  } 

Пример 2. Work Queues

В данном примере одну очередь слушают уже два листенера. Для эмуляции полезной работы используем Thread.sleep. Важно, что листенеры одной очереди могут быть и на разных инстансах программы. Так можно распараллелить очередь на несколько компьютеров или нод в облаке.

    @RabbitListener(queues = "query-example-2")     public void worker1(String message) throws InterruptedException {         logger.info("worker 1 : " + message);         Thread.sleep(100 * random.nextInt(20));     }      @RabbitListener(queues = "query-example-2")     public void worker2(String message) throws InterruptedException {         logger.info("worker 2 : " + message);         Thread.sleep(100 * random.nextInt(20));     } 

Результат:

 2015-06-23 22:03:48.018  INFO 6784 --- [nio-8080-exec-1] com.rabbitmq.example2.SampleController   : Emit to queue 2015-06-23 22:03:48.029  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 1 2015-06-23 22:03:48.029  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 0 2015-06-23 22:03:48.830  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 2 2015-06-23 22:03:49.331  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 3 2015-06-23 22:03:49.432  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 4 2015-06-23 22:03:49.634  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 5 2015-06-23 22:03:49.733  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 6 2015-06-23 22:03:49.735  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 7 2015-06-23 22:03:50.236  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 8 2015-06-23 22:03:50.537  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 9 
Исходный код RabbitMqListener.java и обновленного SampleController.java

package com.rabbitmq.example2;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);     Random random = new Random();      @RabbitListener(queues = "query-example-2")     public void worker1(String message) throws InterruptedException {         logger.info("worker 1 : " + message);         Thread.sleep(100 * random.nextInt(20));     }      @RabbitListener(queues = "query-example-2")     public void worker2(String message) throws InterruptedException {         logger.info("worker 2 : " + message);         Thread.sleep(100 * random.nextInt(20));     }  }	 

package com.rabbitmq.example2;  import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;   @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     AmqpTemplate template;      @RequestMapping("/queue")     @ResponseBody     String queue1() {         logger.info("Emit to queue");         for(int i = 0;i<10;i++)             template.convertAndSend("query-example-2","Message " + i);         return "Emit to queue";     } } 

Пример 3. Publish/Subscribe

Тут одно и то же сообщение приходит сразу двум консьюмерам.

 2015-06-23 22:12:24.669  INFO 1664 --- [nio-8080-exec-1] com.rabbitmq.example3.SampleController   : Emit to exchange-example-3 2015-06-23 22:12:24.684  INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener   : accepted on worker 1 : Fanout message 2015-06-23 22:12:24.684  INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener   : accepted on worker 2 : Fanout message 

Для этого, подключим обе очереди к FanoutExchange:

    @Bean     public FanoutExchange fanoutExchangeA(){         return new FanoutExchange("exchange-example-3");     }      @Bean     public Binding binding1(){         return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());     }      @Bean     public Binding binding2(){         return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());     } 

И будем отправлять не в очередь, а в exchange exchange-example-3:

    template.setExchange("exchange-example-3");     template.convertAndSend("Fanout message"); 

Каждый раз указывать exchange необязательно. Его можно указать и один раз при создании RabbitTemplate.

Полные исходные коды

package com.rabbitmq.example3;  import org.apache.log4j.Logger; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @EnableRabbit @Configuration public class RabbitConfiguration {     Logger logger = Logger.getLogger(RabbitConfiguration.class);      @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory =                 new CachingConnectionFactory("localhost");         return connectionFactory;     }      @Bean     public AmqpAdmin amqpAdmin() {         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());         return rabbitAdmin;     }      @Bean     public RabbitTemplate rabbitTemplate() {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());         return rabbitTemplate;     }       @Bean     public Queue myQueue1() {         return new Queue("query-example-3-1");     }      @Bean     public Queue myQueue2() {         return new Queue("query-example-3-2");     }      @Bean     public FanoutExchange fanoutExchangeA(){         return new FanoutExchange("exchange-example-3");     }      @Bean     public Binding binding1(){         return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());     }      @Bean     public Binding binding2(){         return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());     }  } 

package com.rabbitmq.example3;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);     Random random = new Random();      @RabbitListener(queues = "query-example-3-1")     public void worker1(String message) {         logger.info("accepted on worker 1 : " + message);     }      @RabbitListener(queues = "query-example-3-2")     public void worker2(String message) {         logger.info("accepted on worker 2 : " + message);     }  } 

package com.rabbitmq.example3;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;   @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     RabbitTemplate template;      @RequestMapping("/")     @ResponseBody     String home() {         return "Empty mapping";     }      @RequestMapping("/emit")     @ResponseBody     String emit() {         logger.info("Emit to exchange-example-3");         template.setExchange("exchange-example-3");         template.convertAndSend("Fanout message");         return "Emit to exchange-example-3";     } } 

Пример 4. Routing

Здесь используется routing key, в зависимости от которого сообщение может попасть в одну из очередей или сразу в обе. Для этого вместо FanoutExchange используем DirectExchange:

    @Bean     public DirectExchange directExchange(){         return new DirectExchange("exchange-example-4");     }      @Bean     public Binding errorBinding1(){         return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");     }      @Bean     public Binding errorBinding2(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");     }      @Bean     public Binding infoBinding(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");     }      @Bean     public Binding warningBinding(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");     } 

И при отправке используем указываем Routing key, например, так:

    template.convertAndSend("info", "Info"); 

В результате получаем:

 2015-06-23 22:29:24.480  INFO 5820 --- [nio-8080-exec-2] com.rabbitmq.example4.SampleController   : Emit as info 2015-06-23 22:29:24.483  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Info 2015-06-23 22:29:29.721  INFO 5820 --- [nio-8080-exec-4] com.rabbitmq.example4.SampleController   : Emit as error 2015-06-23 22:29:29.727  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Error 2015-06-23 22:29:29.731  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 1 : Error 2015-06-23 22:29:36.779  INFO 5820 --- [nio-8080-exec-5] com.rabbitmq.example4.SampleController   : Emit as warning 2015-06-23 22:29:36.781  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Warning 
Полные исходные коды

package com.rabbitmq.example4;  import org.apache.log4j.Logger; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @EnableRabbit @Configuration public class RabbitConfiguration {     Logger logger = Logger.getLogger(RabbitConfiguration.class);      @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory =                 new CachingConnectionFactory("localhost");         return connectionFactory;     }      @Bean     public AmqpAdmin amqpAdmin() {         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());         return rabbitAdmin;     }      @Bean     public RabbitTemplate rabbitTemplate() {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());         rabbitTemplate.setExchange("exchange-example-4");         return rabbitTemplate;     }      @Bean     public Queue myQueue1() {         return new Queue("query-example-4-1");     }      @Bean     public Queue myQueue2() {         return new Queue("query-example-4-2");     }      @Bean     public DirectExchange directExchange(){         return new DirectExchange("exchange-example-4");     }      @Bean     public Binding errorBinding1(){         return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");     }      @Bean     public Binding errorBinding2(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");     }      @Bean     public Binding infoBinding(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");     }      @Bean     public Binding warningBinding(){         return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");     }  } 

package com.rabbitmq.example4;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);     Random random = new Random();      @RabbitListener(queues = "query-example-4-1")     public void worker1(String message) {         logger.info("accepted on worker 1 : " + message);     }      @RabbitListener(queues = "query-example-4-2")     public void worker2(String message) {         logger.info("accepted on worker 2 : " + message);     }  } 

package com.rabbitmq.example4;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;  @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     RabbitTemplate template;      @RequestMapping("/")     @ResponseBody     String home() {         return "Empty mapping";     }      @RequestMapping("/emit/error")     @ResponseBody     String error() {         logger.info("Emit as error");         template.convertAndSend("error", "Error");         return "Emit as error";     }      @RequestMapping("/emit/info")     @ResponseBody     String info() {         logger.info("Emit as info");         template.convertAndSend("info", "Info");         return "Emit as info";     }      @RequestMapping("/emit/warning")     @ResponseBody     String warning() {         logger.info("Emit as warning");         template.convertAndSend("warning", "Warning");         return "Emit as warning";     } } 

Пример 5. Topics

Здесь вместо DirectExchange используем TopicExchange

    @Bean     public TopicExchange topicExchange(){         return new TopicExchange("exchange-example-5");     }      @Bean     public Binding binding1(){         return BindingBuilder.bind(myQueue1()).to(topicExchange()).with("*.orange.*");     }      @Bean     public Binding binding2(){         return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("*.*.rabbit");     }      @Bean     public Binding binding3(){         return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("lazy.#");     } 

В результате получаем:

 2015-06-23 22:42:28.414  INFO 6560 --- [nio-8080-exec-1] com.rabbitmq.example5.SampleController   : Emit 'to 1 and 2' to 'quick.orange.rabbit' 2015-06-23 22:42:28.428  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 2 : to 1 and 2 2015-06-23 22:42:28.428  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 1 : to 1 and 2 2015-06-23 22:42:55.802  INFO 6560 --- [nio-8080-exec-2] com.rabbitmq.example5.SampleController   : Emit 'to 2' to 'lazy.black.cat' 2015-06-23 22:42:55.805  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 2 : to 2 
Полные исходные коды

package com.rabbitmq.example5;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;   @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     RabbitTemplate template;      @RequestMapping("/")     @ResponseBody     String home() {         return "Empty mapping";     }      @RequestMapping("/emit/{key}/{message}")     @ResponseBody     String error(@PathVariable("key") String key, @PathVariable("message") String message) {         logger.info(String.format("Emit '%s' to '%s'",message,key));         template.convertAndSend(key, message);         return String.format("Emit '%s' to '%s'",message,key);     } } 

package com.rabbitmq.example5;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);     Random random = new Random();      @RabbitListener(queues = "query-example-5-1")     public void worker1(String message) {         logger.info("accepted on worker 1 : " + message);     }      @RabbitListener(queues = "query-example-5-2")     public void worker2(String message) {         logger.info("accepted on worker 2 : " + message);     }  } 

package com.rabbitmq.example5;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;   @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     RabbitTemplate template;      @RequestMapping("/")     @ResponseBody     String home() {         return "Empty mapping";     }      @RequestMapping("/emit/{key}/{message}")     @ResponseBody     String error(@PathVariable("key") String key, @PathVariable("message") String message) {         logger.info(String.format("Emit '%s' to '%s'",message,key));         template.convertAndSend(key, message);         return String.format("Emit '%s' to '%s'",message,key);     } } 

Пример 6. Remote procedure call (RPC)

Spring AMQP позволяет использовать convertSendAndReceive, чтобы получить ответ на отправленное сообщение. При этом, при дефолтной настройке, в случае если у нас RabbitMQ версии до 3.4.0, то для ответного сообщения будет создана временная очередь. Этот способ не очень производительный, поэтому его использовать не рукомендуется и следует создать самому также и очередь для обратных сообщений и указать её как ReplyQueue у RabbitTemplate. Если же у нас RabbitMQ версии 3.4.0 и выше, то будет использован механизм Direct reply-to, который намного быстрее. Подробнее в документации по Spring AMQP.

Таким образом осуществить удаленный вызов процедуры можно всего одной строкой:

    String response = (String) template.convertSendAndReceive("query-example-6",message); 

А так процедура обрабатывается на консьюмере:

    @RabbitListener(queues = "query-example-6")     public String worker1(String message) throws InterruptedException {         logger.info("received on worker : " + message);         Thread.sleep(3000); //эмулируем полезную работу         return "received on worker : " + message;     } 

В результате получаем:

 2015-06-23 23:12:36.677  INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController   : Emit 'Hello world' 2015-06-23 23:12:36.679  INFO 6536 --- [cTaskExecutor-1] com.rabbitmq.example6.RabbitMqListener   : Received on worker : Hello world 2015-06-23 23:12:39.681  INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController   : Received on producer 'Received on worker : Hello world' 
Полные исходные коды

package com.rabbitmq.example6;  import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @EnableRabbit @Configuration public class RabbitConfiguration {     Logger logger = Logger.getLogger(RabbitConfiguration.class);      @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory =                 new CachingConnectionFactory("localhost");         return connectionFactory;     }      @Bean     public AmqpAdmin amqpAdmin() {         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());         return rabbitAdmin;     }      @Bean     public RabbitTemplate rabbitTemplate() {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());         rabbitTemplate.setQueue("query-example-6");         rabbitTemplate.setReplyTimeout(60 * 1000);         //no reply to - we use direct-reply-to         return rabbitTemplate;     }      @Bean     public Queue myQueue() {         return new Queue("query-example-6");     } } 

package com.rabbitmq.example6;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMqListener {     Logger logger = Logger.getLogger(RabbitMqListener.class);      @RabbitListener(queues = "query-example-6")     public String worker1(String message) throws InterruptedException {         logger.info("Received on worker : " + message);         Thread.sleep(3000);         return "Received on worker : " + message;     } } 

package com.rabbitmq.example6;  import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;   @Controller public class SampleController {     Logger logger = Logger.getLogger(SampleController.class);      @Autowired     RabbitTemplate template;      @RequestMapping("/")     @ResponseBody     String home() {         return "Empty mapping";     }      @RequestMapping("/process/{message}")     @ResponseBody     String error(@PathVariable("message") String message) {         logger.info(String.format("Emit '%s'",message));         String response = (String) template.convertSendAndReceive("query-example-6",message);         logger.info(String.format("Received on producer '%s'",response));         return String.valueOf("returned from worker : " + response);     } } 

Заключение

У себя я использовал RabbitMQ в проекте в облачном хостинге heroku. Использовать RabbitMQ в heroku довольно просто — достаточно подключить одного из провайдеров RabbitMQ в консоли администрирования и тогда в переменных окружения появится адрес для доступа к кролику. Этот адрес нужно использовать при создании connectionFactory.

	@Bean 	public ConnectionFactory connectionFactory() 	{ 		//получаем адрес AMQP у провайдера 		String uri = System.getenv("CLOUDAMQP_URL"); 		if (uri == null) //значит мы запущены локально и нужно подключаться к локальному rabbitmq 			uri = "amqp://guest:guest@localhost"; 		URI url = null; 		try 		{ 			url = new URI(uri); 		} catch (URISyntaxException e) 		{ 			e.printStackTrace(); //тут ошибка крайне маловероятна 		}  		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 		connectionFactory.setHost(url.getHost()); 		connectionFactory.setUsername(url.getUserInfo().split(":")[0]); 		connectionFactory.setPassword(url.getUserInfo().split(":")[1]); 		if (StringUtils.isNotBlank(url.getPath())) 			connectionFactory.setVirtualHost(url.getPath().replace("/", "")); 		connectionFactory.setConnectionTimeout(3000); 		connectionFactory.setRequestedHeartBeat(30); 		return connectionFactory; 	} 

В остальном код мало отличается от приведенного в примере 4(Routing).

Использованные источники

Страница проекта Spring AMQP
Страница проекта Spring Boot
Страница с примерами RabbitMQ

ссылка на оригинал статьи http://habrahabr.ru/post/262069/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *