Livy — недостающее звено цепи Hadoop Spark Airflow Python

от автора

Привет всем, немного информации «из под капота» дата инженерного цеха Альфастрахования — что будоражит наши технические умы.

image

Apache Spark — замечательный инструмент, позволяющий просто и очень быстро обрабатывать большие объемы данных на достаточно скромных вычислительных ресурсах (я имею в виду кластерную обработку).

Традиционно, в процессе ad hoc обработки данных используется jupyter notebook. В комбинации со Spark-ом это позволяет нам манипулировать долго живущими дата фреймами (распределением ресурсов занимается Spark, дата фреймы «живут» где-то в кластере, время их жизни ограничено временем жизни Spark контекста).

После переноса обработки данных в Apache Airflow время жизни дата фреймов сильно сокращается — Spark контекст «живет» в пределах одного оператора Airflow. Как это обойти, зачем обходить и при чем здесь Livy — читайте под катом.

Давайте рассмотрим совсем-совсем простой пример: предположим нам нужно денормализовать данные в большой таблице и сохранить результат в другой таблице для дальнейшей обработки (типичный элемент конвейера обработки данных).

Как бы мы это делали:

  • загрузили данные в dataframe (выборка из большой таблицы и справочников)
  • посмотрели «глазами» на результат (правильно ли получилось)
  • сохранили dataframe в таблицу Hive (например)

По результатам анализа нам может потребоваться вставить на втором шаге какую-то специфическую обработку (словарную замену или еще что-то). С точки зрения логики мы имеем три шага

  • шаг 1: загрузка
  • шаг 2: обработка
  • шаг 3: сохранение

В jupyter notebook у нас так и получается — мы можем сколь угодно долго обрабатывать загруженные данные, отдав Spark-у управление ресурсами.

Вполне логично ожидать, что такое разбиение удастся перенести в Airflow. То есть иметь граф примерно такого вида

image

К сожалению, это невозможно при использовании комбинации Airflow + Spark: каждый оператор Airflow исполняется в своем python интерпретаторе, поэтому кроме всего прочего каждый оператор должен как-то «персистить» результаты своей деятельности. Тем самым наша обработка «сжимается» в один шаг — «денормализовать данные».

Как можно «вернуть» в Airflow гибкость jupyter notebook? Понятно, что приведенный пример «того не стоит» (может быть, даже наоборот — получается хороший понятный шаг обработки). Но все же — как сделать так, чтобы операторы Airflow могли выполняться в одном Spark контексте над общим пространством dataframe-ов?

Приветствуем Livy

На помощь приходит еще один продукт экосистемы Hadoop — Apache Livy.

Не буду пытаться здесь описать — что это за «зверь» такой. Если совсем кратко и черно-бело — Livy позволяет «инжектить» python код в программу, которую исполняет driver:

  • сначала мы создаем сессию работы с Livy
  • после этого у нас есть возможность исполнять произвольный код на python-е в этой сессии (очень похоже на идеологию jupyter/ipython)

И к всему этому есть REST API.

Возвращаясь к нашей простенькой задаче: с помощью Livy мы можем сохранить изначальную логику нашей денормализации

  • на первом шаге (первом операторе нашего графа) мы загрузим и выполним код загрузки данных в dataframe
  • на втором шаге (втором операторе) — выполним код необходимой дополнительной обработки этого dataframe
  • на третьем шаге — код сохранения dataframe-а в таблицу

Что в терминах Airflow может выглядеть так:

image

(поскольку картинка вполне реальный скриншот, то добавились дополнительные «реалии» — создание Spark контекста стало отдельной операцией со странным названием, «обработка» данных пропала, потому что не нужна, и т.п.)

Если обобщить — мы получаем

  • универсальный Airflow оператор, который выполняет код на python-е в сессии Livy
  • возможность «организовывать» код на python-е в достаточно сложные графы (на то и Airflow)
  • возможность заняться оптимизациями более высокого уровня, например, в каком порядке нужно выполнять наши преобразования с тем, чтобы Spark смог максимально долго держать общие данные в памяти кластера

Типичный конвейер подготовки данных для моделирования содержит порядка 25 запросов по 10 таблицам, очевидно, что некоторые таблицы используются чаще других (те самые «общие данные») и есть что пооптимизировать.

Что дальше

Техническая возможность опробована, думаем дальше — как технологичнее перевести наши трансформации в эту парадигму. И как подступиться к упомянутой выше оптимизации. Мы еще в начале этой части нашего пути — когда будет что-то интересное, обязательно поделимся.


ссылка на оригинал статьи https://habr.com/ru/company/alfastrah/blog/466017/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *