Сценарии применения Greenplum PXF для интеграции с Data Lake, OLTP, Clickhouse

от автора

Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.

Популярность массивно-параллельных СУБД (MPP) для решения аналитических задач неукоснительно растет. Сегодня хотел бы поговорить о широко распространенной СУБД Greenplum и, в частности, о Platform Extension Framework (PXF) — расширении, с помощью которого открываются почти неограниченные возможности интеграции с множеством внешних систем и форматов данных.

В этой публикации Вас ждет:

  • Основные возможности PXF, конфигурация, способы оптимизации

  • Организация Extract — Load с помощью PXF (Data Lake / OLTP)

  • Объединение локальных и внешних таблиц в запросах (Federated queries)

  • Запись данных во внешние системы (Clickhouse)

Базовая идея Greenplum PXF

Заключается в том, чтобы задействовать обработку данных на стороне внешних систем, тем самым обеспечивая пользователю возможность обращения ко всей истории данных вне зависимости от места хранения, но без дополнительных трат ресурсов и времени на полную репликацию.

Демонстрационный сценарий описывает источники, отражающие данные по продажам за период в несколько лет. Операционные данные (например, текущий месяц — hot data) хранятся в OLTP MySQL, в Greenplum хранятся данные для аналитической отчетности (например, последний год-два – warm data), а в AWS S3 архивируются данные за более ранние периоды (cold data).

Таким образом, запрос на вывод суммы продаж с группировкой помесячно может быть распределен на 3 системы и выполнен параллельно. При этом пользователь обращается к одной таблице.

Заявлена поддержка доступа к данным следующих систем:

  • Hadoop / Hive / HBase

  • AWS S3 / Google Cloud Storage / Azure Blob Storage / MinIO

  • Реляционные СУБД (через JDBC)

  • Network file systems

И следующих форматов хранения:

  • Text (plain, delimited, embedded line feeds)

  • JSON

  • Avro, AvroSequenceFile

  • SequenceFile, RCFile

  • ORC / Parquet

В рамках этой публикации я буду использовать управляемый сервис от Я.Облака – Yandex Managed Service for Greenplum®. Конфигурация кластера: 2 x Master + 2 x Segment хоста на базе s2.micro (2 vCPU, 100% vCPU rate, 8 GB RAM).

Managed Service for Greenplum уже включает расширение PXF и его базовую конфигурацию. В случае использования Self-hosted Greenplum, все шаги установки и конфигурации придется проделать самостоятельно:

  • Соблюдение ряда требований перед установкой

  • Загрузка PXF Package

  • Установка Package на хостах

  • Инициализация и запуск сервиса PXF

Пошаговые действия описаны в официальной документации, и я не буду останавливаться на этом подробно.

PXF — расширение для работы с внешними данными через EXTERNAL TABLEs

PXF реализует протокол для СУБД Greenplum, который можно использовать при создании внешних таблиц.

Синтаксис комадны CREATE EXTERNAL TABLE с протоколом pxf выглядит следующим образом:

CREATE [WRITABLE] EXTERNAL TABLE <table_name> ( <column_name> <data_type> [, ...] | LIKE <other_table> ) LOCATION('pxf://<path-to-data>?PROFILE=<profile_name>[&SERVER=<server_name>][&<custom-option>=<value>[...]]') FORMAT '[TEXT|CSV|CUSTOM]' (<formatting-properties>) ;

Краткое пояснение к ключевым параметрам конфигурации:

С помощью PXF можно осуществлять как чтение из внешних источников, так и запись данных во внешние системы (ключевое слово WRITABLE). Также возможно объединение данных из разных источников в одном запросе (т.н. federated queries).

Отдельно хочу заострить внимание на поддержке Filter pushdown и Column Projection.

Filter pushdown позволяет применить ограничение на читаемые строки из выражения WHERE запроса SELECT на стороне источника данных, тем самым значительно снижая нагрузку и объем данных, передаваемых по сети. Например, это может быть использовано в обращениях к внешним СУБД, исключению партиций в Hive (partition pruning), чтению групп строк в колоночных форматах (ORC, Parquet).

Column Projection означает то, что только запрошенные колонки SELECT-запроса буду возвращены из внешних систем. Например, при запросе 5 колонок из 100 возможных в файлах формата Parquet в результате запроса вернутся (будут переданы по сети) только 5, что кратное уменьшает объем данных.

Интеграция с Data Lake (S3 / GCS / HDFS / MinIO)

Создадим EXTERNAL TABLE, указывающую на данные в объектном хранилище:

-- 1. Create EXTERNAL TABLE pointing to S3 (Text file) DROP EXTERNAL TABLE IF EXISTS src_orders ; CREATE EXTERNAL TABLE src_orders( O_ORDERKEY BIGINT, O_CUSTKEY INT, O_ORDERSTATUS CHAR(1), O_TOTALPRICE DECIMAL(15,2), O_ORDERDATE DATE, O_ORDERPRIORITY CHAR(15), O_CLERK  CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) ) LOCATION ('pxf://otus-dwh/tpch-dbgen/orders.csv?PROFILE=s3:csv&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net' ) FORMAT 'TEXT' (DELIMITER '|') ; 

Явно подчеркну важность корректной конфигурации для чтения файла, а именно:

  • Адрес в бакете – otus-dwh/tpch-dbgen/orders.csv

  • Профиль для чтения формата файла – PROFILE=s3:csv

  • Наличие ключей для доступа к бакету – accesskey=<>&secretkey=<>

  • Указание endpoint для S3-подобных хранилищ, например endpoint=storage.yandexcloud.net

  • Параметры для чтения конкретных форматов – для текста это (DELIMITER '|')

В случае некорректной конфигурации, получить данные из внешней таблицы не удастся.

Всего в файле содержится ровно 15М строк.

-- 2. Count rows SELECT count(1) FROM src_orders ; -- 15000000 ROWS

Теперь пробуем запустить аналитический запрос и посмотреть план его выполнения:

-- 3. Run OLAP query EXPLAIN ANALYZE SELECT     DATE_TRUNC('month', O_ORDERDATE) AS order_year     ,count(1) AS num_orders FROM src_orders WHERE O_ORDERSTATUS = 'P' GROUP BY 1 ORDER BY 1 ASC  ;

На выполнение запроса потребовалось 21.5 секунды, львиная доля времени была затрачена на чтение данных в S3. Даже при наличии фильтра WHERE потребовалось полное чтение файла, так как текстовые файлы не поддерживают predicate pushdown.

Далее с помощью WRITEABLE таблицы запишем эти же данные обратно в S3, но уже в колоночном формате Parquet:

-- 4. Write data back to S3 in columnar format (Parquet) DROP EXTERNAL TABLE IF EXISTS trg_orders ; CREATE WRITABLE EXTERNAL TABLE trg_orders(     O_ORDERKEY BIGINT,     O_CUSTKEY INT,     O_ORDERSTATUS CHAR(1),     O_TOTALPRICE DECIMAL(15,2),     O_ORDERDATE DATE,     O_ORDERPRIORITY CHAR(15),      O_CLERK  CHAR(15),      O_SHIPPRIORITY INTEGER,     O_COMMENT VARCHAR(79) ) LOCATION ('pxf://otus-dwh/tpch-dbgen-parquet/orders?PROFILE=s3:parquet&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net'     ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); INSERT INTO trg_orders SELECT * FROM src_orders ORDER BY O_ORDERSTATUS, O_ORDERDATE ;

Запустим тот же самый OLAP-запрос к новой таблице и сравним результаты:

В этот раз запрос выполнился за 4 секунды (в 5 раз быстрее), с учетом predicate pushdown (WHERE O_ORDERSTATUS = 'P') и column projection (фактически читали 1 колонку — O_ORDERDATE), поддерживаемых форматом Parquet.

Интеграция с OLTP СУБД (PostgreSQL)

В этом сценарии перейдем к работе с внешними OLTP-базами, которые динамично наполняются новыми данными и меняют уже имеющиеся. Предположим, что в нашем случае это статистические данные о рекламных кампаниях:

-- 1. Create EXTERNAL TABLE pointing to PostgreSQL table DROP EXTERNAL TABLE IF EXISTS src_direct_ads_facts ; CREATE EXTERNAL TABLE src_direct_ads_facts (     id serial ,     account_id int4 ,     dates_id int4 ,     sites_id int4 ,     traffic_id int4 ,     device varchar(16) ,     impressions_context int4 ,     impressions_search int4 ,     impressions int4 ,     clicks_context int4 ,     clicks_search int4 ,     clicks int4 ,     cost_context numeric(18, 2) ,     cost_search numeric(18, 2) ,     "cost" numeric(18, 2) ,     average_position numeric(18, 2) ,     average_position_clicks numeric(18, 2) ,     campaigns_id int4 ,     ads_id int4 ,     adgroups_id int4 ,     bounces int4 ) LOCATION ('pxf://public.direct_ads_facts?PROFILE=JDBC&JDBC_DRIVER=org.postgresql.Driver&DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import') ;

В случае JDBC видим уже несколько другой набор параметров:

  • Схема и имя таблицы – public.direct_ads_facts

  • Профиль для чтения – PROFILE=JDBC

  • Класс JDBC-драйвера – JDBC_DRIVER=org.postgresql.Driver

  • DB URI – DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>'

Будем считать, что необходимо регулярно получать данные из системы-источника в Greenplum. Сделать это можно двумя способами:

  • Копирование всей таблицы при каждом запросе

  • Организация инкрементальной загрузки

Первый способ обладает очевидными недостатками – чрезмерная нагрузка на источник и копирование одних и тех же данных каждый раз. Инкрементальный способ загрузки представляется гораздо более эффективным, но для него понадобятся способы выделения дельты (изменений), например, монотонно возрастающий ключ:

-- 1. Initialize CREATE TABLE direct_ads_facts AS SELECT * FROM src_direct_ads_facts ; -- 2. Incremental load INSERT INTO direct_ads_facts SELECT * FROM src_direct_ads_facts WHERE id > (SELECT MAX(id) FROM direct_ads_facts) ;

Использование федеративных запросов (Federated Queries)

Теперь представьте ситуацию, когда огромную таблицу фактов нужно склеить с маленьким, но часто меняющимся справочником. Реальный пример — это справочник названий рекламных кампаний, которые всегда имеют одинаковый идентификатор, но периодически меняют свои названия (label).

Для этого идеально подойдет PXF с возможностью обращения к источнику src_direct_campaigns для получения самых актуальных значений из справочника:

-- 2.7. Create Data Mart CREATE TABLE direct_search_facts AS SELECT           cf.account_id AS account_id     , MD5(CONCAT(CAST(cf.id AS varchar), 'yandex.search')) AS id     , ga.caption AS caption     , gd.simple_date AS dt     , cf.device AS device     , 'yandex.search' AS SOURCE     , gt.medium AS medium     , CAST(cp.campaign_id AS Int) AS campaign_id     , cp.name AS campaign -- Always correct and up-to-date     , gt.campaign AS traffic_campaign     , gt.content AS CONTENT     , gt.keyword AS keyword     , gs.domain AS DOMAIN     , gt.landing_page AS landing_page     , cf.impressions_search AS impressions     , cf.clicks_search AS clicks     , cf.cost_search AS COST FROM direct_ads_facts AS cf     LEFT JOIN general_accounts AS ga             ON ga.account_id = cf.account_id     LEFT JOIN general_dates AS gd             ON gd.id = cf.dates_id     LEFT JOIN src_direct_campaigns AS cp -- ! From PostgreSQL directly             ON cp.id = cf.campaigns_id     LEFT JOIN general_sites AS gs             ON gs.id = cf.sites_id     LEFT JOIN general_traffic AS gt             ON gt.id = cf.traffic_id ;

Запись данных в Clickhouse для сверхбыстрого чтения

Полученная широкая таблица-витрина содержит все данные, чтобы задавать многочисленные аналитические вопросы. Однако, порой даже Greenplum может не справляться с множеством сложных аналитических запросов, существенно теряя в скорости и интерактивности. На помощь приходит сверхбыстрый Clickhouse! Наша задача — переместить готовую витрину данных туда, где с ней можно будет работать в режиме пинг-понг, вне зависимости от объема данных.

Для начала на стороне приемника необходимо создать пустую таблицу (схему):

CREATE TABLE direct_search(   account_id Int8     , id String , caption String , dt String , device String , "source" String , medium String , campaign_id Int8 , campaign String , traffic_campaign String , "content" String , keyword String , "domain" String , landing_page String , impressions Int8 , clicks Int8 , "cost" Float32 ) ENGINE = MergeTree()   ORDER BY (dt) ;

После этого мы готовы к записи данных в Clickhouse с помощью PXF:

DROP EXTERNAL TABLE IF EXISTS trg_ch_direct_search ;  CREATE WRITABLE EXTERNAL TABLE trg_ch_direct_search( account_id int4 , id varchar(128) , caption varchar(128) , dt varchar(128) , device varchar(16) , "source" varchar(1024) , medium varchar(1024) , campaign_id int4 , campaign varchar(1024) , traffic_campaign varchar(1024) , "content" varchar(1024) , keyword varchar(1024) , "domain" varchar(1024) , landing_page varchar(1024) , impressions int4 , clicks int4 , "cost" numeric(18, 2) ) LOCATION ('pxf://direct_search?PROFILE=JDBC&JDBC_DRIVER=ru.yandex.clickhouse.ClickHouseDriver&DB_URL=jdbc:clickhouse://<hostname>:8123/default') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export') ;  INSERT INTO trg_ch_direct_search SELECT account_id ,id ,caption ,dt ,device ,"source" ,medium ,campaign_id ,campaign ,traffic_campaign ,"content" ,keyword ,"domain" ,landing_page ,impressions ,clicks ,"cost" FROM direct_search ;

Ничего сложного, несколько простых действий, немного терпения и готово. Теперь с этой витриной можно работать в Clickhouse, который, как известно, не тормозит!

Проблемы и трудности

В рамках исследования и подготовки материала я столкнулся с несколькими трудностями, на которые хотел бы обратить ваше внимание:

  1. В случае работы с S3 необходимо указывать ключи доступа accesskey=<>&secretkey=<> даже для публично доступных buckets

  2. Для работы PXF в Yandex Managed Service for Greenplum необходимо включить Egress NAT для подсети хостов.

С недавнего времени на сайте с документацией появилось релевантное сообщение:

Для подключения к внешним источникам необходимо включить NAT в интернет для подсети, в которой расположен кластер Managed Service for Greenplum®.

В противном случае вы будете получать ошибку с таймаутом SQL Error [08000]: ERROR: PXF server error : Could not obtain datasource for server default and PoolDescriptor

  1. Отсутствие SSL-соединения для JDBC

«PXF» не поддерживает «SSL-соединение» на уровне параметров драйвера ClickHouse-JDBC.

Workaround: Вы можете оставить в кластере ClickHouse один хост без публичного доступа и к нему обращаться из Greenplum.

Умение строить комплексные решения, отвечающие на запросы бизнеса

Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.

Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics EngineerData EngineerDataOps Engineer в OTUS.

Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами.

В комментарии поделитесь, с каким кейсом использования PXF удалось поработать, либо как планируете применять?


ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/682990/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *