Разработка фреймворка для автоматизации загрузок данных из источников: Case Study для металлургической компании

от автора

Всем привет! Меня зовут Амир Розикзода, я Data Engineer в компании “ДЮК Технологии”. Хочу рассказать о кейсе, в котором я участвовал в качестве соавтора под руководством моего коллеги Хуснутдинова Артура, Data Engineer в компании “ДЮК Технологии”.

Актуальность

Аналитика данных и витрины аналитики — источники аналитической отчетности, на основе которой принимаются стратегические управленческие решения. Однако на рынке нет готовых фреймворков, которые полностью удовлетворяют потребности в извлечении данных.

Конечно, есть Apache NiFi, но с ним возникает много проблем при работе с большими объемами данных. Связка Python и Apache Airflow на сегодняшний день является одной из лучших практик в области управления данными не только для оркестрации данных, но и для извлечения, поэтому логично разрабатывать ETL-систему (Extract, Transform, Load) поверх Airflow. Это позволяет эффективно управлять процессами извлечения, преобразования и загрузки данных, обеспечивая надежность и гибкость в аналитической инфраструктуре.

Нашему заказчику, крупной металлургической компании с большим количеством филиалов, нужно было простое решение, ускоряющее работу с аналитикой данных и извлечением из различных гетерогенных источников. При этом требовалось достаточно гибкое к расширению функционала загрузок решение.

Можно было решать задачу клиента классическим образом — написанием кода, но тогда разработка заняла бы около трех месяцев. У нас не было столько времени, поэтому приняли решение разработать фреймворк, чтобы ускорить и упростить разработку.

Фреймворк должен был объединить разную кодовую базу и создать универсальный подход к решению схожих задач, при этом уменьшить количество дублей кода.

Сформулирую две проблемы, с которыми компания к нам обратилась.

Проблема 1: У компании много филиалов, в филиалах есть руководители, у руководителей есть KPI и динамика этих KPI, которые обсуждаются на планерках. 

Руководителям нужны данные. Но проблема с консолидацией данных в том, что нет единого хранилища, чтобы на их основе создавать аналитические витрины. Например, витрину с количеством выплавленной за смену меди. Или витрину по персоналу: кто сейчас работает, кто в отпуске, какая текучесть кадров. Или витрину по количеству вагонов железа, отгруженных по разным филиалам.

Проблема 2: Множество разрозненных источников данных. 

Например, у одного завода данные по выплавке металлов лежат в MySQL, данные по сотрудникам хранятся в 1С:ЗиК. У другого завода система ручного ввода данных по выплавке металлов, а личные дела сотрудников ведутся в БД Postgres. Кроме того, все заводы вносят данные по инициативам в самописную систему ручного ввода с выгрузкой по API.

На основе проблем мы сформулировали задачу из двух составных частей. 

Собрать данные, очистить, обогатить для аналитики в BI и принятия управленческих решений:

  1. Создать пайплайны по загрузке данных из множества разных источников.

  2. Создать витрины для руководителей и стейкхолдеров.

Был выбран такой стек технологий:

База данных — Greenplum, open-source massive parallel processing БД, специально разработанная для создания DWH (Data Warehouse).

Оркестратор — Apache Airflow.

Для хранения промежуточных данных при загрузке был поднят инстанс Minio — это self-hosted объектное хранилище. 

Извлечение данных — c помощью кода, написанного на Python, это нативное и гибкое решение в связке с Apache Airflow.

После анализа задачи решили создать фреймворк для генерации ДАГов (DAG — directed acyclic graph), чтобы один раз разработать функции по извлечению и трансформации данных из конкретного источника. И в дальнейшем с помощью фреймворка переиспользовать эти функции.

Какие разработали функции:

  1. Универсальная функция для извлечения из API: авторизация по токену, или двухступенчатая, или нет авторизации.

  2. Универсальная функция для извлечения из любых БД. Извлечение конкретной таблицы или столбцов из таблицы или передачу Raw SQL, который выдает ответ.

  3. Универсальная функция извлечения и записи из объектного хранилища S3. Также S3 используется для хранения промежуточных файлов.

  4. Функции для разворачивания массивов и объектов в плоскую структуру для записи их в реляционную БД.

  5. Функция для обогащения техническими метаданными: код источника, ID загрузки записи, время загрузки записи.

  6. Функция по генерации из полученных плоских данных файла в формате parquet

  7. Функция по загрузке плоских данных на слой STG (staging data) с помощью утилиты Greenplum PXF.

  8. Набор функций по загрузке данных из слоя STG (слой временного хранения данных) в слой ODS (слой для постоянного хранения данных в структуре источника).

Также была разработана конфигурация описания пайплайна и конфигуратор, который валидирует и генерирует код пайплайна.

Разработанные функции покрывают все кейсы клиента и позволяют быстро создавать пайплайны для загрузки данных. Мы провели типизацию большинства бизнес-задач, их дальнейшее переиспользование сокращает время разработки.

Описание модулей архитектуры

Framework

ДАГ-синхронизатор парсит файл-конфигуратор ДАГа в Git, валидирует корректность файла на наличие ошибок, автоматически генерирует на основе файла конфигурации новый ДАГ, состоящий из шагов, описанных в файле конфигурации. Новый созданный ДАГ кладется в Git в ветку develop папку со всеми ДАГами/airflow-dags, откуда он реплицируется на веб-сервер Airflow и воркеры, становится видимым в Airflow Web UI.

Типовые функции

Был реализован набор необходимых функций по извлечению (извлечение из API, БД, S3, FTP), трансформации и обогащения данных, которые будут использоваться сгенерированным ДАГом.

Формат конфигурации ДАГа

Была разработана конфигурация ДАГа, с помощью которой можно описать пайплайн по извлечению данных из какого-либо источника. Каждая конфигурация описывает один ДАГ для извлечения данных из одного источника, в нем мы описываем последовательность шагов, с помощью которых будем эти данные извлекать и трансформировать. Каждый шаг имеет множество вариантов и параметров в зависимости от того, с каким источником работаем и каким способом будем извлекать данные.

Валидация конфигурации

На основе формата конфигурации были созданы модели из библиотеки Pydantic, это позволило валидировать полученную конфигурацию. 

Генерация ДАГа по конфигурации

На основе моделей и шаблонизатора Jinja была реализована генерация итоговых ДАГов.

Модуль хранения метаданных генерации ETL-процессов

В модуле хранится JSON-файл конфигуратора ETL-fw, файл маппер типов данных источника, набор типовых функций фреймворка, который будет использован в процессе обработки данных (созданном ДАГом). Набор типовых функций будет пополняться по мере развития фреймворка. JSON-файл конфигуратор содержит основное описание ДАГа, параллельный и последовательные шаги ДАГа, ссылку на файл-маппер типов данных для обработки сущностей системы-источника, а также ссылки на используемые типовые функции в данном ДАГе (функции извлечения данных из АПИ источников, JDBC-подобных интеграций, загрузка в S3, конвертация типов данных, разворачивание parquet в плоскую структуру, создание STG-таблицы в Greenplum, загрузка в слой ODS в Greenplum).

Модуль исполнения ETL-процессов

Здесь происходит основной запуск и исполнение экземпляров ДАГа, в рамках которого загружаются данные из источников в витрины данных (или в слой ODS, в зависимости от цели). ДАГ-синхронизатор использует созданные функции, ставит их в нужном порядке с требуемыми параметрами.

Модуль хранения метаданных запуска ETL-процессов

В рамках модуля хранятся все метаданные запуска ДАГов и шагов и возникающие ошибки. Каждый ДАГ и шаг фреймворка логирует метаданные запуска процесса (ДАГа,

шага). Метаданные всех процессов далее будут визуализированы для последующего анализа ошибок и алертинга с целью оперативного мониторинга

Модуль визуализации и анализа запуска и ошибок ETL-процессов

Модуль отвечает за визуализацию и анализ запущенных экземпляров ДАГов и шагов,

ошибок исполнения процессов. Предполагается, что для оперативного мониторинга

потребуется настроить алертинг.

Тестирование фреймворка

Харакеристики стенда, на котором проводили тестирование:

  • 1 scheduler (4 vCPU / 8 Gb RAM)

  • 3 worker (6 vCPU / 28 Gb RAM)

  • Meta DB Postgres (8 vCPU / 8 GB RAM).

В рамках фреймворка мы ввели несколько сущностей: идея, теги, KPI, цели:

  1. Идея — бизнес-предложение и гипотеза от руководителей. У идеи есть набор значений, месячные эффекты: сколько выплавили, сколько потратили, какой ожидаем план, какой у идеи срок, какие затрагивает филиалы. 

  2. Теги — к чему относится идея. 

  3. Цели — какие предполагаются цели по этой идее. 

  4. KPI — численные значения показателей цели.

Процесс заполнения метаданных на основе фреймворка и генерации ДАГа:

  1. Извлечение данных из API: заполнили конфигурацию или использовали кастомную функцию для расширения функционала.

  2. Разворачивание полученных объектов и обогащение техническими метаданными.

  3. Загрузка на слой хранения временных данных.

  4. Загрузка в слой постоянного хранения данных.

  5. Генерация ДАГа.

Результат генерации ДАГа:

Процесс прогонки ДАГа:

  1. api_s3_raw_ideas_json: 180-240 секунд — шаг извлечения сырых данных из источника и приземление их в хранилище s3.

  2. s3_raw_s3_convflat_ideas_parquet: 5-10 секунд — шаг преобразования в плоскую структуру и обогащения метаданными.

  3. s3_convflat_stg_ideas: 5-10 секунд — загрузка плоских данных в временный слой хранения данных с помощью утилиты PXF.

  4. stg_ods_ideas: 20-30 секунд — загрузка данных из временного слоя в постоянный по различным методикам (scd0-2, snapshots).

Результаты работы ДАГа: 

  • Время разработки: было 5-6 дней на один источник, новое время разработки 1 источника — от 2-5 часов.

  • Время работы: 10-15 минут в зависимости от количества данных на источнике.

PXF как самый быстрый способ параллельной загрузки в Greenplum

Без PXF Greenplum мы бы хранили все в памяти, генерировали insert. Основная проблема этого способа в том, что python-скрипт на паре гигабайт данных гарантированно зависнет. 

Если извлечение делать частями, разделив трансформацию и загрузку, то этап загрузки может занять 30-60 минут. У нас для некоторых источников было требование от заказчика, чтобы загрузка была не больше 30 минут.

Поэтому приняли решение использовать сочетание PXF Greenplum и parquet-файлов. PXF оптимизирован для работы с Greenplum с JDBC, так как загружает данные в параллель на сегменты напрямую из источника. Это позволило снизить время загрузки до 2-3 минут вместо 30-60 минут.

Валидация конфигурации

Для валидации конфигурации использовали библиотеку Pydantic: она валидирует любые данные, но ее основная специализация — валидация JSON. Большой плюс Pydantic — она позволяет писать кастомную валидацию.

Пример модели конфигурации ДАГа

class DagConfig(pydantic.BaseModel):

    schedule_interval: str

    tasks_flow: List[str]

    dag_type: DagType

    dag_id: str

    types_path: str

    env: str

    source_code_id: int

    source_system_integration_type_id: int

    tasks: List[DagTask]

class DagTask(pydantic.BaseModel):

    name: str

    type: str

    trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS

    input: Union[ApiInput, DBInput, S3Input]

    output: Union[DBOutput, S3Output]

    transformation: Optional[Transformation] = None

Результаты и метрики производительности

Первая разработка ДАГа для источника — процесс трудоемкий. Затем готовый фреймворк значительно ускоряет разработку.

Какие результаты после внедрения фреймворка:

  1. Уменьшилось количество ошибок. Теперь есть валидация конфигурации: базовые ошибки отсекаются на уровне генерации конфигурации. За счет валидации мы освободили у разработчиков в среднем по 4 часа на каждую интеграцию, которые раньше тратились на отладку.

  2. Повысилось единообразие кода за счет типизации, он теперь весь генерируется конфигуратором.

  3. Сохранена возможность легкого расширения фреймворка: можно создавать новые функции или расширять существующие.

  4. Упростились тестирование, отладка, поддержка текущего функционала: исправление ошибки в одном месте исправляет его во всех остальных.

  5. Улучшен developer UX.

В результате получили экономию времени дата-инженеров: среднее время разработки интеграции с источником было 24-48 часа, стало 6-12 часов. Как следствие, сократилось время на разработку, поддержку и расширение функционала потоков данных.

Какие дальше планы развития:

  1. Создать веб-интерфейс, который позволит создавать ДАГи из набора выпадающих списков. Цель — чтобы системные аналитики с помощью фреймворка без помощи дата-инженеров смогли по структуре параметров создавать пайплайны по загрузке данных.

  2. Добавить функции для извлечения из других источников данных: FTP, загрузка CSV с компьютера клиента.

  3. Добавить функции уведомлений на почту или в мессенджеры: успешные и неуспешные завершения.


ссылка на оригинал статьи https://habr.com/ru/articles/819109/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *