Кто такой Analytics Engineer – E2E-решение с использованием bash + dbt + Looker

Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.

Мы могли бы долго и нудно обсуждать, кто такой Analytics (Data / Backend) Engineer, какими инструментами он должен владеть, какие buzzwords в тренде и ценятся в CV, однако, на мой взгляд, гораздо интереснее рассмотреть процесс и результаты его деятельности в рамках конкретной прикладной задачи.

В этой публикации:

  • Что значит решение End-to-End и в чем его ценность?

  • Организация Extract & Load данных из асинхронного API MaestroQA

  • Моделирование витрин данных с помощью dbt

  • Поставка ценности для пользователей с помощью Looker

Решение End-to-End — от идеи до создания ценности

В двух словах, End-to-End – это поставка полноценного функционального решения, включающая все детали пазла.

Предлагаю переходить к реальному сценарию — это работа с приложением MaestroQA, автоматизирующим мониторинг и оценку клиентского сервиса (Customer Support).

Одна из самых важных идей заключается в том, что заказчик, кем бы он ни был (Manager, Product Owner, CEO), почти никогда не ставит задачу в инженерных терминах:

  • Налить 100500 гигабайт в Хранилище

  • Добавить multithreading в код

  • Написать супероптимальный запрос

  • Создать 15 dbt-моделей

За любой инженерной задачей стоит решение конкретных бизнес-проблем. Для нас это:

  • Прозрачность Customer Support (фиксируем все оценки, инциденты)

  • Результативность на ладони (отслеживаем динамику показателей во времени)

  • Отчитываемся о KPI команд поддержки (агрегирующие показатели по командам, городам, странам и т.д.)

  • Получаем обратную связь и исправляем ошибки (идентификация слабых/проблемных мест и быстрый feedback)

  • Постоянно учимся и разбираем кейсы (категоризация тем, организация тренингов и разборов)

И это ключевой фокус, который отличает Analytics Engineer от, например, классических Data Engineer, Backend Engineer. Обладая всем спектром инженерных навыков и практик, Analytics Engineer создает ценность для бизнеса и решает прикладные задачи. Говорит на одном языке с заказчиком решений и мыслит в терминах бизнес-показателей.

Получим исходные данные – Extract & Load Data

Окей, теперь ближе к делу. Я выбрал MaestroQA намеренно — это источник, для которого нет готовых коннекторов в SaaS-решениях.

Поэтому нам придется реализовать эту интеграцию и установить ее на расписание самостоятельно.

Выбор инструментов для интеграции данных — во многом вопрос вкуса, но я предпочитаю использовать простые shell-скрипты и оркестрировать их с помощью Airflow.

1. Начнем с изучения документации к API сервиса:

  • Нам доступен ряд методов: request-raw-export, request-groups-export, request-audit-log-export, get-export-data

  • Методы принимают набор параметров: apiToken, startDate, endDate, exportId

  • Результирующие отчеты формируются асинхронно

Асинхронный API означает то, что в ответ на запрос той или иной выгрузки вы получите не саму выгрузку а номер в очереди. Предъявив этот номер в другое окно (метод get-export-data), вы получите выгрузку, как только она будет готова.

Использование Async API несколько усложняет задачу, а именно использованием нескольких методов и сохранением exportId, но так задача становится даже интереснее.

2. Получим API Token.

Это секретный ключ, который позволит выгружать данные, связанные с вашим аккаунтом.

3. Готовим скрипты для выгрузки.

Шаг 1. Запросить выгрузку сырых данных (total_scores.sh):

  • Сформировать запрос к API (JSON_DATA)

  • Получить exportId (талон в очереди)

  • Проверить значение exportId и если ОК, то перейти к выгрузке результата (retrieve.sh)

# 1. Prepare request JSON_DATA=$(jq -n \               --arg maestroqa_token "$MAESTROQA_TOKEN" \               --arg start_date "$START_DATE" \               --arg end_date "$END_DATE" \               --arg single_file_export "$SINGLE_FILE_EXPORT" \               --arg name "$FILE_NAME" \               '{apiToken: $maestroqa_token, startDate: $start_date, endDate: $end_date, singleFileExport: $single_file_export, name: $name }' )  # 2. Get exportId EXPORT_ID=$(curl -s -X POST $ENDPOINT \     -H 'Content-Type: application/json' \     -d "${JSON_DATA}" \     | jq -r '.exportId')  # 3. Retrieve data by exportId if [ -z "$EXPORT_ID" ] then       echo "EXPORT_ID is empty"       exit 1 else       echo "EXPORT_ID=$EXPORT_ID"       EXPORT_ID=$EXPORT_ID bash retrieve.sh fi

Шаг 2. Получить готовую выгрузку (retrieve.sh):

  • Определить функцию для запроса статуса готовности выгрузки (get_status())

  • Сформировать запрос к API (JSON_DATA)

  • Опрашивать API о статусе готовности каждые 10 секунд

  • По готовности (complete) получить файл с результатами и сохранить в S3

# 1. function used to poll to get current status get_status() {     curl -s -X GET $RETRIEVE_ENDPOINT \     -H 'Content-Type: application/json' \     -d "${JSON_DATA}" \     | jq -r '.status' \     | cat }  # 2. prepare request JSON_DATA=$(jq -n \               --arg maestroqa_token "$MAESTROQA_TOKEN" \               --arg export_id "$EXPORT_ID" \               '{apiToken: $maestroqa_token, exportId: $export_id }' )  # 3. get current status ("in progress" / "complete") STATUS="$(get_status)" printf "STATUS=$STATUS\n"  # 4. poll every 10 seconds while [ "$STATUS" != "complete" ]; do   printf "STATUS=$STATUS\n"   sleep 10   STATUS="$(get_status)" done  # 5. Store resulting file to S3 curl -s -X GET $RETRIEVE_ENDPOINT \   -H 'Content-Type: application/json' \   -d "${JSON_DATA}" \   | jq -r '.dataUrl' \   | xargs curl -s \   | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv  echo "UPLOADED TO s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv"
  1. Автоматизация на Airflow.

Отлично! После успешного формирования выгрузки в ручном режиме возникает необходимость задать расписание выполнения. Удобнее всего это сделать через Airflow, с возможностью осуществлять повторные попытки, мониторинг, получения уведомлений.

Пример DAG:

from airflow import DAG from airflow.operators.bash_operator import BashOperator  import os import yaml from datetime import datetime, timedelta from slack.notifications import failed_task_slack_notification  ### INIT DAG DAG_NAME = "maestroqa_api" SCHEDULE_INTERVAL = '0 0 * * *'  DAG_PATH = os.path.dirname(__file__) CONFIG_FILE_NAME = "config.yml" CONFIG_PATH = os.path.join(DAG_PATH, CONFIG_FILE_NAME) CONFIG = yaml.safe_load(open(CONFIG_PATH))["endpoints"]  default_args = {     "owner": "airflow",     "depends_on_past": False,     "start_date": datetime(2022, 2, 1),     "retries": 1,     "retry_delay": timedelta(minutes=3),     "catchup": False,     "dagrun_timeout": timedelta(minutes=5),     'on_failure_callback': failed_task_slack_notification }  dag = DAG(     DAG_NAME,     default_args=default_args,      schedule_interval=SCHEDULE_INTERVAL )  os.environ["START_DATE"] = "{{ execution_date.isoformat() }}" os.environ["END_DATE"] = "{{ next_execution_date.isoformat() }}"  groups = BashOperator(             task_id=CONFIG['groups']['FILE_NAME'],             bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['groups']['FILE_NAME']}.sh ",             env={ **os.environ.copy(), **CONFIG['groups'] },             trigger_rule="all_done",             dag=dag     )  total_scores = BashOperator(             task_id=CONFIG['total_scores']['FILE_NAME'],             bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['total_scores']['FILE_NAME']}.sh ",             env={ **os.environ.copy(), **CONFIG['total_scores'] },             trigger_rule="all_done",             dag=dag     )  groups >> total_scores

Смоделируем витрины данных – Transform Data

К данному этапу у нас ежедневно работает выгрузка новых данных и файлы накапливаются в S3.

Зарегистрируем файлы в S3 в качестве EXTERNAL TABLE

Чтобы иметь возможность обращаться к данным с помощью SELECT-запросов. В этом нам поможет package dbt-labs/dbt_external_tables:

version: 2  sources:    - name: maestroqa     database: wheely     schema: spectrum     tags: ["sources", "maestroqa"]     loader: Airflow (S3 via External Tables)     description: "MaestroQA – customer service quality assurance software"      tables:          - name: groups           identifier: maestroqa_groups           description: "Agent Groups"           external:             location: "s3://{{ var('s3_bucket_name') }}/maestroqa/GROUPS/"             row_format: serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'             table_properties: "('skip.header.line.count'='1')"           columns:             - name: group_name               data_type: varchar               description: "Group / Team name: All Agents, UK Team, Ru Team, FR Team, UK Team"             - name: group_id               data_type: varchar             - name: agent_name               data_type: varchar             - name: agent_email               data_type: varchar             - name: agent_ids               data_type: varchar               description: "List of semicolon separated Agent IDs. Used to link with MaestroQA Total Scores table"             - name: available               data_type: bool               description: "Flag indicating if agent is available at the moment"

Результирующие EXTERNAL TABLES будут использоваться в dbt в качестве sources (источников данных)

Моделирование витрины данных

Предлагаю взглянуть на то, как выглядят исходные данные, полученные из ответов API. Выгружаются 2 типа файлов:

  • Groups — справочник агентов, команд

  • Scores — факты оценок и скорингов коммуникаций

Особое внимание обратим на колонку agent_ids, представляющиую собой массив идентификаторов. С этим атрибутом придется повозиться — необходимо разбить массив на элементы и придать таблице плоский вид, добавив суррогатные ключи.

Наша задача — собрать широкую витрину, на основе которой впоследствии можно находить ответы на любые вопрос. Для этого объединим таблицы:

{{     config (       materialized='table',       dist='auto',       sort=['graded_dt', 'country'],       tags=['maestroqa']     ) }}  SELECT      -- IDs   scores.gradable_id , scores.agent_id       -- dimensions     , scores.grader , scores.agent_name , scores.agent_email , groups.group_name , CASE groups.group_name       WHEN 'Ru Team' THEN 'RU'       WHEN 'FR Team' THEN 'FR'       WHEN 'UK Team' THEN 'GB'       WHEN 'All Agents' THEN 'All'     END AS country              -- dates     , scores.date_graded::DATE AS graded_dt      -- measures     , scores.rubric_score , scores.max_rubric_score  FROM {{ ref('stg_maestroqa_total_scores') }} AS scores     LEFT JOIN {{ ref('flt_maestroqa_groups') }} AS groups ON groups.agent_id = scores.agent_id          AND groups.group_name IN ('All Agents', 'Ru Team', 'FR Team', 'UK Team')

Итоговый граф зависимостей моделей dbt (DAG) выглядит следующим образом:

Обеспечим доступ к данным через BI – Deliver value

Отлично, к этому этапу помимо набора файлов в S3 у нас есть постоянно обновляющаяся широкая таблица в СУБД (витрина), обращаясь к которой мы можем получить ответы на любые вопросы.

Однако не все пользователи одинаково способны формулировать свои вопросы на чистом SQL. Этот барьер призваны устранить BI-инструменты, основные задачи которых сводятся к:

  • Предоставлению визуального конструктора запросов к данным

  • Формированию набора измерений, метрик и фильтров для последующего использования бизнес-пользователями

  • Группировке ряда визуализаций и ответов в пользовательские дашборды

  • Настройке рассылок данных и уведомлений по определенным правилам

В Looker каждой колонке исходной таблицы можно присвоить статус измерения или метрики, задать правила агрегации, добавить комментарий. Это делается на языке LookML, напоминающем Javascript:

Далее любой пользователь может воспользоваться конструктором для визуального формирования ответов на свои вопросы – в Looker это называется Explore:

Готовые плиточки (tiles) можно группировать в дашборды, рассылать всем заинтересованным пользователям, устанавливать уведомления при достижении пороговых значений метрик:

В целом — это вершина айсберга, относительно проделанной работы, но именно на этом этапе создается основная ценность и польза для компании.

Умение строить комплексные решения, отвечающие на запросы бизнеса

Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.

Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics Engineer и Data Engineer в OTUS.

Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами:

  • Продвинутое моделирование в dbt

  • Развертывание и особенности работы c BI-инструментами

  • Аналитические паттерны и SQL

  • Кейсы: Сквозная аналитика, Company’s KPI, Timeseries analysis

Также своими наблюдениями, опытом и практиками я делюсь в ТГ-канале Technology Enthusiast.

Напишите комментарий, если сталкивались с потребностью строить подобные решения, и какой подход применяли?

Спасибо!


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/665642/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *