Понять статью не составит труда тем, кто знаком с Spring и Spring Web и хотя бы раз создавал простое приложение с контроллерами, сервисами и моделями (проще говоря — реализовывал паттерн Model View Controller).
С чего всё начиналось
По работе ко мне пришли с предложением начать разработку небольшого проекта с использованием RabbitMQ в связке с Spring Framework. До того момента я только лишь читал о RabbitMQ и с очередями сообщений особо не работал, так что часть своих выходных решил потратить на изучение данной технологии и её применениях.
О RabbitMQ вкратце
RabbitMQ — брокер сообщений. Данная технология позволяет вести асинхронную обработку данных, а также делать микросервисы слабосвязанными, что может облегчить разработку. RabbitMQ можно сравнить с почтой, куда приходят письма. Если письмо пришло на почту, то его можно прочитать в любое удобное время. Также и в RabbitMQ: пришедшее письмо микросервис сможет забрать в любой момент. Тем самым можно ослабить нагрузку на микросервис, ведь «письма» он сможет обрабатывать в своём темпе.
Сообщения отправляются в очереди, где они хранятся до момента прочтения. После прочтения сообщение удаляется. Дальнейшая его судьба определяется сервисом, прочитавшим сообщение. Также сообщение обладает некоторыми атрибутами, в зависимости от которых определяется, в какую очередь оно попадёт. Данные атрибуты задаются разработчиком. Для примера атрибутом может служить название очереди или topic. Подробнее о атрибутах и обмене сообщениями по ним стоит прочитать на официальном сайте: https://www.rabbitmq.com/tutorials/amqp-concepts.
Для более полного понимания приведу для примера следующую схему:
На схеме P — producer, сервис отправляющий сообщения. X — маршрутизатор сообщений (он определяет в какую очередь попадёт сообщение), Q1, Q2, Q3 — очереди, C1, C2, C3 — потребители. Схема взаимосвязи микросервисов не обязательно такая, как указана на изображении. У одной очереди вполне может быть несколько продюсеров и несколько потребителей, всё зависит от самой архитектуры микросервисов.
Для получения полной информации о RabbitMQ стоит посетить официальный сайт: https://www.rabbitmq.com/. В данной же статье будет описание простого проекта на Java с двумя микросервисами, взаимодействующими посредством RabbitMQ.
Развёртывание RabbitMQ
Первым делом я взялся за развёртывание RabbitMQ. Благо для этого на Windows нужен лишь предустановленный Docker и интернет. Чтобы поднять контейнер с RabbitMQ на своём компьютере необходимо выполнить следующую команду:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
После успешного выполнения этой команды перейдя по ссылке http://localhost:15672 можно увидеть следующую картину:
По дефолту логин и пароль: guest
После авторизации можно увидеть следующую чудесную картину:
При должном знании английского разобраться в UI не составит труда.
UI предоставляет всю информацию о состоянии RabbitMQ, о её очередях, пользователях, использовании, готовых к обработке сообщений и так далее…
Дальше перейдём к написанию проекта
RabbitMQProducer
Данный сервис отправляет сообщения в одну из двух очередей RabbitMQ.
Для начала подключим необходимые библиотеки:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.4.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>RabbitMQProducer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQProducer</name> <description>RabbitMQProducer</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springdoc</groupId> <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId> <version>2.0.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Среди них Spring Web для написания REST, Spring RabbitMQ для работы с RabbitMQ и Srping Openapi для подключения Swagger.
Сперва сконфигурируем наше приложение с помощью файла application.properties:
spring.application.name=RabbitMQProducer spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 server.port=8081 rabbitmq.queue.name=message_queue rabbitmq.queue_with_delay.name=delay_queue
Под spring.rabbitmq указываем подключения к rabbitmq, server.port для того, чтобы открыть веб приложение на определённом порту (нужно учитывать, что он может быть попросту занят другим приложением). Две последних строки указывают названия очередей (де-факто их можно указать и напрямую в Java Code, но такой подход не совсем правильный, как мне кажется, да и менять названия очередей затем проще из application.properties, чем потом копаться в коде и менять там).
Затем укажем конфигурации для очередей:
package org.example.rabbitmqproducer.config; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Value("${rabbitmq.queue.name}") private String queueName; @Value("${rabbitmq.queue_with_delay.name}") private String queueWithDelayName; /** * Конфигурируем очередь * @return Очередь RabbitMQ */ @Bean public Queue queue() { return new Queue(queueName, false); } @Bean public Queue queue2() { return new Queue(queueWithDelayName, false); } }
Здесь с помощью аннотации @Value забираем значения из application.properties.
Далее напишем небольшой сервис, который будет просто посылать сообщения в брокер:
package org.example.rabbitmqproducer.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class MessageService { private final RabbitTemplate template; @Value("${rabbitmq.queue.name}") private String messageQueue; @Value("${rabbitmq.queue_with_delay.name}") private String messageWithDelayQueue; @Autowired public MessageService(RabbitTemplate template) { this.template = template; } /** * Отправляет сообщение в очередь * @param message Сообщение */ public void sendMessage(String message) { template.convertAndSend(messageQueue, message); } /** * Посылает сообщение в очередь, которая обрабатывается слушателем с некоторой задержкой * @param message Сообщение */ public void sendMessageToQueueWithDelay(String message) { template.convertAndSend(messageWithDelayQueue, message); } }
И теперь напишем контроллер, где метод POST по пути /message отправит сообщение в message_queue очередь, а метод POST по пути /message/with_delay отправит сообщение в очередь delay_queue:
package org.example.rabbitmqproducer.controller; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import org.example.rabbitmqproducer.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @Controller @Tag(name = "Message Controller", description = "Controller to send message to RabbitMQ") @RequestMapping("/message") public class MessageController { private final MessageService service; @Autowired public MessageController(MessageService service) { this.service = service; } @Operation(description = "Send message to RabbitMQ") @ApiResponse(responseCode = "204") @PostMapping public ResponseEntity<Void> sendMessage(String message) { service.sendMessage(message); return ResponseEntity.noContent().build(); } @Operation(description = "Send message to RabbitMQ in queue with delay") @ApiResponse(responseCode = "204") @PostMapping("/with_delay") public ResponseEntity<Void> sendMessageWithDelay(String message) { service.sendMessageToQueueWithDelay(message); return ResponseEntity.noContent().build(); } }
Запустим приложение.
Если всё правильно написано и сконфигурировано, то по ссылке http://localhost:8081/swagger-ui/index.html#/ можно увидеть поднятый Swagger:
Воспользуемся эндпоинтом /message два раза и эндпоинтом /message/with_delay один раз:
Здесь мы можем увидеть, что в очереди пришли сообщения. Теперь обработаем их.
RabbitMQConsumer
Данный сервис отвечает за обработку сообщений из очереди.
Подключим нужные библиотеки:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.4.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>RabbitMQConsumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQProducer</name> <description>RabbitMQConsumer</description> <url/> <licenses> <license/> </licenses> <developers> <developer/> </developers> <scm> <connection/> <developerConnection/> <tag/> <url/> </scm> <properties> <java.version>21</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Сконфигурируем в application.properties:
spring.application.name=RabbitMQConsumer spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 server.port=8082 rabbitmq.queue.name=message_queue rabbitmq.queue_with_delay.name=delay_queue
Здесь из Spring библиотек нужна только Spring RabbitMQ
Напишем слушатель сообщений:
package org.example.rabbitmqconsumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Random; @Component public class RabbitMQListener { private final static Random RANDOM = new Random(); @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue.name')}") public void receiveMessage(String message) { System.out.println("Received message: " + message); } @RabbitListener(queues = "#{@environment.getProperty('rabbitmq.queue_with_delay.name')}") public void receiveMessageWithDelay(String message) throws InterruptedException { var delay = RANDOM.nextInt(10000); Thread.sleep(delay); System.out.println("Received message: " + message + " with delay: " + delay); } }
Особым образом здесь подтягиваются названия очередей из application.properties.
В методе receiveMessage сообщение сразу печатается в консоле. В методе receiveMessageWithDelay симулируется задержка до 10 секунд. Можно увидеть, что сообщения постепенно вытаскиваются из очереди delay_queue.
Запустим приложение и увидим вывод в консоле:
Приложение запустилось и само забрало из очереди сообщение. Можно ещё «побаловаться» и поотправлять сообщений. Посмотрим, что в мониторинге очереди delay_queue RabbitMQ:
Также на message_queue:
Видим разницу. В message_queue сообщения не задерживаются. А вот в delay_queue задерживаются из-за случайной задержки.
GitHub проекта: https://github.com/3abubenni/rabbitmq
ссылка на оригинал статьи https://habr.com/ru/articles/873450/
Добавить комментарий