RabbitMQ + Java Spring

от автора

Понять статью не составит труда тем, кто знаком с Spring и Spring Web и хотя бы раз создавал простое приложение с контроллерами, сервисами и моделями (проще говоря — реализовывал паттерн Model View Controller).

С чего всё начиналось

По работе ко мне пришли с предложением начать разработку небольшого проекта с использованием RabbitMQ в связке с Spring Framework. До того момента я только лишь читал о RabbitMQ и с очередями сообщений особо не работал, так что часть своих выходных решил потратить на изучение данной технологии и её применениях.

О RabbitMQ вкратце

RabbitMQ — брокер сообщений. Данная технология позволяет вести асинхронную обработку данных, а также делать микросервисы слабосвязанными, что может облегчить разработку. RabbitMQ можно сравнить с почтой, куда приходят письма. Если письмо пришло на почту, то его можно прочитать в любое удобное время. Также и в RabbitMQ: пришедшее письмо микросервис сможет забрать в любой момент. Тем самым можно ослабить нагрузку на микросервис, ведь «письма» он сможет обрабатывать в своём темпе.

Сообщения отправляются в очереди, где они хранятся до момента прочтения. После прочтения сообщение удаляется. Дальнейшая его судьба определяется сервисом, прочитавшим сообщение. Также сообщение обладает некоторыми атрибутами, в зависимости от которых определяется, в какую очередь оно попадёт. Данные атрибуты задаются разработчиком. Для примера атрибутом может служить название очереди или topic. Подробнее о атрибутах и обмене сообщениями по ним стоит прочитать на официальном сайте: https://www.rabbitmq.com/tutorials/amqp-concepts.

Для более полного понимания приведу для примера следующую схему:

Схема отправления сообщений

Схема отправления сообщений

На схеме P — producer, сервис отправляющий сообщения. X — маршрутизатор сообщений (он определяет в какую очередь попадёт сообщение), Q1, Q2, Q3 — очереди, C1, C2, C3 — потребители. Схема взаимосвязи микросервисов не обязательно такая, как указана на изображении. У одной очереди вполне может быть несколько продюсеров и несколько потребителей, всё зависит от самой архитектуры микросервисов.

Для получения полной информации о RabbitMQ стоит посетить официальный сайт: https://www.rabbitmq.com/. В данной же статье будет описание простого проекта на Java с двумя микросервисами, взаимодействующими посредством RabbitMQ.

Развёртывание RabbitMQ

Первым делом я взялся за развёртывание RabbitMQ. Благо для этого на Windows нужен лишь предустановленный Docker и интернет. Чтобы поднять контейнер с RabbitMQ на своём компьютере необходимо выполнить следующую команду:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

После успешного выполнения этой команды перейдя по ссылке http://localhost:15672 можно увидеть следующую картину:

страница авторизации в RabbitMQ

страница авторизации в RabbitMQ

По дефолту логин и пароль: guest

После авторизации можно увидеть следующую чудесную картину:

UI RabbitMQ

UI RabbitMQ

При должном знании английского разобраться в UI не составит труда.

UI предоставляет всю информацию о состоянии RabbitMQ, о её очередях, пользователях, использовании, готовых к обработке сообщений и так далее…

Дальше перейдём к написанию проекта

RabbitMQProducer

Данный сервис отправляет сообщения в одну из двух очередей RabbitMQ.

Для начала подключим необходимые библиотеки:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.4.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>RabbitMQProducer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQProducer</name> <description>RabbitMQProducer</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springdoc</groupId> <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId> <version>2.0.2</version> </dependency> </dependencies>  <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>  </project> 

Среди них Spring Web для написания REST, Spring RabbitMQ для работы с RabbitMQ и Srping Openapi для подключения Swagger.

Сперва сконфигурируем наше приложение с помощью файла application.properties:

spring.application.name=RabbitMQProducer  spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.host=localhost spring.rabbitmq.port=5672  server.port=8081  rabbitmq.queue.name=message_queue rabbitmq.queue_with_delay.name=delay_queue

Под spring.rabbitmq указываем подключения к rabbitmq, server.port для того, чтобы открыть веб приложение на определённом порту (нужно учитывать, что он может быть попросту занят другим приложением). Две последних строки указывают названия очередей (де-факто их можно указать и напрямую в Java Code, но такой подход не совсем правильный, как мне кажется, да и менять названия очередей затем проще из application.properties, чем потом копаться в коде и менять там).

Затем укажем конфигурации для очередей:

package org.example.rabbitmqproducer.config;  import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;   @Configuration public class RabbitMQConfig {      @Value("${rabbitmq.queue.name}")     private String queueName;      @Value("${rabbitmq.queue_with_delay.name}")     private String queueWithDelayName;      /**      * Конфигурируем очередь      * @return Очередь RabbitMQ      */     @Bean     public Queue queue() {         return new Queue(queueName, false);     }      @Bean     public Queue queue2() {         return new Queue(queueWithDelayName, false);     }  } 

Здесь с помощью аннотации @Value забираем значения из application.properties.

Далее напишем небольшой сервис, который будет просто посылать сообщения в брокер:

package org.example.rabbitmqproducer.service;  import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;  @Service public class MessageService {      private final RabbitTemplate template;      @Value("${rabbitmq.queue.name}")     private String messageQueue;      @Value("${rabbitmq.queue_with_delay.name}")     private String messageWithDelayQueue;      @Autowired     public MessageService(RabbitTemplate template) {         this.template = template;     }      /**      * Отправляет сообщение в очередь      * @param message Сообщение      */     public void sendMessage(String message) {         template.convertAndSend(messageQueue, message);     }      /**      * Посылает сообщение в очередь, которая обрабатывается слушателем с некоторой задержкой      * @param message Сообщение      */     public void sendMessageToQueueWithDelay(String message) {         template.convertAndSend(messageWithDelayQueue, message);     }  } 

И теперь напишем контроллер, где метод POST по пути /message отправит сообщение в message_queue очередь, а метод POST по пути /message/with_delay отправит сообщение в очередь delay_queue:

package org.example.rabbitmqproducer.controller;  import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import org.example.rabbitmqproducer.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping;  @Controller @Tag(name = "Message Controller", description = "Controller to send message to RabbitMQ") @RequestMapping("/message") public class MessageController {      private final MessageService service;      @Autowired     public MessageController(MessageService service) {         this.service = service;     }      @Operation(description = "Send message to RabbitMQ")     @ApiResponse(responseCode = "204")     @PostMapping     public ResponseEntity<Void> sendMessage(String message) {         service.sendMessage(message);         return ResponseEntity.noContent().build();     }      @Operation(description = "Send message to RabbitMQ in queue with delay")     @ApiResponse(responseCode = "204")     @PostMapping("/with_delay")     public ResponseEntity<Void> sendMessageWithDelay(String message) {         service.sendMessageToQueueWithDelay(message);         return ResponseEntity.noContent().build();     } } 

Запустим приложение.

Если всё правильно написано и сконфигурировано, то по ссылке http://localhost:8081/swagger-ui/index.html#/ можно увидеть поднятый Swagger:

Воспользуемся эндпоинтом /message два раза и эндпоинтом /message/with_delay один раз:

результат отправления сообщений

результат отправления сообщений

Здесь мы можем увидеть, что в очереди пришли сообщения. Теперь обработаем их.

RabbitMQConsumer

Данный сервис отвечает за обработку сообщений из очереди.

Подключим нужные библиотеки:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.4.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>RabbitMQConsumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQProducer</name> <description>RabbitMQConsumer</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>  <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>  </project> 

Сконфигурируем в application.properties:

spring.application.name=RabbitMQConsumer  spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.host=localhost spring.rabbitmq.port=5672  server.port=8082  rabbitmq.queue.name=message_queue rabbitmq.queue_with_delay.name=delay_queue

Здесь из Spring библиотек нужна только Spring RabbitMQ

Напишем слушатель сообщений:

package org.example.rabbitmqconsumer.listener;  import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Random;  @Component public class RabbitMQListener {      private final static Random RANDOM = new Random();      @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue.name')}")     public void receiveMessage(String message) {         System.out.println("Received message: " + message);     }      @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_delay.name')}")     public void receiveMessageWithDelay(String message) throws InterruptedException {         var delay = RANDOM.nextInt(10000);         Thread.sleep(delay);         System.out.println("Received message: " + message + " with delay: " + delay);     }  } 

Особым образом здесь подтягиваются названия очередей из application.properties.

В методе receiveMessage сообщение сразу печатается в консоле. В методе receiveMessageWithDelay симулируется задержка до 10 секунд. Можно увидеть, что сообщения постепенно вытаскиваются из очереди delay_queue.

Запустим приложение и увидим вывод в консоле:

вывод в консоль

вывод в консоль

Приложение запустилось и само забрало из очереди сообщение. Можно ещё «побаловаться» и поотправлять сообщений. Посмотрим, что в мониторинге очереди delay_queue RabbitMQ:

мониторинг delay_queue

мониторинг delay_queue

Также на message_queue:

мониторинг message_queue

мониторинг message_queue

Видим разницу. В message_queue сообщения не задерживаются. А вот в delay_queue задерживаются из-за случайной задержки.

GitHub проекта: https://github.com/3abubenni/rabbitmq


ссылка на оригинал статьи https://habr.com/ru/articles/873450/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *