DataVault на Greenplum с помощью DBT

от автора

Введение

Привет, Хабр!

Меня зовут Марк Порошин, я занимаюсь DataScience в DV Group. Недавно я уже рассказывал про то, как начать трансформировать данные с помощью dbt. Сегодня я решил поделиться, как мы в DV Group поженили dbt, Greenplum и DataVault, собрали все грабли, что могли; немного поконтрибьютили в open-source, но по итогу остались очень довольны результатом.

Расскажу сначала пару слов о том, что такое DataVault. DataVault — методология построения хранилища, предполагающая высокую нормализацию данных (3ая нормальная форма). Основными ее компонентами являются:

  • hub — “сущность” хранит только первичный и бизнес-ключ;

  • satellite — “свойства сущности”, относятся многие к одному с хабом и хранит свойства сущности;

  • link — “связь между сущностями” — отношение многие ко многим между сущностями (не обязательно двумя).

Чтобы стало чуть понятнее, давайте рассмотрим пример. Предположим мы хотим хранить информацию о запусках рекламных кампаний. У нас есть данные о том, когда клиенты запускали кампанию для каких-то товаров. Как же в этом случае может выглядеть ER диаграмма?

er диаграмма
er диаграмма

Можно заметить, что в сателлитах есть поле effective_from и <entity>_hashdiff, благодаря которому в DavaVault реализуется SCD2, это дает возможность реализовывать “версионность” данных.

Больше почитать про Data Vault можно здесь:

  1. Документация

  2. статья на Хабре 1

  3. статья на Хабре 2

Прежде чем переходить к основной части, хочу дать поделиться статьей, потому что я начинал изучать эту тему именно с нее и во многом статьи будут пересекаться, но я хочу больше сконцентрироваться на деталях, а еще обсудить ошибки, которые мы совершили.

Постановка задачи

Из внешних источников данных мы периодически загружаем историю покупок пользователей в таблицу pure.pure_transactions на Greenplum и хотим преобразовать ее в структуру Data Vault, т.е. разбить данные на хабы, линки и сателлиты. Преобразование происходит в 3 этапа.

  1. Сначала нужно подготовить таблицу с данными, которые будут загружаться (мы будем выбирать данные за 1 день).

  2. Далее необходимо обогатить данные всеми необходимыми хешами, но об этом дальше.

  3. И, наконец, расщепить данные на сущности.

pipeline построения datavault
pipeline построения datavault

Таблица pure.pure_transactions описывает историю покупок пользователей с некоторой метаинформацией. К сожалению, показать ее полностью я не могу, но в рамках статьи нам необходимы только следующие поля:

  1. id транзакции(transaction_id);

  2. дата транзакции(transaction_date);

  3. цена товара(price);

  4. количество купленного товара(quantity);

  5. наименование товара(product_name);

  6. id категории товара(cat_id);

Мы выделили из этих данных две сущности

  1. транзакция или строчка в чеке (transaction_id);

  2. товар (product_name);

Теперь когда у нас есть представление о том, чего мы хотим и какие у нас данные, перейдем к самому интересному.

Адаптер для greenplum

Прежде чем начать писать dbt-код, хочу немного рассказать про особенности работы с Greenplum. Greenplum — база построена на основе Postgres, поэтому синтаксис SQL запросов практически полностью совпадает, но есть ряд значительных отличий, которые будут использоваться в дальнейшем, и которые стали причиной реализации отдельного адаптера для dbt. Подробнее про это можно почитать здесь. А еще хочу поделиться интересным докладом, он будет полезен всем, кто начинает работать Greenplum.

Функциональность адаптера

Важная особенность Greenplum — возможность указать поле дистрибьюции. По этому полю Greenplum будет “раскладывать” данные по сегментам и по этому же полю будут самые эффективные join-ы. Указать параметр можно следующим образом:

{{     config( ...         distributed_by='<field_name>' ...     ) }}

Сжатие и колоночная ориентация

Greenplum предназначен для работы с большими данными, уменьшение времени на чтение/запись за счет сжатия является значительным фактором, который позволяет сократить время выполнения запроса. В dbt при использовании адаптера для Greenplum, это можно имплементировать следующим образом:

{{     config(         ...         appendonly='true',         orientation='column',         compresstype='ZSTD',         compresslevel=4,         blocksize=32768 ...     ) }}

Здесь мы указали параметр appendonly='true', он позволяет Greenplum создать таблицу оптимизированную для вставок. А еще мы добавили, что хотим использовать колоночную ориентацию orientation='column'. И, наконец, указывали тип сжатия compresstype='ZSTD', который хотим использовать и его параметры compresslevel=4blocksize=32768. Указанные значения являются параметрами по умолчанию — их можно не прописывать отдельно, если они для вас подходят.

Партиционирование

Последней важной особенностью является партиционирование, в postgres тоже есть эта возможность, но такой возможности нет у адаптера dbt-postgres(прошу поправить меня, если я ошибаюсь). Партиционирование позволяет разбить таблицу на несколько физических файлов по некоторому условию и читать только необходимые партиции. Из-за того что в Greenplum нельзя настраивать партиционирование, во время создания таблицы с помощью create table as select, реализация этой фичи получилась не очень симпатичной. Требуется указать строчку с определением полей и строчку с определением партиционирования:

{% set fields_string %}      id int4 null,      incomingdate timestamp NULL {% endset %}   {% set raw_partition %}     PARTITION BY RANGE (incomingdate)     (         START ('2021-01-01'::timestamp) INCLUSIVE         END ('2023-01-01'::timestamp) EXCLUSIVE         EVERY (INTERVAL '1 day'),         DEFAULT PARTITION extra     ); {% endset %}  {{     config( ...         fields_string=fields_string,         raw_partition=raw_partition,         default_partition_name='other_data' ...     ) }}

Построение DataVault

Raw

Сперва нужно выделить данные за день, для этого создали модель raw.raw_transaction:

{{     config(         schema='raw',         materialized='table'     ) }}   with transaction_day_dedup as ( select * from ( select *,    row_number() over (       partition by pa."transaction_id"                order by pa."savetime" asc            ) as rn from {{ source('pure', 'pure_transactions') }} pa where  '{{ var('raw_transactions')['start_date'] }}' <= transaction_date and  transaction_date < '{{ var('raw_transactions')['end_date'] }}' ) as h where rn = 1 ) select "transaction_id" as transaction_id, "transaction_date" as transaction_date, "price" as price, "quantity" as quantity, "product_name" as product_name, "cat_id" as cat_id,     ...     'PURE_TRANSACTIONS' as record_source from transaction_day_dedup ra

Здесь мы с помощью CTE выбирали данные за один день и дедублицировали по полю transaction_id. После запуска модели в таблице raw.raw_transaction у нас оказались данные за 1 день, если указать соответствующие переменные 
var('raw_transactions')['start_date'] и 
var('raw_transactions')['end_date']:

vars:   raw_transactions:     start_date: '2022-01-01 00:00:00.0'     end_date: '2022-01-02 00:00:00.0'

Stage

К данным в таблице raw.raw_transaction добавили первичные ключи, которые будут использоваться в сущностях DataVault.

Мы использовали пакет dbtvault (его пришлось немного доработать, чтобы он поддерживал последнюю версию dbt). Он позволяет сократить количество boilerplate кода.

Чтобы установить необходимые зависимости мы добавили в корень dbt проекта файл package.yml со следующим содержанием:

packages:   - git: "https://github.com/markporoshin/dbtvault.git"     revision: develop

и вызвать команду:

dbt deps

После этого у вас появится папка dbt_packages, в которой будут находиться исходники установленных пакетов.

В модели stage_transactions мы завели переменную yaml_metadata и указываем поля, которые станут основой для ключей. Их существует два типа:

  1. Первичные ключи сущностей: хабы и линки;

  2. HASHDIFF — хеши для отслеживания изменений в свойствах сущности, которые строятся из полей сателлита.

{{     config(         schema='stage',         materialized='table',     ) }}   {%- set yaml_metadata -%} source_model: 'raw_transactions' derived_columns:   LOAD_DATE: (SAVETIME + 1 * INTERVAL '1 day')   EFFECTIVE_FROM: 'SAVETIME' hashed_columns:   TRANSACTION_PK:     - 'transaction_id'   TRANSACTION_HASHDIFF:     is_hashdiff: true     columns:       - 'price'       - 'quantity'       - 'transaction_date'   PRODUCT_PK:     - 'product_name'   PRODUCT_HASHDIFF:     is_hashdiff: true     columns:     - 'cat_id'   LINK_TRANSACTION_PRODUCT_PK:     - 'transaction_id'     - 'product_name' ... {%- endset -%}  {% set metadata_dict = fromyaml(yaml_metadata) %}  {% set source_model = metadata_dict['source_model'] %}  {% set derived_columns = metadata_dict['derived_columns'] %}  {% set hashed_columns = metadata_dict['hashed_columns'] %}  {{ dbtvault.stage(include_source_columns=true,                   source_model=source_model,                   derived_columns=derived_columns,                   hashed_columns=hashed_columns,                   ranked_columns=none) }}

Результатом вызова модели получится таблица stage.stage_transactions в базе данных со следующими полями:

  1. transaction_id

  2. transaction_date

  3. price

  4. quantity

  5. product_name

  6. cat_id

  7. transaction_pk

  8. transaction_hashdiff

  9. product_pk

  10. product_hashdiff

  11. link_transaction_product_pk

  12. load_date

  13. effective_from

  14. record_source

Теперь у нас есть все необходимые хеши для того чтобы строить хранилище в методологии DataVault.

Создание Хаба

Рассмотрим создание хаба на примере сущности “продукт”, из исходных данных у нас есть его название cleanedname, которое является бизнес-ключом(natural key), на stage стадии мы создали первичный ключ product_pk, а также поле classid, которое является его свойством.

Модель хаба product будет выглядеть следующий образом:

{{     config(         schema='raw_vault',         materialized='incremental',         distributed_by='product_pk',     ) }}   {%- set source_model = "stage_transactions" -%} {%- set src_pk = "product_pk" -%} {%- set src_nk = "cleanedname" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%}  {{ config(schema='raw_vault') }}  {{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,                 src_source=src_source, source_model=source_model) }}

В начале мы описываем конфигурацию модели указываем схему, тип материализации и ключ дистрибьюции. После этого определяем переменные:

  1. source_model — таблицу источник, из которой будет происходить выгрузка данных для пополнения хаба;

  2. src_pk — название поля, в котором хранится первичный ключ хаба;

  3. src_nk — название поля, в котором хранится бизнес-ключ хаба;

  4. load_date — название поля с датой загрузки данных;

  5. src_source — название поля, в котором хранится наименование источника данных.

И вызываем макрос для генерации кода. В результате появится таблица с следующим DDL:

CREATE TABLE raw_vault.h_product ( product_pk text NULL, cleanedname text NULL, load_date text NULL, record_source unknown NULL ) WITH ( appendonly=true, blocksize=32768, orientation=column, compresstype=zstd, compresslevel=4 ) DISTRIBUTED BY (product_pk);

Рассмотрим еще один пример модели хаба. Дело в том, что сущность «транзакция»(сточка чека), в отличие от продукта однозначно относится со временем, когда совершили покупку, и хочется добавить поле incomingdate для того чтобы реализовать партиционирование по нему. У нас на данный момент число транзакций превысило миллиард и обновление хаба без партиционирования занимает несколько часов.

{% set fields_string %}     transaction_pk text NULL,     load_date text NULL,     record_source text NULL,     transaction_id text NULL,     transaction_date timestamp NULL {% endset %}   {% set raw_partition %}     PARTITION BY RANGE (transaction_date)     (         START ('2020-01-01'::timestamp) INCLUSIVE         END ('2028-01-01'::timestamp) EXCLUSIVE         EVERY (INTERVAL '1 day'),         DEFAULT PARTITION extra     ); {% endset %}  {{     config(         schema='raw_vault',         materialized='incremental',         compresslevel=4,         distributed_by='transaction_pk',         fields_string=fields_string,         raw_partition=raw_partition     ) }}   {%- set source_model = "stage_transactions" -%} {%- set src_pk = "transaction_pk" -%} {%- set src_nk = "transaction_date" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {%- set src_extra = ["transaction_date"] -%} {%- set partition_cause = "'" + var('h_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('h_transaction')['start_date'] + "'" -%}   {{ config(schema='raw_vault') }}  {{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,                 src_source=src_source, source_model=source_model,                 src_extra=src_extra, partition_cause=partition_cause) }}

Это уже не совсем соответствует подходу DataVault, но следование ему в точности обходилось бы слишком дорого.

DDL модели h_transaction:

CREATE TABLE raw_vault.h_transaction ( transaction_pk text NULL, load_date text NULL, record_source text NULL, transaction_id text NULL, transaction_date timestamp NULL ) WITH ( appendonly=true, blocksize=32768, orientation=column, compresstype=zstd, compresslevel=4 ) DISTRIBUTED BY (transaction_pk) PARTITION BY RANGE(transaction_date)  ( START ('2020-01-01 00:00:00'::timestamp without time zone) END ('2028-01-01 00:00:00'::timestamp without time zone) EVERY ('1 day'::interval) WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')            COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4),  DEFAULT PARTITION extra  WITH (appendonly='true', blocksize='32768', orientation='column', compresstype=zstd, compresslevel='4')            COLUMN transaction_pk ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN load_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN record_source ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN transaction_id ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4)            COLUMN transaction_date ENCODING (blocksize=32768, compresstype=zstd, compresslevel=4) );

Создание сателлита

У хаба продуктов есть свойство cat_id, поэтому мы создали сателлит для его хранения:

{{     config(         schema='raw_vault',         materialized='incremental',         distributed_by='product_pk',     ) }}   {%- set source_model = "stage_transactions" -%} {%- set src_pk = "product_pk" -%} {%- set src_hashdiff = "product_hashdiff" -%} {%- set src_payload = ["cat_id, productname"] -%} {%- set src_eff = "effective_from" -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%}   {{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,                 src_payload=src_payload, src_eff=src_eff,                 src_ldts=src_ldts, src_source=src_source,                 source_model=source_model) }}

Модель выглядит практически также, как их хаб за исключением трех дополнительных переменных:

  1. src_hashdiff — название поля, хранящие хеш данного набора свойств;

  2. src_eff — название поля, хранящее дату, с которой данный кортеж актуален;

  3. src_payload — список полей, которые составляют свойства данной сущности.

Аналогично с хабом нам потребовалось внедрить партиционирования для сателлитов, для этого рассмотрим сателлит для сущности транзакция:

{% set fields_string %}   transaction_pk text NULL, transaction_id text NULL, transaction_hashdiff text NULL, price float4 NULL, quantity float4 NULL, transaction_date timestamp NULL, load_date text NULL, record_source text NULL {% endset %}   {% set raw_partition %}     PARTITION BY RANGE (transaction_date)     (         START ('2020-01-01'::timestamp) INCLUSIVE         END ('2028-01-01'::timestamp) EXCLUSIVE         EVERY (INTERVAL '1 day'),         DEFAULT PARTITION extra     ); {% endset %}  {{     config(         schema='raw_vault',         materialized='incremental',         compresslevel=4,         distributed_by='transaction_pk',         fields_string=fields_string,         raw_partition=raw_partition     ) }}  {%- set source_model = "stage_transactions" -%} {%- set src_pk = "transaction_pk" -%} {%- set src_hashdiff = "transaction_hashdiff" -%} {%- set src_payload = [     "price",     "quantity",     "itemsum",     "transaction_id",     "transaction_date" ] -%} {%- set src_eff = "EFFECTIVE_FROM" -%} {%- set src_ldts = "LOAD_DATE" -%} {%- set src_source = "RECORD_SOURCE" -%} {%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}   {{ config(schema='raw_vault') }}  {{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,                 src_payload=src_payload, src_eff=src_eff,                 src_ldts=src_ldts, src_source=src_source,                 source_model=source_model, partition_cause=partition_cause) }}

Создание линки

Теперь, когда у нас есть два хаба и сателлиты к ним, осталось только создать таблицу, связывающую их. В роли такой сущности в DavaVault служат ссылки.

{% set fields_string %}     link_transaction_product_pk text NULL,     transaction_pk text NULL,     product_pk text NULL,     load_date text NULL,     record_source text NULL,     transaction_date timestamp NULL {% endset %}   {% set raw_partition %}     PARTITION BY RANGE (transaction_date)     (         START ('2020-01-01'::timestamp) INCLUSIVE         END ('2028-01-01'::timestamp) EXCLUSIVE         EVERY (INTERVAL '1 day'),         DEFAULT PARTITION extra     ); {% endset %}  {{     config(         schema='raw_vault',         materialized='incremental',         compresslevel=4,         distributed_by='link_transaction_product_pk',         fields_string=fields_string,         raw_partition=raw_partition     ) }}   {%- set source_model = "stage_transactions" -%} {%- set src_pk = "link_transaction_product_pk" -%} {%- set src_fk = ["transaction_pk", "product_pk"] -%} {%- set src_ldts = "load_date" -%} {%- set src_source = "record_source" -%} {%- set src_extra = ["incomingdate"] -%} {%- set partition_cause = "'" + var('hs_transaction')['start_date'] + "' <= transaction_date and transaction_date < '" + var('hs_transaction')['start_date'] + "'" -%}  {{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,                  src_source=src_source, source_model=source_model,                 src_extra=src_extra, partition_cause=partition_cause) }}

В методологии DavaVault есть возможность создавать сателлиты для link, однако кажется, что если у связи есть свойства, значит можно выделить еще одну сущность. Кстати, насчет связей между более чем двумя сущностями — лично я пришел к выводу, что лучше избегать подобные структуры без острой необходимости, поскольку последующие запросы получаются нетривиальными особенно если в подобной связи встречаются NULL.

Выводы

Я надеюсь, что мне удалось вас убедить, что применение dbt в связке с datavault позволяет сильно облегчить построение хранилища DavaVault. Я буду очень рад замечаниям, вопросам и комментариям, а так же приглашаю присоединиться к улучшению адаптера для Greenplum =)

Хочу подвести небольшой итог и выделить основные рекомендации, которые я могу дать на основе совершенных ошибок:

  1. Используйте партиционирование везде, где это возможно;

  2. Изучите SQL запросы, которые компилирует dbtvault, поскольку иногда их можно оптимизировать в контексте вашей задачи;

  3. В greenplum есть потрясающая фича external table так что можно отказаться от хранения исходных данных (pure схема в статье).

У меня осталось непокрытой последняя тема о том, как автоматизировать процесс наполнения хранилища с помощью Dagster(развитие Airflow от его же создателей). Если тема актуальна, пишите в комментарии, я расскажу о ней.


ссылка на оригинал статьи https://habr.com/ru/post/671836/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *