Терабайты данных из Teradata в Trino — эффективный способ передачи

от автора

Архитектурный принцип Lakehouse предполагает, что вы оперируете всеми данными, загруженными в систему. Но иногда нужно выполнить ad hoc анализ за ее периметром, потому что необходимых данных по каким-либо причинам нет в Lakehouse-платформе. В этом случае на помощь приходит федеративный доступ. Стандартом для такой задачи является движок Trino. Он умеет извлекать данные из внешних СУБД и даже в некоторых случаях может делать push-down определенных вычислений на сторону системы-источника. Главное, чтобы под рукой был подходящий connector для нужной СУБД, который умеет эффективно с ней работать.

Недавно в состав Data Ocean Nova был добавлен новый Trino Teradata Connector. Он позволяет пользователям «подтягивать» необходимые срезы данных из Teradata в рамках ad hoc запросов и решает задачу эффективной передачи данных: можно передавать терабайты в несколько потоков без существенного увеличения нагрузки на источник.

В данной статье разберем:

  • Как организовать эффективную многопоточную работу с Teradata:

    • Где часто допускают ошибки;

    • Как должно выглядеть правильное решение;

  • Какие возможности дает Nova Trino Teradata Connector:

    • Многопоточная передача;

    • Push-down оптимизации.

В качестве вступления

Для начала стоит отметить, что мы не рекомендуем использовать функционал Nova Trino Teradata Connector именно для задач миграции данных в Lakehouse-платформу или для регулярной загрузки. Для этого у нас есть специализированный инструмент эффективной пакетной передачи данных Data Ocean Flex Loader, который при работе с Teradata использует (по выбору пользователя или администратора) либо механизм Native Object Storage (NOS), либо Teradata Parallel Transporter (TPT). Проблема доступа посредством Trino у наших клиентов, как правило, возникает в федеративных прикладных задачах вроде реконсиляции данных, реализации приложений проверки качества, профилирования данных системы-источника и так далее.

Эффективный клиентский доступ к Teradata

Небольшое погружение в архитектуру Teradata

Среди целей этой статьи нет детального погружения в СУБД Teradata, но, чтобы остальные технические детали были понятны и общий пазл по итогам прочтения сложился в понятную картину, кратко затронем ключевые архитектурные аспекты. Думаю, гораздо больше читателей могут быть знакомы с Greenplum, поэтому также проведем параллели с ним.

Teradata — это MPP-СУБД по схеме shared-nothing: каждый рабочий узел имеет свой выделенный объем вычислительных ресурсов (CPU, RAM, Disk) для решения задач. Рабочие узлы здесь именуются AMP’ами (Access Module Processor, полное имя используется редко), записи в таблицах распределяются между AMP’ами для хранения и обработки. AMP’ы объединены в общую interconnect-сеть BYNET. Пользователи подключаются и отправляют запросы к компонентам Parsing Engine (PE) — именно здесь происходит парсинг запросов, построение плана запроса, формирование задач для AMP’ов, контроль выполнения, формирование и отдача ответа клиенту.

Teradata проектировалась очень давно, оперативной памяти было значительно меньше, и она была значительно дороже (хотя, публикуя эту статью в начале 2026 г., рассуждать о дешевой памяти, к сожалению, не приходится). Поэтому СУБД изначально активно использовала дисковую подсистему для сохранения промежуточных вычислений и подготовленных данных (result sets) — это решение работает до сих пор. В планах запросов можно увидеть, что очень часто СУБД будет писать в Spool — это оно и есть.

Для тех, кто знаком с Greenplum, быстро проведем аналогию:

  • Master ServerParsing Engine (PE), которых может быть несколько; каждый PE может обрабатывать порядка 100+ сессий;

  • SegmentAMP. Обычно на одном достаточно мощном физическом Segment Host поднимают несколько Segment Server’ов, но на аналогичном по мощности Teradata Node всегда больше AMP’ов, поэтому обычно в Teradata общее количество AMP’ов больше, и счет идет на сотни и тысячи. Но и каждый AMP имеет в своем распоряжении меньше ресурсов и хранит/обрабатывает меньше данных, чем один Segment;

  • Distributed KeyPrimary Index. Данные распределяются между рабочими узлами по hash’у от полей, входящих в ключ дистрибуции. Обычно стараются добиться равномерного распределения данных между рабочими узлами на больших таблицах (для маленьких таблиц обычно выбирается другая стратегия хранения данных). В деталях реализации механизма дистрибуции данных (как и хранения) есть немало отличий, но в них сейчас погружаться не будем. Поставим в приоритет основные цели статьи;

  • SpillSpool. В отличие от Greenplum, который старается оставить вычисления в RAM, пока не исчерпает ее, Teradata сразу будет писать в Spool.

Где часто допускают ошибку

За свою практику мы сталкивались с несколькими решениями для работы с Teradata и последующей загрузки в Hadoop или Lakehouse, и у многих были одни и те же проблемы с эффективностью извлечения данных из Teradata. И не только потому, что некоторые выгружали сотни ГБ в один поток. Критичнее, когда выгрузка в N потоков создает N-кратную нагрузку на исходную систему.

Для параллельной выгрузки в таких случаях формировалось несколько запросов, каждый из которых должен обработать свою часть записей (есть две ключевые версии):

SELECT ... FROM table WHERE hashbucket(hashrow(key_field)) MOD 4 = 0;SELECT ... FROM table WHERE hashbucket(hashrow(key_field)) MOD 4 = 1;SELECT ... FROM table WHERE hashbucket(hashrow(key_field)) MOD 4 = 2;SELECT ... FROM table WHERE hashbucket(hashrow(key_field)) MOD 4 = 3;
SELECT ... FROM table WHERE hashamp(hashbucket(hashrow(key_field))) MOD 4 = 0;SELECT ... FROM table WHERE hashamp(hashbucket(hashrow(key_field))) MOD 4 = 1;SELECT ... FROM table WHERE hashamp(hashbucket(hashrow(key_field))) MOD 4 = 2;SELECT ... FROM table WHERE hashamp(hashbucket(hashrow(key_field))) MOD 4 = 3;

И каждый из этих запросов действительно вернет свою часть записей, которые не будут пересекаться между собой (сейчас оставим за скобками кейс с параллельными изменениями для упрощения). Но что для этого пришлось сделать СУБД, какую работу проделать?

Для обработки каждый AMP сканирует все записи и проверяет, для каких выполняется условие hash(key_field) MOD 4 = X. Такие записи каждый AMP записывает в Spool. Затем каждый AMP отдает эти записи из Spool’а клиенту. Другими словами, во время одного такого запроса мы сканируем таблицу целиком, тратим дополнительное CPU на вычисления предикатов для всех строк, и только потом начинается хоть какая-то экономия ресурсов.

Но у нас будет не один такой запрос, их будет N штук. Т.е. выгрузка в N потоков приведет к N full table scan: увеличивая количество потоков, мы будем линейно увеличивать нагрузку на исходную систему. Хотим выгрузить 10 ТБ таблицу из Teradata в Lakehouse, но в 1 поток выходит как-то медленно? Поставим 10 потоков и заставим сделать работы в 10 раз больше, т.е. вместо сканирований на 10 ТБ через I/O-подсистему прогоним 100 ТБ + потратим кучу CPU для вычислений предикатов.

Как итог, данный метод очень плохо масштабируется, выгрузка в N потоков дает:

  • 🆘 N-кратное увеличение I/O-операций в Teradata;

  • 🆘 N-кратное увеличение нагрузки по CPU в Teradata;

  • Слабое уменьшение времени выгрузки. Время подготовки данных на стороне Teradata уменьшаться не будет, выигрыш возможен только за счет уменьшения объема данных на 1 поток на принимающей стороне.

Что требуется для эффективной выгрузки

Давайте разберем, что мы имеем к моменту, когда хотим выгрузить данные из Teradata, и что о них знаем:

  • Данные таблицы уже разложены по сотням AMP’ов (а в крупных инсталляциях – и нескольким тысячам);

  • На каждом AMP’е лежит примерно равная порция данных;

  • Т.к. порций достаточно много (сотни-тысячи), в момент выгрузки не требуется их разбивать, а, скорее, наоборот, данные из нескольких AMP’ов можно выгружать в рамках одного потока (подключения/сессии).

Можно провести параллель с партиционированием таблиц, которое часто используется в разных СУБД, включая Teradata. Партиционирование позволяет разложить данные в отдельные порции (партиции) и потом читать только одну конкретную, а не сканировать все целиком. Но сейчас мы хотим читать порцию на одном конкретном AMP’е в рамках одного потока выгрузки.

В Teradata как раз есть механизм, позволяющий получить данные с конкретного AMP’а, необходимо только указать его порядковый номер. Можно указать и сразу несколько AMP’ов — очень удобно, можно выстраивать различные workload’ы под разные сценарии.

Зная количество AMP’ов, мы можем распределить задачу выгрузки на N потоков. Например, имея 10 запущенных процессов на стороне Lakehouse, можно распределить задачу выгрузки из Teradata с 1000 AMP’ами таким образом:

  • AMP’ы [1, 2, 3, … 100] → worker 1

  • AMP’ы [101, 102, 103, … 200] → worker 2

  • AMP’ы [901, 902, 903, … 1000] → worker 10

При этом каждый AMP прочитает свою порцию данных всего один раз, независимо от количества потоков, соответственно, здесь не будет тех проблем с масштабированием в N потоков, которые были ранее:

  • N-кратное увеличение I/O-операций в Teradata → ✅ Количество I/O-операций будет константой для таблицы, независимо от количества потоков;

  • N-кратное увеличение нагрузки по CPU в Teradata → ✅ Нагрузка по CPU на AMP’ах будет константой для таблицы, независимо от количества потоков;

  • Слабое уменьшение времени выгрузки → ✅ Время уменьшается линейно при условии фактического запуска в параллель всех запросов.

В завершение этого блока стоит сделать ремарку по вопросу Skew (перекоса) вычислений. В пределе один запрос будет отдавать данные с одного AMP’а, т.е. все вычисления будут производиться на одном AMP’е – это соответствует 100% Skew. Обычно таких сильных перекосов стараются избегать, поэтому иногда администраторы системы приходят с вопросами по таким запросам, и приходится объяснять, что в них проблемы нет: если просуммировать все запросы, каждый из которых вытащил свою порцию данных с одного AMP’а (и поэтому имел метрики со 100% Skew), то общая сумма по всем запросам покажет 0–1–2% Skew— это соответствует Skew исходной таблицы.

Trino Teradata Connector в Data Ocean Nova

Теперь правильный Teradata Connector идет в составе дистрибутива Data Ocean Nova. Для его установки не требуется дополнительных действий, сразу можно приступить к конфигурации каталога, который позволит обращаться Trino к одному из кластеров Teradata:

CREATE CATALOG teradata_cluster USING teradataWITH (  "connection-url" = 'jdbc:teradata://hostname/DATABASE=mydb,DBS_PORT=1025',  -- Используем проброс учетных данных (credential passthrough)  "user-credential-name" = 'td_user',  "password-credential-name" = 'td_password'  -- Альтернативный вариант с без проброса пользовательской УЗ,  -- задаем параметры общей технической УЗ  -- "connection-user" = 'user',  -- "connection-password" = 'password');

Сразу после этого мы можем посылать запросы из Trino, которые уже будут взаимодействовать с Teradata:

SELECT * FROM teradata_cluster.db_name.table_name WHERE ...;SELECT ...FROM lakehouse_db.lakehouse_tableLEFT JOIN teradata_cluster.db_name.table_name ON ...WHERE ...;

В процессе выполнения этих запросов Trino будет читать данные из Teradata. По умолчанию чтение будет происходить в один поток, но это легко поменять:

SET SESSION teradata_cluster.max_scan_parallelism = 4;

Connector самостоятельно определит количество AMP’ов в Teradata и распределит выгрузку с них между Trino Worker’ами:

Каждый AMP будет сканировать имеющиеся у него данные только один раз при любом количестве потоков выгрузки. Если задать max_scan_parallelism больше, чем есть AMP‘ов, то количество потоков будет равно количеству AMP’ов — не больше. Соответственно, один поток будет выгружать один AMP. На практике для быстрой выгрузки достаточно 4–8–16 потоков в зависимости от размера таблицы — до описанного ограничения очень далеко.

На схеме выше можно увидеть дополнительный шаг с фильтрацией — в этом примере в Trino нужны данные только за один день. Логично отсечь ненужные данные на стороне источника и не передавать их вовсе —Trino умеет делать push-down определенных выражений к источнику для оптимизации передаваемых объемов.

Поддерживаемые push-down оптимизации

Push-down where-предикатов

Для отсечения выборки на стороне источника Trino умеет «спускать» некоторые операторы и функции (и их комбинации через AND/ OR). Сейчас Teradata Connector поддерживает:

  • Операторы сравнения: =, <>, <, <=, >, >= ;

  • Арифметические операции над целочисленными типами: +, -, *, /, MOD ;

  • Унарный минус для целочисленных типов;

  • Оператор LIKE (с опциональным ESCAPE);

  • Проверки на NULL: IS NULL, IS NOT NULL ;

  • Логическое отрицание: NOT ;

  • Функция NULLIF ;

Например:

SELECT dt, client_id, phone_numberFROM teradata_cluster.db_name.table_nameWHERE dt = '2026-01-01' AND phone_number IS NOT NULL;

Push-down агрегирующих функций

При агрегации мы обычно уменьшаем размер выборки, поэтому push-down может также уменьшить объем передаваемых данных. В итоге нужно будет существенно меньше датасета, что может благоприятно сказаться на времени выполнения — особенно когда итоговая выдача уменьшается на порядки (10x, 100x, 1000x). 

Но, т.к. изначальный пользовательский запрос в Trino может содержать любые агрегации, которые знает Trino, важно, чтобы их аналоги были и в СУБД, и connector к СУБД понимал, что и как транслировать. Сейчас Teradata Connector поддерживает «джентльменский набор» в виде COUNT/ COUNT(DISTINCT), SUM, MIN, MAX, AVG и несколько менее знаменитые STDDEV_SAMP, STDDEV_POP, VARIANCE , VAR_POP. Поэтому следующий запрос Trino сможет отправить вычисляться в Teradata:

SELECT report_dt, COUNT(*), AVG(salary)FROM teradata_cluster.db_name.table_nameGROUP BY report_dt;

Важно отметить, что если хотя бы одна агрегирующая функция в запросе не поддерживается для push-down’а, то все агрегации в данном запросе будут выполнены в Trino. Если для расчетов нужно тащить детальные данные, то будет эффективнее сделать один проход вычислений по ним в Trino, чем делать два прохода (и в Trino, и в другой СУБД).

Если push-down агрегации не нужен, то его можно выключить:

SET SESSION teradata_cluster.aggregation_pushdown_enabled = false;

Push-down Join-операций

Сразу важный дисклеймер: push-down join’а работает только для объектов одного и того же каталога. Если в запросе используются объекты из разных каталогов, то Trino загрузит данные каждого объекта к себе и выполнит join самостоятельно. Магии здесь пока ждать не стоит.

Также важно, чтобы в условиях join’а не было каких-то Trino-specific функций, аналогов которых вообще нет в Teradata.

В остальном это тоже хороший вариант для уменьшения объема итоговой выборки перед передачей в Trino. Особенно когда данных становится кратно меньше.

Но если по итогам join’а количество записей должно увеличиться — лучше не делать push-down такого join’а. Дернуть стоп-кран можно вот так:

SET SESSION teradata_cluster.join_pushdown_enabled = false;

Ограничения push-down вычислений в Teradata

Выше мы говорили про быструю выгрузку результирующего датасета из Teradata в N потоков. Для этого каждый поток должен забрать уже готовые данные с одного или нескольких AMP’ов. К этому не проблема добавить дополнительное условие выборки (where-предикат) — его можно применять на каждом AMP’е независимо.

Но когда речь идет про join’ы или агрегацию, то исходные условия задачи оказываются другими. Системе уже нужно не просто отдать сохраненные данные с приведением простых скалярных вычислений, а сделать гораздо более сложную работу, которая может включать:

  • Redistribution одной или сразу нескольких таблиц между AMP’ами;

  • Duplicate таблицы на все AMP’ы;

  • Дополнительную сортировку записей на каждом AMP’е одной или нескольких таблиц;

  • Построение Hash-таблицы.

Для этих вычислений уже нельзя ограничиться только одним AMP’ом — для корректной операции нужны данные с других AMP’ов. Поэтому каждый поток выгрузки будет задействовать не один или несколько указанных AMP’ов, а сразу все. В начале статьи мы как раз разбирали связанную с этим проблематику.

Какие в этом случае есть варианты?

  1. «Приоритет Push-down’а». Push-down join’а / агрегации + выгрузка в один поток;

  2. «Многопоточная выгрузка». Без push-down’а — их отключаем;

  3. «Материализация». Сделать CREATE TABLE AS SELECT с нужным join’ом / агрегацией в Teradata, затем — многопоточная выгрузка.

Точный рецепт зависит от множества частных условий, но постараюсь изложить общие рекомендации для нескольких кейсов:

  • Если итоговая выборка небольшая — «Приоритет Push-down’а»;

  • Если итоговая выборка существенно сокращается в результате, чаще всего можно придерживаться «Приоритета Push-down’а» или «Материализации», когда в Teradata есть соответствующее место в sandbox’е;

  • Если итоговая выборка будет примерно такой же после join’а / агрегации, вариант — «Многопоточная выгрузка», т.к. тащить все равно столько же.

Заключение

В завершение сформулируем ключевые идеи и выводы:

  • Для эффективной многопоточной передачи данных на клиентскую сторону из Teradata каждый поток должен работать со своим подмножеством AMP’ов. Каждый AMP должен быть задействован лишь один раз, независимо от количества потоков;

  • В ряде случаев может быть более эффективно выполнить вычисления на стороне источника для уменьшения размера выборки. Trino умеет делать push-down определенных вычислений, но их должен поддерживать и connector;

  • При push-down’е вычислений важно понимать, как они будут выполняться на источнике, — некоторые из них не стоит совмещать с многопоточной выгрузкой. Но это может быть все равно лучше, особенно когда в результате получим существенно меньшую выборку.

Подписывайтесь на блог Data Sapience на Habr, чтобы узнавать о наших публикациях первыми.

ссылка на оригинал статьи https://habr.com/ru/articles/1024690/