Уже более двух лет data build tool активно используется в компании Wheely для управления Хранилищем Данных. За это время накоплен немалый опыт, мы на тернистом пути проб и ошибок к совершенству в Analytics Engineering.
Несмотря на то, что в русскоязычном сегменте уже есть несколько публикаций, посвященных применению dbt, всё ещё нельзя говорить о широкой популярности и интересе, которые продукт стремительно обретает на Западе.
Поэтому сегодня я предлагаю вам экскурсию по Хранилищу Данных Wheely. В формат публикации я попытался уложить самые яркие моменты и впечатления от использования dbt, снабдив реальными примерами, практиками и опытом. Добро пожаловать под кат.
Структура превыше всего
Измерять сложность Хранилища Данных в количестве гигабайт сегодня — дурной тон
Налить кучу тяжело интерпретируемых данных без метаинформации (читай мусора) не составит большого труда. Гораздо сложнее из этих данных получить что-то осмысленное. То, на что с уверенностью могут опираться business stakeholders, принимая решения. То, что регулярно измеряется на предмет качества и актуальности. Наконец, то, что соответствует принципам Keep it simple (KISS) и Don’t repeat yourself (DRY).
Первостепенным элементом я считаю прозрачность структуры Хранилища Данных. Чаще всего DWH выстраивается согласно многослойной логике, где каждому этапу соответствует набор преобразований, детали реализации которого скрыты для последующих слоев (элемент абстракции).
Зеленым цветом – слой источников данных sources. Это реплики структур и таблиц из исходных систем, которые поддерживаются ELT-сервисом. Данные синхронизируются 1:1 с источником, без каких-либо преобразований. Опциональный слой flatten позволяет вложенные иерархические структуры (JSON) превратить в плоские таблицы.
Слой staging предназначен для простых преобразований: переименование полей, преобразование типов, расчет новых колонок с помощью конструкции case. На этом этапе мы готовим почву для дальнейших преобразований, приводим всё к единому виду и неймингу.
Intermediate или промежуточный слой отвечает за формирование предварительных таблиц и агрегатов, где происходит обогащение данных. Для ряда бизнес-областей мы не используем этот слой, для других логика может насчитывать до 5-10 взаимосвязанных моделей.
Кульминацией являются data marts или Витрины Данных, которые используются Data Scientists / Business Users / BI tools. Слой, в свою очередь, делится на:
-
dimensions: пользователи, компании, машины, водители, календарь
-
facts: поездки, транзакции, сеансы, продвижения, коммуникации
-
looker: материализованные представления и витрины, оптимизированные под чтение из BI-системы
Число 120 из заголовка публикации относится только к витринам данных:
Running with dbt=0.19.0 Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
На текущий момент в проекте:
-
273 модели во всех перечисленных слоях
-
493 теста на эти модели, включая not null, unique, foreign key, accepted values
-
6 снапшотов для ведения истории SCD (slowly changing dimensions)
-
532 макроса (большая часть из которых импортирована из сторонних модулей)
-
7 operations включая vacuum + analyze
-
81 источник данных
Помимо разбиения на логические слои, Хранилище можно нарезать по бизнес-областям. В случае необходимости есть возможность пересчитать или протестировать витрины, относящиеся к вертикалям Marketing / Supply / Growth / B2B. Например, в случае late arriving data или ручных корректировках маппингов/справочников.
Осуществляется это за счет присвоения моделям и витринам тегов, а также за счет богатых возможностей синтаксиса выбора моделей. Запустить расчет всех витрин вертикали Marketing и их вышестоящие зависимости:
dbt run -m +tag:marketing
Этот же принцип лежит в основе организации кодой базы. Все скрипты объединены в директории с общей логикой и понятными наименованиями. Сложно потеряться даже при огромном количестве моделей и витрин:
Иерархия проекта dbt
. |____staging | |____webhook | |____receipt_prod | |____core | |____wheely_prod | |____flights_prod | |____online_hours_prod | |____external | |____financial_service |____marts | |____looker | |____dim | |____snapshots | |____facts |____flatten | |____webhook | |____receipt_prod | |____wheely_prod | |____communication_prod |____audit |____sources |____aux | |____dq | | |____marts | | |____external |____intermediate
Оптимизация физической модели
Логическое разделение на слои и области — это замечательно. Но не менее важно и то, как эта логика ложится на конкретную СУБД. В случае Wheely это Amazon Redshift.
Подход с декомпозицией позволит разбить логику на понятные части, которые можно рефакторить по отдельности. Одновременно это помогает оптимизатору запросов подобрать лучший план выполнения. По такому принципу реализована одна из центральных витрин – journeys (поездки).
На этапе обогащения данных важна скорость склейки таблиц (join performance), поэтому данные сегментированы и отсортированы в одинаковом ключе, начиная с sources. Это позволит использовать самый быстрый вид соединения — sort merge join:
Конфигурация для оптимального соединения – sort merge join
{{ config( materialized='table', unique_key='request_id', dist="request_id", sort="request_id" ) }}
Витрина же хранится отсортированной по самым популярным колонкам доступа: city, country, completed timestamp, service group. В случае правильного подбора колонок Interleaved key позволяет значительно оптимизировать I/O и ускорить отрисовку графиков в BI-системах.
Конфигурация для быстрого чтения витрины – interleaved sortkey
{{ config( materialized='table', unique_key='request_id', dist="request_id", sort_type='interleaved', sort=["completed_ts_loc" , "city" , "country" , "service_group" , "is_airport" , "is_wheely_journey"] ) }}
При этом часть моделей есть смысл материализовать в виде views (виртуальных таблиц), не занимающих дисковое пространство в СУБД. Так, слой staging, не содержащий сложных преобразований, конфигурируется на создание в виде представлений на уровне проекта:
staging: +materialized: view +schema: staging +tags: ["staging"]
Другой интересный пример – результаты проверки качества данных. Выбранный тип материализации – ephemeral, т.е. на уровне СУБД не будет создано ни таблицы, ни представления. При каждом обращении к такой модели будет выполнен лишь запрос. Результат такого запроса является слагаемым в суммарной таблице, содержащей метрики всех проверяемых объектов.
В свою очередь большие таблицы фактов имеет смысл наполнять инкрементально. Особенно при условии того, что факт, случившийся однажды, больше не меняет своих характеристик. Таким образом мы процессим только изменения (delta) – новые факты, произошедшие после последнего обновления витрины. Обратите внимание на условие where:
Пример инкрементального наполнения витрины
{{ config( materialized='incremental', sort='metadata_timestamp', dist='fine_id', unique_key='id' ) }} with fines as ( select fine_id , city_id , amount , details , metadata_timestamp , created_ts_utc , updated_ts_utc , created_dt_utc from {{ ref('stg_fines') }} where true -- filter fines arrived since last processed time {% if is_incremental() -%} and metadata_timestamp > (select max(metadata_timestamp) from {{ this }}) {%- endif %} ), ...
Кстати, о принципах MPP и о том, как выжать максимум из аналитических СУБД я рассказываю на курсах Data Engineer и Data Warehouse Analyst (скоро первый запуск!).
SQL + Jinja = Flexibility
Высокоуровневый декларативный язык SQL прекрасен сам по себе, но вкупе с движком шаблонизации Jinja он способен творить чудеса.
Любой код, который вы используете с dbt проходит этапы compile & run. На этапе компиляции интерпретируются все шаблонизированные выражения и переменные. На этапе запуска код оборачивается в конструкцию CREATE в зависимости от выбранного типа материализации и фишек используемой СУБД: clustered by / distributed by / sorted by. Рассмотрим пример:
Model code:
{{ config( materialized='table', dist="fine_id", sort="created_ts_utc" ) }} with details as ( select {{ dbt_utils.star(from=ref('fine_details_flatten'), except=["fine_amount", "metadata_timestamp", "generated_number"] ) }} from {{ ref('fine_details_flatten') }} where fine_amount > 0 ) select * from details
Compiled code:
with details as ( select "id", "fine_id", "city_id", "amount", "description", "created_ts_utc", "updated_ts_utc", "created_dt_utc" from "wheely"."dbt_test_akozyr"."fine_details_flatten" where fine_amount > 0 ) select * from details
Run code:
create table "wheely"."dbt_test_akozyr"."f_chauffeurs_fines" diststyle key distkey (fine_id) compound sortkey(created_ts_utc) as ( with details as ( select "id", "fine_id", "city_id", "amount", "description", "created_ts_utc", "updated_ts_utc", "created_dt_utc" from "wheely"."dbt_test_akozyr"."fine_details_flatten" where fine_amount > 0 ) select * from details );
Ключевым моментом является тот факт, что пишете вы только лаконичный шаблонизированный код, а остальным занимается движок dbt. Написание boilerplate code сведено к минимуму. Фокус инженера или аналитика остается преимущественно на реализуемой логике.
Во-вторых, как происходит выстраивание цепочки связей и очередности создания витрин, продемонстрированные на картинках выше? Внимательный читатель уже заметил, что в рамках написания кода при ссылках на другие модели нет хардкода, но есть конструкция {{ ref('fine_details_flatten') }}
– ссылка на наименование другой модели. Она и позволяет распарсить весь проект и построить граф связей и зависимостей. Так что это тоже делается абсолютно прозрачным и органичным способом.
С помощью шаблонизации Jinja в проекте Wheely мы гибко управляем схемами данных и разделением сред dev / test / prod. В зависимости от метаданных подключения к СУБД будет выбрана схема и период исторических данных. Продакшн модели создаются в целевых схемах под технической учетной записью. Аналитики же ведут разработку каждый в своей личной песочнице, ограниченной объемом данных в 3-е последних суток. Это реализуется с помощью макроса:
Макрос управления схемами для подключений:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if target.name == 'prod' and custom_schema_name is not none -%} {{ custom_schema_name | trim }} {%- else -%} {{ default_schema }} {%- endif -%} {%- endmacro %}
Еще одним важным преимуществом является самодокументируемый код. Иными словами, из репозитория проекта автоматически можно собрать статический сайт с документацией: перечень слоев, моделей, атрибутный состав, метаинформацию о таблицах в СУБД и даже визуализировать граф зависимостей (да-да, картинки выше именно оттуда).
Не повторяйся – лучше подготовь макрос
Однотипный код, повторяющиеся обращения и действия, зачастую реализуемые по принципу copy-paste нередко являются причиной ошибок и багов. В Wheely мы придерживаемся принципа Do not repeat yourself и любой сколько-нибудь похожий код шаблонизируем в макрос с параметрами. Писать и поддерживать такой код становится сплошным удовольствием.
Простой пример с конвертацией валют:
-- currency conversion macro {% macro convert_currency(convert_column, currency_code_column) -%} ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd {%- endmacro %}
Вызов макроса из модели:
select ... -- price_details , r.currency , {{ convert_currency('price', 'currency') }} , {{ convert_currency('transfer_min_price', 'currency') }} , {{ convert_currency('discount', 'currency') }} , {{ convert_currency('insurance', 'currency') }} , {{ convert_currency('tips', 'currency') }} , {{ convert_currency('parking', 'currency') }} , {{ convert_currency('toll_road', 'currency') }} , {{ convert_currency('pickup_charge', 'currency') }} , {{ convert_currency('cancel_fee', 'currency') }} , {{ convert_currency('net_bookings', 'currency') }} , {{ convert_currency('gross_revenue', 'currency') }} , {{ convert_currency('service_charge', 'currency') }} ... from {{ ref('requests_joined') }} r
По большому счету, макрос это просто вызов функции с передачей аргументов, на уже знакомом вам диалекте Jinja. Результатом работы макроса является готовый к исполнению SQL-скрипт. Макрос для кросс-сверки значений в колонках:
Сравнить значения двух колонок
-- compare two columns {% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%} {%- if is_numeric == true -%} {%- set src_column = 'round(' + src_column + ', 2)' -%} {%- set trg_column = 'round(' + trg_column + ', 2)' -%} {%- endif -%} CASE WHEN {{ src_column }} = {{ trg_column }} THEN 'match' WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null' WHEN {{ src_column }} IS NULL THEN 'missing in source' WHEN {{ trg_column }} IS NULL THEN 'missing in target' WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch' ELSE 'unknown' END {%- endmacro %}
В макрос можно запросто записать даже создание UDF-функций:
Создать UDF
-- cast epoch as human-readable timestamp {% macro create_udf() -%} {% set sql %} CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT) RETURNS VARCHAR(512) STABLE AS $$ # Convert column to binary, strip "0b" prefix, pad out with zeroes if bitwise_column is not None: b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1] return b else: None $$ LANGUAGE plpythonu ; CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP) RETURNS VARCHAR(128) STABLE AS $$ SELECT nvl( DECODE($2, null, null, 'deleted') , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown') , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null) ) $$ LANGUAGE SQL ; {% endset %} {% set table = run_query(sql) %} {%- endmacro %}
Параметризовать можно и довольно сложные вещи, такие как работа с nested structures (иерархическими структурами) и выгрузка во внешние таблицы (external tables) в S3 в формате parquet. Эти примеры вполне достойны отдельных публикаций.
Не изобретай велосипед – импортируй модули
Модуль или package — это набор макросов, моделей, тестов, который можно импортировать в свой проект в виде готовой к использованию библиотеки. На портале dbt hub есть неплохая подборка модулей на любой вкус, и, что самое главное, их список постоянно пополняется.
С помощью модуля логирования и добавления 2 простых hooks на каждый запуск dbt у меня как на ладони появляется статистическая информация о времени, продолжительности, флагах и параметрах развертывания. Я наглядно вижу модели анти-лидеры по потребляемым ресурсам (первые кандидаты на рефакторинг):
models: +pre-hook: "{{ logging.log_model_start_event() }}" +post-hook: "{{ logging.log_model_end_event() }}"
Измерение календаря собирается в одну строку, при этом набор колонок поражает:
{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
С помощью модуля dbt_external_tables я уже выстраиваю полноценный Lakehouse, обращаясь из Хранилища к данным, расположенным в файловом хранилище S3. К примеру, самые свежие курсы валют, получаемые через API Open Exchange Rates в формате JSON:
External data stored in S3 accessed vith Redshift Spectrum
- name: external schema: spectrum tags: ["spectrum"] description: "External data stored in S3 accessed vith Redshift Spectrum" tables: - name: currencies_oxr description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org" freshness: error_after: {count: 15, period: hour} loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second' external: location: "s3://data-analytics.wheely.com/dwh/currencies/" row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'" columns: - name: timestamp data_type: bigint - name: base data_type: varchar(3) - name: rates data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
Ну и, конечно, ночью по расписанию работает VACUUM + ANALYZE, ведь Redshift это форк PostgreSQL. Дефрагментация, сортировка данных в таблицах, сбор статистик. Иначе говоря поддержание кластера в тонусе, пока dba спит.
dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
Running in production: используем dbt Cloud в Wheely
dbt Cloud это платный сервис для управления проектами, основанными на движке dbt. За небольшие деньги команда получает возможность создавать окружения, конфигурировать джобы и таски, устанавливать расписание запусков, и даже полноценную IDE (среду разработки!) в браузере.
Прежде всего речь идет об удобстве использования: приятный и понятный визуальный интерфейс, простота поиска и ориентирования, акцентирование ключевой информации при разборе ошибок и чтении логов:
Во-вторых, это гибкие настройки условий запуска джобов. Начиная от простых условий с выбором дня недели и времени, продолжая кастомными cron-выражениями, и заканчивая триггером запуска через webhook. Например, именно через вебхук мы связываем в цепочку завершение выгрузок для кросс-сверки и начало расчета соответствующих витрин в Хранилище (kicked off from Airflow):
В третьих, это консолидация всех важных уведомлений одном месте. Для нашей команды это канал в Slack и любые проблемы связанные с Production-запусками. В режиме реального времени мы получаем все уведомления об инцидентах с деталями и ссылками на подробный лог.
Сам dbt является проектом с открытым исходным кодом, и использование продукта dbt Cloud представляется очень удобным, но не обязательным. В качестве альтернативных способов можно выбрать любой другой оркестратор: Airflow, Prefect, Dagster, и даже просто cron. В своем проекте Сквозная Аналитика я организую оркестрацию при помощи Github Actions. Выходит очень занятно.
Вместо заключения
В команде аналитики Wheely мы стремимся к тому, чтобы работа была наполнена смыслом и приносила удовлетворение и пользу, но не раздражение и негодование. Все перечисленные в публикации достоинства не могут не вызвать симпатию со стороны новых членов команды и значительно ускоряют процессы адаптации и onboarding.
Сегодня бизнес и команда активно растут. Доступен ряд интересных позиций:
-
Head of Data Insights — https://wheely.com/ru/careers/4425384003
-
Product Analyst, Backoffice — https://wheely.com/ru/careers/4308521003
-
Product Analyst, Business — https://wheely.com/ru/careers/4425290003
-
Product Analyst, Chauffeur growth — https://wheely.com/ru/careers/4185132003
-
Product Analyst, Marketplace — https://wheely.com/ru/careers/4425328003
-
Product Analyst, Passenger growth — https://wheely.com/ru/careers/4194291003
У тебя есть возможность узнать детали из первых уст и получить прямую рекомендацию.
Также время от времени я провожу вебинары и выступления, на которых подробнее рассказываю о своей работе и проектах. Следить за моими публикациями можно в телеграм-канале Technology Enthusiast – https://t.me/enthusiastech
Пишите, задавайте вопросы и, конечно, пробуйте dbt в своих проектах!
ссылка на оригинал статьи https://habr.com/ru/company/wheely/blog/549614/
Добавить комментарий