Всем привет! Меня зовут Амир Розикзода, я Data Engineer в компании “ДЮК Технологии”. Хочу рассказать о кейсе, в котором я участвовал в качестве соавтора под руководством моего коллеги Хуснутдинова Артура, Data Engineer в компании “ДЮК Технологии”.
Актуальность
Аналитика данных и витрины аналитики — источники аналитической отчетности, на основе которой принимаются стратегические управленческие решения. Однако на рынке нет готовых фреймворков, которые полностью удовлетворяют потребности в извлечении данных.
Конечно, есть Apache NiFi, но с ним возникает много проблем при работе с большими объемами данных. Связка Python и Apache Airflow на сегодняшний день является одной из лучших практик в области управления данными не только для оркестрации данных, но и для извлечения, поэтому логично разрабатывать ETL-систему (Extract, Transform, Load) поверх Airflow. Это позволяет эффективно управлять процессами извлечения, преобразования и загрузки данных, обеспечивая надежность и гибкость в аналитической инфраструктуре.
Нашему заказчику, крупной металлургической компании с большим количеством филиалов, нужно было простое решение, ускоряющее работу с аналитикой данных и извлечением из различных гетерогенных источников. При этом требовалось достаточно гибкое к расширению функционала загрузок решение.
Можно было решать задачу клиента классическим образом — написанием кода, но тогда разработка заняла бы около трех месяцев. У нас не было столько времени, поэтому приняли решение разработать фреймворк, чтобы ускорить и упростить разработку.
Фреймворк должен был объединить разную кодовую базу и создать универсальный подход к решению схожих задач, при этом уменьшить количество дублей кода.
Сформулирую две проблемы, с которыми компания к нам обратилась.
Проблема 1: У компании много филиалов, в филиалах есть руководители, у руководителей есть KPI и динамика этих KPI, которые обсуждаются на планерках.
Руководителям нужны данные. Но проблема с консолидацией данных в том, что нет единого хранилища, чтобы на их основе создавать аналитические витрины. Например, витрину с количеством выплавленной за смену меди. Или витрину по персоналу: кто сейчас работает, кто в отпуске, какая текучесть кадров. Или витрину по количеству вагонов железа, отгруженных по разным филиалам.
Проблема 2: Множество разрозненных источников данных.
Например, у одного завода данные по выплавке металлов лежат в MySQL, данные по сотрудникам хранятся в 1С:ЗиК. У другого завода система ручного ввода данных по выплавке металлов, а личные дела сотрудников ведутся в БД Postgres. Кроме того, все заводы вносят данные по инициативам в самописную систему ручного ввода с выгрузкой по API.
На основе проблем мы сформулировали задачу из двух составных частей.
Собрать данные, очистить, обогатить для аналитики в BI и принятия управленческих решений:
-
Создать пайплайны по загрузке данных из множества разных источников.
-
Создать витрины для руководителей и стейкхолдеров.
Был выбран такой стек технологий:
База данных — Greenplum, open-source massive parallel processing БД, специально разработанная для создания DWH (Data Warehouse).
Оркестратор — Apache Airflow.
Для хранения промежуточных данных при загрузке был поднят инстанс Minio — это self-hosted объектное хранилище.
Извлечение данных — c помощью кода, написанного на Python, это нативное и гибкое решение в связке с Apache Airflow.
После анализа задачи решили создать фреймворк для генерации ДАГов (DAG — directed acyclic graph), чтобы один раз разработать функции по извлечению и трансформации данных из конкретного источника. И в дальнейшем с помощью фреймворка переиспользовать эти функции.
Какие разработали функции:
-
Универсальная функция для извлечения из API: авторизация по токену, или двухступенчатая, или нет авторизации.
-
Универсальная функция для извлечения из любых БД. Извлечение конкретной таблицы или столбцов из таблицы или передачу Raw SQL, который выдает ответ.
-
Универсальная функция извлечения и записи из объектного хранилища S3. Также S3 используется для хранения промежуточных файлов.
-
Функции для разворачивания массивов и объектов в плоскую структуру для записи их в реляционную БД.
-
Функция для обогащения техническими метаданными: код источника, ID загрузки записи, время загрузки записи.
-
Функция по генерации из полученных плоских данных файла в формате parquet
-
Функция по загрузке плоских данных на слой STG (staging data) с помощью утилиты Greenplum PXF.
-
Набор функций по загрузке данных из слоя STG (слой временного хранения данных) в слой ODS (слой для постоянного хранения данных в структуре источника).
Также была разработана конфигурация описания пайплайна и конфигуратор, который валидирует и генерирует код пайплайна.
Разработанные функции покрывают все кейсы клиента и позволяют быстро создавать пайплайны для загрузки данных. Мы провели типизацию большинства бизнес-задач, их дальнейшее переиспользование сокращает время разработки.
Описание модулей архитектуры
![](https://habrastorage.org/getpro/habr/upload_files/b3b/1af/04c/b3b1af04c91335ecabb0ae9fe8135a52.jpeg)
Framework
ДАГ-синхронизатор парсит файл-конфигуратор ДАГа в Git, валидирует корректность файла на наличие ошибок, автоматически генерирует на основе файла конфигурации новый ДАГ, состоящий из шагов, описанных в файле конфигурации. Новый созданный ДАГ кладется в Git в ветку develop папку со всеми ДАГами/airflow-dags, откуда он реплицируется на веб-сервер Airflow и воркеры, становится видимым в Airflow Web UI.
Типовые функции
Был реализован набор необходимых функций по извлечению (извлечение из API, БД, S3, FTP), трансформации и обогащения данных, которые будут использоваться сгенерированным ДАГом.
Формат конфигурации ДАГа
Была разработана конфигурация ДАГа, с помощью которой можно описать пайплайн по извлечению данных из какого-либо источника. Каждая конфигурация описывает один ДАГ для извлечения данных из одного источника, в нем мы описываем последовательность шагов, с помощью которых будем эти данные извлекать и трансформировать. Каждый шаг имеет множество вариантов и параметров в зависимости от того, с каким источником работаем и каким способом будем извлекать данные.
Валидация конфигурации
На основе формата конфигурации были созданы модели из библиотеки Pydantic, это позволило валидировать полученную конфигурацию.
Генерация ДАГа по конфигурации
На основе моделей и шаблонизатора Jinja была реализована генерация итоговых ДАГов.
Модуль хранения метаданных генерации ETL-процессов
В модуле хранится JSON-файл конфигуратора ETL-fw, файл маппер типов данных источника, набор типовых функций фреймворка, который будет использован в процессе обработки данных (созданном ДАГом). Набор типовых функций будет пополняться по мере развития фреймворка. JSON-файл конфигуратор содержит основное описание ДАГа, параллельный и последовательные шаги ДАГа, ссылку на файл-маппер типов данных для обработки сущностей системы-источника, а также ссылки на используемые типовые функции в данном ДАГе (функции извлечения данных из АПИ источников, JDBC-подобных интеграций, загрузка в S3, конвертация типов данных, разворачивание parquet в плоскую структуру, создание STG-таблицы в Greenplum, загрузка в слой ODS в Greenplum).
Модуль исполнения ETL-процессов
Здесь происходит основной запуск и исполнение экземпляров ДАГа, в рамках которого загружаются данные из источников в витрины данных (или в слой ODS, в зависимости от цели). ДАГ-синхронизатор использует созданные функции, ставит их в нужном порядке с требуемыми параметрами.
Модуль хранения метаданных запуска ETL-процессов
В рамках модуля хранятся все метаданные запуска ДАГов и шагов и возникающие ошибки. Каждый ДАГ и шаг фреймворка логирует метаданные запуска процесса (ДАГа,
шага). Метаданные всех процессов далее будут визуализированы для последующего анализа ошибок и алертинга с целью оперативного мониторинга
Модуль визуализации и анализа запуска и ошибок ETL-процессов
Модуль отвечает за визуализацию и анализ запущенных экземпляров ДАГов и шагов,
ошибок исполнения процессов. Предполагается, что для оперативного мониторинга
потребуется настроить алертинг.
Тестирование фреймворка
Харакеристики стенда, на котором проводили тестирование:
-
1 scheduler (4 vCPU / 8 Gb RAM)
-
3 worker (6 vCPU / 28 Gb RAM)
-
Meta DB Postgres (8 vCPU / 8 GB RAM).
В рамках фреймворка мы ввели несколько сущностей: идея, теги, KPI, цели:
-
Идея — бизнес-предложение и гипотеза от руководителей. У идеи есть набор значений, месячные эффекты: сколько выплавили, сколько потратили, какой ожидаем план, какой у идеи срок, какие затрагивает филиалы.
-
Теги — к чему относится идея.
-
Цели — какие предполагаются цели по этой идее.
-
KPI — численные значения показателей цели.
Процесс заполнения метаданных на основе фреймворка и генерации ДАГа:
-
Извлечение данных из API: заполнили конфигурацию или использовали кастомную функцию для расширения функционала.
-
Разворачивание полученных объектов и обогащение техническими метаданными.
-
Загрузка на слой хранения временных данных.
-
Загрузка в слой постоянного хранения данных.
-
Генерация ДАГа.
Результат генерации ДАГа:
![](https://habrastorage.org/getpro/habr/upload_files/629/b4d/b42/629b4db42eca0032ff823dfa89eb7e0a.png)
Процесс прогонки ДАГа:
-
api_s3_raw_ideas_json: 180-240 секунд — шаг извлечения сырых данных из источника и приземление их в хранилище s3.
-
s3_raw_s3_convflat_ideas_parquet: 5-10 секунд — шаг преобразования в плоскую структуру и обогащения метаданными.
-
s3_convflat_stg_ideas: 5-10 секунд — загрузка плоских данных в временный слой хранения данных с помощью утилиты PXF.
-
stg_ods_ideas: 20-30 секунд — загрузка данных из временного слоя в постоянный по различным методикам (scd0-2, snapshots).
Результаты работы ДАГа:
-
Время разработки: было 5-6 дней на один источник, новое время разработки 1 источника — от 2-5 часов.
-
Время работы: 10-15 минут в зависимости от количества данных на источнике.
PXF как самый быстрый способ параллельной загрузки в Greenplum
Без PXF Greenplum мы бы хранили все в памяти, генерировали insert. Основная проблема этого способа в том, что python-скрипт на паре гигабайт данных гарантированно зависнет.
Если извлечение делать частями, разделив трансформацию и загрузку, то этап загрузки может занять 30-60 минут. У нас для некоторых источников было требование от заказчика, чтобы загрузка была не больше 30 минут.
Поэтому приняли решение использовать сочетание PXF Greenplum и parquet-файлов. PXF оптимизирован для работы с Greenplum с JDBC, так как загружает данные в параллель на сегменты напрямую из источника. Это позволило снизить время загрузки до 2-3 минут вместо 30-60 минут.
Валидация конфигурации
Для валидации конфигурации использовали библиотеку Pydantic: она валидирует любые данные, но ее основная специализация — валидация JSON. Большой плюс Pydantic — она позволяет писать кастомную валидацию.
Пример модели конфигурации ДАГа
class DagConfig(pydantic.BaseModel):
schedule_interval: str
tasks_flow: List[str]
dag_type: DagType
dag_id: str
types_path: str
env: str
source_code_id: int
source_system_integration_type_id: int
tasks: List[DagTask]
class DagTask(pydantic.BaseModel):
name: str
type: str
trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS
input: Union[ApiInput, DBInput, S3Input]
output: Union[DBOutput, S3Output]
transformation: Optional[Transformation] = None
Результаты и метрики производительности
Первая разработка ДАГа для источника — процесс трудоемкий. Затем готовый фреймворк значительно ускоряет разработку.
Какие результаты после внедрения фреймворка:
-
Уменьшилось количество ошибок. Теперь есть валидация конфигурации: базовые ошибки отсекаются на уровне генерации конфигурации. За счет валидации мы освободили у разработчиков в среднем по 4 часа на каждую интеграцию, которые раньше тратились на отладку.
-
Повысилось единообразие кода за счет типизации, он теперь весь генерируется конфигуратором.
-
Сохранена возможность легкого расширения фреймворка: можно создавать новые функции или расширять существующие.
-
Упростились тестирование, отладка, поддержка текущего функционала: исправление ошибки в одном месте исправляет его во всех остальных.
-
Улучшен developer UX.
В результате получили экономию времени дата-инженеров: среднее время разработки интеграции с источником было 24-48 часа, стало 6-12 часов. Как следствие, сократилось время на разработку, поддержку и расширение функционала потоков данных.
Какие дальше планы развития:
-
Создать веб-интерфейс, который позволит создавать ДАГи из набора выпадающих списков. Цель — чтобы системные аналитики с помощью фреймворка без помощи дата-инженеров смогли по структуре параметров создавать пайплайны по загрузке данных.
-
Добавить функции для извлечения из других источников данных: FTP, загрузка CSV с компьютера клиента.
-
Добавить функции уведомлений на почту или в мессенджеры: успешные и неуспешные завершения.
ссылка на оригинал статьи https://habr.com/ru/articles/819109/
Добавить комментарий