
Современные Data Pipelines – это как вода в кране
Если она есть – всё замечательно, можно мыть руки, приготовить еду и постирать вещи. Как только вода отключается, либо идёт слабый напор – проблема становится весьма ощутима. Ту же аналогию сегодня можно провести относительно потоков интеграции данных.
Data Integration / Data Pipelines сегодня стали commodity – они просто должны быть и функционировать, обеспечивая базовые потребности, при этом основной фокус работы Аналитиков и Инженеров приходится на моделирование данных, трансформацию, обогащение, агрегирование, а также визуальную подачу выводов.
Второй важной особенностью является смена парадигмы от ETL к EL(T). Я попробую изложить ключевые идеи в паре тезисов.
Исторический подход ETL предполагал последовательность Extract – Transform – Load, что выявило ряд проблем:
-
Нет прозрачности – в Хранилище попадают уже трансформированные данные, без возможности восстановить историю и исходные данные
-
Отсутствие гибкости – трансформации должны быть известны и разработаны заранее, любые изменения и дополнительные требования могли стоить дорого
-
Зависимость Аналитиков от Инженеров (иногда очень скилованных) – сложные варианты интеграции с рядом источников, высокий порог сложности решений
Современный подход EL(T) предполагает независимые этапы Extract – Load и Transform:
-
Гибкость – из сырых исходных данных можно собрать что угодно, какие бы идеи у вас не возникали, даже если они часто меняются
-
Вычислительные ресурсы и хранение данных доступны как никогда – Облачные сервисы хранить все данные без необходимости экономить
-
Разделение этапов EL и T – вы больше не завязаны на один инструмент, но вправе использовать любые тулзы для трансформации данных, такие как dbt, Airflow.

Множество компаний сегодня предоставляют Data Pipelines / Integration как сервис. Перечислю те, с которыми мне доводилось сталкиваться: Fivetran, Hevo, Alooma, Stitch.
Их основные преимущества:
-
Надежность и поддержка от вендора
-
Полностью управляемый сервис – минимум забот на вашей стороне
-
Легкая конфигурация pipelines – все стремятся упростить настройку
Но есть и ряд недостатков:
-
Это закрытый код – вы ограничены возможностями которые поддерживает вендор
-
Могут найтись специфические коннекторы (или способы подключения), которые вендор не поддерживает
-
И конечно это стоимость – чек может быть очень большим

Альтернативно, существуют класс современных и удобных решений для управления потоками интеграции данных с открытым исходным кодом: Airbyte, Meltano, Singer. И вот одно из таких решений сегодня я и предлагаю рассмотреть.
И да, честь и хвала разработчикам и контрибьюторам таких решений.
Airbyte – простота и гибкость в интеграции данных
Airbyte – это проект с открытым исходным кодом, который стремительно набирает популярность. Проект доступен на GitHub (3.800+ stars), а сообщество в Slack насчитывает 2.500+ человек. По сути это современный стандарт для выстраивания потоков интеграции данных из всевозможных приложений, баз данных и API в аналитические хранилища данных, озера данных. Ниже я коротко рассмотрю ключевые преимущества инструмента.
Обширный набор коннекторов, доступных для подключения в считанные минуты. В списке все самые популярные СУБД, а также огромное количество популярных сегодня приложений: Intercom, Zendesk, Stripe, Salesforce, Jira. Усилиями сообщества пользователей список коннекторов постоянно растет. Добавление новых коннекторов сведено к простому конфигурированию – оркестрацией и спосбоами репликации займется Airbyte.

Понятная и масштабируемая архитектура. Хранилище метаданных, в качестве которого можно использовать внешнюю СУБД (Postgres), веб-интерфейс, набор рабочих лошадок (Workers), число которых можно гибко регулировать, а также полноценный scheduler с возможностью гибко регулировать частоту репликации данных.

Различные варианты установки приложения: AWS, Azure, GCP, K8s, Docker. Подходящий вариант для компаний, которым необходимо разместить приложение на своих мощностях в связи с требованиями к безопасности и compliance. При размещении в облаке – данные хранятся в вашем облаке и стоимость ресурсов остается прозрачной.

Различные стратегии синхронизации данных – Sync strategies:
-
Full Refresh Overwrite: полная выгрузка всего объема данных и перезапись на приемнике
-
Full Refresh Append: полная выгрузка всего объема данных и добавление на приемнике
-
Incremental Append: инкрементальное чтение записей и добавление на приемнике
-
Incremental Deduped History: инкрементальное чтение записей, добавление на приемнике, а также формирование дедуплицированной версии представления
-
Manual full refresh: в случае необходимости провести полную репликацию данных из источника
Нормализация данных и преобразование типов. Также Airbyte может быть полезен в переводе массивов и вложенных (nested) коллекций в плоские структуры.

И, пожалуй, одно из самых главных – вы платите только за используемые вычислительные мощности (в случае использования облака). Никакой платы за количество коннекторов и объем реплицируемых строк, как в сервисах типа Hevo, Fivetran.
Развертывание Airbyte в Yandex.Cloud
Предлагаю попробовать пощупать Airbyte своими руками, развернув приложение в Яндекс.Облаке. Для этого нам понадобится проделать ряд шагов:
-
Регистрация в Облаке
-
Создание виртуальной машины
-
Создание пары ключей и подключение по SSH
-
Установка Docker + Docker compose
-
Запуск Airbyte
-
Конфигурирование Yandex Object Storage (S3) – опционально
Развертывание Airbyte в Yandex.Cloud
# Начало работы с интерфейсом командной строки # Install: https://cloud.yandex.ru/docs/cli/quickstart#install # Init profile: https://cloud.yandex.ru/docs/cli/quickstart#initialize yc config list # check # Подключиться к виртуальной машине Linux по SSH # Создание пары ключей SSH: https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#creating-ssh-keys ls ~/.ssh/id_rsa.pub # check # Создать виртуальную машину из публичного образа Linux # https://cloud.yandex.ru/docs/compute/operations/vm-create/create-linux-vm # проверьте путь до файла с публичным ключом yc compute instance create \ --name airbyte-vm \ --ssh-key ~/.ssh/key.pub \ --create-boot-disk image-folder-id=standard-images,image-family=ubuntu-1804-lts,size=10,auto-delete=true \ --network-interface subnet-name=default-ru-central1-a,nat-ip-version=ipv4 \ --memory 2G \ --cores 2 \ --hostname airbyte-vm yc compute instance list # check # Запишите публичный адрес ВМ – EXTERNAL IP # Подключение к виртуальной машине # https://cloud.yandex.ru/docs/compute/operations/vm-connect/ssh#vm-connect ssh -L 8000:localhost:8000 -i ~/.ssh/key yc-user@<external-ip> # укажите свой EXTERNAL IP # Install docker sudo apt-get update sudo apt-get install -y apt-transport-https ca-certificates curl gnupg2 software-properties-common curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add -- sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian buster stable" sudo apt-get update sudo apt-get install -y docker-ce docker-ce-cli containerd.io sudo usermod -a -G docker $USER # Install docker-compose sudo apt-get -y install wget sudo wget https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m) -O /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose docker-compose --version # Install Airbyte git clone https://github.com/airbytehq/airbyte.git cd airbyte git checkout v0.29.2-alpha sudo docker-compose up -d # Access at http://localhost:8000/preferences
В случае необходимости, Airbyte можно масштабировать (Scaling) следующими способами:
-
Увеличение количества Workers – scaling out
-
Использование более мощного типа VM (CPU, Memory, Disk) – scaling up
Подключим источник данных – Yandex.Metrika
В качестве источника интересных данных для нашего пайплайна я предложу использовать API сервиса Яндекс.Метрика. Это веб-счетчик который позволяет собирать огромное количество поведенческой информации с вашего сайта. У Я.Метрики есть демо-счётчик, который не требует аутентификации при обращении; им мы и воспользуемся.
Интерфейс позволяет выбрать интересующие метрики, разрезы и сегменты (фильтры) и представить данные в наглядном виде в интерактивном режиме (под капотом Clickhouse, который не тормозит!). Ознакомиться с ним можно по ссылке Metrika Live Demo.

Но прелесть в том, что все исходные данные доступны к выгрузке в сыром виде через API сервиса. То есть у вас появляется возможность собрать из этих данных что-то своё, например Сквозную аналитику.
Итак, в базовом виде обращение к API конфигурируется следующим образом:
-
Тип выгружаемого отчета – таблица, drill-down, time-series
-
Набор интересующих измерений, метрик, сегментов
-
Формат ответа и семплирование
-
Параметризация (валюта, атрибуция, цели)

В качестве обращения предлагаю формировать отчет Источники, сводка: https://api-metrika.yandex.net/stat/v1/data?preset=sources_summary&id=44147844
Для этого в Airbyte необходимо создать новый источник (source):
-
Тип – HTTP Request
-
Url – API Endpoint
-
Метод – GET

Конфигурируем простой pipeline и устанавливаем расписание
В качестве приемника данных будем использовать объектное хранилище Yandex Object Storage (совместимое с S3).
Для конфигурации бакета проделаем следующие шаги:
Конфигурация S3 в Яндекс.Облаке
## Configure S3 Access # Creating a service account: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create # Assign a role to service account: https://cloud.yandex.ru/docs/iam/operations/sa/assign-role-for-sa # Create access key: https://cloud.yandex.com/en-ru/docs/iam/operations/sa/create-access-key # Access with AWS S3 CLI: https://cloud.yandex.ru/docs/storage/tools/aws-cli yc iam service-account create --name airbyte-s3 yc iam service-account list yc iam service-account get airbyte-s3 yc iam service-account add-access-binding airbyte-s3 \ --role storage.admin \ --subject serviceAccount:<id> yc iam access-key create --service-account-name airbyte-s3 aws configure --profile yc aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 mb s3://<bucket-name> aws --profile=yc --endpoint-url=https://storage.yandexcloud.net s3 ls
Настроим Destination в Airbyte:
-
Destination type – S3
-
Endpoint – https://storage.yandexcloud.net
-
Имя Bucket и путь
-
S3 Key Id & Access Key полученные при настройке технической учетной записи

Обратите внимание на то, что помимо JSON из коробки доступен ряд других файловых форматов:
-
CSV
-
AVRO (binary) – гибкий, для schema evolution
-
Parquet – бинарный, колоночный, оптимизированный под чтение

Для финализации пайплайна выберем расписание и тип репликации:
-
Раз в сутки
-
Full refresh – Append
После сохраним и запустим репликацию.

Изучим выгруженные данные
Выгрузка представляет собой JSON-документ с:
-
параметрами обращения
-
результатами запроса
-
метаданными о выгрузке
Параметры обращения:
{ "query": { "ids": [ 44147844 ], "preset": "sources_summary", "dimensions": [ "ym:s:lastSignTrafficSource", "ym:s:lastSignSourceEngine" ], "metrics": [ "ym:s:visits", "ym:s:users", "ym:s:bounceRate", "ym:s:pageDepth", "ym:s:avgVisitDurationSeconds" ], "sort": [ "-ym:s:visits" ], "date1": "2021-08-19", "date2": "2021-08-25", "limit": 100, "offset": 1, "group": "Week", "auto_group_size": "1", "attr_name": "", "quantile": "50", "offline_window": "21", "attribution": "LastSign", "currency": "RUB", "adfox_event_id": "0" } }
Метаданные о выгрузке:
{ "total_rows": 116, "total_rows_rounded": false, "sampled": false, "contains_sensitive_data": false, "sample_share": 1.0, "sample_size": 2698, "sample_space": 2698, "data_lag": 184, "totals": [ 2694.0, 2363.0, 36.82256867, 1.63437268, 85.72345954 ], "min": [ 1.0, 1.0, 0.0, 1.0, 0.0 ], "max": [ 833.0, 736.0, 100.0, 9.0, 1201.0 ] }
Результаты запроса:
{ "data": [ { "dimensions": [ { "icon_id": "0", "icon_type": "traffic-source", "name": "Direct traffic", "id": "direct" }, { "name": null, "id": null, "favicon": null, "url": null } ], "metrics": [ 833.0, 736.0, 28.81152461, 1.7334934, 105.03601441 ] }, ... ] }
Ради интереса можно попытаться найти соответствие чисел в веб-интерфейсе Я.Метрики и выгрузке через API.
Понравилось – хочу еще
Браво! Поздравляю с первым потоком интеграции на реальных данных и использованием Modern Data Stack.
Еще больше современных инструментов мы изучаем на занятиях курсов Data Engineer и Analytics Engineer в OTUS: dbt, Clickhouse, Dataproc, Airflow. Но главное – то, что я и мои коллеги стремимся дать полноценную картинку и практические навыки, так необходимые для работы.
Главные преимущества такого подхода:
-
Живое общение на регулярных вебинарах
-
Пошаговые практические инструкции и домашние задания
-
Обратная связь и возможность получить консультации к своим решениям
Data Engineer – один из самых успешных тиражных курсов, в запусках которого я участвую уже более 2-х лет. К новому старту готовы кардинальные обновления по содержанию, используемым инструментам, инфраструктуре, включая выделенные вебинары на разбор домашних заданий.
Analytics Engineer – попытка закрыть потребность на людей-мультиинструменталистов, которые сильны и в понимании специфики бизнеса, моделировании и в инженерной части. Львиная доля курса посвящена современным аналитическим СУБД, BI-инструментам, практикам продвинутой аналитики и моделирования в dbt.
Спасибо за внимание!
ссылка на оригинал статьи https://habr.com/ru/articles/574704/
Добавить комментарий