Как мы попробовали Apache Iceberg в связке со Spark и что из этого вышло

от автора

Тема преимуществ открытых табличных форматов при работе с озерами данных всё чаще поднимается в среде дата-инженеров. Предполагается, что их использование способно устранить недостатки популярного Apache Hive. Но так ли это на практике?

Меня зовут Иван Биленко, я инженер данных в команде дата-платформы Циан. В этой статье я хочу немного познакомить вас с процессами и стеком внутри нашей платформы, рассказать, почему мы решили попробовать Iceberg, с какими проблемами столкнулись при тестировании и какие преимущества Iceberg может дать тем, кто еще только задумывается о переходе. Дисклеймер: статья носит обзорный характер.


Описание стека и процессов с данными

Циан с точки зрения обработки данных прошел долгий путь. В 2001 году мы были обычной Excel-таблицей.

Сегодня же наша инфраструктура включает в себя DWH на базе Greenplum и Data Lake на основе Yandex S3.

Основные источники данных — это сайт и мобильные приложения, откуда информация поступает в Data Lake через Kafka — наш основной канал передачи данных. После этого данные распределяются по различным потребителям:

Аналитики получают данные через DWH для проведения исследований и анализа.

Дата-сайентисты используют данные из фича-стора для построения моделей машинного обучения.

Есть и другие потребители данных: например, разработчики, а также прочие системы, участвующие в процессе передачи данных. Но на этом этапе мы не будем углубляться в их роль. Остановимся на процессах вокруг Data Lake (на схеме A и B) и связанных задачах.

Боли в процессах и потребности

Процесс A: работа с данными через Kafka

На этом этапе мы обрабатываем более 400 таблиц, загружая и парся данные из Kafka по заранее заданным слоям. У себя внутри мы называем этот процесс раскладкой. Каждую неделю добавляются 2-3 новые таблицы. 

Основная задача нашей команды — обеспечить доступность и актуальность данных к назначенному сроку, а также предоставить бизнесу инструмент для самостоятельного управления раскладкой без необходимости привлекать дата-инженеров. 

Какие проблемы мы здесь встречаем:

  1. Ограниченные DDL-операции в Apache Hive: Например, у нас нет доступа к операциям с колонками, таким как ALTER TABLE RENAME, REPLACE, или DROP. Это создает сложности, поскольку источники данных меняют свои форматы, и нам приходится перезаписывать таблицы целиком, что усложняет автоматизацию раскладки.

  2. Отсутствие транзакций: Если на этапе перезаписи происходит ошибка, мы можем потерять данные. Чтобы избежать этого, мы используем костыли с временными таблицами, записывая данные сначала в них. Но эти костыли не решают проблему временной недоступности таблиц. Кроме того, нам не удается проводить операции с таблицами одновременно, что сильно замедляет процесс.

Процесс B: работа с фича-стором

Мы собираем около 40 фича-таблиц для нашего фича-стора. Возникает потребность в хранении исторических версий данных. Поскольку речь идет о таблицах, которые могут весить сотни гигабайт, нам нужен эффективный механизм Time Travel, чтобы избежать создания копий данных для разных версий.

Какое решение

В процессе поиска решения на горизонте сразу появились несколько популярных табличных форматов: Delta Lake, Apache Iceberg и Apache Hudi. Все они нацелены на расширение возможностей реляционных баз данных для Data Lake, что должно было полностью покрыть наши запросы. Что касается минусов перехода — на первый взгляд, кроме возможных сложностей с настройкой, их будто бы и нет. Если вы наткнулись на другую информацию, буду рад ссылке в комментариях к статье или в личном сообщении.

При выборе между табличными форматами Delta Lake сразу отпал, поскольку поддерживает только формат Parquet, а полноценное production ready решение с его полным функционалом доступно лишь в коммерческой версии Databricks. Нам же было нужно опенсорсное решение.

Из решений от Apache выбрали Iceberg благодаря его простой настройке — Yandex Cloud предлагает готовый набор файлов jar и подробный мануал, — и активному сообществу.

Сравнение форматов Iceberg и Hive

А теперь возьмём основные фишки Apache Iceberg, изучим, как они должны работать, и проверим их на практике.

Снэпшоты

После каждой операции над таблицей Iceberg фиксирует ее состояние в виде снимка и ведёт лог. Это даёт возможность перемещаться между снэпшотами, делать Time Travel запросы не только по id снимка, но и по заданному тегу. 

На практике это оказалось полезным: так можно фиксировать продовые версии датасетов. А вот функция разделения снэпшотов по веткам (например, для экспериментов, чтобы не трогать продовые данные) пока не пригодилась.

Производительность

Netflix, как создатель формата Apache Iceberg, делится впечатляющим кейсом его использования.

Данные

  • 2,7 млн файлов в 2688 партициях

Запрос

SELECT distinct tags[‘type’] as type 

FROM iceberg.atlas 

WHERE name = ‘metric-name’ 

AND date > 20180222 

AND date <= 20180228 

ORDER BY type;

Результат

EXPLAIN — 9 мин., query — не отрабатывает.

query — 42 сек. при условии min/max filtering.

Почему такой скачок в производительности?

Основной ответ — Iceberg оптимизирует запросы за счёт метаданных, которые позволяют фильтровать файлы, не соответствующие условиям запроса, исключая их из сканирования. 

Также Iceberg избавляет от необходимости листинга директорий по каждой партиции, ведь все метаданные хранятся отдельно.

Структура директории Iceberg таблицы

Структура директории Iceberg таблицы

Уровни метаданных

1. Для каждого дата-файла: статистики по колонкам хранятся в manifest files.

 select * from iceberg_catalog.feature_store.test.files

 select * from iceberg_catalog.feature_store.test.files

2. На уровне снэпшота таблицы: агрегированные статистики в manifest list.

select * from iceberg_catalog.feature_store.test.snapshots

select * from iceberg_catalog.feature_store.test.snapshots

Однако, на практике всё оказалось не так гладко.

Данные

  • 400 Гб, 31 тысяча файлов в 820 партициях, формат Parquet.

Условия

  • Движок Spark 3.1.2, Iceberg 0.13.1 и фиксированное количество ресурсов.

Запрос

SELECT count(*)

FROM iceberg_catalog.feature_store.user_search_history_features

Результат

EXPLAIN — 5 сек., query — 1 мин. 7 сек.

EXPLAIN — 58 сек., query — 4 мин. 32 сек.

Ожидалось, что подсчёт количества строк займет десятки секунд, но реальность оказалась другой. Возник вопрос: почему так?

Первое предположение: Iceberg не использует метаданные для оптимизации. Логично было бы взять информацию из summary с указанием total-records, но, глядя на план выполнения, видно, что таблица сначала полностью сканируется.

== Physical Plan ==

* HashAggregate (5)

+- Exchange (4)

   +- * HashAggregate (3)

      +- * Project (2)

         +- BatchScan (1)

Нет отдельного шага, где бы происходило чтение метаданных.

Второе предположение: слишком много метаданных, так как на каждую операцию создается новый файл. 

2823 — количество файлов метаданных в тестируемой таблице

2823 — количество файлов метаданных в тестируемой таблице

В документации действительно есть раздел Maintenance, где рекомендуется регулярно чистить метаданные. Но какое количество является допустимым для сохранения производительности — вопрос остаётся открытым.

Наличие транзакций

Транзакционная согласованность в Iceberg дает возможность параллельно выполнять операции записи без влияния на операции чтения. Проблема потери данных при перезаписи больше не актуальна. Также ушла ошибка Cannot overwrite a path that is also being read from — теперь не нужно сбрасывать план запроса или создавать временные таблицы, если пайплайн одновременно читает и записывает в одно и то же место.

Эволюция данных

Наша первая боль — ограниченные операции с колонками — решена благодаря возможности эволюции схемы данных без перезаписи. Теперь доступны операции Add, Drop, Rename, Update, Reorder. Возможность менять схему партиционирования (на текущий момент стабильно по дням) и порядка сортировки таблиц пока не использовали, но опция есть.

Расширенный SQL

Операцию DELETE пока не применяли, зато UPDATE используем регулярно в связке с INSERT через MERGE INTO. Важно помнить, что архитектурно в S3 сами файлы не изменяются и не удаляются — создаются дополнительные файлы, которые объединяются при чтении. Со временем это может сказаться на производительности.

Результаты и выводы

Мы начали частичную миграцию на Iceberg при создании нового процесса для сохранения исторических фичей в фича-сторе (процесс B). Нам удалось решить многие из упомянутых болей, но возникло снижение производительности на «жирных» таблицах, которое нужно проработать для полноценного перехода (процесс A). А полноценная миграция — тема для следующей статьи.


ссылка на оригинал статьи https://habr.com/ru/articles/859484/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *