Trino — это популярный SQL-движок для выполнения аналитических запросов к озерам данных и виртуализации. Наша команда создает коммерческий форк Trino, который называется CedrusData. В России Trino набирает популярность как compute движок для новых платформ данных на основе архитектуры lakehouse и замещения Greenplum.
В статье я расскажу, как мы ускорили запросы, научив оптимизатор Trino удалять из плана ненужные Join операторы. Подобная оптимизация (также называемая join elimination, join pruning или join culling) реализована в большом количестве аналитических продуктов, но отсутствует в ванильном Trino.
Мы обсудим, почему в аналитических запросах часто возникают избыточные Join, почему это плохо для SQL-движков, какие эквивалентные преобразования позволяют избавиться от ненужных Join, и с какими проблемами мы столкнулись при интеграции данного функционала в оптимизатор Trino.
Мотивация
Аналитические запросы обычно представляют собой расчеты на основе нескольких таблиц. При этом пользователи могут обращаться не только к целевым таблицам, но и виртуальным представлениям (view). View повышают удобство, упрощают управление безопасностью, а в ряде случае и вовсе являются неотъемлемой частью архитектуры (например, data vault). BI-инструменты часто используют семантические модели для генерации SQL-запросов к источнику данных. С некоторой натяжкой мы можем рассматривать семантические модели как view, которые определены на уровне BI инструмента. Таким образом, пользователи аналитических систем часто используют view в явном или неявном виде.
Рассмотрим следующую схему данных, состояющую из таблицы фактов sales
и нескольких измерений.
Мы можем определить view, который использует все связанные таблицы:
CREATE VIEW sales_view AS SELECT * FROM sales INNER JOIN store ON sales.store_id = store.id INNER JOIN item ON sales.item_id = item.id INNER JOIN employee ON sales.employee_id = employee.id INNER JOIN campaign ON sales.campaign_id = campaign.id
Теперь мы можем использовать view для выполнения широкого спектра запросов. Например, мы можем посчитать продажи по магазинам:
SELECT store.name, SUM(sales.amount) FROM sales_view GROUP BY store.name
Проблема заключается в том, что при обращении к sales_view
, движку придется сделать Join всех таблиц, хотя запрос использует только две. Это так, потому что любой Join в общем случае может изменить набор возвращаемых записей.
Таким образом, при использовании view мы меняем удобство пользователя на скорость. Не самый приятный компромисс. Если мы научим движок определять, что в запросе присутствуют операторы Join, которые не влияют на конечный результат, и удалять их из плана, то мы сможем существенно увеличить производительность. Соответствующая оптимизация называется join pruning.
Когда Join pruning возможен?
Немного скучной теории. Для простоты мы будем рассматривать только INNER JOIN
. Для OUTER JOIN
правила немного меняются, но мы их опустим, чтобы не перегружать статью.
В большинстве движков Join является бинарным оператором, который имеет два входа. Join возвращает все пары строк из обоих входов, которые удовлетворяют заданному условию. Таким образом, строка каждого из входов может быть возвращена ноль, один или несколько раз.
Интуитивно, Join может быть заменен на один из своих входов, если запрос использует колонки только этого входа, и мы можем доказать, что будет возвращен тот же самый набор записей, как и до удаления Join.
Рассмотрим следующий запрос, в котором мы используем только колонки таблицы sales
:
SELECT SUM(sales.amount) FROM sales INNER JOIN store ON sales.store_id = store.id
Чтобы безопасно удалить Join с таблицей store
, мы должны быть уверены, что каждая запись из sales
будет возвращена ровно столько же раз, как и в присутствии Join. В общем случае мы не можем это гарантировать:
-
Некоторые записи таблицы
sales
могут быть отфильтрованы оператором Join, если для них нет соответствующих записей в таблицеstore
. -
Некоторые записи таблицы
sales
могут быть возвращены несколько раз, если колонкаstore.id
не является уникальной.
Чтобы безопасно удалить Join нам нужно доказать, что описанные выше ситуации невозможны.
Во-первых, если существует внешний ключ FOREIGN KEY (sales.store_id) REFERENCES (store.id)
, то у нас точно не будет записей в таблице sales
, которые были бы отфильтрованы из-за отсутствия соответствующей записий в таблице store
.
Во-вторых, если колонка store.id
уникальна, ни одна запись из таблицы sales
не будет возвращена более, чем один раз.
В-третьих, мы должны гарантировать, что sales.store_id
не имеет NULL
значений, так как наш Join их также отфильтровывает. Альтернативно, мы можем вручную добавить предикат store_id IS NOT NULL
после удаления Join.
При выполнении всех трех условий, мы можем удалить Join с таблицей store
. При этом результат запроса останется неизменным.
На основе данных правил join pruning реализован в различных продуктах. Например: Databricks, SAP HANA, Snowflake, Tableau.
Стоит отметить, что аналитические системы и BI инструменты, как правило, не могут гарантировать, что заданные constraint действительно выполняются. Вместо этого вы как пользователь явно уведомляете движок о том, что доверяете тем или иным constraint.
Как мы добавляли Join pruning в Trino
Данный параграф содержит скучные детали того, как мы добавляли join pruning в Trino. Если вы не являетесь фанатом внутренностей СУБД, можете сразу перейти к следующему параграфу.
Теоретические основы join pruning понятны. Но интеграция изменений в конкретный продукт зачастую сопряжена с рядом сложностей, обусловленных архитектурными особенностями системы. Trino не является исключением.
Первым делом нам потребовалось добавить понятие constraint (PRIMARY KEY
, UNIQUE
, FOREIGN KEY
, NOT NULL
) в оптимизатор Trino. К несчастью, в Trino в принципе нет понятия constraint.
Мы предложили сообществу Trino добавить constraint в оптимизатор, но получили отказ. Мотивация: Trino не сможет гарантировать, что constraint действительно выполняется. Это действительно так, но в мире аналитических систем отсутствие валидации constraint давно не является проблемой — см. ключевое слово RELY
в документации аналитических движков. Тем не менее мы не смогли оперативно переубедить сообщество. Это означало, что нам придется добавить понятие constraint в свой форк, а не в Trino. Сказано — сделано. Мы расширили парсер Trino, и добавили возможность задания нужных нам constraint через параметры конфигурации или свойства сессии (чтобы управлять оптимизацией в рамках отдельного запроса). Например:
SET SESSION cedrusdata_constraints = ARRAY[ 'PRIMARY KEY example.tpch.part(partkey)', 'PRIMARY KEY example.tpch.supplier(suppkey)', 'FOREIGN KEY example.tpch.partsupp(partkey) REFERENCES example.tpch.part(partkey)', 'FOREIGN KEY example.tpch.partsupp(suppkey) REFERENCES example.tpch.supplier(suppkey)' ];
Далее нам необходимо было понять, на каком этапе оптимизации делать join pruning. Планировщик Trino состоит из нескольких десятков фаз, называемых «optimizers», которые следуют друг за другом в строгом порядке, см. класс PlanOptimizers. Мы подобрали подходящее место для встраивания нашей оптимизации. Например, мы точно знали, что наш optimizer должен работать после PredicatePushDown, но до PushPredicateIntoTableScan. Почему? Потому что оптимизатор PredicatePushDown
умеет преобразовывать некоторые CROSS JOIN
в INNER JOIN
, что крайне важно для join pruning. А PushPredicateIntoTableScan
наоборот может существенно затруднить join pruning, так как в некоторых случаях он безвозвратно скрывает некоторую важную для нас информацию о предикатах. Все это тонкие нюансы работы оптимизатора Trino, которые не слишком интересны конечному пользователю, но крайне важны для корректной реализации сложных улучшений.
Оставалось лишь дорисовать сову — реализовать сам join pruning. Сначала мы добавили специальную механику, которая позволяет определить для каждого оператора набор текущих constraint и предикатов. Технически это паттерн visitor, который обходит дерево операторов снизу вверх и рассчитывает текущие ограничения оператора.
Далее мы реализовали логику, которая для каждого оператора Join определяет на основе доступных constraint, возможно ли удалить одну из его сторон. Если да, то мы просто заменяем Join на один из его входов, и опционально добавляем сверху NOT NULL
фильтр для колонок, участвующих в join condition.
Далее мы добавили ряд точечных улучшений, чтобы наш оптимизатор срабатывал для большего количества запросов. Например:
-
Добавили учет транзитивности. Например, мы можем переписать
GROUP BY store.id
наGROUP BY store.store_id
, так как условиеsales INNER JOIN store ON sales.store_id = store.id
гарантирует эквивалентность данных колонок. Данное переписывание позволяет в некоторых случаях удалить Join, даже если в запросе мы ссылаемся на колонки как правого, так и левого входов. -
Добавили поддержку
OUTER JOIN
. Данное улучшение требует некоторой модификации правил переписывания дерева операторов, но в целом реализуется даже проще, чем поддержкаINNER JOIN
. Вместе с тем,OUTER JOIN
встречается достаточно часто, поэтому без его поддержки мы не смогли бы в должной степени раскрыть потенциал нашей оптимизации.
Все выглядело достаточно неплохо, пока мы не занялись тестированием. По какой-то причине не всегда удавалось удалить ненужные цепочки Join. Например, у нас есть три таблицы:
Создаем view, который объединяет все три таблицы:
CREATE VIEW sales_view AS SELECT * FROM sales INNER JOIN store ON sales.store_id = store.id INNER JOIN city ON store.city_id = city.id
Обращаемся к колонкам sales
, но join pruning не происходит. В чем проблема?
SELECT sales.amount FROM sales_view
В процессе анализа мы быстро поняли, что виноват порядок Join. Если оптимизатор принимал решение расположить Join в порядке sales JOIN (store JOIN city)
, то join pruning отрабатывал успешно:
Если же порядок Join был (sales JOIN store) JOIN city
, то возникала неприятная ситуация: (sales JOIN store)
обязан вернуть колонку store.city
для выполнения вышестоящего Join. Таким образом, оптимизатор считал, что нам нужны колонки с обоих входов нижнего Join, и не выполнял join pruning.
Иными словами, промежуточный Join может использовать колонки, которые нужны только вышестоящим Join, но не нужны пользователю. Обработать данную ситуацию с помощью бинарных Join затруднительно. Мы поняли, что вместо бинарных Join (2-way) нам нужно использовать N-way Join.
Многие оптимизаторы умеют объединять соседние операторы Join в один большой N-way Join. В Trino есть несколько реализаций подобной логики, одна из которых называется JoinGraph. Мы переписали наш join pruning optimizer таким образом, чтобы он находил соседние Join, объединял их в JoinGraph
, и уже после этого определял, какие входы могут быть удалены.
Теперь наша реализация join pruning корректно определяла все таблицы, которые могут быть безопасно удалены из плана.
Но возникла новая проблема! В Trino JoinGraph
является промежуточным представлением плана, которое обязательно должно быть преобразовано обратно в набор бинарных Join по окончании оптимизации. В процессе тестирования мы обнаружили, что порядок Join до преобразования в JoinGraph
может отличаться от порядка Join, который получается после «разворачивания» JoinGraph
обратно в бинарные Join. Это обусловлено тем, что JoinGraph
был добавлен в Trino для реализации одной конкретной оптимизации (см. EliminateCrossJoins), в которой в силу ряд особенностей финальный порядок Join не имеет значения. В нашем же случае мы обязаны были сохранить оригинальный порядок Join, так как этот порядок может быть задан пользователем сознательно. В противном случае пользователь мог бы столкнуться с ситуацией, когда наш новый модный функционал многократно замедляет выполнение запроса из-за нарушения порядка Join. От обилия подобных нюансов у нас начинало немного подгорать.
Для преодоления данной проблемы нам пришлось отказаться от идеи использования уже существующего JoinGraph
, и написать свою реализацию, которая гарантирует сохранение оригинального порядка Join.
Был еще ряд неожиданных мелких проблем, которые я не буду описывать. В конце концов мы убедились, что все целевые сценарии join pruning (топологии chain, start, starflake) работают, и запросы возвращают корректные результаты.
Производительность
Проверим оптимизацию в действии. Мы будем использовать схему данных TPC-DS, scale factor 1000. Данные хранятся в S3, формат Parquet, коннектор Hive. Два узла CedrusData по 32 логических ядра и 128 Gb оперативной памяти.
Создадим view, который объединяет таблицу фактов store_sales
(~2.7 млрд строк) с четырьмя измерениями:
CREATE VIEW store_sales_view AS SELECT * FROM store_sales INNER JOIN customer_address ON ss_addr_sk = ca_address_sk INNER JOIN customer_demographics ON ss_cdemo_sk = cd_demo_sk INNER JOIN date_dim on ss_sold_date_sk = d_date_sk INNER JOIN item ON ss_item_sk = i_item_sk
Попробуем посчитать сумму продаж по годам без join pruning.
SELECT d_year, SUM(ss_quantity * ss_list_price) FROM store_sales_view GROUP BY d_year ORDER BY d_year
Время выполнения запроса: 49 секунд. Посмотрим на план:
EXPLAIN (TYPE LOGICAL, FORMAT GRAPHVIZ) SELECT d_year, SUM(ss_quantity * ss_list_price) FROM store_sales_view GROUP BY d_year ORDER BY d_year
Видим, что все четыре Join из оригинального view остались на своих местах:
Включим join pruning, задав в явном виде constraints (некоторые детали опущены для простоты):
SET SESSION cedrusdata_constraints = ARRAY[ 'PRIMARY KEY customer_address(ca_address_sk)', 'PRIMARY KEY customer_demographics(cd_demo_sk)', 'PRIMARY KEY date_dim(d_date_sk)', 'PRIMARY KEY item(i_item_sk)', 'FOREIGN KEY store_sales(ss_addr_sk) REFERENCES customer_address(ca_address_sk)', 'FOREIGN KEY store_sales(ss_cdemo_sk) REFERENCES customer_demographics(cd_demo_sk)', 'FOREIGN KEY store_sales(ss_sold_date_sk) REFERENCES date_dim(d_date_sk)', 'FOREIGN KEY store_sales(ss_item_sk) REFERENCES item(i_item_sk)' ]
Запустим запрос еще раз. Время выполнения с join pruning: 24 секунды. План запроса показывает, что остался только один Join между store_sales
и date_dim
:
Нам удалось ускорить запрос в два раза. Достаточно неплохой результат! Значит, наши мучения с оптимизатором Trino не были напрасными.
На широком наборе запросов данная оптимизация может давать прирост производительности от нескольких десятков до сотен процентов.
Вместо заключения
В CedrusData мы постоянно добавляем важные улучшения производительности, которые позволяют нашим клиентам быстрее решать свои аналитические задачи. Join pruning — лишь одна из многих оптимизаций, которые мы уже реализовали в нашем форке Trino. Получить пробную версию CedrusData можно по ссылке.
Если вы столкнулись с проблемами производительности Trino, расскажите об этом в комментариях. Подумаем вместе, как их можно решить.
Ну а если вы хотите вместе с нами работать над современными open-source проектами, такими как Trino, Apache Iceberg или DuckDB, то мы всегда будем рады получить ваше резюме!
В следующей статье мы расскажем, как нам в CedrusData удалось кратно ускорить чтение и запись данных из/в Greenplum через протокол GPFDIST.
Не забывайте подписаться на наш Telegram-канал о Trino и CedrusData (без пяти минут 500 участников!), YouTube-канал c докладами о Trino, и блог на Хабре.
Связь.
ссылка на оригинал статьи https://habr.com/ru/articles/843882/
Добавить комментарий