Dagster и Great Expectations: Интеграция без боли

от автора

Меня зовут Артем Шнайдер, и я занимаюсь DataScience в Бланке. Сегодня я хочу рассказать вам о том, как можно интегрировать два мощных инструмента – Dagster и Great Expectations.

Great Expectations позволяет определить так называемые ожидания от ваших данных, то есть задать правила и условия, которым данные должны соответствовать. 

Dagster, с другой стороны, это платформа с открытым исходным кодом для управления данными, которая позволяет создавать, тестировать и развертывать пайплайны данных. Написан на python, что позволяет пользователям гибко настраивать и расширять его функциональность.

Исходный код к этой статье на GitHub.

Давайте начнем?)

Подготовка

Нам понадобятся какие-нибудь данные, которые мы загрузим в базу данных. Я решил взять набор данных транзакций с Kaggle.

Собрал для вас готовый репозиторий, который вы можете клонировать по этой ссылке

Перейдем в директорию /simple‑postgres‑container и запустим нашу базу данных:

docker compose -f "docker-compose.yaml" up -d --build

Теперь у нас есть локальный экземпляр базы данных Postgres с табличкой transactions

Идем дальше)

1. Great Expectations шаг за шагом

В этой части мы установим пакет great-expectations, подключимся к нашей базе данных  и создадим простой набор ожиданий.

Выполним установку great-expectationsи инициализируем проект:

pip install great-expectations great_expectations init

Команда создаст директорию со следующей структурой:

great_expectations/
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│ └── custom_data_docs
├── profilers
└── uncommitted
├── config_variables.yml
├── data_docs
└── validations

Дальнейшие команды будем выполнять из директории /great_expectations:

cd great_expectations

Great Expectations предлагает  использовать интерфейс командной строки для автоматического создания предварительно настроенных блокнотов Jupyter, которые проведут через конкретные этапы работы с Great Expectations.

1.1 Настроим источник данных

Выполним команду:

great_expectations datasource new

GE предложит варианты подключения к данным, выбираем Relational database (SQL):

P.S. Может потребоваться установка дополнительных пакетов — их GE установит сам в процессе.

Далее выбираем нашу базу данных:

GE сгенерирует Jupyter ноутбук, в котором мы должны заполнить учетные данные для подключения:

Больше ничего менять не потребуется, выполняем ноутбук до конца, закрываем и прерываем команду в терминале. Конфигурацию нашего источника данных можно увидеть в файле great_expectations.yaml.

great_expectations.yaml
# Welcome to Great Expectations! Always know what to expect from your data. # # Here you can define datasources, batch kwargs generators, integrations and # more. This file is intended to be committed to your repo. For help with # configuration please: #   - Read our docs: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview/#2-configure-your-datasource #   - Join our slack channel: http://greatexpectations.io/slack  # config_version refers to the syntactic version of this config file, and is used in maintaining backwards compatibility # It is auto-generated and usually does not need to be changed. config_version: 3.0  # Datasources tell Great Expectations where your data lives and how to get it. # You can use the CLI command `great_expectations datasource new` to help you # add a new datasource. Read more at https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview datasources:   my_datasource:     class_name: Datasource     module_name: great_expectations.datasource     data_connectors:       default_runtime_data_connector_name:         batch_identifiers:           - default_identifier_name         class_name: RuntimeDataConnector         module_name: great_expectations.datasource.data_connector       default_inferred_data_connector_name:         include_schema_name: true         class_name: InferredAssetSqlDataConnector         module_name: great_expectations.datasource.data_connector         introspection_directives:           schema_name: public       default_configured_data_connector_name:         class_name: ConfiguredAssetSqlDataConnector         assets:           transactions:             class_name: Asset             schema_name: public             module_name: great_expectations.datasource.data_connector.asset         module_name: great_expectations.datasource.data_connector     execution_engine:       class_name: SqlAlchemyExecutionEngine       credentials:         host: localhost         port: '5432'         username: postgres         password: postgres         database: postgres         drivername: postgresql       module_name: great_expectations.execution_engine  # This config file supports variable substitution which enables: 1) keeping # secrets out of source control & 2) environment-based configuration changes # such as staging vs prod. # # When GX encounters substitution syntax (like `my_key: ${my_value}` or # `my_key: $my_value`) in the great_expectations.yml file, it will attempt # to replace the value of `my_key` with the value from an environment # variable `my_value` or a corresponding key read from this config file, # which is defined through the `config_variables_file_path`. # Environment variables take precedence over variables defined here. # # Substitution values defined here can be a simple (non-nested) value, # nested value such as a dictionary, or an environment variable (i.e. ${ENV_VAR}) # # # https://docs.greatexpectations.io/docs/guides/setup/configuring_data_contexts/how_to_configure_credentials   config_variables_file_path: uncommitted/config_variables.yml  # The plugins_directory will be added to your python path for custom modules # used to override and extend Great Expectations. plugins_directory: plugins/  stores: # Stores are configurable places to store things like Expectations, Validations # Data Docs, and more. These are for advanced users only - most users can simply # leave this section alone. # # Three stores are required: expectations, validations, and # evaluation_parameters, and must exist with a valid store entry. Additional # stores can be configured for uses such as data_docs, etc.   expectations_store:     class_name: ExpectationsStore     store_backend:       class_name: TupleFilesystemStoreBackend       base_directory: expectations/    validations_store:     class_name: ValidationsStore     store_backend:       class_name: TupleFilesystemStoreBackend       base_directory: uncommitted/validations/    evaluation_parameter_store:     class_name: EvaluationParameterStore   checkpoint_store:     class_name: CheckpointStore     store_backend:       class_name: TupleFilesystemStoreBackend       suppress_store_backend_id: true       base_directory: checkpoints/    profiler_store:     class_name: ProfilerStore     store_backend:       class_name: TupleFilesystemStoreBackend       suppress_store_backend_id: true       base_directory: profilers/  expectations_store_name: expectations_store validations_store_name: validations_store evaluation_parameter_store_name: evaluation_parameter_store checkpoint_store_name: checkpoint_store  data_docs_sites:   # Data Docs make it simple to visualize data quality in your project. These   # include Expectations, Validations & Profiles. The are built for all   # Datasources from JSON artifacts in the local repo including validations &   # profiles from the uncommitted directory. Read more at https://docs.greatexpectations.io/docs/terms/data_docs   local_site:     class_name: SiteBuilder     show_how_to_buttons: true     store_backend:       class_name: TupleFilesystemStoreBackend       base_directory: uncommitted/data_docs/local_site/     site_index_builder:       class_name: DefaultSiteIndexBuilder  anonymous_usage_statistics:   enabled: true   data_context_id: ffd790a7-4454-4b53-b9b1-428bfcfb4e64 notebooks: include_rendered_content:   globally: false   expectation_validation_result: false   expectation_suite: false 

1.2 Создадим набор ожиданий

Возвращаемся в командую строку и выполняем:

great_expectations suite new

GE на выбор предоставит несколько способов создания наборов проверок. В документации рекомендуется автоматическое создание с использованием Data Assistant — предварительно настроенной утилиты, упрощающей создание ожиданий.

Также GE предлагает три типа DataConnector классов. Про выбор коннекторов можно почитать здесь, а нам нужен default_configured_data_connector_name:

Выбираем ассет данных и задаем имя нашему набору проверок:

Наборы ожиданий, созданные с помощью ноутбука, не предназначены для использования в производственной среде. Мы можем отредактировать сгенерированный файл/expectations/transactions/suite.json и добавить туда еще пару ожиданий: проверим, что все транзакции имеют ID, а количество купленных товаров больше 0.

Итоговый suite.json
{   "data_asset_type": null,   "expectation_suite_name": "transactions.suite",   "expectations": [     {       "expectation_type": "expect_table_row_count_to_be_between",       "kwargs": {         "max_value": 2000000,         "min_value": 0       },       "meta": {         "profiler_details": {           "metric_configuration": {             "domain_kwargs": {},             "metric_name": "table.row_count",             "metric_value_kwargs": null           },           "num_batches": 1         }       }     },     {       "expectation_type": "expect_table_columns_to_match_set",       "kwargs": {         "column_set": [           "ItemCode",           "UserId",           "NumberOfItemsPurchased",           "Country",           "TransactionTime",           "ItemDescription",           "TransactionId",           "CostPerItem"         ],         "exact_match": null       },       "meta": {         "profiler_details": {           "success_ratio": 1.0         }       }     },     {       "expectation_type": "expect_column_values_to_not_be_null",       "kwargs": {         "column": "TransactionId"       }     },     {       "expectation_type": "expect_column_values_to_be_between",       "kwargs": {         "column": "NumberOfItemsPurchased",         "min_value": 1       }     }   ],   "ge_cloud_id": null,   "meta": {     "citations": [       {         "citation_date": "2023-07-06T00:08:01.886955Z",         "comment": "Created by effective Rule-Based Profiler of OnboardingDataAssistant with the configuration included.\n"       }     ],     "great_expectations_version": "0.17.2"   } }

С множеством доступных ожиданий можно ознакомиться в Expectations Store.

Таким образом, у нас есть две проверки на уровне таблицы, и еще две — на уровне колонок. Идем дальше.

1.3 Создание Checkpoint

Checkpoint (контрольная точка) — это основное средство проверки данных при развертывании Great Expectations в производственной среде. Выполним команду:

great_expectations checkpoint new my_checkpoint

После выполнения всех шагов в ноутбуке в директории /checkpoints появится файл my_checkpoint.yaml:

my_checkpoint.yaml
name: my_checkpoint config_version: 1.0 template_name: module_name: great_expectations.checkpoint class_name: Checkpoint run_name_template: '%Y%m%d-%H%M%S-my-run-name-template' expectation_suite_name: batch_request: {} action_list:   - name: store_validation_result     action:       class_name: StoreValidationResultAction   - name: store_evaluation_params     action:       class_name: StoreEvaluationParametersAction   - name: update_data_docs     action:       class_name: UpdateDataDocsAction       site_names: [] evaluation_parameters: {} runtime_configuration: {} validations:   - batch_request:       datasource_name: my_datasource       data_connector_name: default_inferred_data_connector_name       data_asset_name: public.transactions       data_connector_query:         index: -1     expectation_suite_name: transactions.suite profilers: [] ge_cloud_id: expectation_suite_ge_cloud_id: 

Если в ноутбуке вы не проверяли работоспособность чекпоинта, это можно сделать в терминале:

great_expectations checkpoint run my_checkpoint

Мы должны увидеть результаты проверки:

В терминале не будет отображаться информация о том, какая именно проверка не прошла, однако имеется возможность автоматически сгенерировать документацию в виде файлов HTML, что позволит посмотреть результаты в любом браузере. Подробнее об использовании Data Docs можно почитать здесь.

great_expectations docs build

Команда автоматически откроет окно в браузере, чтобы мы смогли увидеть резултаты валидации:

Набор ожиданий готов, а значит, мы готовы запускать наши чекпоинты в Dagster!

2. Работа с Dagster

Прежде, чем мы создадим наш проект, поговорим об основных концепциях Dagster.

Asset (активы) — это объект, который фиксирует результат выполнения некоторого этапа в пайплайне dagster. Ассеты могут любого типа, например, файлами, таблицами базы данных или моделью машинного обучения.

Ops (операции) — являются основной единицей вычисления в Dagster, и должны выполнять относительно простые задачи, например, какие-нибудь вычисления, обращения к базе данных, вызовы API и т.д.

Таким образом, операции — это действия, которые выполняются в пайплайне, а активы — это данные, которые обрабатываются в процессе выполнения этих операций. 

Jobs (задания) — основная единица исполнения и мониторинга в Dagster. Задание может материализовать набор активов, или выполнить граф операций.

Resources (ресурсы) — это объекты, которые являются общими для нескольких активов, операций, расписаний или датчиков. Например, ресурсом может быть подключение к базе данных или сервису.

2.1 Создание проекта

Самый простой способ начать работу с Dagster — использовать шаблон проекта по умолчанию с помощью интерфейса командной строки:

pip install dagster dagster project scaffold --name great-expectations-dagster

Команда dagster project создаст необходимую структуру папок:

great-expectations-dagster/
├── great_expectations_dagster
│ ├── assets.py
│ └── init.py
├── great_expectations_dagster_tests/
├── pyproject.toml
├── README.md
├── setup.cfg
└── setup.py

Файлы, с которыми мы будем работать:

  • __init__.py — содержит все определения, созданные в нашем проекте. Это могут активы, задания, расписания, датчики и ресурсы;

  • assets.py — модуль Python, содержащий программно-определяемые активы;

  • setup.py — содержит сценарий сборки, используем его для указания зависимостей для нашего проекта.

Подробнее о структуре проекта Dagster можно прочитать здесь.

2.2 Создание программно-определяемого актива

Мы создадим ассет, который провалидилирует наши данные и запишет результаты проверки в постоянное хранилище.

P.S. В Dagster есть возможность использовать фабрику ge_validation_op_factory из пакета dagster-ge для создания операций Dagster, которые интегрируются с Great Expectations. Однако это потребует создания операции, которая выгрузит нам наш набор данных в виде датафрейма и передаст его в качестве входных данных операции, созданной с помощью ge_validation_op_factory.

Отредактируем файл assets.py:

from dagster import MetadataValue, Output, asset from dagster_ge.factory import GEContextResource from great_expectations.render.renderer import ValidationResultsPageRenderer from great_expectations.render.view import DefaultMarkdownPageView   @asset def validate_data(data_context: GEContextResource):      # предоставим контекст данных GE     context = data_context.get_data_context()      # запустим проверку на соответствие набору ожиданий     results = context.run_checkpoint('my_checkpoint')      # визуазлиция результатов проверки в Dagster требует вывода метаданных       validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True)      rendered_document_content_list = (         validation_results_page_renderer.render_validation_operator_result(results)     )     md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list))      return Output(         value=results['success'],         metadata={             "Expectation Results": MetadataValue.md(md_str)     })

Далее идем в __init__.py и редактируем наше определение:

from dagster import (Definitions, define_asset_job, file_relative_path,                      load_assets_from_modules) from dagster_ge.factory import GEContextResource from great_expectations_dagster.assets import validate_data    defs = Definitions(     assets=[         validate_data     ],     jobs=[         # задание, которое запустит материализацию нашего ассета         define_asset_job(             name='validate_data_job',             selection=[validate_data]         )     ],     resources={         # для настройки контекста GE необходимо указать корень каталога GE         # (путь к файлу great_exepctations.yaml)         'data_context': GEContextResource(             ge_root_dir=file_relative_path(__file__, '../../great_expectations')         )     } )

Теперь нам останется поправить файл setup.py и добавить туда пакет dagster-ge:

from setuptools import find_packages, setup  setup(     name="great_expectations_dagster",     packages=find_packages(exclude=["great_expectations_dagster_tests"]),     install_requires=[         "dagster",         "dagster-ge"     ],     extras_require={"dev": ["dagit", "pytest"]}, ) 

Теперь все готово, чтобы мы могли запустить наш маленький паплайн!

В корне директории great-expectations-dagster/ выполним следующие команды:

pip install -e “.[dev]” dagit

Команда dagit распечатает URL-адрес, по которому мы сможем получить доступ к интерфейсу в браузере, обычно через порт 3000. Мы должны увидеть наше задание validate_data_job:

Теперь нам достаточно нажать на кнопку Materialize, подождать, пока задание материализует актив, и мы сможем увидеть метаданные нашей материализации:

Для того, чтобы увидеть панель справа, необходимо кликнуть на икону ассета

Для того, чтобы увидеть панель справа, необходимо кликнуть на икону ассета

На вкладке мы видим поле Expectation Results и ссылку [Show Markdown], нам сюда)

Вот и всё 🙂

Интеграция между Dagster и Great Expectations представляет собой мощный инструмент для обеспечения качества данных в конвейерах данных, с возможностью проверки и визуализации ожиданий на каждом этапе контейнеров данных.

Надеюсь, этот туториал будет полезен 🙂


ссылка на оригинал статьи https://habr.com/ru/articles/746874/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *