Доброго дня. Меня зовут Иван Клименко, я разработчик потоков обработки данных в компании Аскона. В этом цикле статей я расскажу опыт внедрения инструмента Apache Nifi для формирования DWH.
Данная статья посвящена первому этапу внедрения Apache NIFI — начальным потокам выгрузки, внедрению инкрементальной загрузки, и описанию существующей архитектуры.
Цели, задачи
Я вхожу в группу обработки данных подразделения “Платформа данных”. Пришёл в команду в начале прошлого года с задачей организовать сбор данных от источников и заполнение DWH с применением Apache NIFI. Так как некая структура уже существовала, то появились обозначения:
-
Старая архитектура — сбор данных от источников выполняется раз в сутки, ночью, инструмент — SSIS на MsSqlServer (да, в качестве хранилища применяется MSSql, но мы планируем переход на GreenPlum), все инициализируется джобами через Sql Server Agent.
-
Новая архитектура — сбор данных от источников выполняется раз в сутки с помощью Apache NIFI, заполняются стейджинговые таблицы, потом все по старому — джоб, пакеты, SSIS…
Какие были трудности в старой архитектуре:
-
Загрузка осуществлялась раз в сутки и в случае большого объема данных SSIS не справлялся с выгрузкой данных от источника, и джоб падал.
-
С увеличением объема данных и количества выгружаемых сущностей времени на выгрузку и перестройку витрин стало не хватать.
-
Не выполнялась инкрементальная загрузка.
Задачи новой архитектуры:
-
Обеспечить загрузку данных по мере их изменения на источнике.
-
Масштабировать нагрузку на большое количество таблиц.
-
Обеспечить формирование витринного слоя на основе загружаемого инкремента.
-
Разделить процессы загрузки и пересчета витрин.
-
Обеспечить легкую миграцию на GreenPlum.
Поток ETL
Основным источником данных является ЕРП “Галактика”, СУБД — Oracle. У нашей команды есть доступ к источнику и есть возможность разработки View. Так как в NIFI есть встроенный механизм сохранения инкрементального поля при запросе (числовое поле или дата-время), решено было для каждой таблицы источника создавать два View — исторический и инкрементальный. Структура View одинаковая, отличие только в том, что для инкрементального представления данные извлекаются из журналов, срок журналирования от 2 недель до 1 месяца. В каждой таблице есть два числовых поля — дата и время изменения записи. Так как NIFI умеет хранить только возрастающие поля, а время является циклично зависимым, решили сделать инкрементальный ключ, объединив в одном поле дату и время изменения записи в числовом представлении.
Был разработан поток загрузки данных, который извлекает данные из таблицы источника, выполняет ряд преобразований и выполняет загрузку в целевую таблицу. Отмечу, что в основном поток разработал мой коллега Фархад, я подключился после решения инфраструктурных задач.
![Рис. 1. Общий вид потока. Дает представление о направлении движении данных. Рис. 1. Общий вид потока. Дает представление о направлении движении данных.](https://habrastorage.org/getpro/habr/upload_files/6a4/3b1/7cd/6a43b17cdf5fbe0dae4bb1ee6e82a697.png)
Рассмотрим подробнее, от запуска и до завершения.
На начальном этапе происходит извлечение данных с помощью процессора QueryDatabaseTable.
![Рис. 2. Процессор QueryDatabaseTable - порождает поток и выполняет запрос к источнику. Рис. 2. Процессор QueryDatabaseTable - порождает поток и выполняет запрос к источнику.](https://habrastorage.org/getpro/habr/upload_files/532/634/fd6/532634fd672025bc38ee9cbda8834a82.png)
![Рис. 3. Натсройки QueryDatabaseTable Рис. 3. Натсройки QueryDatabaseTable](https://habrastorage.org/getpro/habr/upload_files/8e1/ce3/422/8e1ce3422e6b0416c856bb62c9e0abca.png)
В настройках указываем соединение, схему и таблицу, перечисляем поля (либо оставляем поле пустым, тогда будут выбраны все поля), и указываем имя поля в настройке Maximum-value Columns. NIFI при запросе вычислит максимум и сохранит его в своем состоянии. При следующей генерации запроса NIFI добавит условие “WHERE DT_KEY > 1923929292929”, то есть подставит и имя поля и его предыдущее значение. Таким образом реализуется инкрменетальная выгрузка.
Далее необходимо выполнить ряд преобразований. Так как могут поступать данные, которые уже были введены в систему (например, при редактировании), надо понять, произошли ли изменения данных. В целях отслеживания мы рассчитываем хеш каждой записи, и если он будет отличаться, то требуется обновить данные в витринном слое.
![Рис. 4. Обновление данных и вычисление хэша Рис. 4. Обновление данных и вычисление хэша](https://habrastorage.org/getpro/habr/upload_files/eaf/94d/59c/eaf94d59cd13847620f3b8a801a3858b.png)
Сначала идет подготовка данных. Формируем новое поле с помощью Updaterecord, и вычисляем его хэш с помощью стороннего процессора HashColumn (спасибо автору за разработку, исходник в github). При дальнейшей разработки я решил максимально использовать штатные процессоры. В следующей части я покажу, как именно я заменил эту связку.
![Рис. 5. Настройки UpdateRecord Рис. 5. Настройки UpdateRecord](https://habrastorage.org/getpro/habr/upload_files/8ee/0f7/0e9/8ee0f70e9828e55f7009bac06de18041.png)
В настройках UpdateAttribute указываем имя нового поля — HASH_DIFF, и правило его формирования — конкатенация значимых полей. В результате формируется новое поле, в котором будет строка, содержащая все поля. И процессор HashColumn вычислит хеш по заданному алгоритму и поместит значение в это же поле.
![Рис. 6. Последовательность действия при преобразовании к CSV Рис. 6. Последовательность действия при преобразовании к CSV](https://habrastorage.org/getpro/habr/upload_files/f10/22e/e3b/f1022ee3b371cfaf74d177e8678e535d.png)
Сначала идет формирование атрибутов, где атрибуту $filename присваивается текущее имя файла с расширением «.csv». Этот атрибут понадобится при сохранении файла. Далее добавляется мета-информация — в каждую запись вносим время выгрузки. Запись преобразуется к CVS-формату с помощью процессора ConvertRecord. В результате контент из формата «Avro» будет преобразован к «CSV», экранирование происходит в зависимости от настройки сервиса записи.
![Рис. 7. Запись файла в сетевую папку и очистка контента Рис. 7. Запись файла в сетевую папку и очистка контента](https://habrastorage.org/getpro/habr/upload_files/212/0bb/8a7/2120bb8a7a26c663a32d00bfdf10108c.png)
Полученный файл сохраняется на сетевую папку на сервере, где крутится целевой MS Sql с помощью PutSmbFile, и выполняем очистку контента. Это необязательный шаг, однако при работе было замечено, что при наличии контента последующий запрос сильно тормозит.
После записи на сервере выполняется запрос для вставки данных в таблицу с помощью компонента ExecuteSQL.
![Рис. 8. Последовательность действий для внесения и удаления данных на сервере Рис. 8. Последовательность действий для внесения и удаления данных на сервере](https://habrastorage.org/getpro/habr/upload_files/f3e/2d0/000/f3e2d0000876a9c6481f4e62a4918607.png)
Сам запрос имеет следующий вид:
BULK INSERT #{tgt.sql.schema}.#{tgt.sql.table.name.customer_orders} FROM '#{local.folder.Bulk.Insert}${filename}' WITH(FIRSTROW = 2, FIELDTERMINATOR = '~',ROWTERMINATOR = '0x0a', CODEPAGE = 65001, TABLOCK )
Как видно, на сервер передается имя файла для внесения в заданную таблицу, где схема и таблица определена в параметрах группы, а имя файла извлекается из атрибута.
В случае, если произошел сбой, поток отправляется на повтор, и в случае трех неудачных внесений формируется письмо об ошибке.
При удачном внесении выполняется хранимая процедура, удаляющая загруженный файл:
DECLARE @deletefile NVARCHAR(MAX) , @cmd NVARCHAR(MAX) set @deletefile = '${filename}' set @cmd = 'xp_cmdshell ''del "#{local.folder.Bulk.Insert}\' + @deletefile + '"'''; EXEC (@cmd)
Файл можно удалить и средствами NIFI, например через GetSmbFile, но в этом случае контент передается по сети обратно. Можно вызвать скрипт SSH или PowerShell, но это выдача дополнительных разрешений для пользователя NIFI.
Дальнейшие шаги являются служебными — формирование и запись лога, отправка уведомлений и т.д.
Заключение.
Данный поток позволил нашей команде обкатать инкрементальную загрузку, исключить загрузку данных от источника средствами SSIS, разделить процесс загрузки данных от процесса расчета витрин.
Достоинства:
-
Простой поток, в котором отражены все этапы ETL — выгрузка, преобразования, загрузка.
-
Легко масштабировать — сохранив в виде шаблона или положив в Registry, меняя переменные и список полей можно легко добавить новую таблицу.
-
Легко мониторить — видно, какой поток упал, где ошибка.
Недостатки:
-
Когда количество таблиц превысило 20, стало неудобно поддерживать это решение.
-
Все запуски выполнялись примерно в одно и то же время, создавая сильную нагрузку на источник.
-
При необходимости модификации приходилось вносить изменения во все потоки, в одном и том же месте.
В следующих частях я расскажу о развитии потока выгрузки, его параметризации, как решалась задача информирования о завершении загрузки и уведомления об ошибках, и к чему мы пришли в итоге.
ссылка на оригинал статьи https://habr.com/ru/post/599911/
Добавить комментарий