Курсы валют и аналитика – использование обменных курсов в Хранилище Данных

от автора

Привет! На связи Артемий – Analytics Engineer из Wheely.

Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.

Покажу как этот вопрос решается с помощью современных подходов на примере кейса Wheely:

  • Расширение списка базовых валют

  • Регулярное обновление и получения актуальных курсов

  • Обеспечение корректности исторических показателей

  • Максимальное удобство и простота использования в аналитических инструментах

Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.


Новые требования к сервису валютных курсов

В качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.

Новые требования можно суммировать следующим образом:

  • Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ

  • Получение самых актуальных котировок, включая внутридневные курсы

  • Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)

Матрица новых требований к работе с курсами валют
Матрица новых требований к работе с курсами валют

Задачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:

  • Интеграция нового API для уже использующихся курсов

  • Добавление новых базовых валют в выгрузку

  • Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды

  • Архивирование курсов из legacy-источника

Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить. 

Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.

Интеграция с новым поставщиком данных

Как уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют – https://openexchangerates.org/

Минимальный необходимый план Developer включает в себя:

  • 10.000 запросов ежемесячно (более чем достаточно)

  • Ежечасные внутридневные обновления курсов

  • Широкий набор базовых валют, включая криптовалюты

Доступные методы API:

Для получения актуальных курсов валют воспользуемся API endpoint /latest.json

Простой запрос-ответ может выглядеть следующим образом:

Установка на расписание в Airflow

Для регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow – де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами. 

Смысловая составляющая графа задачи (DAG):

  • Сделать запрос к API

  • Сохранить полученный ответ (например, в виде уникального ключа на S3)

  • Уведомить в Slack в случае ошибки

Конфигурация DAG:

  • Базовые валюты (base currency), от которых отсчитываем курсы

  • Синхронизация расписание запусков с расчетом витрин в Хранилище Данных

  • Токен доступа к сервису

Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:

TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`   curl -H "Authorization: Token $OXR_TOKEN" \  "https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \  | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json

Вот как выглядит результат регулярной работы скрипта в S3:

Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:

Выгрузка истории по новым валютам

После обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.

К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.

Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:

#!/bin/bash   d=2011-01-01 while [ "$d" != 2021-02-19 ]; do  echo $d  curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json  d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d) done

Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:

Архивирование исторических курсов валют

Вся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.

Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.

Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:

  • Трансформацию legacy pivot-таблицы в двумерную

  • Запись в колоночный формат PARQUET в AWS S3

Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf STORED AS PARQUET LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS WITH base AS (    SELECT 'EUR' AS base_currency    UNION ALL    SELECT 'GBP'    UNION ALL    SELECT 'RUB'    UNION ALL    SELECT 'USD' ) SELECT    "day" AS business_dt    ,b.base_currency    ,CASE b.base_currency        WHEN 'EUR' THEN 1        WHEN 'GBP' THEN gbp_to_eur        WHEN 'RUB' THEN rub_to_eur        WHEN 'USD' THEN usd_to_eur        ELSE NULL      END AS eur    ,CASE b.base_currency        WHEN 'EUR' THEN eur_to_gbp        WHEN 'GBP' THEN 1        WHEN 'RUB' THEN rub_to_gbp        WHEN 'USD' THEN usd_to_gbp        ELSE NULL      END AS gbp    ,CASE b.base_currency        WHEN 'EUR' THEN eur_to_rub        WHEN 'GBP' THEN gbp_to_rub        WHEN 'RUB' THEN 1        WHEN 'USD' THEN usd_to_rub        ELSE NULL      END AS rub    ,CASE b.base_currency        WHEN 'EUR' THEN eur_to_usd        WHEN 'GBP' THEN gbp_to_usd        WHEN 'RUB' THEN rub_to_usd        WHEN 'USD' THEN 1        ELSE NULL      END AS usd      FROM ext.currencies c    CROSS JOIN base b ;

Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.

Доступ к данным из DWH через S3 External Table

А теперь самое интересное – из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.

Оптимальное решение – создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.

CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (    "timestamp" bigint    , base varchar(3)    , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8> ) ROW format serde 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<BUCKET>/dwh/currencies/' ; 

Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.

Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:

   - name: external      schema: spectrum      tags: ["spectrum"]      loader: S3      description: "External data stored in S3 accessed vith Redshift Spectrum"      tables:        - name: currencies_oxr          description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"          freshness:            error_after: {count: 15, period: hour}          loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'          external:            location: "s3://<BUCKET>/dwh/currencies/"            row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"          columns:            - name: timestamp              data_type: bigint            - name: base              data_type: varchar(3)            - name: rates              data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

Немаловажным элементом является проверка своевременности данных – source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.

В случае отставания данных – более 15 часов без свежих обменных курсов – мы тут же получаем уведомление в Slack.

Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:

Объединение исторических и новых данных в единый справочник
{{    config(        materialized='table',        dist='all',        sort=["business_dt", "base_currency"]    ) }}   with cbrf as (    select        business_dt    , null as business_ts    , base_currency    , aed    , eur    , gbp    , rub    , usd    from {{ source('external', 'currencies_cbrf') }}  where business_dt <= '2021-02-18'  ),   oxr_all as (      select        (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt    , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts    , o.base as base_currency    , o.rates.aed::decimal(10,4) as aed    , o.rates.eur::decimal(10,4) as eur    , o.rates.gbp::decimal(10,4) as gbp    , o.rates.rub::decimal(10,4) as rub    , o.rates.usd::decimal(10,4) as usd    , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn      from {{ source('external', 'currencies_oxr') }} as o    where business_dt > '2021-02-18'   ),   oxr as (    select        business_dt    , business_ts    , base_currency    , aed    , eur    , gbp    , rub    , usd    from {{ ref('stg_currencies_oxr_all') }}  where rn = 1  ),   united as (    select        business_dt    , business_ts    , base_currency    , aed    , eur    , gbp    , rub    , usd   from cbrf    union all    select        business_dt    , business_ts    , base_currency    , aed    , eur    , gbp    , rub    , usd   from oxr   )   select      business_dt  , business_ts  , base_currency  , aed  , eur  , gbp  , rub  , usd   from united

При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте  виде для ускорения работы запросов.

Использование курсов в моделировании данных

В целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты  . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:

   select          -- price_details        , r.currency        , {{ convert_currency('price', 'currency') }}        , {{ convert_currency('discount', 'currency') }}        , {{ convert_currency('insurance', 'currency') }}        , {{ convert_currency('tips', 'currency') }}        , {{ convert_currency('parking', 'currency') }}        , {{ convert_currency('toll_road', 'currency') }}      from {{ ref('requests') }} r        left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt            and r.currency = currencies.base_currency 

Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:

-- currency conversion macro {% macro convert_currency(convert_column, currency_code_column) -%}        ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed    , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur    , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp    , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub    , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd   {%- endmacro %} 

Практико-ориентированное развитие

Работа с данными – одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.

В конце мая состоится юбилейный запуск курса Data Engineer в OTUS, в котором я принимаю участие в роли преподавателя.

По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов – реальных прикладных проблем инженеров:

  • Data Architecture

  • Data Lake

  • Data Warehouse

  • NoSQL / NewSQL

  • MLOps

Детально с программой можно ознакомиться на лендинге курса.

Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.

Благодарю за внимание.

ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/558238/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *