Привет, Хабр!
Меня зовут Марк Порошин, в DV Group я занимаюсь Data Science. Мы работаем с большим количеством данных, на данный момент приближаемся к 10тб данных на нашем кластере Greenplum. Так как бизнес достаточно молодой, требования заказчиков, аналитиков постоянно меняются, да и сама структура данных периодически дополняется, поэтому мы выбрали достаточно современную технологию построения Data Warehouse — DataVault. Данные методологии очень привлекателны своей гибкостью, однако ценой за эту гибкость будет огромное количество таблиц. Это приводит сразу к двух основным проблемам:
-
Нужна база данных, которая поддерживает и хорошо справляется с большим количеством join-ов;
-
Способ автоматизации инкрементального наполнения таблиц, поскольку руками прописывать SQL запросы очень трудоемко, а еще это чревато ошибками.
Здесь я расскажу про технологию, которую мы используем в DV Group — dbt(data build tool), она позволяет во многом справиться со второй проблемой и очень хорошо себя зарекомендовала в нашем проекте.
Настройка проекта
Знакомство с dbt начнем с тестового проекта. В качество целевой базы данных будем использовать postgres, которую я настроил локально на своей машине. Создаем папку проекта, я буду работать в PyCharm, это вовсе необязательно, тут каждый выбирает сам. Необходимо настроить окружение python3 и установить необходимые зависимости.
pip install dbt-core==1.1.0 dbt-postgres==1.1.0
После этого инициализируем dbt проект:
(venv) ➜ PostgresDBTIntro dbt init 11:32:18 Running with dbt=1.1.0 Enter a name for your project (letters, digits, underscore): dbt_postgres_intro Which database would you like to use? [1] postgres (Don\'t see the one you want? https://docs.getdbt.com/docs/available-adapters) Enter a number: 1 11:33:04 Your new dbt project "dbt_postgres_intro" was created!
На данный момент у вас в проекте должна появиться папка с таким же названием, что вы указали в качестве имени проекта.
Первые шаги в dbt
Давайте немного пройдем по файлам, которые появились после инициализации проекта.
Под номером один находится файл dbt_project.yml, в котором мы описываем структуру проекта, переменные(vars), дефолтные типы материализаций моделей. Также здесь можно прописать хуки on-run-start, on-run-end. К этим тонкостям мы вернемся позже, а сейчас рассмотрим файл под номером 2 my_first_dbt_model.sql
/* Welcome to your first dbt model! Did you know that you can also configure models directly within SQL files? This will override configurations stated in dbt_project.yml Try changing "table" to "view" below */ {{ config(materialized='table') }} with source_data as ( select 1 as id union all select null as id ) select * from source_data /* Uncomment the line below to remove records with null `id` values */ -- where id is not null
Пропускаем блок комментариев, экранированных с помощью /* ... */
и видим:
{{ config(materialized='table') }}
DBT построен на основе Jinja, поэтому {{ ... }}
используются для экранирования кода. В нем вызываем macro(читай “функцию”) — config, в который передаем аргументы для конфигурации нашей модели. В данном случае у нас всего лишь один аргумент materialized
со значением 'table'.
Это значит, что в результате запуска модели “my_first_dbt_model”, должна быть создана (пересоздана) таблица с таким же названием, как и название файла.
Следом идет sql код для выбора данных:
select * from source_data
Прежде чем запускать модель, нужно разобраться с еще одним моментом.Пока мы еще нигде не прописали креденшены для подключения к инстансу нашего Postgres’a. Это делается с помощью файла profiles.yml
. В моем случае он выглядит следующим образом:
config: send_anonymous_usage_stats: False use_colors: True partial_parse: True dbt_postgres_intro: outputs: dev: type: postgres threads: 3 host: localhost port: 5432 user: markporoshin pass: "<password>" dbname: dbt_intro_db schema: public target: dev
Я разместил этот файл на одном уровне с файлом dbt_project.yml
, сделано это для удобства дальнейшего деплоя. DBT предлагает стандартное расположение файла со всеми конфигурациями (/Users/<user>/.dbt/profiles.yml
на mac os). Чтобы узнать ваше дефолтное расположение, можно просто попробовать запустить модель, и в логах dbt напишет, где он по дефолту ищет файл с конфигами подключения:
dbt run --project-dir ./ -m my_first_dbt_model
Если же вы расположите profiles.yml
также как я, вызов модели будет выглядеть следующим образом:
dbt run --project-dir ./ --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model
Здесь мы указываем расположением dbt проекта --project-dir ./
; путь к папке с файлом profiles.yml — --profiles-dir ./
; название профиля --profile dbt_postgres_intro
, поскольку у вас может быть несколько профилей в одном файле profiles.yml для разных проектов или разных окружений (например DEV, PROD)
При запуске модели для базы данных Postgres dbt дополнит его create table ... as ...
и мы получим следующий sql код для создания таблицы:
create table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as ( with source_data as ( select 1 as id union all select null as id ) select * from source_data );
Остановимся здесь чуть подробнее. DBT создал нам табличку, но в названии почему-то присутствует постфикс __dbt_tmp
. Это связано с тем, что dbt создает таблицу в несколько этапов:
-- создание новой таблицы create table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" as ( with source_data as ( select 1 as id union all select null as id ) select * from source_data ); -- если целевая таблица уже есть, переименуем ее в backup alter table "dbt_intro_db"."public"."my_first_dbt_model" rename to "my_first_dbt_model__dbt_backup"; -- теперь переименуем новую таблицу в целевую alter table "dbt_intro_db"."public"."my_first_dbt_model__dbt_tmp" rename to "my_first_dbt_model" -- после того, как все предыдущие этапы прошли успешно, можем удалять backup drop table if exists "dbt_intro_db"."public"."my_first_dbt_model__dbt_backup" cascade
dbt отслеживает успешность обновления таблицы, а если что-то пошло не так, возвращает все к “статусу кво”.
Проследить за тем, что именно делает dbt, при вызове модели, можно добавлением флага -d
:
dbt -d run --profiles-dir ./ --profile dbt_postgres_intro -m my_first_dbt_model
Как работает dbt
Чуть подробнее остановимся на том, что происходит, когда вы запускаете модель. При выполнении dbt run
dbt выполняет следующие действия:
-
Парсит модели, макросы, тесты итд. На этом этапе не выполняются никакие sql запросы;
-
Компилирует и запускает файлы уже не содержащие Jinja код.
Это важно понимать, чтобы избегать ошибок. Полезно прочитать статью в документации.
Понимание этого факта может помочь в дебаге запуска моделей. После успешной компиляции, скомпилированный файл можно найти в папке target/compiled
(генерируется автоматически), а если была успешно пройдена стадия run в папке target/run
можно найти sql код который будет выполнен.
Магия jinja
Наконец-то мы можем перейти к самому “вкусному” в dbt, тому, что помогает избавиться от написания boilerplate кода и сильно упростить жизнь data engineer =).
Для начала создадим новую dbt модель, чтобы немного наполнить нашу базу данными:
{{ config( materialized='table', ) }} select 1 as id, 'Nikita' as name, 'Analytics' as type union select 2 as id, 'Stanislav' as name, 'Analytics' as type union select 3 as id, 'Alex' as name, 'CTO' as type union select 4 as id, 'Artem' as name, 'DevOps' as type union select 5 as id, 'Artem' as name, 'DataScience' as type union select 6 as id, 'Victor' as name, 'Backend' as type union select 7 as id, 'Mark' as name, 'DataEngineer' as type
Переменные
В dbt существует два способа работать с переменными.
Во-первых, вы можете их указать в файле dbt_projects.yml:
vars: developer_name: "Nikita"
Дальше использовать в модели:
{{ config( materialized='view', ) }} select id, type from {{ ref('developers') }} d where d.name = '{{ var('developer_name') }}'
Здесь мы видим сразу несколько новых моментов. В качестве материализации мы выбрали тип 'view'
, это приводит к созданию не таблицы, а view. Дальше мы берем в качестве источника данных {{ ref('developers') }}
, то есть мы хотим, чтобы dbt нашел модель developers и сам подставил путь к ней (возможно, что модель лежит не в дефолтной схеме или для нее задан alias
, это все можно настроить в macro config). И последнее, в условии where
с помощью макроса var
обращаемся к глобальным переменным dbt и вытягиваем значение переменной developer_name
.
Во втором случае мы можем использовать локальные переменные:
{{ config( materialized='view', ) }} {% set type = 'DevOps' %} select id, name from {{ ref('developers') }} d where type = '{{ type }}'
Создаем переменную с помощью ключевого слова set
и экранизируем это все с помощью {% … %}
.
Сразу зафиксируем, что в dbt по документации существует три типа “экранизации”:
-
{{ ... }}
— для вывода переменных или результатов выполнения макросов в скомпилированный файл; -
{% ... %}
— для объявления переменных, циклов, условных операторов и т.д.; -
{# ... #}
— комментарии.
Я встречал использование {%- ... -%}
, кажется это тоже самое, что и обычные скобки с процентами.
Циклы
Я думаю уже примерно понятна логика и структура Jinja инъекций в dbt. Ниже приведен пример модели, в которой используются массив и цикл:
{{ config( materialized='table', ) }} {%- set types = ['Analytics', 'DataScience'] -%} select id, name from {{ ref('developers') }} where type in ( {%- for type in types -%} '{{ type }}' {%- if not loop.last %},{% endif -%} {%- endfor -%} )
Использование вспомогательных запросов
Зачастую хочется выполнить какой-то вспомогательный запрос, прежде чем запускать саму модель. Например, в контексте наших данных, мы хотим сначала получить разработчиков, наименования которых начинаются с буквы ‘а’, сохранить их в переменную, а потом использовать в целевом запросе. Понятно, что это все можно прописать в самом запросе, но существуют задачи, когда такое решение получается либо не оптимальным, либо громоздким, а иногда и вовсе невозможным. Рассмотрим использование вспомогательных запросов на примере:
{{ config( materialized='table' ) }} {% set names_start_with_a_query %} select name from {{ ref('developers') }} where lower(name) like 'a%' {% endset %} {% set names_start_with_a = [] %} {% if execute %} {% set names_start_with_a = run_query(names_start_with_a_query).columns[0].values() %} {% endif %} {{ log(names_start_with_a, info=True) }} select id, name, type from {{ ref('developers') }} {% if names_start_with_a != () %} where name in ( {%- for name in names_start_with_a %} '{{ name }}' {%- if not loop.last %},{% endif -%} {%- endfor -%} ) {% endif %}
После блока с конфигурацией модели, мы определяем переменные, names_start_with_a_query
, names_start_with_a
в которые записываем вспомогательный запрос и пустой массив.
Следом идет условный оператор, где мы выполняем запрос находящийся в переменной names_start_with_a_query
и записываем результат в переменную names_start_with_a
. Однако необходимо чуть подробнее остановиться на том, зачем нам нужна обертка выполнения запроса в условный оператор. Все дело в уже упомянутом жизненном цикле выполнения модели. execute
— специальная переменная, которая имеет значение True, если выполнение модели(макроса и тд) в “execute” моде, это значит, что в данный момент уже прошла стадия парсинга и можно выполнять sql запросы.
Пользовательские macro
Необходимость написания собственных macro объясняется несколькими причинами: во-первых, это уменьшение дублирования кода, во-вторых декомпозиция и на самом деле теми же аргументами, зачем нужны функции во всех языках программирования.
Создадим в папке macros файл so_important_macro.sql
:
{% macro so_important_macro(number) %} {% set so_important_query %} select 1 as info union select 2 as info {% endset %} {%- set info = run_query(so_important_query).columns[0].values() -%} {{ log('number ' + number|string, info=True) }} {{ return(info) }} {% endmacro %}
И дальше можем использовать его в нашей модели:
{{ config( materialized='table' ) }} {% if execute %} {% set info = so_important_macro(4) %} {{ log(' info: ' + info|string, info=True) }} {% endif %} select 1 as id
В результате в логах мы получим следующее:
Running with dbt=1.1.0 Found 8 models, 4 tests, 0 snapshots, 0 analyses, 168 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics Concurrency: 3 threads (target='dev') 1 of 1 START table model public.test_macro ..................................... [RUN] number 4 info: (Decimal('1'), Decimal('2')) 1 of 1 OK created table model public.test_macro ................................ [SELECT 1 in 0.15s] Finished running 1 table model in 0.24s. Completed successfully Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Инкрементальная материализация
Наконец мы перешли к самому интересному=)
Такой тип материализации позволяет инкрементально наполнять таблицу. Рассмотрим сначала данный тип на синтетическом примере. Предположим, что мы хотим на каждый запуск модели добавлять в нее максимальное значение в таблице +1, если в таблице нет данных, тогда вставляем 1.
{{ config( materialized='incremental' ) }} {% set data_to_insert = 1 %} {% if is_incremental() %} {% set max_number_query %} select max(num) from {{ this }} {% endset %} {% set data_to_insert = run_query(max_number_query).columns[0].values()[0]|int + 1 %} {% endif %} {{ log('number to insert: ' + data_to_insert|string, info=True)}} select {{ data_to_insert }} as num
Макро is_incremental
доступен для моделей с типом incremental
и он возвращает True, если таблица уже существует. Это необходимо в случае наличия рекурсии в запросе, например при дедубликации данных.
Рассмотрим, что произойдет, если мы запустим модель в первый раз. Макро is_incremental()
вернет False и в итоге будет создана таблица с одной строчкой со значением 1
.
Если после этого мы попробуем запустить модель еще раз, тогда is_incremental()
вернет True. Внутри условного оператора мы определяем sql запрос, который возвращает максимальное значение из текущей таблицы(this
— специальная переменная dbt, которая возвращает Relation
на текущую таблицу). Таким образом при втором запуске в таблицу будет вставлено значение 2
, в третий раз 3
и так далее.
Теперь рассмотрим реальный пример использования инкрементальной материализации с дедубликацией. Предположим, что у вас есть таблица-источник raw_source
, в которую периодически вставляются данные, но там могут встречаться дубликаты строчек. Для удобства, предположим, что существует поле id
, которое уникально для набора остальных атрибутов, т.е по этому полю можно дедублицировать. Мы же хотим создать таблицу, в которой будут храниться только уникальные значения.
Для начала создадим в папке models
файл source.yml
в котором мы опишем источники данных (таблицы, которые наполняются из внешних источников и не являются моделями dbt):
version: 2 sources: - name: raw schema: public tables: - name: raw_source
И опишем модель stage_source.sql
:
{{ config( materialized='incremental' ) }} select distinct on (src.id) src.* from {{ source('raw', 'raw_source') }} src {% if is_incremental() %} left join {{ this }} dst on src.id = dst.id where dst.id is null {% endif %}
При первичным запуске итоговый select
запрос будет выглядеть следующим образом:
select distinct on (src.id) src.* from "dbt_intro_db"."public"."raw_source" src
Видно, что мы выбираем все данные из raw_source
и дедублицируем их по src.id
Если же мы попробуем запустить второй раз:
select distinct on (src.id) src.* from "dbt_intro_db"."public"."raw_source" src left join "dbt_intro_db"."public"."stage_source" dst on src.id = dst.id where dst.id is null
Теперь же мы сначала пытаемся найти данные, которых еще нет в stage_source
и после этого дедублицируем их по ключу src.id
Документация
Очень приятным дополнением в dbt является автоматическая генерация документации. Если вы активно используете ref
, source
dbt может автоматически построить DAG связей. Сгенерировать документацию и запустить сервер с ui можно следующим образом:
dbt docs generate --profiles-dir ./ --profile dbt_postgres_intro dbt docs serve --profiles-dir ./ --profile dbt_postgres_intro
Так же можно писать документацию моделей в файле schema.yml
лежащим на уровне моделей, тогда все это тоже будет красиво оформлено в ui:
Заключение
Надеюсь мне удалось вас заинтересовать замечательной технологией dbt, если вы о ней еще не слышали или рассказать что-то новое для тех, кто уже присматривался к ней.
В dbt есть возможность писать свои плагины, это значительно расширяет потенциал. Пишите в комментариях ваши замечания и предложения.
В мыслях есть планы рассказать, как с помощью dbt можно строить datavault на базе greenplum и не испытывать боль =) На хабре уже есть статья на эту тему, но я бы хотел ее расширить уделить внимание деталям, тому как оркестрировать это все с помощью Dagster и ошибкам, которые мы совершили:
Исходники: ссылка.
P.S.
В качестве бонуса, мы в DV Group немного доработали адаптер dbt-postgres
для greenplum, чтобы можно было выбирать поле дистрибьюции, сжатие и патриционирование: ссылка на GitHub.
ссылка на оригинал статьи https://habr.com/ru/post/670062/
Добавить комментарий