Создание компонента Apache Camel

от автора

Приветствую, сообщество!

Меня зовут Александр, я java разработчик в компании БФТ-Холдинг. Тружусь я на проекте СМЭВ-адаптера, где мы занимаемся транзитивной обработкой сообщений. В нашу зону ответственности входит несколько микросервисов, которые обрабатывают очень много сообщений, почти ничего не пишут в БД, но часто обращаются в сторонние прикладные сервисы.

Для отслеживания пути сообщения через наши микросервисы мы используем Zipkin. Помимо этого в проекте задействован Apache Camel, с помощью которого мы выстраиваем цепочку обработки сообщения в одном конкретном микросервисе. Стандартные средства для работы с Zipkin обычно позволяют легко добавить к трассе вход, выход в сервис и запись в БД, но,
т.к. к нас не совсем стандартное поведение у сервисов, нам хотелось выделять в Zipkin и обращение в сторонние сервисы.

Хотелось эту логику как-то элегантно встроить в роут Camel, но существующие средства такой возможности не предоставляли.

Было принято решение написать свой компонент для Apache Camel. Делали мы это впервые и, к сожалению, полноценного гайда в интернетах найти не удалось…

Встречайте! Гайд по написанию собственного Camel-компонента!

Основные составляющие camel-компонента:

* Component — отвечает за создание Endpointa и является входной точкой в ваш компонент.
* Endpoint — отвечает за создание Producer и Consumer. Также хранит в себе параметры из урла.
* Producer — принимает запросы к вашему компоненту (`to(«ref»)`).
* Consumer — отправляет сообщения для слушателей вашего компонента (`from(«ref»)`).

Теперь подробнее про каждого.

Component

Класс компонента необходимо аннотировать @Component и передать имя вашего camel-компонента. Также отнаследовать его от абстрактного класса DefaultComponent и переопределить метод createEndpoint. Как можно было догадаться этот метод отвечает за создание Endpoint и важно позаботиться о том, чтобы все необходимые зависимости попали в него, если вы не желаете их получать потом обходными путями. Кроме этого, в этом методе определяются параметры из урла.

@Component("zipkintrace") public class ZipkinTraceComponent extends DefaultComponent {      // Зависимости     private final ZipkinTraceProperties zipkinTraceProperties;     private final ZipkinTraceCache zipkinTraceCache;       public ZipkinTraceComponent(         CamelContext context, ZipkinService zipkinService, ZipkinTraceProperties zipkinTraceProperties,         ZipkinTraceCache zipkinTraceCache     ) {         super(context);         this.zipkinService = zipkinService;         this.zipkinTraceProperties = zipkinTraceProperties;         this.zipkinTraceCache = zipkinTraceCache;     }      @Override     protected ZipkinTraceEndpoint createEndpoint(         String uri, String remaining, Map<String, Object> parameters     ) throws Exception {          ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint(             uri, this, zipkinTraceProperties, zipkinTraceCache         );          // Сохранения параметров из урла         setProperties(endpoint, parameters);         endpoint.setAction(remaining);          return endpoint;     } }

Endpoint

Этот класс поинтереснее. Тут описывается вся необходимая информация для Apache Camel.

Прежде всего аннотируем его @UriEndpoint. Аннотация принимает множество параметров, описание которых вы найдёте в javaDoc её файла.

Если вы не хотите полностью настраивать Endpoint для сamel, наследуемся от DefaultEndpoint и имплементируем AsyncEndpoint, чтобы дать понять фреймворку, что Endpoint поддерживает асинхронную обработку сообщений.

В полях класса определяем все возможные параметры, которые можно передать в урле и помечаем их соответствующими аннотациями.

Важно! У каждого такого поля должен быть геттер и сеттер с описанным JavaDoc для них. Иначе camel-компонент не собрать.

В этом же классе переопределяем методы создания Producer и Consumer

@UriEndpoint(     firstVersion = "3.21.0",     scheme = "zipkintrace",     syntax = "zipkintrace:action",     title = "zipkintrace",     category = Category.LOG,     producerOnly = true,     headersClass = ZipkinTraceConstants.class ) public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint {     private final ZipkinTraceProperties zipkinTraceProperties;     private final ZipkinTraceCache zipkinTraceCache;      @UriPath     @Metadata(required = true)     private String action;     @UriParam     private String route;     @UriParam     private String processor;     @UriParam     private String messageId;     @UriParam     private String originalMessageId;     @UriParam     private String iisId;     @UriParam     private boolean buildTraceContext;     @UriParam(description = "Трасса, которую необходимо продолжить")     private String traceContext;      public ZipkinTraceEndpoint(String endpointUri, Component component,                                ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) {         super(endpointUri, component);         this.zipkinTraceProperties = zipkinTraceProperties;         this.zipkinTraceCache = zipkinTraceCache;     }      @Override     public Producer createProducer() throws Exception {         return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache);     }          @Override     public Consumer createConsumer(Processor processor) throws Exception {         throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him");     }          /**      * Действие относительно трассы zipkin.      * Перечень в ZipkinTraceAction      */     public String getAction() {         return action;     }          /**      * Действие относительно трассы zipkin.      * Перечень в ZipkinTraceAction      */     public void setAction(String action) {         this.action = action;     }          // остальные геттеры и сеттеры } 

Producer

Тут всё проще. Как и раньше, если не хотим полностью настраивать Producer, используем стандартный абстрактный класс — DefaultProducer или DefaultAsyncProducer. Переопределяем getEndpoint, чтобы не получать стандартный интерфейс, и метод полезный работы process. В асинхронном варианте последний метод будет иметь в параметрах callback для завершения потока.

public class ZipkinTraceProduces extends DefaultAsyncProducer {        private final ZipkinTraceProperties zipkinTraceProperties;     private final ZipkinTraceCache zipkinTraceCache;          public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint,                                ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache     ) {         super(endpoint);         this.zipkinTraceProperties = zipkinTraceProperties;         this.zipkinTraceCache = zipkinTraceCache;     }          @Override     public ZipkinTraceEndpoint getEndpoint() {         return (ZipkinTraceEndpoint) super.getEndpoint();     }          @Override     public boolean process(Exchange exchange, AsyncCallback callback) {              if (!isRunAllowed()) {             return shutDownWithException(exchange, callback);         }              try {                  // полезная работа                          callback.done(true);             return true;         } catch (Throwable e) {             exchange.setException(e);             callback.done(true);                  return true;         }     }          private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) {              if (isNull(exchange.getException())) {             exchange.setException(new RejectedExecutionException());         }         callback.done(true);         return true;     }  } 

Consumer

В этом классе определяется логика, которая будет отправлять сообщения слушателям. Например, может запускаться слушатель очереди или какая-то крон-задача.

Для этого используем класс DefaultConsumer в качестве родительского и переопределяем методы doStartdoStop.

Если поток сообщений может быть приостановлен (не полное отключение), нужно пометить класс интерфейсом Suspendable. Методы для обработки этого поведения doSuspend и doResume

public class ZipkinTraceConsumer extends DefaultConsumer {        public ZipkinTraceConsumer(Endpoint endpoint, Processor processor) {         super(endpoint, processor);     }          @Override     protected void doStart() throws Exception {         super.doStart();     }          @Override     protected void doStop() throws Exception {         super.doStop();     }          @Override     protected void doSuspend() {     }          @Override     protected void doResume() throws Exception {     }  }

Теперь неочевидное

К сожалению, чтобы фреймворк заметил ваш компонент и позволил его использовать, действий, описанных выше, недостаточно. Помимо всего этого нужно добавить плагин, который сгенерирует метаинформацию по вашему компоненту во время компиляции кода.

<plugins>     <plugin>         <groupId>org.apache.camel</groupId>         <artifactId>camel-component-maven-plugin</artifactId>         <version>${camel-version}</version>         <executions>             <execution>                 <id>generate</id>                 <goals>                     <goal>generate</goal>                 </goals>                 <phase>process-classes</phase>             </execution>         </executions>     </plugin> </plugins>

Также создать сам файл мета информации в пакете

resources/META-INF/services/org/apache/camel/component/

Файл назвать по имени компонента.

Содержимое файла

class=ru.gov.pfr.ecp.iis.smev.adapter.zipkin.camel.component.ZipkinTraceComponent

И вот теперь camel признает все ваши труды.

Что получилось у нас

 from("direct:" + FSSP_REPORT_ARREST_PROCESSING_ROUTE)     .routeId(FSSP_REPORT_ARREST_PROCESSING_ROUTE)     .log(LoggingLevel.INFO, FSSP_REPORT_ARREST_PROCESSING_ROUTE + ".start")     .to("zipkintrace:scoped?processor=ReportArrestXmlEACreateProcessor") <-- Обращение в условный S3     .process(reportArrestXmlEACreateProcessor)     .to("zipkintrace:scoped?processor=ReportArrestXmlSignProcessor") <-- Обращение в прикладной сервис     .process(reportArrestXmlSignProcessor)     .to("zipkintrace:scoped?processor=ReportArrestArchiveCreateProcessor") <-- Сохранение результата в S3     .process(reportArrestArchiveCreateProcessor)     .to("zipkintrace:end")     .process(convertProcessor);

Надеюсь, эта статья будет полезна и убережёт вас от подводных каменей Apache Camel.

Полезные материалы

  1. Код примера

  2. Документация фреймворка

  3. Другие компоненты в открытом доступе

  4. Stackoverflow


ссылка на оригинал статьи https://habr.com/ru/articles/796607/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *