Приветствую, сообщество!
Меня зовут Александр, я java разработчик в компании БФТ-Холдинг. Тружусь я на проекте СМЭВ-адаптера, где мы занимаемся транзитивной обработкой сообщений. В нашу зону ответственности входит несколько микросервисов, которые обрабатывают очень много сообщений, почти ничего не пишут в БД, но часто обращаются в сторонние прикладные сервисы.
Для отслеживания пути сообщения через наши микросервисы мы используем Zipkin. Помимо этого в проекте задействован Apache Camel, с помощью которого мы выстраиваем цепочку обработки сообщения в одном конкретном микросервисе. Стандартные средства для работы с Zipkin обычно позволяют легко добавить к трассе вход, выход в сервис и запись в БД, но,
т.к. к нас не совсем стандартное поведение у сервисов, нам хотелось выделять в Zipkin и обращение в сторонние сервисы.
Хотелось эту логику как-то элегантно встроить в роут Camel, но существующие средства такой возможности не предоставляли.
Было принято решение написать свой компонент для Apache Camel. Делали мы это впервые и, к сожалению, полноценного гайда в интернетах найти не удалось…
Встречайте! Гайд по написанию собственного Camel-компонента!
Основные составляющие camel-компонента:
* Component — отвечает за создание Endpointa и является входной точкой в ваш компонент.
* Endpoint — отвечает за создание Producer и Consumer. Также хранит в себе параметры из урла.
* Producer — принимает запросы к вашему компоненту (`to(«ref»)`).
* Consumer — отправляет сообщения для слушателей вашего компонента (`from(«ref»)`).
Теперь подробнее про каждого.
Component
Класс компонента необходимо аннотировать @Component
и передать имя вашего camel-компонента. Также отнаследовать его от абстрактного класса DefaultComponent
и переопределить метод createEndpoint
. Как можно было догадаться этот метод отвечает за создание Endpoint и важно позаботиться о том, чтобы все необходимые зависимости попали в него, если вы не желаете их получать потом обходными путями. Кроме этого, в этом методе определяются параметры из урла.
@Component("zipkintrace") public class ZipkinTraceComponent extends DefaultComponent { // Зависимости private final ZipkinTraceProperties zipkinTraceProperties; private final ZipkinTraceCache zipkinTraceCache; public ZipkinTraceComponent( CamelContext context, ZipkinService zipkinService, ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache ) { super(context); this.zipkinService = zipkinService; this.zipkinTraceProperties = zipkinTraceProperties; this.zipkinTraceCache = zipkinTraceCache; } @Override protected ZipkinTraceEndpoint createEndpoint( String uri, String remaining, Map<String, Object> parameters ) throws Exception { ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint( uri, this, zipkinTraceProperties, zipkinTraceCache ); // Сохранения параметров из урла setProperties(endpoint, parameters); endpoint.setAction(remaining); return endpoint; } }
Endpoint
Этот класс поинтереснее. Тут описывается вся необходимая информация для Apache Camel.
Прежде всего аннотируем его @UriEndpoint
. Аннотация принимает множество параметров, описание которых вы найдёте в javaDoc её файла.
Если вы не хотите полностью настраивать Endpoint для сamel, наследуемся от DefaultEndpoint
и имплементируем AsyncEndpoint
, чтобы дать понять фреймворку, что Endpoint поддерживает асинхронную обработку сообщений.
В полях класса определяем все возможные параметры, которые можно передать в урле и помечаем их соответствующими аннотациями.
Важно! У каждого такого поля должен быть геттер и сеттер с описанным JavaDoc для них. Иначе camel-компонент не собрать.
В этом же классе переопределяем методы создания Producer и Consumer
@UriEndpoint( firstVersion = "3.21.0", scheme = "zipkintrace", syntax = "zipkintrace:action", title = "zipkintrace", category = Category.LOG, producerOnly = true, headersClass = ZipkinTraceConstants.class ) public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint { private final ZipkinTraceProperties zipkinTraceProperties; private final ZipkinTraceCache zipkinTraceCache; @UriPath @Metadata(required = true) private String action; @UriParam private String route; @UriParam private String processor; @UriParam private String messageId; @UriParam private String originalMessageId; @UriParam private String iisId; @UriParam private boolean buildTraceContext; @UriParam(description = "Трасса, которую необходимо продолжить") private String traceContext; public ZipkinTraceEndpoint(String endpointUri, Component component, ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) { super(endpointUri, component); this.zipkinTraceProperties = zipkinTraceProperties; this.zipkinTraceCache = zipkinTraceCache; } @Override public Producer createProducer() throws Exception { return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache); } @Override public Consumer createConsumer(Processor processor) throws Exception { throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him"); } /** * Действие относительно трассы zipkin. * Перечень в ZipkinTraceAction */ public String getAction() { return action; } /** * Действие относительно трассы zipkin. * Перечень в ZipkinTraceAction */ public void setAction(String action) { this.action = action; } // остальные геттеры и сеттеры }
Producer
Тут всё проще. Как и раньше, если не хотим полностью настраивать Producer, используем стандартный абстрактный класс — DefaultProducer
или DefaultAsyncProducer
. Переопределяем getEndpoint
, чтобы не получать стандартный интерфейс, и метод полезный работы process
. В асинхронном варианте последний метод будет иметь в параметрах callback
для завершения потока.
public class ZipkinTraceProduces extends DefaultAsyncProducer { private final ZipkinTraceProperties zipkinTraceProperties; private final ZipkinTraceCache zipkinTraceCache; public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint, ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache ) { super(endpoint); this.zipkinTraceProperties = zipkinTraceProperties; this.zipkinTraceCache = zipkinTraceCache; } @Override public ZipkinTraceEndpoint getEndpoint() { return (ZipkinTraceEndpoint) super.getEndpoint(); } @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (!isRunAllowed()) { return shutDownWithException(exchange, callback); } try { // полезная работа callback.done(true); return true; } catch (Throwable e) { exchange.setException(e); callback.done(true); return true; } } private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) { if (isNull(exchange.getException())) { exchange.setException(new RejectedExecutionException()); } callback.done(true); return true; } }
Consumer
В этом классе определяется логика, которая будет отправлять сообщения слушателям. Например, может запускаться слушатель очереди или какая-то крон-задача.
Для этого используем класс DefaultConsumer
в качестве родительского и переопределяем методы doStart
, doStop
.
Если поток сообщений может быть приостановлен (не полное отключение), нужно пометить класс интерфейсом Suspendable
. Методы для обработки этого поведения doSuspend
и doResume
public class ZipkinTraceConsumer extends DefaultConsumer { public ZipkinTraceConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @Override protected void doStart() throws Exception { super.doStart(); } @Override protected void doStop() throws Exception { super.doStop(); } @Override protected void doSuspend() { } @Override protected void doResume() throws Exception { } }
Теперь неочевидное
К сожалению, чтобы фреймворк заметил ваш компонент и позволил его использовать, действий, описанных выше, недостаточно. Помимо всего этого нужно добавить плагин, который сгенерирует метаинформацию по вашему компоненту во время компиляции кода.
<plugins> <plugin> <groupId>org.apache.camel</groupId> <artifactId>camel-component-maven-plugin</artifactId> <version>${camel-version}</version> <executions> <execution> <id>generate</id> <goals> <goal>generate</goal> </goals> <phase>process-classes</phase> </execution> </executions> </plugin> </plugins>
Также создать сам файл мета информации в пакете
resources/META-INF/services/org/apache/camel/component/
Файл назвать по имени компонента.
Содержимое файла
class=ru.gov.pfr.ecp.iis.smev.adapter.zipkin.camel.component.ZipkinTraceComponent
И вот теперь camel признает все ваши труды.
Что получилось у нас
from("direct:" + FSSP_REPORT_ARREST_PROCESSING_ROUTE) .routeId(FSSP_REPORT_ARREST_PROCESSING_ROUTE) .log(LoggingLevel.INFO, FSSP_REPORT_ARREST_PROCESSING_ROUTE + ".start") .to("zipkintrace:scoped?processor=ReportArrestXmlEACreateProcessor") <-- Обращение в условный S3 .process(reportArrestXmlEACreateProcessor) .to("zipkintrace:scoped?processor=ReportArrestXmlSignProcessor") <-- Обращение в прикладной сервис .process(reportArrestXmlSignProcessor) .to("zipkintrace:scoped?processor=ReportArrestArchiveCreateProcessor") <-- Сохранение результата в S3 .process(reportArrestArchiveCreateProcessor) .to("zipkintrace:end") .process(convertProcessor);
Надеюсь, эта статья будет полезна и убережёт вас от подводных каменей Apache Camel.
Полезные материалы
ссылка на оригинал статьи https://habr.com/ru/articles/796607/
Добавить комментарий