Автоматизация доставки flow в Apache NiFi

от автора

Всем привет!

Задача заключается в следующем — есть flow, представленный на картинке выше, который надо раскатить на N серверов с Apache NiFi. Flow тестовый — идет генерация файла и отправка в другой инстанс NiFi. Передача данных происходит с помощью протокола NiFi Site to Site.


NiFi Site to Site (S2S) — безопасный, легко настраиваемый способ передачи данных между инстансами NiFi. Как работает S2S смотрите в документации и важно не забыть настроить инстанс NiFi, чтобы разрешить S2S смотрите тут.

В тех случаях, когда речь идет о передачи данных с помощью S2S — один инстанс называется клиентским, второй серверным. Клиентский отправляет данные, серверный — принимает. Два способа настроить передачу данных между ними:

  1. Push. С клиентского инстанса данные отправляются с помощью Remote Process Group (RPG). На серверном инстансе данные принимаются с помощью Input Port
  2. Pull. Сервер принимает данные с помощью RPG, клиент отправляет с помощью Output port.


Flow для раскатки храним в Apache Registry.


Apache NiFi Registry — подпроект Apache NiFi, представляющий инструмент для хранения flow и управления версиями. Этакий GIT. Информацию об установке, настройке и работе с registry можно найти в официальной документации. Flow для хранения объединяется в process group и в таком виде хранится в registry. Далее в статье к этому еще вернемся.


На старте, когда N малое число, flow доставляется и актуализируется руками за приемлемое время.

Но с ростом N, проблем становится больше:

  1. на актуализацию flow уходит больше времени. Надо зайти на все сервера
  2. возникают ошибки актуализации шаблонов. Вот тут обновили, а тут забыли
  3. ошибки человека при выполнении большого количества однотипных операций

Всё это подводит нас к тому, что надо автоматизировать процесс. Я пробовал следующие способы решения этой задачи:

  1. Использовать MiNiFi вместо NiFi
  2. NiFi CLI
  3. NiPyAPI

Использование MiNiFi

Apache MiNiFy — подпроект Apache NiFi. MiNiFy — компактный агент, использующий те же самые процессоры, что и NiFi, позволяющий создавать те же flow, что и в NiFi. Легковесность агента достигается в том числе за счет того, что у MiNiFy нет графического интерфейса для конфигурации flow. Отсутствие графического интерфейса у MiNiFy означает, что необходимо решать проблему доставки flow в minifi. Поскольку, MiNiFy активно используется в IOT, компонентов много и процесс доставки flow до конечных экземпляров minifi надо автоматизировать. Знакомая задача, правда?

Решить такую задачу поможет еще один подпроект — MiNiFi C2 Server. Этот продукт предназначен для того, чтобы быть центральной точкой в архитектуре раскатки конфигураций. Как сконфигурировать окружение — описано в этой статье на Хабре и информации достаточно для решения поставленной задачи. MiNiFi в связке с C2 server автоматическом режиме обновляет конфигурацию у себя. Единственный недостаток такого подхода — приходится создавать шаблоны на C2 Server, простого коммита в registry не достаточно.

Вариант, описанный в статье выше рабочий и не сложный для реализации, но надо не забывать следующее:

  1. В minifi есть не все процессоры из nifi
  2. Версии процессоров в Minifi отстают от версий процессоров в NiFi.

На момент написания публикации последняя версия NiFi — 1.9.2. Версия процессоров последней версии MiNiFi — 1.7.0. Процессоры можно добавлять в MiNiFi, но из-за расхождения версий между процессорами NiFi и MiNiFi это может не сработать.

NiFi CLI

Судя по описанию инструмента на официальном сайте, это инструмент для автоматизации взаимодействия NiFI и NiFi Registry в области доставки flow или управления процессами. Для начала работы этот инструмент необходимо скачать отсюда.

Запускаем утилиту

./bin/cli.sh            _     ___  _  Apache   (_)  .' ..](_)   ,  _ .--.   __  _| |_  __    )\ [ `.-. | [  |'-| |-'[  |  /  \ |  | | |  | |  | |   | | '    ' [___||__][___][___] [___]',  ,'                            `'           CLI v1.9.2  Type 'help' to see a list of available commands, use tab to auto-complete. 

Для того, чтобы нам подгрузить необходимый flow из registry, нам надо знать идентификаторы корзины (bucket identifier) и самого flow (flow identifier). Эти данные можно получить либо через cli, либо в веб-интерфейсе NiFi registry. В веб интерфейсе выглядит так:

С помощью CLI делается так:

#> registry list-buckets -u http://nifi-registry:18080  #   Name             Id                                     Description -   --------------   ------------------------------------   ----------- 1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)  #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080  #   Name           Id                                     Description -   ------------   ------------------------------------   ----------- 1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85 

Запускаем импорт process group из registry:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080  7f522a13-016e-1000-e504-d5b15587f2f3 

Важный момент — в качестве хоста, на который мы накатываем process group может быть указан любой инстанс nifi.

Process group добавлен со стопнутыми процессорами, их надо запустить

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080 

Отлично, процессоры стартанули. Однако, нам по условиям задачи надо, чтобы инстансы NiFi отправляли данные на другие инстансы. Предположим, что для передачи данных на сервер выбрали способ Push. Для того, чтобы организовать передачу данных, надо на добавленном Remote Process Group (RPG), который уже включен в наш flow включить передачу данных (Enable transmitting).

В документации в CLI и других источниках я не нашел способа включить передачу данных. Если вы знаете как это сделать — напишите, пожалуйста, в комментариях.

Раз уж у нас bash и мы готовы идти до конца — найдем выход! Можно воспользоваться NiFi API для решения этой проблемы. Воспользуемся следующим методом, ID берем из примеров выше (в нашем случае это 7f522a13-016e-1000-e504-d5b15587f2f3). Описание методов NiFi API тут.


В body надо передать JSON, следующего вида:

{     "revision": { 	    "clientId": "value", 	    "version": 0, 	    "lastModifier": "value" 	},     "state": "value",     "disconnectedNodeAcknowledged": true } 

Параметры, которые надо заполнить, чтобы “заработало”:
state — статус передачи данных. Доступно TRANSMITTING для включения передачи данных, STOPPED для выключения
version — версия процессора

version по умолчанию будет 0 при создании, но эти параметры можно получить используя метод

Для любителей bash скриптов данный метод может показаться пригодным, но мне тяжеловато — bash скрипты не самое мое любимое. Следующий способ поинтереснее и поудобнее на мой взгляд.

NiPyAPI

NiPyAPI — библиотека для языка Python для взаимодействия с инстансами NiFi. Страница с документацией содержит необходимую информацию для работы с библиотекой. Quick start описан в проекте на github.

Наш скрипт для раскатки конфигурации — программка на языке Python. Переходим к кодингу.
Настраиваем конфиги для дальнейшей работы. Нам понадобятся следующие параметры:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем 

Дальше буду вставлять названия методов этой библиотеки, которые описаны тут.

Подключаем registry к инстансу nifi с помощью

nipyapi.versioning.create_registry_client

На этом шаге можно еще добавить проверку того, что registry уже к инстансу добавлен, для этого можно воспользоваться методом

nipyapi.versioning.list_registry_clients

Находим bucket для дальнейшего поиска flow в корзине

nipyapi.versioning.get_registry_bucket

По найденному bucket ищем flow

nipyapi.versioning.get_flow_in_bucket

Дальше важно понять а не добавлен ли уже этот process group. Process group размещается по координатам и может сложиться ситуация, когда поверх одного компонента наложится второй. Я проверял, такое может быть 🙂 Чтобы получить все добавленные process group используем метод

nipyapi.canvas.list_all_process_groups

и дальше можем поискать, например по имени.

Я не буду описывать процесс обновления шаблона, скажу лишь, что если в новой версии шаблона процессоры добавляются, то проблем с наличием сообщений в очередях нет. А вот если процессоры удаляются, то проблемы могут возникнуть (nifi не дает удалять процессор, если перед ним скопилась очередь сообщений). Если вам интересно как я решил эту проблему — напишите мне, пожалуйста, обсудим этот момент. Контакты в конце статьи. Перейдем к шагу добавления process group.

При отладке скрипта я столкнулся с особенностью, что не всегда подтягивается последняя версия flow, поэтому рекомендую сначала эту версию уточнить:

nipyapi.versioning.get_latest_flow_ver

Деплоим process group:

nipyapi.versioning.deploy_flow_version

Запускаем процессоры:

nipyapi.canvas.schedule_process_group

В блоке про CLI было написано, что в remote process group автоматически не включается передача данных? При реализации скрипта я столкнулся с этой проблемой тоже. На тот момент, запустить передачу данных с помощью API у меня не получилось и я решил написать разработчику библиотеки NiPyAPI и спросить совета/помощи. Разработчик мне ответил, мы обсудили проблему и он написал, что ему надо время “проверить кое-что”. И вот, спустя пару дней приходит письмо, в котором написана функция на Python, решающая мою проблему запуска!!! На тот момент версия NiPyAPI была 0.13.3 и в ней, конечно же, ничего такого не было. А вот в версию 0.14.0, которая вышла совсем недавно, эта функция уже вошла в составе библиотеки. Встречайте,

nipyapi.canvas.set_remote_process_group_transmission

Итак, с помощью библиотеки NiPyAPI подключили registry, накатили flow и даже запустили процессоры и передачу данных. Дальше можно причесывать код, добавлять всевозможные проверки, логирование и вот это всё. Но это уже совсем другая история.

Из рассмотренных мною вариантов автоматизации последний мне показался самым работоспособным. Во-первых, это все же код на python, в который можно встраивать вспомогательный программный код и пользоваться всеми преимуществами языка программирования. Во-вторых, проект NiPyAPI активно развивается и в случае проблем можно написать разработчику. В-третьих, NiPyAPI все же более гибкий инструмент для взаимодействия с NiFi в решении сложных задач. Например, в определении того пустые ли очереди сообщений сейчас в flow и можно ли обновлять process group.

На этом все. Я описал 3 подхода к автоматизации доставки flow в NiFi, подводные камни, с которыми может столкнуться разработчик и привел рабочий код для автоматизации доставки. Если вас так же, как и меня интересует эта тема — пишите!


ссылка на оригинал статьи https://habr.com/ru/post/476722/