Всем доброго дня. Я Иван Клименко, и я разработчик потоков данных в компании Аскона. Данная статья посвящена параметризации NIFI-потока и информированию СУБД об окончании загрузки.
В первой части я описал начальный поток, на котором отладили основную модель заполнения stage слоя. Основные принципы: инкрементальная загрузка, формирование служебного поля — хеша для сравнения изменений, загрузка в БД батчем (применительно к MSSQL через BULK INSERT).
Недостатки потока, выяснение в процессе эксплуатации:
-
При росте количества источников сложность поддерживать актуальную версию возрастает.
-
Начальным процессором является QueryDatabasetable, который не поддерживает входящие соединения. Таким образом, этот процессор мог работать только по одному заданному расписанию.
-
Необходимость удаления CSV файла на сервере, откуда MSSQL может забирать данные с помощью BULK INSERT. В мем случае это локальная папка на сервере с MSSQL.
-
Экранирование в CSV. В некоторых источниках были поля с очень объемными блоками текста, и экранирование фактически искажало информацию. Стали поступать жалобы от аналитиков о расхождениях при сверках. Само содержание по смыслу не менялось, но автоматические тесты выявляли расхождения с источником.
Для устранения был разработан один общий поток, принимающий на вход FlowFile, в атрибутах которого содержались сведения об исходной таблице и таблице назначения:
|
src.schema |
Исходная схема |
|
src.table |
Исходная таблица/представление |
|
src.table.incrementkey |
Имя поля с инкрементальным ключом |
|
tgt.schema |
Целевая схема |
|
tgt.table |
Целевая таблица |
Получение имен полей
Так как теперь неизвестно, какая именно таблица будет обрабатываться, потребовалось реализовать динамическое извлечение списка имен полей в таблице, для последующего составления правила конкатенации перед расчетом хеша.

Процесс получения имен полей
Первым этапом формируется запрос к Oracle:
SELECT OWNER as TABLE_SCHEMA,TABLE_NAME as TABLE_NAME,COLUMN_NAME as COLUMN_NAME, Concat(DATA_TYPE,DATA_LENGTH) AS DATA_TYPE FROM ALL_TAB_COLS WHERE TABLE_NAME IN ('${src.table}') ORDER BY COLUMN_ID
В результате получаем набор записей, содержащих имена колонок. После проверки, что запрос вернул записи, переходим к извлечению данных с помощью RouteOnAttribute.
${executesql.row.count:equals(0)}
Данные преобразуются к Json извлекаются в два Jolt этапа.
Первый «Shift» преобразует имена полей в массив «data»:
{"*": {"@COLUMN_NAME": "data"}}
Второй «Modify-override» формирует нужную строку:
{"datanew": "=join(',',@(1,data))"}
Далее с помощью EvaluateJsonPath колонки помещаются в атрибут ${src.table.columnnames}
Формирование и выполнение запроса
Для того, чтобы сгенерировать запрос, учитывающий наличие списка колонок и инкрементальный ключ удобно применять GenerateTableFetch. Для выполнения запроса применяется ExecuteSQLRecord. Проверка аналогично указаной ранее.

Настройки процессоров
Для процессора GenerateTableFetch требуется задать параметры подключения и указать, по какой таблице формировать запрос.

Если атрибут ${src.table.incrementkey} не задан, то при генерации запроса он игнорируется. Если же задан, то при генерации его последнее значение будет извлечено из хранилища состояний процессора и добавлено в запрос в виде блока » WHERE»
Сформированный запрос попадает на ExecSQLRecord.

Я указываю параметр Max Rows Per Flow File равным 5000. Это означает, что каждые 5000 записей, получаемых от источника NIFI будет формировать новый файл. Это позволит выгрузить большой общем данных и не упасть по памяти, так как данные будут уходит в контент. Весь общем записей по одному запросу составит батч.
Формирование служебных полей для stage слоя
На следующем этапе происходит заполнение служебных полей.

Этот этап по сути такой же, как и в первой части статьи.
Однако, не зря же извлекали список колонок…
При формировании поля, по которому будет произведён расчет хеша я применяю полученный список имен полей. Но, в исходном состоянии это список, с разделенными запятой именами. А требуется привести к RecordPath. Для этого написал выражение:
concat(${src.table.columnnames:substring(0,${src.table.columnnames:lastIndexOf(",")}):substring(${src.table.columnnames:indexOf(",")}):replace(" ",""):replace(",",",/"):substring(1)})
Код меняет сивмолы «,» на «/», и загоняет полученную строку в функцию конкатенции. Т.о. в результирующем поле будет строка, с объединением всех полей.
Внесение в целевую БД
После формирования служебных полей требуется внести данные. Для этого в NIFI есть прекрасный процессор — PutDatabaseRecord. Он берет записи, и применяя JDBC-соединение формирует запрос на вставку данных.

Настройка процессоров
Для PutDatabaserecord требуется указать целевую схему и таблицу.

Следующим этапов надо выделить последний файл в батче. Это нужно для того, чтобы проинформировать целевую систему, что таблица загружена и ее можно брать в обработку. Если возникнет ситуация, в которой NIFI еще грузит данные, чтобы расчет витринного слоя не запускался, так как результирующие данные могут быть не консистентными.
Последний файл в батче легко определяется выражением:
${fragment.count:equals(${fragment.index:plus(1)})}
Выражение сравнивает индекс фрагмента с количеством фрагментов. Данные атрибуты формируются при разбиении выгрузки на файлы процессором ExecuteSQLRecord.
При обнаружении последнего файла в батче, считаем, что все данные были внесены и выполняем процедуру:
DECLARE @RC int DECLARE @tablenameNiFi sysname set @tablenameNiFi = '[${tgt.shema}].[${tgt.table}]' EXECUTE @RC = #{tgt.sql.cognos.switchprocedure} @tablenameNiFi
Процедура выполняет простую функцию — таблица из текущей схемы полностью переключается в другую партицию и другую схему. Таким образом, NIFI может опять начать заполнять текущую таблицу, а уже заполненные данные пойдут в расчеты.
Стоит отметить, что PutDatabaseRecord пробует сформировать батчевую вставку средствами драйвера, т.е. данные будут идти пачкой, а не одной записью.
Замечено, что для корректной работы батчевой вставки требуется, чтобы порядок полей в записи совпадал с порядком полей в таблице. Также для MSSQL, если заменить тип «datetime» в таблице на «datetime2″, то профайлере отображается, что батчевая вставка меняется на » BULK» вставку, то есть идет с той же скоростью, что и BULK INSERT, но по сети, без промежуточного файла.
Следующие этапы являются служебными — формирование логов, информирование и т.д.
Заключение
Итак, в результате у меня получился поток, способный принимать на вход имена таблиц в источнике, самостоятельно формировать правило для расчета хеша, и вносить в целевую таблицу со скоростью, сравнимую с BULK INSERT, и информировать целевую систему о завершении загрузки батча.
Достоинства:
-
Поддержка потока стала гораздо проще.
-
Внедрение новой таблицы — создание во внешней группе процессора GenerateFlowFile с заданными атрибутами и распсианием.
-
Скорость внесения сопоставима с BULK INSERT.
Не обошлось и без неприятностей, которые выявились в процессе:
-
Появился новый источник — MySQL, а поток заточен под Oracle.
-
При многопоточном запуске обработки последний файл мог внестись раньше, чем все остальные. Это связано с тем, что в последнем файле обычно содержится меньше записей, чем порядок разбиения батча, и при расчете служебных полей он успевал проскочить. Партиции переключались, когда не все данные были залиты.
-
В некоторых случаях от источника приходили поля типа FLOAT, и они неверно оторажались в Avro, то есть либо округлялись, либо сдвигались.
О том, как я двигался дальше и к чему пришел на сегодня, я расскажу в следующей статье.
ссылка на оригинал статьи https://habr.com/ru/post/646403/
Добавить комментарий