Свойство типа Controller Service в кастомном процессоре NiFi

от автора

Преамбула

В одном из самодельных процессоров Apache NiFi у меня возникла необходимость работать с файлами сертификатов. Так как в проекте уже были настроенные службы сертификатов (типа StandardRestrictedSSLContextService), я решил сделать у процессора свойство с типом SSLContextService, чтобы можно было подставлять уже настроенные в NiFi службы контроллера и брать данные сертификатов оттуда. Изучил матчасть, ничего сложного. Добавил свойство в процессор, закинул его на flow. Но… процессор не видит мои StandardRestrictedSSLContextService. Пересмотрел и перечитал множество статей, в том числе от уважаемого Pierre Villard, но никак. Пока не наткнулся на реализацию похожего кейса на Github.

Возможно, это банальная проблема, но у меня она вызвала приличные затруднения, поэтому я решил об этом написать. Ниже приведу примеры кода, а так же покажу, как подсунуть свою заглушку в интеграционный тест процессора (что тоже оказалось не совсем очевидной задачей).

Добавление свойства в процессор

Добавляем в процессор свойство, через которое можно будет указывать стандартные службы, обеспечивающие доступ к сертификатам.

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()     .name("SSL Context Service")     .description("SSL Context Service provides trusted certificates and client certificates for TLS communication.")     .required(false)     .identifiesControllerService(SSLContextService.class)     .build();

В коде метода @OnTrigger процессора доступ к значению свойства будем получать вот так:

SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

Теперь наверно самый важный участок этой маленькой статьи, ради которого все и затевалось. Правильное добавление зависимостей!

1) В pom.xml основного модуля процессора добавим две зависимости:

<dependency>     <groupId>org.apache.nifi</groupId>     <artifactId>nifi-ssl-context-service-api</artifactId>     <version>${nifi.version}</version> </dependency>   <dependency>     <groupId>org.apache.nifi</groupId>     <artifactId>nifi-standard-services-api-nar</artifactId>     <version>${nifi.version}</version>     <type>nar</type> </dependency>

2) В pom.xml nar-модуля добавим одну зависимость:

<dependency>     <groupId>org.apache.nifi</groupId>     <artifactId>nifi-standard-services-api-nar</artifactId>     <version>${nifi.version}</version>     <type>nar</type> </dependency>

nifi-ssl-context-service-api — с помощью этой зависимости в модуле процессора у нас появляется возможность использовать интерфейс SSLContextService.class в коде основного модуля процессора

nifi-standard-services-api-nar — эта зависимость в обоих модулях позволит процессору использовать стандартные службы NiFi

Собственно, вот и весь фокус.

Если надо понять, какую зависимость придется использовать, когда нужно добавить в процессор свойство, отличное по типу от SSLContextService, то помочь в этом может анализ исходников NiFi.

Тестирование процессора, в котором используется служба в качестве одного из свойств

Небольшая проблема заключается в том, что для тестирования процессора, у которого одно из свойств это Controller Service, нам надо обязательно создать некую заглушку этого сервиса. А потом и указать ее в качестве одного из свойств.

Подготовим вспомогательные тестовые данные:

DummySSLContextService.class — заглушка для службы
import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; import lombok.Builder; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.ssl.SSLContextService;  @Builder public class DummySSLContextService extends AbstractControllerService implements SSLContextService {     private final String keyStoreFile;     private final String keyStorePassword;     private final String keyStoreType;     private final String trustStoreFile;     private final String trustStorePassword;     private final String trustStoreType;      @Override     public TlsConfiguration createTlsConfiguration() {         throw new RuntimeException("Method not implemented");     }      @Override     public SSLContext createContext() {         throw new RuntimeException("Method not implemented");     }      @Override     public SSLContext createSSLContext(org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {         throw new RuntimeException("Method not implemented");     }      @Override     public SSLContext createSSLContext(ClientAuth clientAuth) throws ProcessException {         throw new RuntimeException("Method not implemented");     }      @Override     public X509TrustManager createTrustManager() {         throw new RuntimeException("Method not implemented");     }      @Override     public String getTrustStoreFile() {         return trustStoreFile;     }      @Override     public String getTrustStoreType() {         return trustStoreType;     }      @Override     public String getTrustStorePassword() {         return trustStorePassword;     }      @Override     public boolean isTrustStoreConfigured() {         throw new RuntimeException("Method not implemented");     }      @Override     public String getKeyStoreFile() {         return keyStoreFile;     }      @Override     public String getKeyStoreType() {         return keyStoreType;     }      @Override     public String getKeyStorePassword() {         return keyStorePassword;     }      @Override     public String getKeyPassword() {         throw new RuntimeException("Method not implemented");     }      @Override     public boolean isKeyStoreConfigured() {         throw new RuntimeException("Method not implemented");     }      @Override     public String getSslAlgorithm() {         throw new RuntimeException("Method not implemented");     }      @Override     public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {         // ничего не делаем     } }

TestData.class — константы и экземпляр службы-заглушки для тестов
import java.nio.file.Path;  import org.apache.nifi.ssl.SSLContextService;  public class TestData {     static final String CLIENT_CERT_FILE_NAME = Path.of("src/test/resources/client-identity.jks").toAbsolutePath().toString();     static final String CLIENT_TRUST_FILE_NAME = Path.of("src/test/resources/truststore.jks").toAbsolutePath().toString();     public static final String KEYSTORE_COMMON_PASSWORD = "changeit";     public static final String KEYSTORE_COMMON_TYPE = "JKS";      public static final SSLContextService TEST_SSL_CONTEXT_SERVICE = DummySSLContextService.builder()             .keyStoreFile(CLIENT_CERT_FILE_NAME)             .keyStorePassword(KEYSTORE_COMMON_PASSWORD)             .keyStoreType(KEYSTORE_COMMON_TYPE)             .trustStoreFile(CLIENT_TRUST_FILE_NAME)             .trustStorePassword(KEYSTORE_COMMON_PASSWORD)             .trustStoreType(KEYSTORE_COMMON_TYPE)             .build(); }

И собственно тест (тут SSL_CONTEXT_SERVICE это дескриптор свойства процессора):

    @Test     void should_run_with_service_property() throws InitializationException {         final TestRunner testRunner = TestRunners.newTestRunner(MyProcessor.class);          // устанавливаем значение для одного из свойств тестового процессора         testRunner.setProperty(SOME_PROPERTY_NAME, SOME_PROPERTY_VALUE);                // устанавливаем для тестового процессора свойство, в котором указывается Controller Service         // это делается в три шага         testRunner.addControllerService("TestSSLContextService", TEST_SSL_CONTEXT_SERVICE);         testRunner.setProperty(SSL_CONTEXT_SERVICE, "TestSSLContextService");         testRunner.enableControllerService(TEST_SSL_CONTEXT_SERVICE);          // создаем набор аттрибутов для тестового flow-файла         Map<String, String> attributes = new HashMap<>();         attributes.put(SOME_ATTR_NAME, SOME_ATTR_VALUE);          // добавляем контент и аттрибуты в тестовый flow-файл         // вместо строки сюда может быть передан поток         testRunner.enqueue("Flowfile content", attributes);          // When         testRunner.run();          // Then         List<MockFlowFile> originalFlowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);         List<MockFlowFile> failureFlowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);          assertThat(originalFlowFiles.size()).isEqualTo(1);         assertThat(failureFlowFiles.size()).isZero();          Map<String, String> actualAttributes = originalFlowFiles.get(0).getAttributes();         assertThat(actualAttributes)                 .isNotNull()                 .containsEntry(SOME_ATTR_NAME, SOME_ATTR_VALUE);     }

Заключение

Вот и все. Надеюсь эта маленькая статья будет полезна. Замечания и указания на неточности в комментариях очень приветствуются))

Спасибо, что дочитали до конца.


ссылка на оригинал статьи https://habr.com/ru/articles/752690/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *