
Всем привет! Меня зовут Алексей Николаев, я работаю дата-инженером в команде ETL-платформы MWS Data (ex DataOps). Часто сталкиваюсь с тем, что в сложной инфраструктуре и больших проектах простые, на первый взгляд, задачи по работе с данными очень сильно усложняются. В результате возникают ситуации, когда хорошие практики превращаются в плохие решения, а плохие практики как раз могут дать хороший результат.
Мои коллеги уже рассказывали про нашу платформу, ее внедрение внутри экосистемы и наши инструменты для работы с данными. В процессе развития продукта перед нами встала проблема массовых регламентных загрузок данных из реляционных источников. Для этого мы создали внутренний инструмент — библиотеку d-van. В качестве движка в ней используется Apache Spark, с которым она взаимодействует через библиотеку onETL. На примере d-van я покажу нестандартный подход к использованию возможностей Apache Spark. Расскажу, какие задачи можно решить с помощью режима master=local и как свой инструмент может стать альтернативой Apache Nifi или Debezium.
За что мы так сильно любим Spark
Apache spark — достаточно универсальный инструмент для обработки данных. Решить с его помощью задачу батчевой загрузки из реляционных источников в hadoop-кластер — легко и просто даже junior-дата-инженеру:
df = spark.read.format("jdbc").options(...).load() df.write.format("orc").saveAsTable(...)
И дальше orc/parquet/hive-table или кому что милее.
В целом сама задача и ее решение просты и незатейливы как грабли. Надо загрузить одну таблицу — пишешь скрипт в 2 строки. Надо десять — пишем чуть более сложный скрипт, куда передаем названия таблиц списком. Абсолютно никаких проблем, пока код крутится в юпитер-ноутбуках.
Когда дело доходит до продакшна, появляются обычные рутинные проблемы:
-
ВНЕЗАПНО источников бывает больше одного.
-
Есть разные типы СУБД.
-
Данные из разных СУБД без явных кастов к типам не хотят укладываться в таргет-таблицу корректно.
-
Для разработки и продакшна используются разные окружения.
-
Некоторые источники состоят из нескольких инстансов, в результате кодовая база уверенно пухнет от копипасты.
-
Тяжелые таблицы не грузятся одним куском, надо делить их на батчи, иначе мы вообще никогда не дождемся окончания загрузки.
-
Огромные батчи тоже не хотят грузиться в один поток — их надо параллелить.
-
Нужно прикрутить инкрементальную выгрузку, так как снапшотами все не перетаскать.
-
Источники раскиданы по разным часовым поясам, нужно синхронизировать их в момент выгрузки.
-
ИБ закрыло доступы к источникам с кластера, оставив только выделенные хосты.
-
Одну половину загрузок нужно перенести на другой кластер, а вторую оставить на старом.
-
В таргет-таблице лежит половина данных, потому что процесс упал на половине загрузки. После ее перезапуска в таблицах откуда-то появляются дубли.
-
Нашлись кривые данные: нужно перезагрузить только их, не трогая нормальные.
-
Кто-то грохнул партицию, и теперь в таблице с сырьем дыра.
-
…

И это не весь список — много еще чего можно написать. Но уже очевидно, что в продакшне задача решается не так же легко и бодро, как в юпитер-тетрадке у дата-аналитика. Все поставленные проблемы — это примеры задач, которые приходится решать при выстраивании процессов загрузки данных на большинстве проектов. Из-за этого в командах появляется человек, который в основном сидит возле своих загрузок и колдует над сотней скриптов на spark, пытаясь внести в них необходимые изменения, раскатить все на прод и разгрести последствия постоянно возникающих инцидентов со сбоями в загрузках.
Реляционные источники, из которых данные передаются в хранилища батчами, по-прежнему составляют существенную долю от всего объема загружаемых данных. Если список таких источников и таблиц измеряется сотнями и тысячами, то ручная работа тут неприемлема. Продуктовая команда, у которой в работе всего несколько таблиц, наверное, может тратить ресурсы на такие активности. Но для инфраструктурной команды, отвечающей за поставку данных в корпоративное хранилище из десятков, а то и сотен источников, это перебор. Даже если сейчас такая возможность есть, то рано или поздно поддержка этого добра сожрет все доступные ресурсы и разработка новых загрузок в хранилище просто встанет.
Проблемы универсальных решений
Эта история началась еще в те далекие времена, когда версия spark и год рождения дата-инженеров начинались с единицы, в инфраструктурных командах пилить новые загрузки было практически некому и некогда. Понятно, что для массовых загрузок из реляционных БД нужен инструмент, который может:
-
работать с разными типами БД;
-
обеспечивать надежность, стабильность и повторяемость загрузок;
-
восстанавливать потерянные в хранилище данные;
-
закрывать различные требования безопасности;
-
поддерживать легкое добавление новых источников, мониторинг и все прочее.
Закрывая эти задачи, продуктовые команды массово создавали собственные кастомные решения для загрузок, а у инфраструктурных команд последовательно рождались и умирали универсальные «комбайны». Причины их преждевременной смерти были одинаковыми:
-
Решения были монолитными, и их нельзя было отделить от инфраструктуры, а код ядра и сами скрипты загрузки хранились в одном монорепозитории.
-
Естественно, отсутствовала возможность использования отдельных сред для разработки и отладки.
-
Релизы, которые планировались «каждый вторник», на деле со скрипом удавалось выпускать раз в несколько месяцев.
-
Остановка текущих процессов у всех команд, обновление сервиса и последующий перезапуск загрузок были похожи на похороны генерального секретаря ЦК КПСС: много подготовки и суеты, массовая печаль, иногда истерика, а потом обязательно что-то переставало работать.
-
Конфликт зависимостей между командами приводил к тому, что апгрейдить сборки на новые версии библиотек было невозможно: всегда находился кто-то, чьи загрузки не могли быть переделаны под новые версии зависимостей.
-
Разговоры про обновление пайплайнов под новый релиз длились дольше, чем он создавался. Сами доработки велись больше времени, чем жили сами решения, а обновлялись они не больше пары раз.
-
Добавляли счастья конфиги разной степени упоротости. Несколько сотен строк в xml/json/yaml-файлах для настройки одной новой выгрузки вряд ли можно считать понятной и простой историей с низким порогом входа для пользователя.
-
Движок сервиса или кодогенератора допилить под нестандартный запрос было почти нереально.
-
Еще можно вспомнить различные «метасторы», системы мониторинга, управления логами, зависимостями и расписанием, которые падали чаще самих загрузок.
Для людей, владевших всей этой магией, ситуация выглядела не так трагично. Остальным пользователям такие решения напоминали велосипед без сиденья, с тремя передними колесами и одной левой педалью. Как-то они работали, но создавали проблем не меньше, чем решали.
Большинство продуктовых команд не могло ими пользоваться и пилило свои собственные инструменты. Инфраструктурная команда страдала и пользовалась сразу всем набором решений одновременно, так как собрать все загрузки было невозможно из-за разной функциональности.
Как мы наступили на грабли универсальности
Нашей лебединой песней было создание на java самописной утилиты загрузки по типу Scoop с запуском через CLI. Решив максимально упростить пользователям конфигурацию загрузок, мы сделали новые конфиги на yaml с возможностью инкапсуляции общих настроек. Было понятно, что львиная доля загрузок отличается друг от друга слабо, иногда только названием таблицы. А пайплайн для одного источника нет смысла конфигурировать для каждой таблицы отдельно, так как он почти не меняется в пределах способа загрузки.
Далее как обычно: сделали собственный мониторинг и набор зависимостей под одну конкретную среду (без учета того, где и что будут использовать разные команды). Бодро прикрутили поддержку Oracle в качестве источника. Дальше уже со скрипом добавили поддержку Postgres, скопировав и доработав ветку Oracle.
Усилия на создание загрузок сократились в разы. Чего не скажешь о трудностях в сопровождении и тестировании этого черного ящика. Инструмент остался заточенным под одно окружение, тестировать загрузки приходилось практически на проде, а команды поддержки не было. Большая часть функций была не стандартной, а имела авторскую реализацию, развивать их было достаточно сложно. По-прежнему это был монорепозиторий, в который коммитили несколько разных продуктовых команд. Нельзя было взять инструмент и добавить его в свое решение. Нужно было часть своего функционала делать на сторонней площадке.
Умерло все на задаче переноса загрузок с одного кластера на другой. Монорепозиторий нельзя было раздеплоить на два разных hadoop-кластера, пришлось создать отдельные форки для каждого кластера. Потребовалось пересобрать утилиту с другими версиями зависимостей, так как в старых имелись баги, которые сильно мешали жить. Но пересобирать было уже некому — автор решения потерял интерес к своему творению и уволился.

