Инфраструктура для Data-Engineer DBT

от автора

Введение

dbt является мощным фреймворком, который включает в себя два популярных языка: SQL + Python.

При помощи dbt можно создавать разные «слои» данных или выделить dbt только под один слой, к примеру dm.

При помощи понятного и всем известного SQL интерфейса можно создавать консистентные модели, которыми смогут пользоваться и понимать все участники data-команды. Ну также по моему мнению плюс dbt – возможность дать аналитикам самостоятельно создавать dm слой со своей логикой, что освобождает ресурс data-команды. Но стоит иметь ввиду, что нужен хороший процесс доставки кода: Code Review, паттерны, линтеры, принципы и прочее.

Стоит также упомянуть, что сервис может работать как отдельный инстанс через докер, так и питон пакетом. Все дальнейшие примеры я буду проводить через питон пакет.

Инициализация проекта

Код всего проекта вы можете найти в моём репозитории.

В начале поднимем нужные нам сервисы используя команду: docker-compose up -d

После этого давайте настроим локальное Python окружение для дальнейшей работы:

python3.12 -m venv venv && \ source venv/bin/activate && \ pip install --upgrade pip && \ pip install poetry && \ poetry lock && \ poetry install

После того, когда мы установили все зависимости, мы можем проверить версию dbt командой: dbt --version.

Следующим этапом при работе с dbt является – инициализация проекта, поэтому выполним команду: dbt init pg_analytics --profiles-dir . и укажем все необходимые параметры, которые он будет спрашивать во время создания проекта. Наш проект я назвал: pg_analytics.

Важно: При инициализации проекта вы можете не указывать параметр --profiles-dir.

Если не указать --profiles-dir, то при инициализации проекта у нас создатся глобальный profiles.yml, который будет хранится в ~/.dbt/.

Но для демонстрации мы не будем его использовать, а воспользуемся локальным profiles.yml внутри самого проекта. Поэтому мы выполним команду dbt init pg_analytics --profiles-dir . чтобы создать profiles.yml внутри нашего проекта.

Важно: рекомендуется создавать profiles.yml вне проекта, чтобы все параметры для подключения держать вне проекта и также profiles.yml будет содержать информацию по всем проектам, если у вас их больше одного.

После инициализации мы получим следующий profiles.yml:

pg_analytics:     outputs:       dev:         dbname: dbt         host: localhost         pass: postgres         port: 5432         schema: dbt         threads: 4         type: postgres         user: postgres     target: dev

Проверить коннект можно командой dbt debug (предварительно зайдя в проект pg_analytics).

Также стоит обратить, что когда мы создали проект, то он содержит стандартный набор директорий:

  • analyses

  • logs

  • macros

  • models

  • seeds

  • snapshots

  • target

  • tests Более подробно о логике и работе директорий вы можете узнать из документации. Ниже мы рассмотрим часть из них на примерах

И файлов:

Стандартные примеры dbt

Все настройки завершены и поэтому мы можем запустить наш проект. В начале у нас в папке models хранятся модели, которые будут созданы изначально. На них проверим корректность работы dbt.

Для этого выполним команду: dbt run и увидим логи нашей команды:

08:01:36  Running with dbt=1.8.8 08:01:36  Registered adapter: postgres=1.8.2 08:01:37  Unable to do partial parsing because saved manifest not found. Starting full parse. 08:01:37  Found 2 models, 4 data tests, 423 macros 08:01:37   08:01:37  Concurrency: 4 threads (target='dev') 08:01:37   08:01:37  1 of 2 START sql table model dbt.my_first_dbt_model ............................ [RUN] 08:01:37  1 of 2 OK created sql table model dbt.my_first_dbt_model ....................... [SELECT 2 in 0.07s] 08:01:37  2 of 2 START sql view model dbt.my_second_dbt_model ............................ [RUN] 08:01:37  2 of 2 OK created sql view model dbt.my_second_dbt_model ....................... [CREATE VIEW in 0.06s] 08:01:37   08:01:37  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 0.27 seconds (0.27s). 08:01:37   08:01:37  Completed successfully 08:01:37   08:01:37  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

И если сейчас подключиться к нашему PostgreSQL, то мы сможем увидеть, что у нас создалась схема dbt и в ней создалась таблица my_first_dbt_model и представление my_second_dbt_model.

Также мы можем «скомпилировать» наши модели и сразу провести тестирование.

А сейчас давайте выполним команду dbt build и мы увидим, что у нас часть процессов не прошла, потому что один из тестов провален.

Мы видим, что у нас провален этап not_null_my_first_dbt_model_id чтобы это исправить, уберём комментарий в модели where id is not null и если заново выполнить build проекта, то у нас пройдут все тесты и все этапы.

Практика

Всё, мы убедились, что наш dbt работает, поэтому давайте создадим нашу первую модель.

Но в начале нам необходимо запустить файл create_and_fill_users.py из папки py_files.

После создания и заполнения таблицы мы можем над ней работать при помощи dbt-моделей.

Для этого нам нужно создать новую папку dm, которая будет содержать dm-таблицы, затем создадим fct_registred_by_date.sql, в котором напишем простой запрос:

SELECT       created_at::date AS registred_date,       count(*)   FROM       users AS u   GROUP BY       1

Важно: Как вы назовёте файл, так и будет называться ваша таблица, поэтому выбирайте правильно.

Стоит отметить, что основной dbt-проекта является dbt_project.yml и в нём указывается вся информация по проекту: настройки компиляции, материализация таблиц, какие папки включать и пр.

И так как мы добавили новую директорию dm, то её необходимо добавить в проект, чтобы модели внутри директории запускались.

Для этого необходимо в dbt_project.yml добавить нашу директорию и указать то, как мы хотим видеть наши модели в БД. Я сделаю таблицы с уровнем материализации table:

... dm:     +materialized: table

Более подробно о материализации.

Дополнительно мы можем прописать мета-данные для нашей модели, чтобы в дальнейшем было удобнее ориентироваться в ней.

Для этого создадим файл schema.yaml:

models:     - name: fct_registred_by_date       description: "Количество регистраций за день"       columns:         - name: registred_date           description: "Дата регистрации"         - name: count           description: "Количество регистраций"

Важно: Если вы некорректно укажите имя столбца, к примеру вместо name укажите namefoo, то у вас не отобразится никаких исключений.

Теперь если выполнить команду dbt run, то получим следующее сообщение:

... OK created sql table model dbt.fct_registred_by_date ...[SELECT 365 in 0.37s] ...

И теперь наша таблица fct_registred_by_date готова к использованию.

Стоит обратить внимание, что таблица заполнила значения за текущий момент и если в таблице users появятся новые записи, то мы в нашей витрине этого не увидим. Поэтому это всё нужно оркестрировать.

Оркестрация доступна в dbt Cloud или сторонними решениями. Как одно из популярных решений – это объединение dbt + Airflow. Об одной из таких реализаций вы можете ознакомиться из доклада: dbt — ядро современной платформы данных.

Теперь давайте посмотрим как будет отрабатывать модель, если в таблице users появятся новые записи.

Для этого выполните файл add_new_month.py и у нас появятся записи за январь 2025-го.

И если после этого снова выполнить команду dbt run, то получим следующее сообщение:

... OK created sql table model dbt.fct_registred_by_date ... [SELECT 396 in 0.08s] ...

Важно: он перезаписывает всю таблицу, в дальнейшем мы сделаем заполнение недостающих данных.

Для того, чтобы исключить полное перезаписывание витрины в dbt существует формат материализации incremental. Давайте им воспользуемся и создадим для этого новый файл fct_registred_by_date_incremental.sql:

{{       config(           materialized='incremental',           unique_key='registred_date'       )   }}      SELECT       created_at::date AS registred_date,       count(*)   FROM       users AS u      {% if is_incremental() %}        WHERE created_at >= (SELECT coalesce(max(registred_date), '1900-01-01') FROM {{ this }})      {% endif %}      GROUP BY       1

На что здесь стоит обратить внимание:

  1. Ранее в dbt_project.yml мы указали, что все модели в директори dm будут создаваться как table. Но в данном случае мы переопределяем основной config на incremental.

  2. Для incremental является обязательным указанием unique_key, по которому он как раз и будет выполнять вставки.

Теперь если мы повторим действия над таблицей users как ранее, то получим корректный результат. Наша таблица после каждого выполнения команды dbt run будет инкрементально заполняться.

Больше подробностей и тонкостей инкрементальной материализации описано в документации.

Source

Для корректного обращения к источникам давайте их пропишем в нашей папке models, чтобы мы могли на них ссылаться через Jinja-шаблоны, а не прописывая их самостоятельно.

Для этого необходимо создать файл source.yml и в нём прописать конфигурацию для источников:

version: 2      sources:     - name: public       database: dbt       schema: public       tables:         - name: users

И теперь давайте попробуем воспользоваться данным источником, чтобы скопировать таблицу users к себе в схему dbt.

Для этого создадим файл dbt_source_users.sql в новой директории as_is внутри models и добавим такой код:

SELECT * FROM {{ source("public", "users") }}

Данный код на первый взгляд может показаться не понятным, но сейчас мы его скомпилируем и получим привычный нам SQL.

Давайте выполним команду: dbt compile и если у нас всё будет хорошо, то в папке target мы сможем найти наш скомпилированный файл, который выглядит следующим образом:

SELECT * FROM "dbt"."public"."users"

Если мы возьмём этот запрос и выполним его в нашей БД, то получим корректный результат. Теперь мы можем запустить эту модель в БД, чтобы она выполнила заложенную логику.

Важно: компилятор вызовет исключение и укажет на ошибку, если ваш запрос составлен некорректно.

Выполним команду: dbt run и увидим, что у нас создалось представление, а это потому, что мы не указали как материализовывать наши модели из папки as_is, давайте укажем в dbt_project.yml, по какой логике должна работать наша модель:

as_is:     +materialized: table

И если ещё раз выполнить команду dbt run, то мы увидим, что у нас создалась таблица, а не представление (в логах будет указано SELECT *** а не CREATE VIEW).

В дальнейшем все наши источники, которые будут использованы во время проекта будем указывать внутри source.yml

Snapshots

Для аналитики довольным важным моментом является – «путешествие во времени«. Что это означает? Это означает, что аналитик желает посмотреть какое у нас было состояние по пользователям/продажам/прочее на тот или иной момент.

Поэтому наша задача как дата-инженеров организовать такую возможность и её можно реализовать при помощи SCD2. Более подробно о SCD можете почитать по ссылке.

При инициализации проекта у нас создалась папка snapshots и мы в ней будем создавать нашу SCD2 таблицу.

В начале нам нужно создать файл users_scd2.sql:

{% snapshot users_timestamp %}          {{           config(               target_database='dbt',               target_schema='snapshots',               strategy='timestamp',               unique_key='id',               updated_at='updated_at',           )       }}          SELECT * FROM {{ source('public', 'users') }}      {% endsnapshot %} 

Здесь мы также используем Jinja-шаблоны:

  • Указываем, что это таблица типа snapshot.

  • Указываем все необходимые атрибуты для работы с таблицей.

  • Указываем запрос на извлечение данных из таблицы. Здесь также используем наш source, который мы определили ранее.

После создания данного файла мы можем выполнить команду: dbt snapshot и увидим в логе SELECT *** – это означает, что таблица создана и перенесла все записи из таблицы users.

Давайте теперь изменим какую-нибудь запись, чтобы увидеть работу SCD2.

Так как в поле у нас хранится uuid и он каждый раз различный, вы можете выбрать любого «пользователя» для изменения.

   UPDATE users SET (first_name, updated_at) = ('dummy', now())    WHERE id = '86433147-fafe-4d74-b67c-59c3a283974c'

Мы изменили имя пользователя и чтобы создать новую запись в SCD-таблице необходимо снова выполнить команду: dbt snapshot. После выполнения мы увидим, что у нас добавилась новая строка в таблицу:

id

created_at

updated_at

first_name

dbt_scd_id

dbt_updated_at

dbt_valid_from

dbt_valid_to

86433147-fafe-4d74-b67c-59c3a283974c

2024-08-16 17:31:30.670 +0000

2024-02-29 14:53:29.191 +0000

Алина

0cea94932fa3921c59e771f5c0b53bb4

2024-02-29 14:53:29.191 +0000

2024-02-29 14:53:29.191 +0000

2024-10-26 03:07:44.519 +0000

86433147-fafe-4d74-b67c-59c3a283974c

2024-08-16 17:31:30.670 +0000

2024-10-26 03:07:44.519 +0000

dummy

cf61eb52bac629a0af70cd2c46e63f8a

2024-10-26 03:07:44.519 +0000

2024-10-26 03:07:44.519 +0000

Теперь наша таблица users будет версионироваться после каждого запуска команды dbt snapshot.

Более подробно о возможностях snapshots описано в официальной документации.

Функция ref

Ref помогает нам выстроить data lineage, для того чтобы он корректно отображался на графе и также мы могли правильно запускать/перезапускать наши модели.

Важным моментом здесь является то, что мы должны «затянуть» к себе в проект какую-то модель, чтобы ей в дальней пользоваться.

Основное отличие от source здесь в том, что source – это просто ссылка на объект в БД, а ref – это ссылка на объект внутри dbt-проекта.

Давайте рассмотрим это на примере и для этого создадим файл count_users_by_city.sql в папке dm со следующим кодом:

SELECT       city,       count(*)   FROM       {{ ref('dbt_source_users') }}   GROUP BY       1 

В данном примере мы используем ранее созданную таблицу dbt_source_users с пользователями. Если мы захотим обратиться изначально к нашей таблице указав {{ ref('users') }}, то получим исключение:

Compilation Error  Model 'model.pg_analytics.count_users_by_city' (models/dm/count_users_by_city.sql) depends on a node named 'users' which was not found

Важно: Во время работы с ref мы должны использовать только dbt-модели.

Более подробно о ref функции описано в документации.

WEB UI

Мы сейчас с вами построили несколько моделей и теперь мы можем посмотреть взаимосвязи между ними.

Для этого нам сначала нужно выполнить команду: dbt docs generate, которая выстроит зависимости между моделями и создаст документацию по нашему проекту.

Для того, чтобы запустить наш веб-сервер необходимо выполнить команду: dbt docs serve. После этого страница откроется автоматически, а если не открылась, то будет доступна по адресу: http://localhost:8080/#!/overview

После запуска нашего сервера мы можем изучить список взаимосвязей в проекте. Что мы имеем сейчас:

Здесь стоит обратить внимание на висящую ноду fct_registred_by_date, потому что она у нас была создана таким запросом:

SELECT       created_at::date AS registred_date,       count(*)   FROM       users AS u   GROUP BY       1

И как мы видим в данном запросе у нас не используется ни source ни ref. И поэтому в случае дебага или проверки зависимостей нужно будет самостоятельно изучать dbt-модели.

При работе с графом зависимостей часто говорят о запуске модели и её зависимостей. В dbt это реализовано довольно гибко. О чём описано в документации.

Но из основного я бы отметил, что мы можем регулировать что запускать при запуске модели. К примеру:

  • Запустить саму модель: dbt run -m name_of_model

  • Запустить саму модель и зависимости ДО: dbt run -m +name_of_model

  • Запустить саму модель и зависимости ПОСЛЕ: dbt run -m name_of_model+

  • Запустить саму модель и зависимости ДО и ПОСЛЕ: dbt run -m +name_of_model+

Теги

Помимо деления по папкам мы также можем добавить деление по tags. Для этого необходимо внутри самого главного конфиг файла dbt_project.yml задать tags:

models:     pg_analytics:       example:         +materialized: view       dm:         +materialized: table         tags: "marts"       as_is:         +materialized: table

И теперь благодаря такой настройке мы можем запускать модели по tags такой командой: dbt run -m tag:marts

Более подробно о работе tags описано в документации.

Тесты

Тесты – это неотъемлемая часть в дата-инженерии и аналитике. Наша задача – сделать так, чтобы данным могли доверять.

И один из способов обеспечить доверие данным – это покрыть их тестами и проверками.

Давайте начнём со стандартных тестов, который предоставляет dbt «из коробки«:

  • unique: проверяет, что значения должны быть уникальным, без дублей.

  • not_null: проверяет, что в колонке нет NULL значений.

  • accepted_values: проверяет, что колонка содержит нужные значения. К примеру: для колонки orders должны быть значения: 'placed''shipped''completed''returned'

  • relationships: Проверяет, что внешний ключ целостный и консистентный и у нас в витрине или другой модели не появилось «новых» ключей.

Все тесты указываются для колонок, к примеру так можно указать тесты для нашей витрины fct_registred_by_date:

version: 2      models:     - name: fct_registred_by_date       description: "Количество регистраций за день"       columns:         - name: registred_date           description: "Дата регистрации"           data_tests:             - unique             - not_null         - name: count           description: "Количество регистраций"

Все тесты указываются в schema.yml для нужной нам модели и нужной колонки

Но помимо стандартных тестов мы можем самостоятельно писать тесты для наших моделей. Основная логика dbt тестов – запрос ничего не должен возвращать. Если тест вернул хотя бы одну строку, то тест считается проваленным.

Давайте сделаем свой not_null для колонки count таблицы fct_registred_by_date.

Для этого создадим файл not_null_count_fct_registred_by_date.sql в папке tests со следующим кодом:

SELECT       *   FROM       {{ ref('fct_registred_by_date') }}   WHERE       "count" IS NULL

И если мы снова запустим наши тесты командой dbt test, то сможем увидеть, что наш тест запустился и прошёл успешно.

... test not_null_count_fct_registred_by_date ... [RUN] ... not_null_count_fct_registred_by_date ... [PASS in 0.06s] ...

Макросы

Макросы – это создание некого шаблона, который мы сможем использовать в нашем коде при помощи Jinja.

Давайте создадим наш первый макрос. Для этого создадим файл cast_to_date.sql в папке macros:

{% macro cast_to_date(column_name) %}       {{ column_name }}::date   {% endmacro %}

И теперь давайте создадим ещё раз нашу таблицу fct_registred_by_date, но уже с использованием макроса:

SELECT       {{ cast_to_date('created_at') }} AS registred_date,       count(*)   FROM       users AS u   GROUP BY       1

И если сейчас мы выполним команду: dbt compile, то получим такой скомпилированный файл (форматирование оставил, как сделал dbt):

SELECT       created_at::date    AS registred_date,       count(*)   FROM       users AS u   GROUP BY       1

Важно: Не стоит делать сильно сложные макросы, потому что их довольно сложно дебажить и также стоит учитывать, что макрос будет использоваться в разных частях проектах и при дальнейших его изменениях можно сломать часть бизнес-логики.

Более подробно о работе и возможностях макросов описано в документации.

Пакеты

Для dbt существует пакетный менеджер, как и для Python. Со всеми пакетами вы можете ознакомиться на официальном dbt-hub.

Но мы сейчас рассмотрим самый популярный пакет для dbt – dbt_utils.

Данный пакет предоставляет: новый тесты для наших моделей, макросы для создания моделей и разные полезные инструменты для более комфортной работы с dbt.

Для того, чтобы установить dbt_utils необходимо в корне нашего проекта создать файл packages.yml и указать следующие параметры:

packages:   - package: dbt-labs/dbt_utils     version: 1.3.0

И затем необходимо выполнить команду: dbt deps

Теперь мы имеем возможность использовать функции dbt_utils, давайте рассмотрим одну из – dbt_utils.star.

Для этого примера мы продублируем таблицу fct_registred_by_date. Давайте создадим файл dbt_utils_source_fct_registred_by_date.sql и напишем такой код:

SELECT       {{ dbt_utils.star(ref("fct_registred_by_date")) }}   FROM       {{ ref("fct_registred_by_date") }}

И если сейчас выполнить команду: dbt compile, то получим такой код:

SELECT   "registred_date",   "count"   FROM       "dbt"."dbt"."fct_registred_by_date"

Он сделал нам перечисление всех колонок из таблицы, а не использовал стандартную команду: *.

Если мы хотим получать не все колонки, то можно их исключить следующим образом:

SELECT       {{ dbt_utils.star(ref("fct_registred_by_date"), except=["count"]) }}   FROM       {{ ref("fct_registred_by_date") }}

И после компиляции получим такой код:

SELECT       "registred_date"   FROM       "dbt"."dbt"."fct_registred_by_date"

Понятное дело, что я привожу простой пример, на практике таблицы могут иметь более одной колонки. В данном случае я хотел продемонстрировать возможности dbt_utils, которых великое множество.

Прочие коннекты

На текущий момент dbt имеет множество коннектов к различным системам: OLAP БД, OLTP БД, Data Lake.

Более подробно о подключении дата платформ в dbt описано в документации.

Давайте попробуем создать подключение к DuckDB.

Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB.

Так как dbt не позволяет держать несколько разных типов коннекта нам необходимо будет создать новый dbt-проект и для этого выполним команду: dbt init duckdb_analytics --profiles-dir .

После инициализации проекта необходимо внутри него создать profiles.yml:

duckdb_analytics:     outputs:       dev:         type: duckdb         path: dev.duckdb         threads: 4         extensions:           - httpfs           - parquet           - postgres     target: dev

Я создал также папку duckdb_example в models и поэтому нам необходимо указать как материализовывать модели из неё.

... models:     duckdb_analytics:       duckdb_example:         +materialized: view

После данной подготовки мы готовы создать нашу первую модель с использованием duckdb, для этого создадим файл users_from_pg_extension.sql:

SELECT       *   FROM       postgres_scan(           'host=localhost port=5432 dbname=dbt user=postgres password=postgres',           'public',           'users'       )

Важно: данный пример является учебным. Поэтому не стоит в открытую использовать параметры для подключения. Их лучше прятать в окружение или при cicd процессе проставлять. Подробнее о том, как скрыть параметры для подключения описано в документации.

Также стоит отметить, что я в примере использую старый способ подключения к PostgreSQL, так как новый через ATTACH в dbt не работает. Подробнее о работе с PostgreSQL через DuckDB описано в документации. И ещё можно под себя настроить коннект, с указанием конкретных плагинов и их настроек, подробнее описано в Configuring dbt-duckdb Plugins.

Так как мы создали нашу первую модель мы можем делать ref на неё, поэтому давайте повторим задачу, которую мы делали в PostgreSQL – посчитаем количество регистраций за каждый день.

Для этого cоздадим fct_registred_by_date.sql:

SELECT       created_at::date AS registred_date,       count(id) AS count   FROM        {{ ref("users_from_pg_extension") }}   GROUP BY       1

И если выполнить команду dbt run, то у нас создастся ещё одно представление в DuckDB. И эта информация также отобразится в графе:

Давайте теперь поработаем с S3 через DuckDB.

Если вы не знаете что такое S3 и как его можно использовать, то можете ознакомиться с моей статьей – Инфраструктура для data engineer S3.

Добавим новый сервис в наш изначальный docker-compose.yaml:

minio:       image: minio/minio:RELEASE.2024-10-13T13-34-11Z       restart: always       command: server /data --console-address ":9001"       volumes:         - ./data:/data       environment:         - MINIO_ROOT_USER=minioadmin         - MINIO_ROOT_PASSWORD=minioadmin         - MINIO_DOMAIN=minio       ports:         - "9000:9000"  # MinIO S3 API         - "9001:9001"  # MinIO Console

Важно: создание бакета, генерацию ключей я пропущу, так как она описана в статье выше.

В самом начале нашей работы S3 + DuckDB необходимо создать параметры для подключения. Поэтому мы немного изменим profiles.yml:

duckdb_analytics:     outputs:       dev:         type: duckdb         path: dev.duckdb         threads: 4         extensions:           - httpfs           - parquet           - postgres         settings:           s3_access_key_id: "GYZDiyIV4wnNoNTzp2Q5"           s3_secret_access_key: "zHOERNJFxcBYXXsa4XUXXSSy0apl8OED7BVp1A3Q"           s3_endpoint: "localhost:9000"           s3_use_ssl: "FALSE"           s3_url_style: "path"     target: dev

Теперь при работе DuckDB у нас будут использоваться настройки, которые мы прописали.

Давайте в начале запишем пользователей в S3, для этого необходимо выполнить скрипт create_and_fill_users_s3.py.

После этого у нас появится .parquet с пользователями в нашем бакете prod.

Теперь мы можем создать первую модель, которая будет читать файл из S3, создадим файл users_from_s3_extension.sql:

SELECT * FROM read_parquet('s3://prod/ods/users.parquet')

Если выполнить команду dbt run, то в нашей БД будет создано представление.

Теперь давайте выполним наш пример с подсчётом регистраций и для этого создадим модель fct_registred_by_date_from_s3.sql:

{{ config(materialized='table') }}      SELECT       created_at::date AS registred_date,       count(id) AS count   FROM        {{ ref("users_from_s3_extension") }}   GROUP BY       1

Важно: Я изменил основной конфиг материализации на таблицу, чтобы проверить корректность работы модели.

Стоит также отметить, что когда мы создаём таблицы/представления при помощи dbt, те настройки, которые мы указали в profiles.yml создаются на сессию.

Это означает то, что если мы подключимся к нашей физической dev.duckdb и попробуем выполнить запрос к S3 у нас не будет работать коннект, потому что мы создали новую сессию и в ней нужно заново инициализировать подключение к S3.

Повторюсь, если вы не знаете о DuckDB и том как создавать подключение к S3, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB.

И теперь после создания всех нужных нам моделей мы можем выполнить dbt run и подключиться к DuckDB чтобы увидеть результат.

И также все наши модели отображаются в графе зависимостей:

Также данный пакет позволяет писать в S3. Для того чтобы совершить запись в S3 необходимо обратиться к документации – Writing to external files. Запись во внешние таблицы имеет специфический синтаксис и о нём стоит знать.

Давайте запишем результат нашего привычного запроса в S3 и для этого создадим файл fct_registred_by_date_from_s3_to_s3.sql:

{{ config(materialized='external', location='s3://prod/dm/fct_registred_date.gzip.parquet') }}   SELECT       created_at::date AS registred_date,       count(id) AS count   FROM       {{ ref("users_from_s3_extension") }}   GROUP BY       1

И теперь помимо того, что мы сделали запись в S3 так это ещё отображается корректно в нашем графе:

Важно: я показал только пример использования dbt + DuckDB, но таких интеграций может быть множество: ClickHouse, MySQL и прочие коннекторы.

Заключение

Как я сказал в начале – dbt является мощным инструментом.

Я постарался показать основные моменты, но в dbt есть множество тонкостей и особенностей работы, с которыми вы сможете познакомиться на практике.

dbt также является довольно популярным инструментом, вы его можете найти у многих компаний. Поэтому знание dbt вам точно пригодится.

На что я бы обратил внимание:

  • Необходимо организовать хранение моделей так, чтобы было удобно всем участникам data-команды, но и было понятно как происходит деление моделей (по слоям, по логике и пр.).

  • Не стоит избегать тегов при работе с моделями.

  • Заполняйте мета-данные по моделям сразу.

  • Определите владельца мета-данных, чтобы не было свалки по моделям и колонкам.

  • Покрывайте модели тестами и не только стандартными, но из модуля dbt_utils.

  • Осторожнее с макросами.

  • Старайтесь использовать ref и source, чтобы выстраивать корректный data lineage и иметь возможность прогрузить всю цепочку.

  • У dbt хорошая документация, не стоит ей пренебрегать.

  • Если вы не можете найти ответ на свой вопрос, то вы всегда можете обратиться в официальный Slack dbt.


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.


ссылка на оригинал статьи https://habr.com/ru/articles/854990/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *