Статья за авторством Александра Романова, разработчика интеграционных решений.
В процессе интеграции систем мы часто сталкиваемся с необходимостью гарантированной доставки сообщений между системами. В таких случаях на помощь нам приходят очереди. Но не все задачи так просты, как доставка сообщений из системы А в систему Б. Бывают случаи, когда нужно обогатить доставляемые сообщения данными из смежных, участвующих в интеграции систем. Которые не всегда могут интегрироваться через очереди, а имеют лишь синхронные сервисы. И вот уже в нашей интеграции возникают такие явления, как недоступность, отказы и другие «приятные» особенности использования «синхронов». Можно было бы переложить обработку промежуточных отказов на систему-источник, но это некультурно, да и невозможно, если мы публикуем события сразу для нескольких систем (в топик).
Удобным и работающим решением проблемы, с нашей точки зрения, является асинхронная пошаговая обработка сообщения через внутреннюю очередь с вызовом внешнего сервиса на каждом шаге. В случае отказа сервиса по ошибке или из-за
его временной неработоспособности сообщение попадает во внутреннюю очередь отказов и отправляется повторно после разборов с возникшими в сервисе проблемами.
Данное решение так же устраняет проблему невозможности отката транзакции при работе с внешними сервисами. Никакой вызов не пройдет дважды — обработка начинается именно с того шага, на котором произошел сбой.
Всё вышеописанное очень легко реализуется на интеграционной шине, в которой асинхронное взаимодействие между компонентами через внутренние очереди идёт «из коробки». Но слишком высокие цены за «коробку» могут сильно затруднить использование интеграционной шины. Мы приведем пример реализации простого приложения на Spring Integration (далее SI) + Rabbit MQ. Оговоримся, что Rabbit MQ у себя в production мы не используем из-за невозможности его работы с XA.
Сердцем всего приложения является spring-integration-context.xml. Там описана компонентная модель, инициализируются ресурсные бины и менеджер транзакций для работы с MQ. Опишем его подробнее.
Подключаем встроенный в SI драйвер и прописываем ресурсы
<rabbit:queue name="si.test.queue.to"/> <rabbit:queue name="si.test.queue.from"/>
Нам необходим низкоуровневый бин amqpTemplate, через который осуществляется взаимодействие с ресурсами. Данный бин мы используем напрямую в тестах, и он требуется для компонент SI, которые работают с Rabbit MQ. ConnectionFactory, необходимый для подключения к ресурсам, конфигурит Spring Boot по настройкам из application.yml (см. org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration).
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>
Для транзакционной работы с Rabbit MQ требуется TransactionManager (нужен для отката сообщения обратно в очередь, если в процессе работы произойдет ошибка). К сожалению, Rabbit MQ не поддерживает XA-транзакции, иначе менеджер транзакций сконфигурил бы Spring Boot. Конфигурим предоставляемый Spring-ом вручную.
<bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/> </bean>
А теперь самое приятное. «Рисуем» flow! В кавычках, потому что пишем в виде xml, что менее приятно.
Flow
Нам потребуются:
- inbound-adapter для чтения из входной очереди;
- асинхронные компоненты для вызовов REST-сервисов;
- outbound-channel-adapter для записи в выходную очередь.
Для организации гарантированной доставки в данном приложении мы используем асинхронные вызовы через внутреннюю очередь. Так как компонент много, требуется несколько очередей, что неудобно ни с точки зрения разработки, ни администратору. Постараемся обойтись одной.
Рассмотрим сценарий взаимодействия между двумя компонентами. SomeComponentOne получает сообщение из канала, вызывает некий синхронный REST-сервис (работает с БД, пишет в файл и т.п.) и отправляет сообщение на дальнейшую обработку, которой должна заниматься SomeComponentTwo. Если SomeComponentOne не смогла выполнить порученный ей кусок работы, она должна откатить транзакцию и вернуть полученное сообщение туда, откуда она его забрала. Если всё хорошо — отправить сообщение во внутреннюю очередь и завершить транзакцию. SomeComponentOne забирает сообщение из внутренней очереди и отправляет сообщение в неё же, при этом не обязательно в том же виде, в котором получила. Сообщение может быть обогащено или изменено, нам не важно. Оно предназначено для работы компоненты SomeComponentTwo. Возникает проблема роутинга. Сообщение попадает во внутреннюю очередь и должно забираться оттуда нужной в данный момент времени компонентой. Другими словами, необходим роутинг.
В данном приложении продемонстрирован роутинг, основанный на заголовках сообщения. У сообщения заполняется кастомный заголовок PartnerComponent, указывающий на компоненту, которая должна работать с сообщением.
Распишем технические детали представленного flow.
Адаптер для чтения из входной очереди. Получает сообщение и в транзакции бросает его сразу во внутреннюю очередь.
<int-amqp:inbound-channel-adapter channel="innerChannel" queue-names="si.test.queue.to" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> <int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>
Мы использовали специализированный для работы с очередями асинхронный канал, предоставляемый Spring-ом. Получили интерфейс SI-channel, а хранение сообщений непосредственно в очереди, в нашем случае во внутренней mq-очереди приложения. При получении сообщения из данного канала-очереди будет открываться транзакция, т.к. мы подключили наш менеджер транзакций.
К данному каналу-очереди подключаем SI-роутер, работающий на заголовках сообщений.
<int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent" default-output-channel="component1Channel"> <int:mapping value="ComponentTwo" channel="component2Channel"/> <int:mapping value="ComponentThree" channel="component3Channel"/> <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/> </int:header-value-router>
Новое для flow сообщение не имеет технического заголовка PartnerComponent, поэтому по-умолчанию будет обрабатываться компонентой someComponentOne, обязанностью которой является указание в заголовке сообщения PartnerComponent следующей компоненты и отправка сообщения во внутреннюю очередь. Роутер вновь забирает сообщение из внутренней очереди и отправляет его на обработку в указанную компоненту.
Описание компонент, в которые отправляются сообщения из роутера.
<int:channel id="component1Channel"/> <int:service-activator input-channel="component1Channel" ref="someComponentOne" method="process"/> <int:channel id="component2Channel"/> <int:service-activator input-channel="component2Channel" ref="someComponentTwo" method="process"/> <int:channel id="component3Channel"/> <int:service-activator input-channel="component3Channel" ref="someComponentThree" method="process"/> <int:channel id="outboundRabbitChannel"/> <int:service-activator input-channel="outboundRabbitChannel" ref="outboundRabbitComponent" method="process"/
Адаптер для отправки в выходную очередь.
<int:channel id="toRabbit"/> <int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/>
Сборка (pom.xml)
Старый добрый Maven.
Стандартная сборка от Spring Boot. Зависимости от SI и AMQP предоставляют все необходимые библиотеки. Также подключаем spring-boot-starter-test для реализации проверочных кейсов на JUnit.
Работа SomeComponent*.java
Транзакционные бины, подключенные как service-activator к flow SI. Вызов REST через RestTemplate и отправка во внутреннюю очередь через innerChannel. Достаточно, чтобы продемонстрировать работу с сервисом и удобно за-mock-ить в тестах.
Тестируем
В тесте testHappyPath мы проверили работоспособность flow, когда нет сбоев при вызове REST. Mock-аем все вызовы REST-сервисов без сбоев, бросаем сообщение во входную очередь, ждем в выходной, проверяем прохождение всех компонент по контенту тела полученного сообщения.
В тесте testGuaranteedDelivery мы проверили гарантированную доставку при сбое в одном из REST. Эмулируем однократный сбой при вызове сервиса из третьей компоненты, ждём доставку сообщения до выходной очереди, проверяем тело полученного сообщения.
Заключение
Мы сделали приложение с гарантированной доставкой. Остался ряд мелких нерешенных вопросов: повторная отправка сообщений происходит бесконечно и неуправляемо, кто поддерживает данное решение — администратор или эту поддержку можно автоматизировать? Мы решаем данные вопросы при помощи самописных приложений и индивидуальных настроек для каждого типа сообщения. Возможно, в будущем мы опишем эти решения.
приложение целиком на git-hub
ссылка на оригинал статьи https://habrahabr.ru/post/315786/
Добавить комментарий