Создание нового велосипеда
После такого, пусть и не самого удачного, проекта мы обдумали совершенные ошибки и поняли, как действовать дальше:
-
Собрать основные боли пользователей и сконцентрироваться на них, а не на создании «интересных технических решений».
-
Взять готовый и предсказуемый движок в качестве «грузилки», а не изобретать и поддерживать свой.
-
Сделать максимально простое управление функциональностью, чтобы пользователи не страдали при написании развесистых yaml-конфигов.
-
Добавить возможность сделать загрузки частью любого продуктового решения. Пользователи должны самостоятельно добавлять загрузки в любой свой проект.
-
Реализовать запуск загрузок из кода, а не интерфейсов или консолей.
-
Сделать возможным запуск в любом месте на любом окружении без зависимостей от среды. Чтобы код, запускаемый в процессе разработки и отладки, не отличался от того, что потом будет крутиться на проде.
-
Предотвратить неуправляемые кодогенерации.
-
Избежать ограничений по зависимостям от библиотек, используемых командой.
-
Реализовать возможность интеграции в используемые сервисы в компании: не должно быть никаких «внешних» поделок — мониторинга, хранения метаданных и управления регламентом — которые необходимо везде таскать за собой прицепом.
-
И еще важно было сделать так, чтобы решение легко и просто развивалось и сопровождалось, при этом никак не ломая уже работающие загрузки. А сложность его развертывания должна быть не выше, чем pip install.
Вроде понятно, что делать дальше: нужно было выбрать движок. Может быть… spark? У него отличный python-api — поверх можно накрутить любую функциональность. Под капотом есть все, что нужно для мониторинга и управления ресурсами.
Использование библиотеки onETL сильно облегчило разработку нового инструмента благодаря высокоуровневой реализации взаимодействия с источниками — почитать документацию можно здесь, посмотреть доклады тут и тут. Это позволило минимизировать количество создаваемых функций, не пилить все с нуля и максимально сконцентрироваться на самом решении.
Процесс разработки и деплоя создаваемых spark-приложений выстраивали на основе EverProject — внутреннего инструмента команды МТС BigData, являющегося шаблоном ETL-проектов на python. EverProject содержит базовые конфигурации ETL-приложения, преднастроенный ci/cd-пайплайн, включающий сборку docker-образов и деплой создаваемых приложений на нашу инфраструктуру hadoop-airflow-spark. Так как это де-факто стандарт для наших продуктовых команд, то проблем с порогом вхождения у инженеров не возникает.
Так мы собрали low-code-библиотеку с названием d-van.
Что умеет d-van
Посмотрим, что в итоге у нас получилось:
-
мы реализовали пайплайн для загрузки батчей снапшотами или инкрементально и укладыванием данных по партициям на кластере;
-
он безопасно грузит батчи в файлы на HDFS с заданными уровнем параллелизма и форматом;
-
по окончании загрузок разом переносит все сохраненные файлы в hive-партиции, чтобы пользователи в моменте не получили доступ к неконсистентным данным;
-
если в процессе загрузки что-то упало, перезапуск продолжит работу пайплайна с момента падения;
-
если в таргет-таблице потерялись или были удалены какие то данные, то отсутствующие диапазоны данных автоматом будут добавлены в загрузку;
-
здесь же решаются вопросы управления партициями в таргет-таблицах, синхронизации данных из разных часовых поясов, преобразования типов источника к таргету, оптимального разделения загрузки на батчи и все прочее, о чем я рассказывал в первом разделе.
Примеры
Пример конфига (при условии, что region, sources и target настроены на уровне проекта):
defaults: - regions: ../regions - target: ../hive - _self_ load_strategy: snapshot source_table: foo source_columns: '*' table_name: mycompany__{dbcode}__{schema}_${.source_table} sources: - dbcode: msk_somedb schema: spam connection: ${connections.${.dbcode}} - dbcode: nvsb_somedb schema: spam connection: ${connections.${.dbcode}}
Пример части кода для вызова загрузки:
import hydra from pyspark.sql import SparkSession from d_van.dvan_master import DVanMaster task = "snapshot_foo.yaml" with hydra.initialize(version_base="1.3", config_path="/relative/path/to/config/folder"): app_conf = hydra.compose(config_name="config", overrides=[f"task={task}"]) spark = SparkSession.builder.appName("my_app").getOrCreate() dvan = DVanMaster(spark, app_conf.task) dvan.execute_parallel() spark.stop()
Возможности d-van
На тип источника внутри пайплайна не завязывались, так как, используя под капотом onEtl, можно единообразно общаться с реляционными БД любого типа. Для учета особенностей механики конкретных СУБД создали в d-van классы-адаптеры, в которых нет лишней логики и минимум кода (поэтому поддержка 4 реляционных источников Oracle, Postgres, MSSQL и MySQL была готова уже в первых релизах и при необходимости можно добавить любые из поддерживаемых onETL). Сделали конкурентный запуск джобов внутри поднятой spark-сессии, чтобы не поднимать их десятками при загрузке одной таблицы с разных инстансов источника и/или одновременной загрузке нескольких батчей параллельно.
Пайплайн загрузки при работе берет настройки из конфига всего ETL-проекта. EverProject использует иерархические конфиги OmegaConf фреймворка hydra. Благодаря этому большая часть необходимых параметров для работы пайплайна по умолчанию есть во всех ETL-проектах: параметры для dev-test- и prod-сред и все нужное для подключения к используемым командой источникам.
Докидываем параметры для задач загрузки, инкапсулируем и переиспользуем все, что уже есть в конфиге проекта, — этого достаточно для запуска. EverProject решает вопросы зависимостей, изолированности кодовых баз и привязки к инфраструктуре. Можно безопасно использовать любую версию spark и других библиотек на любом окружении.
onETL позволяет запускать загрузки в любом своем окружении и разрабатывать, тестировать и отлаживать их безопасно. Если ты видел, что код работает в локальном окружении, то его можно смело деплоить куда угодно. Логи и мониторинг у нас от Spark, метрики и статистика тоже из его комплекта (Graphite и Grafana). Для отслеживания происхождения данных можно легко прикрутить openLineage к Spark — и нам будут видны все загрузки.
Пишем кучу тестов, собираем и публикуем в артифактори свою библиотеку — и у нас есть готовый загрузчик, который можно добавить как зависимость в любой из своих ETL-проектов и использовать его, чтобы переносить данные на кластер. Работать все будет и без EverProject: импортировать библиотеку можно в любом py-скрипте, а конфиг в загрузчик прокинуть обычным python dict. Просто dict-конфигами не так удобно управлять, как с hydra. И деплой в продакшн придется использовать свой.
Запуск spark-приложения в режиме master=local
Это одна из возможностей, которую нам дает Spark. Ее удобно использовать для разработки и тестирования, но у приличных людей тянуть ее в прод не принято. Однако в случае с одновременной загрузкой нескольких батчей ситуация иная. В режиме yarn на каждую таску нам нужен один worker (пусть и с минимальным объемом памяти, но минимум с 1 ядром). Worker не утилизирует на хосте целое ядро CPU, но из пула ресурс-менеджера каждый работающий worker выдергивает одно ядро. Если разом грузим 1 000 батчей, то 1 000 ядер пропадет из пула yarn-очереди. А это будет чувствительно для кластера.
Этого можно избежать в режиме local: spark даже в нем остается многопоточным движком и позволяет запустить одновременно загрузку нескольких батчей, которые работают параллельно и потребляют от силы 1 ядро CPU на всех. В режиме local нам не требуется запуск worker — мы сразу экономим кучу времени, особенно когда речь идет о небольших батчах, которые грузятся около одной минуты. А еще мы в этом режиме не используем при загрузках wide-трансформации. Таким образом, нам не нужны RAM и распределенные вычисления: RAM для работы загрузок нужна только Spark-драйверу и для fetchsize. Еще режим master=local решает приколы с информационной безопасностью, когда все загрузки должны идти только через разрешенные хосты.
Что мы получили с d-van
Это библиотека, которая одной из первых решает задачи загрузки не только для инфраструктурных, но и для продуктовых команд. Постепенно она заменяет самописные решения. Благодаря использованию более ранних наработок и EverProject нам удалось собрать d-van быстро и без необходимости писать много кода. И feature-реквесты от пользователей говорят о том, что библиотека востребована и будет развиваться. И получают они свои фичи максимально быстро. А могут и сами запилить что-то под свои нужды — код библиотеки в открытом доступе внутри компании.
C докладом про d-van мы подавались на одну конференцию и получили фидбэк за «продвижение худших практик использования Spark». Но факт остается фактом: их использование успешно закрыло нам болезненную тему батчевых загрузок. Python и pуspark в команде знают все инженеры, аналитики и DS, а прикрутить NiFi или Debezium к каждому проекту — задача не из легких.
OnETL давно доступен в open source, любой желающий может использовать эту библиотеку для самостоятельного создания собственных решений. Разработанный нами d-van — универсальный инструмент для батчевой загрузки из реляционных СУБД — сейчас доступен только внутри нашей компании. Он содержит в себе некоторые решения, завязанные на нашей внутренней инфраструктуре, но их несложно выпилить и сделать ее более универсальной. Поделитесь своим мнением, будет ли интересна такая тулза в open source?
ссылка на оригинал статьи https://habr.com/ru/articles/919490/
Добавить комментарий