Привет, Хабр!
Асинхронная обработка событий – один из базовых инструментов на сегодняшний день, позволяющий создавать масштабируемые и отзывчивые приложения. Сегодня мы рассмотрим два инструмента из Spring Framework – Spring Events и Spring AMQP, которые помогают управлять асинхронными задачами.
Spring Events
Spring Events – это встроенный механизм для публикации и обработки событий внутри приложения. Spring Events основаны на паттерне проектирования «издатель-подписчик«.
![Издатель (writer) -подписчик(reader) Издатель (writer) -подписчик(reader)](https://habrastorage.org/getpro/habr/upload_files/a94/d0d/431/a94d0d431079c505d833c26b3d8b13ad.png)
Для работы с событиями потребуется три основных компонента:
-
Класс события: описывает само событие.
-
Издатель события: компонент, который генерирует и публикует события.
-
Подписчик на событие: компонент, который обрабатывает опубликованные события.
Начнем с создания класса события. Этот класс должен наследоваться от ApplicationEvent
:
public class CustomSpringEvent extends ApplicationEvent { private String message; public CustomSpringEvent(Object source, String message) { super(source); this.message = message; } public String getMessage() { return message; } }
Для публикации события потребуется класс-издатель, который будет использовать ApplicationEventPublisher
для публикации событий:
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.stereotype.Component; @Component public class CustomSpringEventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher applicationEventPublisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } public void publishEvent(final String message) { System.out.println("Publishing custom event. "); CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message); applicationEventPublisher.publishEvent(customSpringEvent); } }
Теперь создадим компонент, который будет обрабатывать события. Для этого можно использовать аннотацию @EventListener
:
import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @Component public class CustomSpringEventListener { @EventListener public void handleCustomSpringEvent(CustomSpringEvent event) { System.out.println("Received spring custom event - " + event.getMessage()); } }
Чтобы события обрабатывались асинхронно нужно включить поддержку асинхронности в конфигурации Spring и пометить метод-обработчик аннотацией @Async
.
Для этого необходимо добавить аннотацию @EnableAsync
в конфигурационный класс:
import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; @Configuration @EnableAsync public class AsyncConfig { }
Теперь изменим слушатель событий, чтобы он обрабатывал события асинхронно:
import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.context.event.EventListener; @Component public class CustomSpringEventListener { @Async @EventListener public void handleCustomSpringEvent(CustomSpringEvent event) { System.out.println("Received spring custom event - " + event.getMessage()); // доп. логика обработки события } }
Примеры использования
Кэширование
Предположим, есть приложение, которое кэширует данные. Когда данные изменяются, нужно обновить кэш. Можно юзать событие для уведомления всех компонентов о необходимости обновления кэша:
// событие изменения данных public class DataChangeEvent extends ApplicationEvent { private String dataId; public DataChangeEvent(Object source, String dataId) { super(source); this.dataId = dataId; } public String getDataId() { return dataId; } } // публикация события изменения данных @Component public class DataService { @Autowired private ApplicationEventPublisher eventPublisher; public void updateData(String dataId) { // логика обновления данных eventPublisher.publishEvent(new DataChangeEvent(this, dataId)); } } // обработчик события изменения данных @Component public class CacheService { @EventListener @Async public void handleDataChangeEvent(DataChangeEvent event) { System.out.println("Updating cache for dataId: " + event.getDataId()); // логика обновления кэша } }
Уведомление о завершении задач
Для этой задачи можно использовать событие для уведомления других компонентов о завершении длительных задач:
// событие завершения задачи public class TaskCompleteEvent extends ApplicationEvent { private String taskId; public TaskCompleteEvent(Object source, String taskId) { super(source); this.taskId = taskId; } public String getTaskId() { return taskId; } } // публикация события завершения задачи @Component public class TaskService { @Autowired private ApplicationEventPublisher eventPublisher; @Async public void executeTask(String taskId) { // логика выполнения задачи eventPublisher.publishEvent(new TaskCompleteEvent(this, taskId)); } } // обработчик события завершения задачи @Component public class NotificationService { @EventListener @Async public void handleTaskCompleteEvent(TaskCompleteEvent event) { System.out.println("Task completed: " + event.getTaskId()); // логика уведомления } }
Организация обработки сообщений с помощью Spring AMQP
AMQP — это протокол уровня приложений для мидлвэр-систем, предназначенных для передачи сообщений. Имеет функциональные возможности для межпрограммного взаимодействия и включает концепции очередей, маршрутизации, обменов и привязок.
RabbitMQ — это брокер сообщений, реализующий AMQP. Он позволяет обмениваться сообщениями и управлять их очередями.
Для старта работ с Spring AMQP в проекте Spring Boot необходимо добавить соответствующую зависимость в файл pom.xml
:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Далее необходимо настроить параметры подключения к RabbitMQ в файле application.properties
.
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
Создание и отправка сообщений с помощью AmqpTemplate
Для отправки сообщений используется AmqpTemplate
.
Для начала создадим конфигурационный класс, который определит все необходимые бины:
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { static final String queueName = "myQueue"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
Теперь создадим компонент, который будет отправлять сообщения:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { private final RabbitTemplate rabbitTemplate; @Autowired public Sender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void send(String message) { rabbitTemplate.convertAndSend(RabbitConfig.queueName, message); System.out.println("Sent: " + message); } }
Для обработки сообщений, поступающих в очередь, используется аннотация @RabbitListener
.
Добавим компонент, который будет обрабатывать входящие сообщения:
import org.springframework.stereotype.Component; @Component public class Receiver { public void receiveMessage(String message) { System.out.println("Received: " + message); } }
С аннтоацией @RabbitListener
можно декларативно указать методы, которые будут обрабатывать сообщения из очередей:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class MessageListener { @RabbitListener(queues = RabbitConfig.queueName) public void processMessage(String message) { System.out.println("Received message: " + message); } }
Пример
Полный пример с отправкой и получением сообщений:
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication @EnableRabbit public class SpringAmqpApplication { public static void main(String[] args) { SpringApplication.run(SpringAmqpApplication.class, args); } @Bean CommandLineRunner runner(Sender sender) { return args -> { System.out.println("Sending message..."); sender.send("Hello, RabbitMQ!"); }; } }
Примеры использования двух инструментов вместе
Обработка заказов с локальными событиями и асинхронной передачей
В этом примере система обрабатывает заказы локально, публикует событие о новом заказе с помощью Spring Events и затем отправляет заказ на дальнейшую обработку в другую службу через RabbitMQ:
// событие заказа public class OrderCreatedEvent extends ApplicationEvent { private String orderId; public OrderCreatedEvent(Object source, String orderId) { super(source); this.orderId = orderId; } public String getOrderId() { return orderId; } } // публикация события заказа @Component public class OrderService { @Autowired private ApplicationEventPublisher eventPublisher; public void createOrder(String orderId) { // логика создания заказа System.out.println("Order created: " + orderId); eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId)); } } // обработка локального события и отправка в RabbitMQ @Component public class OrderCreatedListener { @Autowired private AmqpTemplate amqpTemplate; @EventListener @Async public void handleOrderCreatedEvent(OrderCreatedEvent event) { System.out.println("Handling order created event for order: " + event.getOrderId()); // доп. логика обработки amqpTemplate.convertAndSend("orderQueue", event.getOrderId()); } } // конфигурация RabbitMQ @Configuration public class RabbitConfig { @Bean public Queue orderQueue() { return new Queue("orderQueue", false); } @Bean public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
Обновление состояния задач и уведомление через RabbitMQ
При обновлении состояния задачи генерируется локальное событие, которое затем отправляется в очередь RabbitMQ для уведомления других микросервисов о статусе задачи:
// событие обновления задачи public class TaskStatusUpdateEvent extends ApplicationEvent { private String taskId; private String status; public TaskStatusUpdateEvent(Object source, String taskId, String status) { super(source); this.taskId = taskId; this.status = status; } public String getTaskId() { return taskId; } public String getStatus() { return status; } } // публикация события обновления задачи @Component public class TaskService { @Autowired private ApplicationEventPublisher eventPublisher; public void updateTaskStatus(String taskId, String status) { System.out.println("Updating task status: " + taskId + " to " + status); eventPublisher.publishEvent(new TaskStatusUpdateEvent(this, taskId, status)); } } // обработка события обновления задачи и отправка в RabbitMQ @Component public class TaskStatusUpdateListener { @Autowired private AmqpTemplate amqpTemplate; @EventListener @Async public void handleTaskStatusUpdateEvent(TaskStatusUpdateEvent event) { System.out.println("Sending task status update to RabbitMQ: " + event.getTaskId() + " - " + event.getStatus()); amqpTemplate.convertAndSend("taskStatusQueue", event.getTaskId() + " - " + event.getStatus()); } } // конфигурация RabbitMQ @Configuration public class RabbitConfig { @Bean public Queue taskStatusQueue() { return new Queue("taskStatusQueue", false); } @Bean public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
В заключение напоминаю об открытых уроках, которые пройдут в рамках курса «Разработчик на Spring Framework» в Otus:
-
1 июля: Тестирование Spring приложений. Интеграционные тесты с контекстом. Тестирование слоя репозиториев и сервисов. Запись по ссылке
-
16 июля: Тестирование Spring приложений. Интеграционные тесты контроллеров, интеграций с внешними API и безопасности. Запись по ссылке
ссылка на оригинал статьи https://habr.com/ru/articles/825528/
Добавить комментарий