Организация обработки асинхронных событий с Spring Events и Spring AMQP

от автора

Привет, Хабр!

Асинхронная обработка событий – один из базовых инструментов на сегодняшний день, позволяющий создавать масштабируемые и отзывчивые приложения. Сегодня мы рассмотрим два инструмента из Spring Framework – Spring Events и Spring AMQP, которые помогают управлять асинхронными задачами.

Spring Events

Spring Events – это встроенный механизм для публикации и обработки событий внутри приложения. Spring Events основаны на паттерне проектирования «издатель-подписчик«.

Издатель (writer) -подписчик(reader)

Издатель (writer) -подписчик(reader)

Для работы с событиями потребуется три основных компонента:

  1. Класс события: описывает само событие.

  2. Издатель события: компонент, который генерирует и публикует события.

  3. Подписчик на событие: компонент, который обрабатывает опубликованные события.

Начнем с создания класса события. Этот класс должен наследоваться от ApplicationEvent:

public class CustomSpringEvent extends ApplicationEvent {     private String message;      public CustomSpringEvent(Object source, String message) {         super(source);         this.message = message;     }      public String getMessage() {         return message;     } }

Для публикации события потребуется класс-издатель, который будет использовать ApplicationEventPublisher для публикации событий:

import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Component;  @Component public class CustomSpringEventPublisher implements ApplicationEventPublisherAware {      private ApplicationEventPublisher applicationEventPublisher;      @Override     public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {         this.applicationEventPublisher = applicationEventPublisher;     }      public void publishEvent(final String message) {         System.out.println("Publishing custom event. ");         CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message);         applicationEventPublisher.publishEvent(customSpringEvent);     } }

Теперь создадим компонент, который будет обрабатывать события. Для этого можно использовать аннотацию @EventListener:

import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component;  @Component public class CustomSpringEventListener {      @EventListener     public void handleCustomSpringEvent(CustomSpringEvent event) {         System.out.println("Received spring custom event - " + event.getMessage());     } }

Чтобы события обрабатывались асинхронно нужно включить поддержку асинхронности в конфигурации Spring и пометить метод-обработчик аннотацией @Async.

Для этого необходимо добавить аннотацию @EnableAsync в конфигурационный класс:

import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync;  @Configuration @EnableAsync public class AsyncConfig { }

Теперь изменим слушатель событий, чтобы он обрабатывал события асинхронно:

import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.context.event.EventListener;  @Component public class CustomSpringEventListener {      @Async     @EventListener     public void handleCustomSpringEvent(CustomSpringEvent event) {         System.out.println("Received spring custom event - " + event.getMessage());         // доп. логика обработки события     } }

Примеры использования

Кэширование

Предположим, есть приложение, которое кэширует данные. Когда данные изменяются, нужно обновить кэш. Можно юзать событие для уведомления всех компонентов о необходимости обновления кэша:

// событие изменения данных public class DataChangeEvent extends ApplicationEvent {     private String dataId;      public DataChangeEvent(Object source, String dataId) {         super(source);         this.dataId = dataId;     }      public String getDataId() {         return dataId;     } }  // публикация события изменения данных @Component public class DataService {     @Autowired     private ApplicationEventPublisher eventPublisher;      public void updateData(String dataId) {         // логика обновления данных         eventPublisher.publishEvent(new DataChangeEvent(this, dataId));     } }  // обработчик события изменения данных @Component public class CacheService {      @EventListener     @Async     public void handleDataChangeEvent(DataChangeEvent event) {         System.out.println("Updating cache for dataId: " + event.getDataId());         // логика обновления кэша     } }

Уведомление о завершении задач

Для этой задачи можно использовать событие для уведомления других компонентов о завершении длительных задач:

// событие завершения задачи public class TaskCompleteEvent extends ApplicationEvent {     private String taskId;      public TaskCompleteEvent(Object source, String taskId) {         super(source);         this.taskId = taskId;     }      public String getTaskId() {         return taskId;     } }  // публикация события завершения задачи @Component public class TaskService {     @Autowired     private ApplicationEventPublisher eventPublisher;      @Async     public void executeTask(String taskId) {         // логика выполнения задачи         eventPublisher.publishEvent(new TaskCompleteEvent(this, taskId));     } }  // обработчик события завершения задачи @Component public class NotificationService {      @EventListener     @Async     public void handleTaskCompleteEvent(TaskCompleteEvent event) {         System.out.println("Task completed: " + event.getTaskId());         // логика уведомления     } }

Организация обработки сообщений с помощью Spring AMQP

AMQP — это протокол уровня приложений для мидлвэр-систем, предназначенных для передачи сообщений. Имеет функциональные возможности для межпрограммного взаимодействия и включает концепции очередей, маршрутизации, обменов и привязок.

RabbitMQ — это брокер сообщений, реализующий AMQP. Он позволяет обмениваться сообщениями и управлять их очередями.

Для старта работ с Spring AMQP в проекте Spring Boot необходимо добавить соответствующую зависимость в файл pom.xml:

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

Далее необходимо настроить параметры подключения к RabbitMQ в файле application.properties.

spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest

Создание и отправка сообщений с помощью AmqpTemplate

Для отправки сообщений используется AmqpTemplate.

Для начала создадим конфигурационный класс, который определит все необходимые бины:

import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  @Configuration public class RabbitConfig {      static final String queueName = "myQueue";      @Bean     Queue queue() {         return new Queue(queueName, false);     }      @Bean     RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {         return new RabbitTemplate(connectionFactory);     }      @Bean     SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,             MessageListenerAdapter listenerAdapter) {         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();         container.setConnectionFactory(connectionFactory);         container.setQueueNames(queueName);         container.setMessageListener(listenerAdapter);         return container;     }      @Bean     MessageListenerAdapter listenerAdapter(Receiver receiver) {         return new MessageListenerAdapter(receiver, "receiveMessage");     } }

Теперь создадим компонент, который будет отправлять сообщения:

import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  @Component public class Sender {      private final RabbitTemplate rabbitTemplate;      @Autowired     public Sender(RabbitTemplate rabbitTemplate) {         this.rabbitTemplate = rabbitTemplate;     }      public void send(String message) {         rabbitTemplate.convertAndSend(RabbitConfig.queueName, message);         System.out.println("Sent: " + message);     } }

Для обработки сообщений, поступающих в очередь, используется аннотация @RabbitListener.

Добавим компонент, который будет обрабатывать входящие сообщения:

import org.springframework.stereotype.Component;  @Component public class Receiver {      public void receiveMessage(String message) {         System.out.println("Received: " + message);     } }

С аннтоацией @RabbitListener можно декларативно указать методы, которые будут обрабатывать сообщения из очередей:

import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;  @Service public class MessageListener {      @RabbitListener(queues = RabbitConfig.queueName)     public void processMessage(String message) {         System.out.println("Received message: " + message);     } }

Пример

Полный пример с отправкой и получением сообщений:

import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;  @SpringBootApplication @EnableRabbit public class SpringAmqpApplication {      public static void main(String[] args) {         SpringApplication.run(SpringAmqpApplication.class, args);     }      @Bean     CommandLineRunner runner(Sender sender) {         return args -> {             System.out.println("Sending message...");             sender.send("Hello, RabbitMQ!");         };     } }

Примеры использования двух инструментов вместе

Обработка заказов с локальными событиями и асинхронной передачей

В этом примере система обрабатывает заказы локально, публикует событие о новом заказе с помощью Spring Events и затем отправляет заказ на дальнейшую обработку в другую службу через RabbitMQ:

// событие заказа public class OrderCreatedEvent extends ApplicationEvent {     private String orderId;      public OrderCreatedEvent(Object source, String orderId) {         super(source);         this.orderId = orderId;     }      public String getOrderId() {         return orderId;     } }  // публикация события заказа @Component public class OrderService {     @Autowired     private ApplicationEventPublisher eventPublisher;      public void createOrder(String orderId) {         // логика создания заказа         System.out.println("Order created: " + orderId);         eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId));     } }  // обработка локального события и отправка в RabbitMQ @Component public class OrderCreatedListener {     @Autowired     private AmqpTemplate amqpTemplate;      @EventListener     @Async     public void handleOrderCreatedEvent(OrderCreatedEvent event) {         System.out.println("Handling order created event for order: " + event.getOrderId());         // доп. логика обработки         amqpTemplate.convertAndSend("orderQueue", event.getOrderId());     } }  // конфигурация RabbitMQ @Configuration public class RabbitConfig {     @Bean     public Queue orderQueue() {         return new Queue("orderQueue", false);     }      @Bean     public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {         return new RabbitTemplate(connectionFactory);     } }

Обновление состояния задач и уведомление через RabbitMQ

При обновлении состояния задачи генерируется локальное событие, которое затем отправляется в очередь RabbitMQ для уведомления других микросервисов о статусе задачи:

// событие обновления задачи public class TaskStatusUpdateEvent extends ApplicationEvent {     private String taskId;     private String status;      public TaskStatusUpdateEvent(Object source, String taskId, String status) {         super(source);         this.taskId = taskId;         this.status = status;     }      public String getTaskId() {         return taskId;     }      public String getStatus() {         return status;     } }  // публикация события обновления задачи @Component public class TaskService {     @Autowired     private ApplicationEventPublisher eventPublisher;      public void updateTaskStatus(String taskId, String status) {         System.out.println("Updating task status: " + taskId + " to " + status);         eventPublisher.publishEvent(new TaskStatusUpdateEvent(this, taskId, status));     } }  // обработка события обновления задачи и отправка в RabbitMQ @Component public class TaskStatusUpdateListener {     @Autowired     private AmqpTemplate amqpTemplate;      @EventListener     @Async     public void handleTaskStatusUpdateEvent(TaskStatusUpdateEvent event) {         System.out.println("Sending task status update to RabbitMQ: " + event.getTaskId() + " - " + event.getStatus());         amqpTemplate.convertAndSend("taskStatusQueue", event.getTaskId() + " - " + event.getStatus());     } }  // конфигурация RabbitMQ @Configuration public class RabbitConfig {     @Bean     public Queue taskStatusQueue() {         return new Queue("taskStatusQueue", false);     }      @Bean     public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {         return new RabbitTemplate(connectionFactory);     } }

В заключение напоминаю об открытых уроках, которые пройдут в рамках курса «Разработчик на Spring Framework» в Otus:

  • 1 июля: Тестирование Spring приложений. Интеграционные тесты с контекстом. Тестирование слоя репозиториев и сервисов. Запись по ссылке

  • 16 июля: Тестирование Spring приложений. Интеграционные тесты контроллеров, интеграций с внешними API и безопасности. Запись по ссылке


ссылка на оригинал статьи https://habr.com/ru/articles/825528/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *