Введение
Привет, Хабр!
Меня зовут Марк Порошин, я занимаюсь DataScience в DV Group. Недавно я уже рассказывал про то, как начать трансформировать данные с помощью dbt. Сегодня я решил поделиться, как мы в DV Group поженили dbt, Greenplum и DataVault, собрали все грабли, что могли; немного поконтрибьютили в open-source, но по итогу остались очень довольны результатом.
Расскажу сначала пару слов о том, что такое DataVault. DataVault — методология построения хранилища, предполагающая высокую нормализацию данных (3ая нормальная форма). Основными ее компонентами являются:
-
hub — “сущность” хранит только первичный и бизнес-ключ;
-
satellite — “свойства сущности”, относятся многие к одному с хабом и хранит свойства сущности;
-
link — “связь между сущностями” — отношение многие ко многим между сущностями (не обязательно двумя).
Чтобы стало чуть понятнее, давайте рассмотрим пример. Предположим мы хотим хранить информацию о запусках рекламных кампаний. У нас есть данные о том, когда клиенты запускали кампанию для каких-то товаров. Как же в этом случае может выглядеть ER диаграмма?
Можно заметить, что в сателлитах есть поле effective_from
и <entity>_hashdiff
, благодаря которому в DavaVault реализуется SCD2, это дает возможность реализовывать “версионность” данных.
Больше почитать про Data Vault можно здесь:
Прежде чем переходить к основной части, хочу дать поделиться статьей, потому что я начинал изучать эту тему именно с нее и во многом статьи будут пересекаться, но я хочу больше сконцентрироваться на деталях, а еще обсудить ошибки, которые мы совершили.
Постановка задачи
Из внешних источников данных мы периодически загружаем историю покупок пользователей в таблицу pure.pure_transactions
на Greenplum и хотим преобразовать ее в структуру Data Vault, т.е. разбить данные на хабы, линки и сателлиты. Преобразование происходит в 3 этапа.
-
Сначала нужно подготовить таблицу с данными, которые будут загружаться (мы будем выбирать данные за 1 день).
-
Далее необходимо обогатить данные всеми необходимыми хешами, но об этом дальше.
-
И, наконец, расщепить данные на сущности.
Таблица pure.pure_transactions
описывает историю покупок пользователей с некоторой метаинформацией. К сожалению, показать ее полностью я не могу, но в рамках статьи нам необходимы только следующие поля:
-
id транзакции(
transaction_id
); -
дата транзакции(
transaction_date
); -
цена товара(
price
); -
количество купленного товара(
quantity
); -
наименование товара(
product_name
); -
id категории товара(
cat_id
);
Мы выделили из этих данных две сущности
-
транзакция или строчка в чеке (
transaction_id
); -
товар (
product_name
);
Теперь когда у нас есть представление о том, чего мы хотим и какие у нас данные, перейдем к самому интересному.
Адаптер для greenplum
Прежде чем начать писать dbt-код, хочу немного рассказать про особенности работы с Greenplum. Greenplum — база построена на основе Postgres, поэтому синтаксис SQL запросов практически полностью совпадает, но есть ряд значительных отличий, которые будут использоваться в дальнейшем, и которые стали причиной реализации отдельного адаптера для dbt. Подробнее про это можно почитать здесь. А еще хочу поделиться интересным докладом, он будет полезен всем, кто начинает работать Greenplum.
Функциональность адаптера
Важная особенность Greenplum — возможность указать поле дистрибьюции. По этому полю Greenplum будет “раскладывать” данные по сегментам и по этому же полю будут самые эффективные join-ы. Указать параметр можно следующим образом:
{{ config( ... distributed_by='<field_name>' ... ) }}
Сжатие и колоночная ориентация
Greenplum предназначен для работы с большими данными, уменьшение времени на чтение/запись за счет сжатия является значительным фактором, который позволяет сократить время выполнения запроса. В dbt при использовании адаптера для Greenplum, это можно имплементировать следующим образом:
{{ config( ... appendonly='true', orientation='column', compresstype='ZSTD', compresslevel=4, blocksize=32768 ... ) }}
Здесь мы указали параметр appendonly='true'
, он позволяет Greenplum создать таблицу оптимизированную для вставок. А еще мы добавили, что хотим использовать колоночную ориентацию orientation='column'
. И, наконец, указывали тип сжатия compresstype='ZSTD'
, который хотим использовать и его параметры compresslevel=4
, blocksize=32768
. Указанные значения являются параметрами по умолчанию — их можно не прописывать отдельно, если они для вас подходят.
Партиционирование
Последней важной особенностью является партиционирование, в postgres тоже есть эта возможность, но такой возможности нет у адаптера dbt-postgres(прошу поправить меня, если я ошибаюсь). Партиционирование позволяет разбить таблицу на несколько физических файлов по некоторому условию и читать только необходимые партиции. Из-за того что в Greenplum нельзя настраивать партиционирование, во время создания таблицы с помощью create table as select
, реализация этой фичи получилась не очень симпатичной. Требуется указать строчку с определением полей и строчку с определением партиционирования:
{% set fields_string %} id int4 null, incomingdate timestamp NULL {% endset %} {% set raw_partition %} PARTITION BY RANGE (incomingdate) ( START ('2021-01-01'::timestamp) INCLUSIVE END ('2023-01-01'::timestamp) EXCLUSIVE EVERY (INTERVAL '1 day'), DEFAULT PARTITION extra ); {% endset %} {{ config( ... fields_string=fields_string, raw_partition=raw_partition, default_partition_name='other_data' ... ) }}
Построение DataVault
Raw
Сперва нужно выделить данные за день, для этого создали модель raw.raw_transaction
:
{{ config( schema='raw', materialized='table' ) }} with transaction_day_dedup as ( select * from ( select *, row_number() over ( partition by pa."transaction_id" order by pa."savetime" asc ) as rn from {{ source('pure', 'pure_transactions') }} pa where '{{ var('raw_transactions')['start_date'] }}' <= transaction_date and transaction_date < '{{ var('raw_transactions')['end_date'] }}' ) as h where rn = 1 ) select "transaction_id" as transaction_id, "transaction_date" as transaction_date, "price" as price, "quantity" as quantity, "product_name" as product_name, "cat_id" as cat_id, ... 'PURE_TRANSACTIONS' as record_source from transaction_day_dedup ra
Здесь мы с помощью CTE выбирали данные за один день и дедублицировали по полю transaction_id
. После запуска модели в таблице raw.raw_transaction
у нас оказались данные за 1 день, если указать соответствующие переменные var('raw_transactions')['start_date']
и var('raw_transactions')['end_date']
:
vars: raw_transactions: start_date: '2022-01-01 00:00:00.0' end_date: '2022-01-02 00:00:00.0'
Stage
К данным в таблице raw.raw_transaction
добавили первичные ключи, которые будут использоваться в сущностях DataVault.
Мы использовали пакет dbtvault (его пришлось немного доработать, чтобы он поддерживал последнюю версию dbt). Он позволяет сократить количество boilerplate кода.
Чтобы установить необходимые зависимости мы добавили в корень dbt проекта файл package.yml
со следующим содержанием:
packages: - git: "https://github.com/markporoshin/dbtvault.git" revision: develop
и вызвать команду:
dbt deps
После этого у вас появится папка dbt_packages, в которой будут находиться исходники установленных пакетов.
В модели stage_transactions
мы завели переменную yaml_metadata
и указываем поля, которые станут основой для ключей. Их существует два типа:
-
Первичные ключи сущностей: хабы и линки;
-
HASHDIFF — хеши для отслеживания изменений в свойствах сущности, которые строятся из полей сателлита.
{{ config( schema='stage', materialized='table', ) }} {%- set yaml_metadata -%} source_model: 'raw_transactions' derived_columns: LOAD_DATE: (SAVETIME + 1 * INTERVAL '1 day') EFFECTIVE_FROM: 'SAVETIME' hashed_columns: TRANSACTION_PK: - 'transaction_id' TRANSACTION_HASHDIFF: is_hashdiff: true columns: - 'price' - 'quantity' - 'transaction_date' PRODUCT_PK: - 'product_name' PRODUCT_HASHDIFF: is_hashdiff: true columns: - 'cat_id' LINK_TRANSACTION_PRODUCT_PK: - 'transaction_id' - 'product_name' ... {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {% set source_model = metadata_dict['source_model'] %} {% set derived_columns = metadata_dict['derived_columns'] %} {% set hashed_columns = metadata_dict['hashed_columns'] %} {{ dbtvault.stage(include_source_columns=true, source_model=source_model, derived_columns=derived_columns, hashed_columns=hashed_columns, ranked_columns=none) }}
Результатом вызова модели получится таблица stage.stage_transactions
в базе данных со следующими полями:
-
transaction_id
-
transaction_date
-
price
-
quantity
-
product_name
-
cat_id
-
transaction_pk
-
transaction_hashdiff
-
product_pk
-
product_hashdiff
-
link_transaction_product_pk
-
load_date
-
effective_from
-
record_source
-
…
Теперь у нас есть все необходимые хеши для того чтобы строить хранилище в методологии DataVault.
Создание Хаба
Рассмотрим создание хаба на примере сущности “продукт”, из исходных данных у нас есть его название cleanedname
, которое является бизнес-ключом(natural key), на stage стадии мы создали первичный ключ product_pk
, а также поле classid
, которое является его свойством.
Модель хаба product будет выглядеть следующий образом:
{{ config( schema='raw_vault', materialized='incremental', distributed_by='product_pk', ) }} {%- set source_model = "stage_transactions" -%} {%- set src_pk = "product_pk" -%} {%- set src_nk = "cleanedname" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {{ config(schema='raw_vault') }} {{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts, src_source=src_source, source_model=source_model) }}
В начале мы описываем конфигурацию модели указываем схему, тип материализации и ключ дистрибьюции. После этого определяем переменные:
-
source_model
— таблицу источник, из которой будет происходить выгрузка данных для пополнения хаба; -
src_pk
— название поля, в котором хранится первичный ключ хаба; -
src_nk
— название поля, в котором хранится бизнес-ключ хаба; -
load_date
— название поля с датой загрузки данных; -
src_source
— название поля, в котором хранится наименование источника данных.
И вызываем макрос для генерации кода. В результате появится таблица с следующим DDL:
CREATE TABLE raw_vault.h_product ( product_pk text NULL, cleanedname text NULL, load_date text NULL, record_source unknown NULL ) WITH ( appendonly=true, blocksize=32768, orientation=column, compresstype=zstd, compresslevel=4 ) DISTRIBUTED BY (product_pk);
Рассмотрим еще один пример модели хаба. Дело в том, что сущность «транзакция»(сточка чека), в отличие от продукта однозначно относится со временем, когда совершили покупку, и хочется добавить поле incomingdate
для того чтобы реализовать партиционирование по нему. У нас на данный момент число транзакций превысило миллиард и обновление хаба без партиционирования занимает несколько часов.
{% set fields_string %} transaction_pk text NULL, load_date text NULL, record_source text NULL, transaction_id text NULL, transaction_date timestamp NULL {% endset %} {% set raw_partition %} PARTITION BY RANGE (transaction_date) ( START ('2020-01-01'::timestamp) INCLUSIVE END ('2028-01-01'::timestamp) EXCLUSIVE EVERY (INTERVAL '1 day'), DEFAULT PARTITION extra ); {% endset %} {{ config( schema='raw_vault', materialized='incremental', compresslevel=4, distributed_by='transaction_pk', fields_string=fields_string, raw_partition=raw_partition ) }} {%- set source_model = "stage_transactions" -%} {%- set src_pk = "transaction_pk" -%} {%- set src_nk = "transaction_date" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {%- set src_extra = ["transaction_date"] -%} {%- set partition_cause = "'" + var('h_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('h_transaction')['start_date'] + "'" -%} {{ config(schema='raw_vault') }} {{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts, src_source=src_source, source_model=source_model, src_extra=src_extra, partition_cause=partition_cause) }}
Это уже не совсем соответствует подходу DataVault, но следование ему в точности обходилось бы слишком дорого.
DDL модели h_transaction
:
CREATE TABLE raw_vault.h_transaction ( transaction_pk text NULL, load_date text NULL, record_source text NULL, transaction_id text NULL, transaction_date timestamp NULL ) WITH ( appendonly=true, blocksize=32768, orientation=column, compresstype=zstd, compresslevel=4 ) DISTRIBUTED BY (transaction_pk) PARTITION BY RANGE(transaction_date) ( START ('2020-01-01 00:00:00'::timestamp without time zone) END ('2028-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval) WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4') COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4), DEFAULT PARTITION extra WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4') COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) );
Создание сателлита
У хаба продуктов есть свойство cat_id
, поэтому мы создали сателлит для его хранения:
{{ config( schema='raw_vault', materialized='incremental', distributed_by='product_pk', ) }} {%- set source_model = "stage_transactions" -%} {%- set src_pk = "product_pk" -%} {%- set src_hashdiff = "product_hashdiff" -%} {%- set src_payload = ["cat_id, productname"] -%} {%- set src_eff = "effective_from" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, source_model=source_model) }}
Модель выглядит практически также, как их хаб за исключением трех дополнительных переменных:
-
src_hashdiff
— название поля, хранящие хеш данного набора свойств; -
src_eff
— название поля, хранящее дату, с которой данный кортеж актуален; -
src_payload
— список полей, которые составляют свойства данной сущности.
Аналогично с хабом нам потребовалось внедрить партиционирования для сателлитов, для этого рассмотрим сателлит для сущности транзакция:
{% set fields_string %} transaction_pk text NULL, transaction_id text NULL, transaction_hashdiff text NULL, price float4 NULL, quantity float4 NULL, transaction_date timestamp NULL, load_date text NULL, record_source text NULL {% endset %} {% set raw_partition %} PARTITION BY RANGE (transaction_date) ( START ('2020-01-01'::timestamp) INCLUSIVE END ('2028-01-01'::timestamp) EXCLUSIVE EVERY (INTERVAL '1 day'), DEFAULT PARTITION extra ); {% endset %} {{ config( schema='raw_vault', materialized='incremental', compresslevel=4, distributed_by='transaction_pk', fields_string=fields_string, raw_partition=raw_partition ) }} {%- set source_model = "stage_transactions" -%} {%- set src_pk = "transaction_pk" -%} {%- set src_hashdiff = "transaction_hashdiff" -%} {%- set src_payload = [ "price", "quantity", "itemsum", "transaction_id", "transaction_date" ] -%} {%- set src_eff = "EFFECTIVE_FROM" -%} {%- set src_ldts = "LOAD_DATE" -%} {%- set src_source = "RECORD_SOURCE" -%} {%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%} {{ config(schema='raw_vault') }} {{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts, src_source=src_source, source_model=source_model, partition_cause=partition_cause) }}
Создание линки
Теперь, когда у нас есть два хаба и сателлиты к ним, осталось только создать таблицу, связывающую их. В роли такой сущности в DavaVault служат ссылки.
{% set fields_string %} link_transaction_product_pk text NULL, transaction_pk text NULL, product_pk text NULL, load_date text NULL, record_source text NULL, transaction_date timestamp NULL {% endset %} {% set raw_partition %} PARTITION BY RANGE (transaction_date) ( START ('2020-01-01'::timestamp) INCLUSIVE END ('2028-01-01'::timestamp) EXCLUSIVE EVERY (INTERVAL '1 day'), DEFAULT PARTITION extra ); {% endset %} {{ config( schema='raw_vault', materialized='incremental', compresslevel=4, distributed_by='link_transaction_product_pk', fields_string=fields_string, raw_partition=raw_partition ) }} {%- set source_model = "stage_transactions" -%} {%- set src_pk = "link_transaction_product_pk" -%} {%- set src_fk = ["transaction_pk", "product_pk"] -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {%- set src_extra = ["incomingdate"] -%} {%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%} {{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts, src_source=src_source, source_model=source_model, src_extra=src_extra, partition_cause=partition_cause) }}
В методологии DavaVault есть возможность создавать сателлиты для link, однако кажется, что если у связи есть свойства, значит можно выделить еще одну сущность. Кстати, насчет связей между более чем двумя сущностями — лично я пришел к выводу, что лучше избегать подобные структуры без острой необходимости, поскольку последующие запросы получаются нетривиальными особенно если в подобной связи встречаются NULL.
Выводы
Я надеюсь, что мне удалось вас убедить, что применение dbt в связке с datavault позволяет сильно облегчить построение хранилища DavaVault. Я буду очень рад замечаниям, вопросам и комментариям, а так же приглашаю присоединиться к улучшению адаптера для Greenplum =)
Хочу подвести небольшой итог и выделить основные рекомендации, которые я могу дать на основе совершенных ошибок:
-
Используйте партиционирование везде, где это возможно;
-
Изучите SQL запросы, которые компилирует dbtvault, поскольку иногда их можно оптимизировать в контексте вашей задачи;
-
В greenplum есть потрясающая фича
external table
так что можно отказаться от хранения исходных данных (pure
схема в статье).
У меня осталось непокрытой последняя тема о том, как автоматизировать процесс наполнения хранилища с помощью Dagster(развитие Airflow от его же создателей). Если тема актуальна, пишите в комментарии, я расскажу о ней.
ссылка на оригинал статьи https://habr.com/ru/post/671836/
Добавить комментарий