Как мы переехали с Greenplum на Delta Table

от автора

У нас было 2 кластера Clickhouse, 1 кластер GreenPlum’a, 1 кластер Kubernetes’a, DataVault 2.0, гора dbt витрин и всего такого, а еще Dagster. Не то чтобы это все было нужно в архитектуре, но раз начал коллекционировать сервисы, то иди в своем увлечении до конца. Единственное, что нас беспокоило — это стоимость. 

Решение о полной переделке архитектуры пришло внезапно, как галлюцинация в ночной пустыне — мы поняли, что больше нельзя ждать. Теперь наши данные обрабатываются быстрее, чем мысли в голове на ЛСД, и мы можем персонализировать клиентский опыт так, что он становится почти реальным. 


Всем привет! На связи Артем, и если вы готовы погрузиться в мир цифрового безумия и увидеть, как мы превратили хаос в порядок, то держитесь крепче и читайте дальше.

Когда-то мы уже писали о том, как построить DataVault на Greenplum, со статьей можно ознакомиться здесь. Кратко расскажем о причинах, подтолкнувших нас пересмотреть подход — это оптимизация расходов и отсутствие экспертов в администрировании GreenPlum.

В первом случае нам бы хотелось платить только за время фактического использования ресурсов, а во втором, несмотря на то, что наш кластер был управляемым, мы регулярно сталкивались с проблемами поддержки и масштабирования системы, что также приводило к непредсказуемым затратам.

Выбор нового решения

При выборе нового решения мы учитывали несколько ограничений:

  • Dagster: для управления задачами

  • Kubernetes: для оркестрации контейнеров

  • Yandex Cloud: как основная облачная платформа

Основные компоненты нового решения:

  • Yandex Object Storage: для хранения

  • Delta Table: для управления

  • Apache Spark: для обработки

Почему именно эти технологии?

Переход с архитектуры Data Vault 2.0 на архитектуру Delta Lake (S3 + Delta Table) может предложить ряд существенных преимуществ:

  • Транзакционная целостность и ACID-свойства — Delta Lake обеспечивает надежность и целостность данных при параллельных операциях

  • Производительность и оптимизация запросов — структура Data Vault 2.0 может приводить к сложным и менее оптимизированным запросам, требующим значительных усилий для оптимизации

  • Гибкость и масштабируемость — Data Vault 2.0 хорошо подходит для интеграции из  различных источников, но может потребовать значительных ресурсов для масштабирования под высокие нагрузки, тогда как Delta Lake спроектирован для работы с большими объемами данных в облачной среде и легко масштабируется в зависимости от нагрузки без серьезных затрат

  • Упрощенное управление схемой данных — delta table поддерживают автоматическую эволюцию схемы и принудительное соблюдение схемы, что упрощает внесение изменений в структуру данных и управление ими

  • Экосистема и интеграция с другими инструментами — delta table легко интегрируется с apache spark, что упрощает разработку и эксплуатацию аналитических приложений

Apache Spark

Одним из преимуществ Apache Spark является возможность его локального развертывания, что особенно удобно для разработки и тестирования. На текущий момент обработка наших данных происходит именно таким образом.

Как это работает?

В самом простом виде нашу архитектуру можно представить следующим образом:

Архитектура данных

Архитектура данных

Данные хранятся в Yandex Object Storage в формате Delta Table. Это позволяет нам воспользоваться преимуществами версионирования данных и транзакционной целостности. Помимо этого, мы используем S3 для хранения артефактов ML моделей.

В работе с форматом Delta Table правильное распределение данных по партициям играет ключевую роль. Учитывая разнообразие наших источников данных, мы внедрили партиционирование не только по дате, но и по источнику. Это значительно повышает скорость и эффективность чтения данных. При грамотном выборе партиции Spark применяет partition pruning, избегая необходимости читать все файлы в таблице, что оптимизирует производительность системы.

Для нашего MVP мы выбрали использование кластера Spark на одном узле, настроив SparkSession следующим образом:

SparkSession.builder.appName().master(f"local[*]")

Полная конфигурация Spark’a для работы с S3 может выглядеть следующим образом:

from delta import configure_spark_with_delta_pip from pyspark.sql import SparkSession  packages = [     "io.delta:delta-core_2.12:2.0.0",     "org.apache.hadoop:hadoop-aws:3.3.1", ]  builder = (SparkSession.builder.appName().master(f"local[*]")     .config("spark.driver.memory", "16g")     .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.access.key", "<backet_access_key>")     .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.secret.key", "<backet_secret_key>")     .config("spark.hadoop.fs.s3a.bucket.<bucket_name>.endpoint", "<backet_endpoint>")     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")     .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")     .config("spark.hadoop.fs.s3a.fast.upload", "true")     .config("spark.sql.ui.explainMode", "extended")     .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")     .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")     .config("spark.sql.sources.partitionOverwriteMode", "dynamic")     .config("spark.driver.extraJavaOptions", _spark_with_newer_jvm_compatibility_options) )  spark = builder.getOrCreate() spark.sparkContext.setLogLevel("ERROR")

Такой подход позволяет запускать Spark локально на той же машине, где создается сессия. Благодаря этому, мы можем запускать Spark непосредственно в подах Dagster и удобно отлаживать запросы локально.

В работе с Dagster мы используем ассеты и менеджеры. Вся бизнес-логика размещается внутри ассета, а управление вводом-выводом передается менеджеру. Таким образом, нам было достаточно написать Spark менеджер и адаптировать запросы в коде для работы со Spark. 

Для экономии ресурсов во время простоя мы внедрили автоскейл группы прерываемых узлов в Managed Service for Kubernetes от Яндекс Облака. Развернули с помощью Terraform, используя готовый модуль. Мы настроили группы так, чтобы они могли масштабироваться до нуля узлов при отсутствии нагрузки. Для подов Dagster у нас выделена отдельная группа узлов, где всегда работает хотя бы один узел. Таким образом, система практически не потребляет ресурсы в периоды простоя.

При необходимости также можно запросить чуть больше ресурсов для конкретных задач, настроив конфигурацию для каждого задания Dagster:

@job(         tags={         "dagster-k8s/config": {             "container_config": {                 "resources": {                     "requests": {"cpu": "250m", "memory": "64Mi"},                     "limits": {"cpu": "500m", "memory": "2560Mi"},                 },             },     }, ) def my_job():     my_op()  Либо, при использовании ассетов: my_job = define_asset_job( name=’my_job’, selection=AssetSelection.assets(my_asset), tags={         "dagster-k8s/config": {             "container_config": {                 "resources": {                     "requests": {"cpu": "250m", "memory": "64Mi"},                     "limits": {"cpu": "500m", "memory": "2560Mi"},                 },             },     }, )

Для записи данных в Clickhouse мы используем библиотеку clickhouse-connect, так как наш исходный код уже был написан с ее помощью. Однако стоит отметить, что для Spark есть драйвер для работы с Clickhouse, позволяя использовать Spark SQL для выполнения запросов и обработки данных в Clickhouse.

Важно отметить, что для контроля расходов на использование хранилища необходимо регулярно обслуживать все таблицы с помощью команд VACUUM и OPTIMIZE. Команда VACUUM удаляет старые, неиспользуемые данные и освобождает пространство, что помогает снизить затраты на хранение. Команда OPTIMIZE, в свою очередь, не только уменьшает размер таблиц, но и значительно улучшает производительность запросов, ускоряя чтение данных. 

Важное замечание: Spark + Delta Table по умолчанию поддерживают механизм оптимистичных блокировок для write-запросов, выполняемых в одном Spark кластере, но не для write-запросов из разных Spark кластеров. Чтобы включить такую поддержку, необходимо добавить в конфигурацию DynamoDB. В Яндекс Облаке YDB реализует интерфейс DynamoDB, но его интеграция оказалась для нас сложной задачей. Поэтому мы решили использовать механизм пессимистичных блокировок, встроенный в Dagster, через функцию tag_concurrency_limits. Все джобы, записывающие данные в таблицу, мы помечаем тегом с названием этой таблицы. Dagster, в свою очередь, предотвращает одновременное выполнение таких джоб, что обеспечивает корректное управление блокировками и исключает конфликтные write-запросы. 

Таким образом, конфигурация вашего dagsterDaemon будет выглядеть как-то так:

dagsterDaemon:  enabled: true    image:    repository: "docker.io/dagster/dagster-celery-k8s"    tag: ~    pullPolicy: Always    heartbeatTolerance: 300    runCoordinator:    enabled: true    type: QueuedRunCoordinator    config:      queuedRunCoordinator:        maxConcurrentRuns: 10        tagConcurrencyLimits:          - key: "single-thread"            value:              applyLimitPerUniqueValue: true            limit: 1

После чего в настройки вашего задания можно добавить:

my_job = define_asset_job( name=’my_job’, selection=AssetSelection.assets(my_asset), tags={         “single-thread”: “my_job”,         "dagster-k8s/config": {             "container_config": {                 "resources": {                     "requests": {"cpu": "250m", "memory": "64Mi"},                     "limits": {"cpu": "500m", "memory": "2560Mi"},                 },             },     }, )

Преимущества нового подхода:

  • Экономия расходов

Мы платим только за фактическое время работы, что позволяет эффективно управлять бюджетом и уменьшить издержки.

  • Гибкость и масштабируемость

Мы легко адаптируемся под изменяющиеся нагрузки, обеспечивая высокую производительность и надежность

  • Поддержка существующих процессов

Мы по-прежнему поставляем готовые витрины в Clickhouse, что позволяет сохранить существующие процессы для аналитиков и избежать значительных изменений.

Заключение

Переход на новую архитектуру оказался успешным и принес значительную экономию расходов, улучшение гибкости и масштабируемости. Наш опыт показывает, что такие изменения могут быть менее сложными, чем ожидается, и приносить значительные выгоды. 

А в данный момент мы подготавливаем инфраструктуру для внедрения горизонтально масштабируемого Spark’a и поддержки Hive Metastore, о чем постараемся рассказать в следующей статье.


Ну вот и все!) Благодарю за внимание и буду рад услышать ваше мнение) Поделитесь своим опытом, задавайте вопросы и расскажите, как вы справляетесь с подобными задачами)


ссылка на оригинал статьи https://habr.com/ru/articles/838112/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *