
Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.
Популярность массивно-параллельных СУБД (MPP) для решения аналитических задач неукоснительно растет. Сегодня хотел бы поговорить о широко распространенной СУБД Greenplum и, в частности, о Platform Extension Framework (PXF) — расширении, с помощью которого открываются почти неограниченные возможности интеграции с множеством внешних систем и форматов данных.
В этой публикации Вас ждет:
-
Основные возможности PXF, конфигурация, способы оптимизации
-
Организация Extract — Load с помощью PXF (Data Lake / OLTP)
-
Объединение локальных и внешних таблиц в запросах (Federated queries)
-
Запись данных во внешние системы (Clickhouse)
Базовая идея Greenplum PXF
Заключается в том, чтобы задействовать обработку данных на стороне внешних систем, тем самым обеспечивая пользователю возможность обращения ко всей истории данных вне зависимости от места хранения, но без дополнительных трат ресурсов и времени на полную репликацию.
Демонстрационный сценарий описывает источники, отражающие данные по продажам за период в несколько лет. Операционные данные (например, текущий месяц — hot data) хранятся в OLTP MySQL, в Greenplum хранятся данные для аналитической отчетности (например, последний год-два – warm data), а в AWS S3 архивируются данные за более ранние периоды (cold data).

Таким образом, запрос на вывод суммы продаж с группировкой помесячно может быть распределен на 3 системы и выполнен параллельно. При этом пользователь обращается к одной таблице.
Заявлена поддержка доступа к данным следующих систем:
-
Hadoop / Hive / HBase
-
AWS S3 / Google Cloud Storage / Azure Blob Storage / MinIO
-
Реляционные СУБД (через JDBC)
-
Network file systems
И следующих форматов хранения:
-
Text (plain, delimited, embedded line feeds)
-
JSON
-
Avro, AvroSequenceFile
-
SequenceFile, RCFile
-
ORC / Parquet
В рамках этой публикации я буду использовать управляемый сервис от Я.Облака – Yandex Managed Service for Greenplum®. Конфигурация кластера: 2 x Master + 2 x Segment хоста на базе s2.micro (2 vCPU, 100% vCPU rate, 8 GB RAM).

Managed Service for Greenplum уже включает расширение PXF и его базовую конфигурацию. В случае использования Self-hosted Greenplum, все шаги установки и конфигурации придется проделать самостоятельно:
-
Соблюдение ряда требований перед установкой
-
Загрузка PXF Package
-
Установка Package на хостах
-
Инициализация и запуск сервиса PXF
Пошаговые действия описаны в официальной документации, и я не буду останавливаться на этом подробно.
PXF — расширение для работы с внешними данными через EXTERNAL TABLEs
PXF реализует протокол для СУБД Greenplum, который можно использовать при создании внешних таблиц.
Синтаксис комадны CREATE EXTERNAL TABLE с протоколом pxf выглядит следующим образом:
CREATE [WRITABLE] EXTERNAL TABLE <table_name> ( <column_name> <data_type> [, ...] | LIKE <other_table> ) LOCATION('pxf://<path-to-data>?PROFILE=<profile_name>[&SERVER=<server_name>][&<custom-option>=<value>[...]]') FORMAT '[TEXT|CSV|CUSTOM]' (<formatting-properties>) ;
Краткое пояснение к ключевым параметрам конфигурации:

С помощью PXF можно осуществлять как чтение из внешних источников, так и запись данных во внешние системы (ключевое слово WRITABLE). Также возможно объединение данных из разных источников в одном запросе (т.н. federated queries).
Отдельно хочу заострить внимание на поддержке Filter pushdown и Column Projection.
Filter pushdown позволяет применить ограничение на читаемые строки из выражения WHERE запроса SELECT на стороне источника данных, тем самым значительно снижая нагрузку и объем данных, передаваемых по сети. Например, это может быть использовано в обращениях к внешним СУБД, исключению партиций в Hive (partition pruning), чтению групп строк в колоночных форматах (ORC, Parquet).
Column Projection означает то, что только запрошенные колонки SELECT-запроса буду возвращены из внешних систем. Например, при запросе 5 колонок из 100 возможных в файлах формата Parquet в результате запроса вернутся (будут переданы по сети) только 5, что кратное уменьшает объем данных.
Интеграция с Data Lake (S3 / GCS / HDFS / MinIO)
Создадим EXTERNAL TABLE, указывающую на данные в объектном хранилище:
-- 1. Create EXTERNAL TABLE pointing to S3 (Text file) DROP EXTERNAL TABLE IF EXISTS src_orders ; CREATE EXTERNAL TABLE src_orders( O_ORDERKEY BIGINT, O_CUSTKEY INT, O_ORDERSTATUS CHAR(1), O_TOTALPRICE DECIMAL(15,2), O_ORDERDATE DATE, O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) ) LOCATION ('pxf://otus-dwh/tpch-dbgen/orders.csv?PROFILE=s3:csv&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net' ) FORMAT 'TEXT' (DELIMITER '|') ;
Явно подчеркну важность корректной конфигурации для чтения файла, а именно:
-
Адрес в бакете –
otus-dwh/tpch-dbgen/orders.csv -
Профиль для чтения формата файла –
PROFILE=s3:csv -
Наличие ключей для доступа к бакету –
accesskey=<>&secretkey=<> -
Указание endpoint для S3-подобных хранилищ, например
endpoint=storage.yandexcloud.net -
Параметры для чтения конкретных форматов – для текста это
(DELIMITER '|')
В случае некорректной конфигурации, получить данные из внешней таблицы не удастся.
Всего в файле содержится ровно 15М строк.
-- 2. Count rows SELECT count(1) FROM src_orders ; -- 15000000 ROWS
Теперь пробуем запустить аналитический запрос и посмотреть план его выполнения:
-- 3. Run OLAP query EXPLAIN ANALYZE SELECT DATE_TRUNC('month', O_ORDERDATE) AS order_year ,count(1) AS num_orders FROM src_orders WHERE O_ORDERSTATUS = 'P' GROUP BY 1 ORDER BY 1 ASC ;

На выполнение запроса потребовалось 21.5 секунды, львиная доля времени была затрачена на чтение данных в S3. Даже при наличии фильтра WHERE потребовалось полное чтение файла, так как текстовые файлы не поддерживают predicate pushdown.
Далее с помощью WRITEABLE таблицы запишем эти же данные обратно в S3, но уже в колоночном формате Parquet:
-- 4. Write data back to S3 in columnar format (Parquet) DROP EXTERNAL TABLE IF EXISTS trg_orders ; CREATE WRITABLE EXTERNAL TABLE trg_orders( O_ORDERKEY BIGINT, O_CUSTKEY INT, O_ORDERSTATUS CHAR(1), O_TOTALPRICE DECIMAL(15,2), O_ORDERDATE DATE, O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) ) LOCATION ('pxf://otus-dwh/tpch-dbgen-parquet/orders?PROFILE=s3:parquet&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); INSERT INTO trg_orders SELECT * FROM src_orders ORDER BY O_ORDERSTATUS, O_ORDERDATE ;
Запустим тот же самый OLAP-запрос к новой таблице и сравним результаты:

В этот раз запрос выполнился за 4 секунды (в 5 раз быстрее), с учетом predicate pushdown (WHERE O_ORDERSTATUS = 'P') и column projection (фактически читали 1 колонку — O_ORDERDATE), поддерживаемых форматом Parquet.
Интеграция с OLTP СУБД (PostgreSQL)
В этом сценарии перейдем к работе с внешними OLTP-базами, которые динамично наполняются новыми данными и меняют уже имеющиеся. Предположим, что в нашем случае это статистические данные о рекламных кампаниях:
-- 1. Create EXTERNAL TABLE pointing to PostgreSQL table DROP EXTERNAL TABLE IF EXISTS src_direct_ads_facts ; CREATE EXTERNAL TABLE src_direct_ads_facts ( id serial , account_id int4 , dates_id int4 , sites_id int4 , traffic_id int4 , device varchar(16) , impressions_context int4 , impressions_search int4 , impressions int4 , clicks_context int4 , clicks_search int4 , clicks int4 , cost_context numeric(18, 2) , cost_search numeric(18, 2) , "cost" numeric(18, 2) , average_position numeric(18, 2) , average_position_clicks numeric(18, 2) , campaigns_id int4 , ads_id int4 , adgroups_id int4 , bounces int4 ) LOCATION ('pxf://public.direct_ads_facts?PROFILE=JDBC&JDBC_DRIVER=org.postgresql.Driver&DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import') ;
В случае JDBC видим уже несколько другой набор параметров:
-
Схема и имя таблицы –
public.direct_ads_facts -
Профиль для чтения –
PROFILE=JDBC -
Класс JDBC-драйвера –
JDBC_DRIVER=org.postgresql.Driver -
DB URI –
DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>'
Будем считать, что необходимо регулярно получать данные из системы-источника в Greenplum. Сделать это можно двумя способами:
-
Копирование всей таблицы при каждом запросе
-
Организация инкрементальной загрузки
Первый способ обладает очевидными недостатками – чрезмерная нагрузка на источник и копирование одних и тех же данных каждый раз. Инкрементальный способ загрузки представляется гораздо более эффективным, но для него понадобятся способы выделения дельты (изменений), например, монотонно возрастающий ключ:
-- 1. Initialize CREATE TABLE direct_ads_facts AS SELECT * FROM src_direct_ads_facts ; -- 2. Incremental load INSERT INTO direct_ads_facts SELECT * FROM src_direct_ads_facts WHERE id > (SELECT MAX(id) FROM direct_ads_facts) ;
Использование федеративных запросов (Federated Queries)
Теперь представьте ситуацию, когда огромную таблицу фактов нужно склеить с маленьким, но часто меняющимся справочником. Реальный пример — это справочник названий рекламных кампаний, которые всегда имеют одинаковый идентификатор, но периодически меняют свои названия (label).
Для этого идеально подойдет PXF с возможностью обращения к источнику src_direct_campaigns для получения самых актуальных значений из справочника:
-- 2.7. Create Data Mart CREATE TABLE direct_search_facts AS SELECT cf.account_id AS account_id , MD5(CONCAT(CAST(cf.id AS varchar), 'yandex.search')) AS id , ga.caption AS caption , gd.simple_date AS dt , cf.device AS device , 'yandex.search' AS SOURCE , gt.medium AS medium , CAST(cp.campaign_id AS Int) AS campaign_id , cp.name AS campaign -- Always correct and up-to-date , gt.campaign AS traffic_campaign , gt.content AS CONTENT , gt.keyword AS keyword , gs.domain AS DOMAIN , gt.landing_page AS landing_page , cf.impressions_search AS impressions , cf.clicks_search AS clicks , cf.cost_search AS COST FROM direct_ads_facts AS cf LEFT JOIN general_accounts AS ga ON ga.account_id = cf.account_id LEFT JOIN general_dates AS gd ON gd.id = cf.dates_id LEFT JOIN src_direct_campaigns AS cp -- ! From PostgreSQL directly ON cp.id = cf.campaigns_id LEFT JOIN general_sites AS gs ON gs.id = cf.sites_id LEFT JOIN general_traffic AS gt ON gt.id = cf.traffic_id ;
Запись данных в Clickhouse для сверхбыстрого чтения
Полученная широкая таблица-витрина содержит все данные, чтобы задавать многочисленные аналитические вопросы. Однако, порой даже Greenplum может не справляться с множеством сложных аналитических запросов, существенно теряя в скорости и интерактивности. На помощь приходит сверхбыстрый Clickhouse! Наша задача — переместить готовую витрину данных туда, где с ней можно будет работать в режиме пинг-понг, вне зависимости от объема данных.
Для начала на стороне приемника необходимо создать пустую таблицу (схему):
CREATE TABLE direct_search( account_id Int8 , id String , caption String , dt String , device String , "source" String , medium String , campaign_id Int8 , campaign String , traffic_campaign String , "content" String , keyword String , "domain" String , landing_page String , impressions Int8 , clicks Int8 , "cost" Float32 ) ENGINE = MergeTree() ORDER BY (dt) ;
После этого мы готовы к записи данных в Clickhouse с помощью PXF:
DROP EXTERNAL TABLE IF EXISTS trg_ch_direct_search ; CREATE WRITABLE EXTERNAL TABLE trg_ch_direct_search( account_id int4 , id varchar(128) , caption varchar(128) , dt varchar(128) , device varchar(16) , "source" varchar(1024) , medium varchar(1024) , campaign_id int4 , campaign varchar(1024) , traffic_campaign varchar(1024) , "content" varchar(1024) , keyword varchar(1024) , "domain" varchar(1024) , landing_page varchar(1024) , impressions int4 , clicks int4 , "cost" numeric(18, 2) ) LOCATION ('pxf://direct_search?PROFILE=JDBC&JDBC_DRIVER=ru.yandex.clickhouse.ClickHouseDriver&DB_URL=jdbc:clickhouse://<hostname>:8123/default') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export') ; INSERT INTO trg_ch_direct_search SELECT account_id ,id ,caption ,dt ,device ,"source" ,medium ,campaign_id ,campaign ,traffic_campaign ,"content" ,keyword ,"domain" ,landing_page ,impressions ,clicks ,"cost" FROM direct_search ;
Ничего сложного, несколько простых действий, немного терпения и готово. Теперь с этой витриной можно работать в Clickhouse, который, как известно, не тормозит!
Проблемы и трудности
В рамках исследования и подготовки материала я столкнулся с несколькими трудностями, на которые хотел бы обратить ваше внимание:
-
В случае работы с S3 необходимо указывать ключи доступа
accesskey=<>&secretkey=<>даже для публично доступных buckets -
Для работы PXF в Yandex Managed Service for Greenplum необходимо включить Egress NAT для подсети хостов.
С недавнего времени на сайте с документацией появилось релевантное сообщение:
Для подключения к внешним источникам необходимо включить NAT в интернет для подсети, в которой расположен кластер Managed Service for Greenplum®.
В противном случае вы будете получать ошибку с таймаутом SQL Error [08000]: ERROR: PXF server error : Could not obtain datasource for server default and PoolDescriptor
-
Отсутствие SSL-соединения для JDBC
«PXF» не поддерживает «SSL-соединение» на уровне параметров драйвера ClickHouse-JDBC.
Workaround: Вы можете оставить в кластере ClickHouse один хост без публичного доступа и к нему обращаться из Greenplum.
Умение строить комплексные решения, отвечающие на запросы бизнеса
Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.
Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics Engineer, Data Engineer, DataOps Engineer в OTUS.
Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами.
В комментарии поделитесь, с каким кейсом использования PXF удалось поработать, либо как планируете применять?
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/682990/
Добавить комментарий