Использование Trino для построения ETL-процессов

от автора

1.     Введение. Trino: ключевые задачи и главные преимущества

В современной архитектуре управления данными ETL-процессы рассматриваются не как вспомогательный инструмент, а как базовый механизм интеграции, трансформации и подготовки данных, поступающих из множества гетерогенных источников. Ключевая цель этих процессов — избавиться от хаоса и разрозненности данных, которые почти всегда появляются в больших распределенных компаниях [1].

В рамках ETL-конвейера выполняется автоматизированное извлечение данных из различных источников, их очистка, нормализация и приведение к единой модели, после чего подготовленные данные загружаются в централизованное аналитическое хранилище. Это даёт три главных преимущества: обеспечивает высокое качество и согласованность данных, структурирует информацию под нужды бизнес-отчетности, а также отделяет аналитическую нагрузку от операционных систем, повышая таким образом производительность системы в целом.

ETL возник как вынужденная мера, так как во время его появления (1970–1990-е) не было ни высокоскоростных сетей, ни мощных распределенных движков аналитики, ни концепции Data Lake. Единственным надёжным способом построить аналитическую отчетность было физически извлекать данные из операционных систем и копировать их в отдельную специализированную базу. Именно поэтому ETL закрепился как основной архитектурный паттерн аналитических систем на долгие десятилетия.

Увы, такой подход породил и массу проблем: это дублирование данных, долгие пайплайны, сложные зависимости, задержки обновления и огромные затраты на поддержку. Традиционным ETL-процессам становится всё труднее справляться с постоянно растущим объемом поступающих данных. Более того, большие сложности возникают при работе с уже накопленной информацией, ведь её требуется хранить на протяжении многих лет, а значит — сохранять возможность глубокого анализа по всей доступной истории.

Сложные аналитические инструменты стали необходимы из-за экспоненциального роста объёмов данных и их разнородности Анализ данных в наше время требует мощных вычислений для обработки огромных объемов данных в реальном времени. Основные причины роста объема данных – бум искусственного интеллекта (ИИ), облачные вычисления, рост числа IoT-устройств и необходимость обработки неструктурированных данных из социальных сетей и бизнеса. Рынок data analytics растёт на 30% ежегодно [2]

Одним из ответов на эти вызовы стал Trino [3] — распределенный SQL-движок для интерактивной аналитики по большим данным. Trino позволяет выполнять быстрые запросы к данным, хранящимся в разных системах (data lake, объектные хранилища, OLTP-БД, стриминг-источники). Высокая скорость обработки данных достигается за счет нескольких архитектурных решений, положенных в основу данного инструмента. Во-первых, обработка запроса в оперативной памяти (насколько это возможно), во-вторых, наличие продвинутого оптимизатора (CBO – Cost-Based Optimizer) и в-третьих, MPP- архитектура, которая позволяет распределять выполнение одного запроса между многими узлами вычислений (Workers).

Trino решает ключевую проблему масштабируемой аналитики больших данных без традиционных ограничений классического ETL-подхода, так как является распределенным SQL-движком. Он превращает сложные трансформации, ранее требовавшие отдельной Big Data-платформы, в обычные SQL-операции. При выполнении запросов Trino опирается преимущественно на свои собственные ресурсы, а значит сводит к минимуму дополнительную нагрузку на источник. В классическом ETL логика живёт в скриптах, пайплайнах и конфигурациях, которые сложно разрабатывать и сопровождать. Trino позволяет выполнять SQL-запросы напрямую поверх разных источников и data lake форматов. Это делает ненужными длинные цепочки из источников данных, промежуточных загрузок только ради того, чтобы объединить данные из разных систем. Теперь появляется возможность джойнить совершенно разные источники в одном запросе, тем самым устраняется необходимость физически перемещать данные, прежде чем их можно анализировать или объединять.

Trino при определённых ограничениях может частично подменить классический ETL-процесс. В современном Data Lakehouse он выходит за рамки простого SQL-движка, беря на себя функции слоя трансформации данных в реальном времени.

2. Основная часть. Строим ETL процесс на Trino

2.1  Настройка Docker desktop для работы

Для работы нам потребуется развернуть набор контейнеров из публичного реестра Docker Hub. Большой набор представленных контейнеров обусловлен необходимостью продемонстрировать взаимодействие Trino с разнородными внешними источниками и приемниками данных. Полный список необходимых контейнеров представлен на рис. 1. Назначение отдельных компонентов будет раскрыто в соответствующих разделах по мере погружения в сценарии использования.

Рис.1. Список Docker контейнеров

Рис.1. Список Docker контейнеров

2.2 Extract. Работа Trino с источниками данных

            Очевидно, что чем больше широким является выбор источников данных, тем больший интерес представляет и сам Trino. Ключевым механизмом, позволяющим Trino подключаться к источникам, являются коннекторы (connectors). Каждый коннектор — это плагин, который реализует специфический интерфейс Trino для работы с определенным типом источника. Коннекторы представляют собой набор конфигурационных файлов в папке catalogs. Это специальные файлы свойств (например, hive.properties, postgresql.properties), где указываются тип коннектора, адрес источника, учетные данные и специфические настройки.

Количество доступных коннекторов для Trino постоянно растет. А поскольку система построена на модульной архитектуре, это позволяет сообществу и вендорам легко добавлять новые коннекторы. Официально на начало 2026 года в стандартную поставку Trino входит более 40 коннекторов. Они позволяют получить доступ к таким источникам как Data Lakes (Hive, Iceberg, Delta Lake, Hudi), реляционные БД (MySQL, PostgreSQL, Oracle, MSSQL и т.д.), NoSQL (Cassandra, MongoDB, Elasticsearch, OpenSearch и т.д.) стриминговые и другие источники.

Вместе с тем, открытая архитектура дает возможность разрабатывать коннекторы компаниям-партнерам, благодаря которым появились коннекторы к целому ряду коммерческих продуктов (SAP HANA, Teradata, NetSuite, Salesforce и др.). Таким образом общее количество коннекторов уже стремится к 100+.

Архитектура Trino позволяет подключать неограниченное количество экземпляров одного и того же типа источника. Например, в одном кластере Trino вы можете одновременно работать с несколькими разными базами (допустим PostgreSQL), просто настроив для каждой свой файл конфигурации — postgresqlХХ.properties.

Важно понимать, что Trino – это только Query Engine (движок запросов), а не база данных. Поддержка стандартов записи (INSERT, UPDATE, DELETE, MERGE) зависит от конкретного коннектора. Например, коннектор к Iceberg или Delta Lake поддерживает полноценный ANSI SQL MERGE и DELETE, а коннектор к обычному S3 (Hive) может иметь ограничения на удаление или обновление отдельных строк.

Когда мы говорим о SQL базах данных, то тут все сравнительно просто. Trino дает вам возможность писать запросы на своем собственном диалекте SQL, такой диалект позиционируется как очень близкий к ANSI SQL движок [4], т.е. официально заявлено о полной совместимости с ANSI SQL, а также реализована значительная часть стандартов SQL от SQL-92 до SQL-2023. Это означает, что несмотря что существуют отличия и особенности Trino диалекта, большинство «обычного» SQL, которое вы пишете для PostgreSQL/MySQL/Hive/Oracle или современных версий других СУБД, будет работать в Trino почти без изменений. 

2.2.1  Работа с PostgreSQL

На данном этапе нам потребуются два контейнера — postgres и trino. А значит наш docker-compose.yaml файл будет содержать две секции.

pg-data:    image: postgres:16    container_name: pg-data    restart: unless-stopped    environment:      POSTGRES_DB: data             POSTGRES_USER: trino      POSTGRES_PASSWORD: trino123    volumes:      - ./stack/pg-data:/var/lib/postgresql/data    ports:      - "5434:5432"    networks:      - trino-network

Листинг 1.
Часть файла docker-compose.yaml, отвечающего за внешний источник данных — postgres

trino:    image: trinodb/trino:latest    container_name: trino    restart: unless-stopped    depends_on:      - pg-data    ports:      - "8082:8080"    volumes:      - ./stack/trino/catalogs:/etc/trino/catalog      - ./stack/trino/kafka_tbls:/etc/trino/kafka      - ./stack/trino/data:/var/trino    networks:      - trino-network

Листинг 2.
Часть файла docker-compose.yaml, отвечающего за Trino

После запуска docker контейнеров, мы можем подключиться к Postgres используя подходящий sql клиент (в моем случае это будет dbeaver). URL подключения будет выглядеть следующим образом:

jdbc:postgresql://localhost:5434/data

Порт, имя базы, логин и пароль берем из Листинг 1.

После настройки соединения создадим простую таблицу и наполним ее данными.

CREATE TABLE public.weather_forecast (  forecast_id  INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, location     VARCHAR NOT NULL, forecast_dt  TIMESTAMP NOT NULL, temp_c       DECIMAL(4,1), weather_desc TEXT);INSERT INTO public.weather_forecast (location, forecast_dt, temp_c, weather_desc) VALUES ('Moscow', '2026-02-04 06:00', 4.2, 'cloudy'),('Moscow', '2026-02-04 09:00', 5.1, 'rain'),('Moscow', '2026-02-04 12:00', 6.5, 'partly cloudy'),('Saint Petersburg', '2026-02-04 06:00', 3.2, 'snow'),('Saint Petersburg', '2026-02-04 09:00', 4.5, 'cloudy'),('Saint Petersburg', '2026-02-04 12:00', 3.8, 'rain');

Листинг 3.
Создание и наполнение таблицы weather_forecast в postgres.

Теперь научимся работать с данной таблицей из Trino, для этого нам потребуется настроить postgres connector. Местоположение файлов настройки коннекторов определено как ./stack/trino/catalogs (см. Листинг 2), туда мы положим файл pg_data.properties следующего содержания:

connector.name=postgresqlconnection-url=jdbc:postgresql://pg-data:5432/dataconnection-user=trinoconnection-password=trino123

Строго говоря, пароли лучше не хранить в открытом виде [5]. Trino поддерживает secrets management для подстановки переменных. Т.е. можно использовать синтаксис ${ENV:VARIABLE_NAME} прямо в .properties файлах. Тогда строка connection-password будет выглядеть так:

connection-password=${ENV:DB_PASSWORD}

В этом случае получаем пароль прямо из переменных окружения. Т.е. потребуется заранее установить необходимый пароль.

export DB_PASSWORD=my-super-secret-pwd

Теперь можем настроить в dbeaver доступ к trino, где уже проброшен коннект в postgres. Теперь в Dbeaver скачаем jdbc драйвер для Trino, а URL подключения будет выглядеть так

jdbc:trino://localhost:8082/pg_data 

Порт берем из Листинга 2, каталог = pg_data, user = trino, пароль оставим пустым (!)

После настройки соединения выполним запрос к таблице weather_forecast

Рис.2. Dbeaver, панель database navigator, запрос к данным postgres через Trino connector

Рис.2. Dbeaver, панель database navigator, запрос к данным postgres через Trino connector

Все данные на месте. Обратите внимание, что для обращения к нужной таблице в Trino всегда используется трехуровневая иерархия: catalog.schema.table.

Стоит заметить: для того, чтобы выполнить запрос к данным, Trino не требует обязательного JDBC-подключения. Так как он работает поверх HTTP протокола и предоставляет данные по REST API, можно написать простую программу на Python (см. Листинг 4). Здесь мы будем отправлять SQL-запрос напрямую через HTTP и получим результат без использования JDBC-драйвера.

import requestsTRINO_URL = "http://localhost:8082/v1/statement"HEADERS = {    "X-Trino-User": "admin",    "X-Trino-Catalog": "pg_data",    "X-Trino-Schema": "public"}QUERY = "SELECT * FROM pg_data.public.weather_forecast"# Инициируем запросresponse = requests.post(TRINO_URL, headers=HEADERS, data=QUERY)result = response.json()all_data = []# Цикл опроса (пока есть тэг nextUri)while "nextUri" in result:    # Если в этой порции есть данные, сохраняем их    if "data" in result:        all_data.extend(result["data"])        # Переходим к следующей ссылке    next_url = result["nextUri"]    result = requests.get(next_url).json()# Финальная порция данных (когда пропал тэг nextUri)if "data" in result:    all_data.extend(result["data"])print(f"Получено строк: {len(all_data)}")for row in all_data:    print(row)—---------------------------------------------------------Получено строк: 6 [1, 'Moscow', '2026-02-04 06:00:00.000', '4.2', 'cloudy'] [2, 'Moscow', '2026-02-04 09:00:00.000', '5.1', 'cloudy'] [3, 'Moscow', '2026-02-04 12:00:00.000', '6.5', 'partly cloudy'] [4, 'Saint Petersburg', '2026-02-04 06:00:00.000', '3.2', 'snow'] [5, 'Saint Petersburg', '2026-02-04 09:00:00.000', '3.8', 'snow'] [6, 'Saint Petersburg', '2026-02-04 12:00:00.000', '4.5', 'cloudy']

Листинг 4.
Python программа для получения данных из Trino по http протоколу и результат.

2.2.2  Работа с Kafka

Поддержка SQL-совместимых БД не единственный источник данных для Trino. И хотя это может показаться неочевидным, но Trino позволяет написать sql запросы к таким источникам, для которых такой формат взаимодействия с данными не предполагается изначально. Здесь кратко зафиксируем, что nosql базы данных и стриминговые хранилища также могут служить для Trino источниками данных. Теперь рассмотрим пример совместной работы с одним из наиболее популярных брокеров сообщений — Kafka.

Для этого нам потребуется еще один контейнер в Docker — apache Kafka.

  kafka:    image: apache/kafka:3.7.0    container_name: kafka-broker    ports:      - "9092:9092"    environment:      KAFKA_NODE_ID: 1      KAFKA_PROCESS_ROLES: broker,controller      KAFKA_LISTENERS: CONTROLLER://0.0.0.0:29093,BROKER://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:29092,PLAINTEXT_HOST://localhost:9092      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_LOG_DIRS: /var/lib/kafka/data    volumes:      - ./stack/kafka_data:/var/lib/kafka/data    networks:      - trino-network

Листинг 5. Часть файла docker-compose.yaml, отвечающего за Kafka.

А в папку ./stack/trino/catalogs добавлю файл kafka.properties следующего содержания:

connector.name=kafkakafka.nodes=kafka:29092kafka.table-names=tbl_persons,tbl_addresskafka.default-schema=default

Содержание строки с параметром kafka.table-names будет объяснено позже. 

Теперь создадим несколько topic-ов в Kafka и наполним их данными.

1. Для того чтобы работать в контейнере kafka-broker интерактивно подключимся к соответствующему контейнеру

docker exec -it kafka-broker bash

2. Создадим топики tbl_persons и tbl_address

/opt/kafka/bin/kafka-topics.sh —create —topic tbl_persons —bootstrap-server localhost:9092

и

/opt/kafka/bin/kafka-topics.sh —create —topic tbl_address —bootstrap-server localhost:9092

3. Наполним topic-и данными в формате json. Json является самым простым но не единственным форматом, с которым умеет работать Trino. Согласно документации Trino поддерживает следующий список форматов: JSON, CSV, Avro, Protobuf, Raw bytes [6]

Topic tbl_persons

/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic tbl_persons{"person_id":1, "firstname":"John", "lastname":"Smit", "age":30, "create_at":"2026-01-28 15:00:00.000"}{"person_id":2, "firstname":"Anna", "lastname":"Brown", "age":25, "create_at":"2026-01-29 17:00:00.000"}{"person_id":3, "firstname":"Mike", "lastname":"Taylor", "age":42, "create_at":"2026-01-29 12:00:00.000"}

Topic tbl_address

/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic tbl_address{"address_id":1, "city":"Moscow", "street": "Arbat", "house":27, "create_at":"2026-01-28 14:00:00.000"}{"address_id":2, "city":"Saint Petersburg", "street": "Nevsky Prospect", "house":12, "create_at":"2026-01-29 16:00:00.000"}{"address_id":3, "city":"Ekaterinburg", "street": "8 Marta", "house":46, "create_at":"2026-01-29 11:00:00.000"}

Для того чтобы превратить «сырые байты» из Kafka-топика в привычную нам таблицу, на каждый topic потребуются два конфигурационных файла в формате json. Они необходимы для того, чтобы коннектор мог прочитать схему и способ декодирования сообщений из Kafka. Такие два файла должны находиться на локальной машине по пути ./stack/trino/kafka_tbls (см. Листинг 2)

{  "tableName": "tbl_persons",  "schemaName": "default",  "topicName": "tbl_persons",  "key": {    "dataFormat": "raw",    "fields": []  },  "message": {    "dataFormat": "json",    "fields": [      {"name": "id", "type": "integer",   "mapping": "person_id"},      {"name": "firstname", "type": "varchar",   "mapping": "firstname"},      {"name": "lastname",  "type": "varchar",   "mapping": "lastname"},      {"name": "age",       "type": "integer",   "mapping": "age"},      {"name": "create_dt", "type": "TIMESTAMP", "mapping": "create_at", "dataFormat": "custom-date-time", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS"}    ]  }}

Листинг 6.
Конфигурационный файл tbl_persons.json

{  "tableName": "tbl_address",  "schemaName": "default",  "topicName": "tbl_address",  "key": {    "dataFormat": "raw",    "fields": []  },  "message": {    "dataFormat": "json",    "fields": [      {"name": "id",         "type": "integer",   "mapping": "address_id"},      {"name": "city",       "type": "varchar",   "mapping": "city"},      {"name": "street",     "type": "varchar",   "mapping": "street"},      {"name": "house",      "type": "integer",   "mapping": "house"},      {"name": "create_dt",  "type": "timestamp", "mapping": "create_at", "dataFormat": "custom-date-time", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS"}    ]  }}

Листинг 7. Конфигурационный файл tbl_address.json

Теперь мы можем выполнить запрос к данным в topic-ах Kafka как будто работаем с  обычными таблицами.

Рис 3. Dbeaver, панель database navigator, Kafka catalog c подключенными таблицами основанными на соответствующих topic-ах.

Рис 3. Dbeaver, панель database navigator, Kafka catalog c подключенными таблицами основанными на соответствующих topic-ах.

Теперь можем за-join-ить две таблицы т.е. topic-а.

select tp.id     , tp.firstname     , tp.lastname     , tp.age     , tp.create_dt     , ta.city  from kafka.default.tbl_persons tp      , kafka.default.tbl_address ta where tp.id = ta.id   —----------------------id|firstname|lastname|age|create_dt              |city            |--+---------+--------+---+-----------------------+----------------+ 1|John     |Smit    | 30|2026-01-28 15:00:00.000|Moscow          | 2|Anna     |Brown   | 25|2026-01-29 17:00:00.000|Saint Petersburg| 3|Mike     |Taylor  | 42|2026-01-29 12:00:00.000|Ekaterinburg    |

Листинг 8. Join двух таблиц tbl_persons и tbl_address, построенных на соответствующих Kafka topic-ах.

Для тех, кто ранее работал с Kafka важно знать, что SQL-запрос в Trino к Kafka не использует и не сохраняет consumer offset так, как это делает обычный Kafka-consumer. Здесь каждый SQL-запрос — это stateless scan topic-а.

Если мы хотим забирать данные из Kafka источника, то тот факт, что Trino не управляет offset-ами может создать дополнительные риски забрать одни и те же данные дважды или что-то пропустить. Чтобы не попасть в такую ситуацию при выборке данных достаточно воспользоваться служебными полями.

К примеру команда

desc kafka.default.tbl_persons

покажет нам только те поля, которые мы написали при создании таблицы, но «секрет» в том, что это не все поля, к которым вы можете обратиться. Из всего списка служебных полей для создания нужного окна можно воспользоваться timestamp и/или partition_offset.

select *  from kafka.default.tbl_persons  where _timestamp between timestamp '2026-02-13 13:33:15'                       and timestamp '2026-02-13 13:33:21'—--------------------------------------------id|firstname|lastname|age|create_dt              |--+---------+--------+---+-----------------------+ 2|Anna     |Brown   | 25|2026-01-29 17:00:00.000|

Листинг 9. Запрос и выборка данных из Kafka по окну времени загрузки.

Таким образом с помощью «скользящего окна» можно выбирать только те данные, которые соответствуют необходимому вам временному диапазону. А корректно смещая окно, можно получить все необходимые данные ровно один раз.

А чтобы увидеть данные в kafka в неизменном виде (в нашем случае это json формат) достаточно выполнить следующий запрос:

 

select _message

  from kafka.default.tbl_persons

В данном примере была рассмотрена задача чтения данных из Apache Kafka с использованием Trino. Однако Kafka-коннектор Trino поддерживает и обратный сценарий — запись данных в Kafka-топики. Для этого достаточно выполнить оператор INSERT в таблицу, определенную в конфигурации Kafka-коннектора. В процессе выполнения запроса Trino формирует сообщение в соответствии с указанным форматом сериализации (например, JSON) и публикует его в соответствующий Kafka-topic. Структура и формат выходного сообщения определяются конфигурационным файлом описания таблицы (в рассматриваемом примере — tbl_persons.json и tbl_address.json).

Такой механизм позволяет реализовать полноценные конвейеры обработки данных непосредственно средствами SQL. Например, Trino может считывать сырые события из одного Kafka-topic, выполнять агрегирование или другие трансформации за заданный временной интервал и публиковать результат обработки в другой Kafka-topic. Таким образом, Trino может выступать не только как инструмент аналитического доступа к потоковым данным, но и как слой трансформации и маршрутизации данных между Kafka-топиками.

2.2.3 Федеративные запросы к источникам

Федеративный запрос в Trino — это такой SQL-запрос, который объединяет данные из нескольких независимых источников.  Запрос реализуется через разные коннекторы и позволяют превращать разрозненные источники данных в единую виртуальную базу данных для аналитики.

Теперь рассмотрим, как работает федеративный запрос. Поскольку у нас есть два типа источников postgres и kafka мы можем написать такой select, который будет использовать сразу два коннектора.

 В postgres создадим таблицу со структурой аналогичной tbl_persons и зальем данные.

create table public.tbl_persons (  id           integer, firstname    varchar, lastname     varchar, age          integer, create_dt    timestamp);—-------------------------insert into public.tbl_persons (id, firstname, lastname, age, create_dt)  values (4, 'Maria', 'Garcia', 28, cast('2026-01-29 09:15:30.000' as timestamp)),(5, 'David', 'Lee', 35, cast('2026-01-28 23:45:12.000' as timestamp)),(6, 'Sophie', 'Wilson', 19, cast('2026-01-30 08:30:00.000' as timestamp));

Теперь напишем федеративный запрос в Trino

select tp.*, ta.city   from (select id             , firstname             , lastname             , age             , create_dt            from kafka.default.tbl_persons tp2       --FROM KAFKA         union all        select id             , firstname             , lastname             , age             , create_dt          from pg_data.public.tbl_persons tp1     --FROM PG        ) as tp   left outer join kafka.default.tbl_address ta        on tp.id = ta.id   order by id—------------------------------------id|firstname|lastname|age|create_dt              |city            |--+---------+--------+---+-----------------------+----------------+ 1|John     |Smit    | 30|2026-01-28 15:00:00.000|Moscow          | 2|Anna     |Brown   | 25|2026-01-29 17:00:00.000|Saint Petersburg| 3|Mike     |Taylor  | 42|2026-01-29 12:00:00.000|Ekaterinburg    | 4|Maria    |Garcia  | 28|2026-01-29 09:15:30.000|                | 5|David    |Lee     | 35|2026-01-28 23:45:12.000|                | 6|Sophie   |Wilson  | 19|2026-01-30 08:30:00.000|                |

Листинг 10. Пример федеративного запроса в Trino. Используются connector-ы к Kafka и Postgres.

Данный пример демонстрирует, что при реализации Lambda-архитектуры Trino может использоваться как объединяющий слой, выполняющий федеративные SQL-запросы к различным источникам данных.

Такая архитектура может быть реализована в среде современной data-platform, например в инфраструктуре data lake, облачных хранилищах данных или гибридных аналитических системах, где данные поступают из потоковых и пакетных источников. В подобных системах Trino может выступать в роли универсального SQL-движка, обеспечивающего единый доступ к данным, независимо от способа их обработки и хранения.

В рамках Lambda-архитектуры быстрый слой (Speed layer) отвечает за обработку данных в реальном времени и обычно реализуется на основе потоковых платформ, таких как Apache Kafka. Медленный слой (Batch layer) хранит исторические данные, прошедшие полную обработку и проверку консистентности, и может быть реализован на базе аналитических хранилищ данных или data lake-решений.

В такой конфигурации Trino выступает в роли логического моста между слоями архитектуры, выполняя объединение и обработку данных непосредственно во время выполнения запроса. Быстрый слой предоставляет актуальные потоковые данные из Kafka, медленный слой — исторические данные из хранилищ, а итоговый результат формируется динамически посредством единого SQL-запроса, выполняемого Trino.

 

2.3 Transform. Возможности Trino по преобразованию данных

SQL сегодня — это не просто язык запросов к базе данных, это полноценный стандарт для описания логики Data Transformation. Если раньше для сложных преобразований требовался код на Python или Java, то такие инструменты как Trino превратили SQL в мощный инструмент обработки данных.

Основные типы трансформаций в SQL это и структурные изменения, такие как переименование полей, приведение типов (CAST), преобразование данных с помощью функций CONCAT, SUBSTRING, REGEXP_REPLACE и др. Агрегация и группировка – схлопывание сырых транзакций в витрины данных с помощью GROUP BY и функций SUM, AVG, COUNT, денормализация — объединение десятков таблиц через JOIN для создания плоских моделей, Очистка данных: Обработка пустых значений (COALESCE, NULLIF), фильтрация шума через WHERE и HAVING.

Даже на фоне современных SQL-движков Trino уверенно выделяется за счёт набора продвинутых возможностей. Его SQL-диалект не ограничивается классическим ANSI-синтаксисом: он расширен конструкциями, которые либо полностью отсутствуют в большинстве популярных СУБД, либо реализованы в значительно более упрощённом виде. Многие из этих инструментов позволяют решать аналитические задачи «на лету», без предварительного ETL и промежуточных преобразований данных.

2.3.1 Примеры продвинутых SQL запросов в Trino

Далее рассмотрим несколько нетипичных примеров, которые показывают возможности SQL в Trino. Они демонстрируют, как с помощью встроенных функций и расширенных возможностей SQL можно выполнять сложную обработку и преобразование данных прямо в запросах. В обычных реляционных СУБД для таких задач часто требуется дополнительный код, процедурная логика или отдельный этап подготовки данных.

1. Функция transform в Trino — это одна из самых часто используемых функций при работе с массивами (array). Она позволяет применить произвольное преобразование (lambda-выражение) к каждому элементу массива и вернуть новый массив с результатами [7]. 

WITH data AS (-- Преобразует строку в тип JSON    SELECT json_parse('[1,2,3,4]') AS js  -- создает временную таблицу data с одним столбцом js)  -- применяет лямбда-функцию к каждому элементу массиваSELECT transform( --преобразование в полноценный ARRAY            CAST(data.js AS ARRAY(INTEGER)), -- лямбда-функция           x -> x * 10       ) AS multiplied  from data;—---------------------------------------multiplied   |-------------+{10,20,30,40}|

Листинг 11. Пример использования SQL функции transform в Trino.

2. MATCH_RECOGNIZE является частью SQL стандарта, который был официально включён в SQL:2016. Выражение MATCH_RECOGNIZE позволяет выполнять поиск паттернов во временных рядах и находить определенные последовательности событий. [8]. Для тех, кто пользовался услугами web-маркетов будет понятна следующая задача. Допустим, в базе данных есть таблица order_status_log (order_id — id заказа, status — статус заказа и status_time — время изменения статуса), где фиксируется статус заказа. Заказ должен проходить последовательность статусов «Создан» → «Оплачен» → «Отсортирован» → «В пути на СЦ»→ «Доставлен». При этом, часть цепочки «Отсортирован» → «В пути на СЦ» может повторяться несколько раз. Нужно написать такой селект, который найдет те заказы, у которых эта последовательность нарушена. Такая задача в Trino может быть решена следующим образом.

WITH order_status_log AS (    SELECT * FROM (VALUES      (10, 10, 'Создан', '2026-01-01')    , (20, 10, 'Оплачен', '2026-02-01')    , (30, 10, 'Отсортирован', '2026-03-01')    , (40, 10, 'В пути на СЦ', '2026-04-01')    , (50, 10, 'Отсортирован', '2026-05-01')    , (60, 10, 'В пути на СЦ', '2026-06-01')    , (70, 10, 'Доставлен', '2026-07-01')    ) AS t(id, order_id, status, status_time))    , correct_orders AS (    SELECT order_id    FROM order_status_log        MATCH_RECOGNIZE (        PARTITION BY order_id        ORDER BY status_time        ONE ROW PER match        PATTERN (^ A B (C D)+ E $)        DEFINE            A AS status = 'Создан',            B AS status = 'Оплачен',            C AS status = 'Отсортирован',            D AS status = 'В пути на СЦ',            E AS status = 'Доставлен'    ))SELECT DISTINCT l.order_idFROM order_status_log lWHERE NOT EXISTS (    SELECT 1    FROM correct_orders c    WHERE c.order_id = l.order_id)

Листинг 12.

MATCH_RECOGNIZE проверяет соответствует ли лог заказа шаблону (^ A B (C D)+ E $) «Создан» (один)-> «Оплачен» (один)-> («Отсортирован» → «В пути») (один или более) -> ‘Доставлен’(один) Если нет, выводит orderid неправильной последовательности

2.3.2 SQL-процедуры в Trino

В Trino есть поддержка SQL-процедур, что можно рассматривать как упрощённый аналог PL/SQL. Хотя по уровню зрелости и глубине функционала это не полная замена PL/SQL (В Trino процедуры не могут использовать SELECT запросы для извлечения данных или любые другие запросы для обработки данных внутри UDF), сам факт появления процедур существенно расширяет модель использования Trino. Это означает, что сложную цепочку действий можно оформить как единый исполняемый блок. Процедуры позволяют перенести часть бизнес-логики внутрь SQL-слоя, улучшить повторное использование логики [9].

Пример. Создаем функцию calc_discount, которая на основе суммы покупки (purchase_amount) и категории клиента (customer_category) рассчитывает итоговую скидку. Функция использует логику CASE внутри блока BEGIN … END.

CREATE FUNCTION memory.default.calc_discount(  purchase_amount   double, customer_category varchar)RETURNS doubleBEGIN    DECLARE discount double DEFAULT 0.0;    -- Базовая скидка в зависимости от категории    SET discount = CASE customer_category        WHEN 'VIP' THEN 0.20        WHEN 'REGULAR' THEN 0.10        WHEN 'NEW' THEN 0.05        ELSE 0.0    END;    -- Дополнительная скидка, если сумма покупки большая    IF purchase_amount > 1000 THEN        SET discount = discount + 0.05; -- дополнительные 5%    END IF;    -- Максимальная скидка не может превышать 25%    IF discount > 0.25 THEN        SET discount = 0.25;    END IF;    -- Возвращаем сумму скидки, округленную до копеек    RETURN ROUND(purchase_amount * discount, 2); END

Теперь применим только что созданную функцию для написания запроса

WITH purchases AS (    SELECT * FROM (VALUES      (1, 200, 'NEW'),      (1, 500, 'REGULAR'),      (1, 1500, 'VIP'),      (2, 1300, 'NEW'),      (2, 800, 'REGULAR'),      (2, 200, 'VIP')   ) AS t(customer_id, purchase_amount, category))SELECT customer_id     , purchase_amount     , category     , memory.default.calc_discount(purchase_amount, category) AS discount_amount  FROM purchases;—---------------------------------------customer_id|purchase_amount|category|discount_amount|-----------+---------------+--------+---------------+          1|            200|NEW     |           10.0|          1|            500|REGULAR |           50.0|          1|           1500|VIP     |          375.0|          2|           1300|NEW     |          130.0|          2|            800|REGULAR |           80.0|          2|            200|VIP     |           40.0|

Листинг 13. Пример использования пользовательской функции calc_discount.

Обратите внимание что пользовательская функция calc_discount была создана в каталоге memory (см. рис. 2). Для подключения данного коннектора потребуется memory.properties файл, расположенный по тому же пути где и другие конфиг файлы достаточно простого содержания.

connector.name=memory

Особенность в том, что не каждый коннектор поддерживает функционал создания кастомных функций. На практике чаще всего используется именно memory, потому что он поставляется «из коробки». Но у данного коннектора есть одна особенность о которой нужно помнить. Все объекты, созданные в каталоге memory, хранятся исключительно в оперативной памяти координатора Trino. Это относится к данным таблиц, SQL-функциям и всем связанным метаданным. Такие объекты существуют только в течение времени работы кластера. После перезапуска Trino содержимое каталога memory полностью очищается.

2.3.3 UDF функции на языках Python и Java

Python UDF (user-defined functions) в Trino — это относительно новая функция, позволяющая писать скалярные функции на Python 3.13.0 (версия Trino 479). Они выполняются в sandboxed-окружении на базе WebAssembly (WASM) внутри JVM, что обеспечивает безопасность, но вводит некоторые ограничения. Производительность такого подхода ниже, чем при использовании sql или java UDF.  Из sandboxed нет доступа к внешним ресурсам: сетевым операциям, файловой системе, подпроцессам или внешним API. Нет возможности устанавливать пакеты через pip или использовать внешние инструменты, доступен только ряд предустановленных библиотек [10]. Такой подход делает непригодным python UDF для задач, требующих интеграции с внешними сервисами.

Вместе с тем если ваши вычисления требуют сложных алгоритмов, которые на SQL превращаются в крайне перегруженный код или вообще не реализуемы Python придет на помощь. Вы просто пишете функцию на привычном языке Python.

CREATE FUNCTION memory.default.validate_json (a_schema json, a_data json)RETURNS booleanLANGUAGE PYTHONWITH (handler='validate_json')AS $$import jsonfrom jsonschema import validate, ValidationErrordef validate_json(a_schema: str, a_data: str) -> bool:  l_schema = json.loads(a_schema)  l_data = json.loads(a_data)  try:    validate(instance=l_data, schema=l_schema)    return True  except ValidationError as e:    return False$$

Листинг 14. Пример создания UDF функции на языке Python.

Функция validate() из пакета jsonschema используется для проверки JSON-данных на соответствие JSON Schema. Это одна из самых часто используемых функций библиотеки при работе с валидацией структурированных данных в Python. Она удобна тем, можно динамически менять правила записанные в JSON Schema в зависимости от того с какой таблицей мы работаем или какой набор правил актуален на сегодня. Написание такой валидации на чистом sql задача совсем не простая, да и нет в этом необходимости, так как можно воспользоваться уже готовым и проверенным решением.

WITH json_samples AS (    SELECT * FROM (VALUES     -- набор JSON файлов для проверки      (json_parse('{"id":1,"name":"Konstantin","email":"konstantin@example.com","age":35}')),      (json_parse('{"id":2,"name":"K","email":"konstantin@example.com","age":15}')),      (json_parse('{"id":-3,"name":"Konstantin","email":"not-an-email","age":19}'))   ) AS t(json_data) ) , json_rules AS (   SELECT * FROM (VALUES     -- схема (правила проверки)     (json_parse('{ "type": "object",                       "required": ["id", "name", "email", "age"],                       "properties": {                         "id": {"type": "integer", "minimum": 1 },                         "name": {"type": "string", "minLength": 2},                         "email": {"type": "string", "format": "email"},                         "age": {"type": "integer", "minimum": 18, "maximum": 100}                       },                       "additionalProperties": false                   }'))   ) AS t(json_rule) ) SELECT json_data        , memory.default.validate_json(json_rule, json_data) validate     FROM json_samples        , json_rules—-------------------------------------------------------------------------------json_data                                                             |validate|----------------------------------------------------------------------+--------+{"id":1,"name":"Konstantin","email":"konstantin@example.com","age":35}|true    |{"id":2,"name":"K","email":"konstantin@example.com","age":15}         |false   |{"id":-3,"name":"Konstantin","email":"not-an-email","age":19}         |false   |

Листинг 15. Пример использования UDF функции на языке Python.

Python UDF — это отличное дополнение для быстрого экспериментирования и менее нагруженных сценариев, но не замена Java в продакшене на больших данных. Java UDF по-прежнему самый правильный и рекомендуемый подход для серьёзных пользовательских функций [11]. такой подход обеспечивает наиболее высокую производительность и рациональное потребление ресурсов. С примерами написания UDF на Java можно ознакомиться здесь [12, 13, 14].

 3. Load. Загрузка данных

3.1 Создание таблицы из результата запроса

Загрузка (Load) — это заключительный этап ETL-процесса, на котором преобразованные данные физически записываются в целевую систему хранения: базу данных, хранилище данных, витрину или потоковую платформу. Именно на этапе загрузки данные становятся доступными для аналитики.

            Самый быстрый способ загрузки данных в Trino это CTAS (CREATE TABLE AS SELECT). В этом случае, во-первых, Trino сразу знает весь объем работы и схему данных, а во-вторых, при выполнении задачи записи данных будут задействованы все Worker-ноды. Каждая нода возьмет свою порцию данных из источника, трансформирует её и запишет в свой собственный файл в целевое хранилище (допустим MinIO/S3).

Trino действительно считается одним из самых удачных и часто рекомендуемых инструментов для работы с современным табличным форматом Apache Iceberg [15]

Для работы с Iceberg добавим в файл docker-compose.yaml несколько ключевых сервисов:

1) minio — объектное хранилище, совместимое с S3, куда будут записываться сами данные таблиц Iceberg и их метаданные;

2) minio/mc (MinIO Client) — инструмент для создания бакетов и начальной инициализации хранилища;

3) pg-metastore (PostgreSQL) — база данных для хранения самого метахранилища Hive.

4) hive-metastore — служба, которая хранит метаданные таблиц Iceberg, схемы, партиции и т.д. Trino использует Hive Metastore как каталог для Iceberg;

  # ---------------- MinIO ----------------  minio:    image: minio/minio:latest    container_name: minio    command: server /data --console-address ":9001"    restart: unless-stopped    environment:      MINIO_ROOT_USER: minioadmin      MINIO_ROOT_PASSWORD: minioadmin123    ports:      - "9500:9000"      - "9501:9001"    volumes:      - ./stack/minio_data:/data    networks:      - trino-network  # ---------------- Create warehouse bucket ----------------  mc:    image: minio/mc:latest    container_name: mc    depends_on:      - minio    entrypoint: >      /bin/sh -c "      sleep 10 &&      mc alias set minio http://minio:9000 minioadmin minioadmin123 &&      mc mb minio/warehouse --ignore-existing &&      mc anonymous set public minio/warehouse &&      tail -f /dev/null      "    networks:      - trino-network  # ---------------- PostgreSQL (Hive Metastore DB) ----------------  pg-metastore:    image: postgres:16    container_name: pg-metastore    restart: unless-stopped    environment:      POSTGRES_DB: metastore      POSTGRES_USER: hive      POSTGRES_PASSWORD: hive123    ports:      - "5433:5432"    volumes:      - ./stack/pg-metastore:/var/lib/postgresql/data    networks:      - trino-network  # ---------------- Hive Metastore ----------------  hive-metastore:    image: apache/hive:3.1.3    container_name: hive-metastore    restart: unless-stopped    depends_on:      - pg-metastore    environment:      SERVICE_NAME: metastore      HIVE_METASTORE_DB_TYPE: postgres      DB_DRIVER: postgres      DB_HOST: pg-metastore      DB_PORT: 5432      DB_NAME: metastore      DB_USER: hive      DB_PASSWORD: hive123    ports:      - "9083:9083"    networks:      - trino-network

Листинг 16. Часть файла docker-compose.yaml, отвечающего за работу с Apache Iceberg.

Нам потребуется настроить коннектор к Apache Iceberg, за это отвечает файл iceberg.properties (полный путь .\stack\trino\catalogs\iceberg.properties), содержание некоторых настроек будет понятно позже

connector.name=icebergiceberg.catalog.type=nessieiceberg.nessie-catalog.uri=http://nessie:19120/api/v2iceberg.nessie-catalog.ref=mainiceberg.nessie-catalog.default-warehouse-dir=s3://warehouse/fs.native-s3.enabled=trues3.endpoint=http://minio:9000s3.path-style-access=trues3.aws-access-key=minioadmins3.aws-secret-key=minioadmin123s3.region=us-east-1

Листинг 17. Содержание конфиг файла — для connector-а iceberg — iceberg.properties.

Теперь можем создать таблицу в формате Iceberg в соответствующей схеме (ее нужно будет предварительно создать). Используем механизм CTAS для создания таблицы и наполнения ее данными.

Во-первых создадим схему wh в каталоге iceberg

CREATE SCHEMA iceberg.wh;

Во вторых, выполним следующий скрипт

CREATE TABLE iceberg.wh.tbl_personsWITH (    format = 'PARQUET',    partitioning = ARRAY['age'])ASSELECT *  FROM pg_data.public.tbl_persons;

Листинг 18. Простейший пример создания таблицы в формате Apache Iceberg с одновременным наполнением ее данными из postgres источника.

И хотя сам вид селекта в данном конкретном примере выглядит максимально просто, в реальности он может включать в себя все то, что было описано ранее, а именно  федеративные запросы к разным источникам и комплексные join-ы и сложные преобразования данных как с помощью UDF sql/python/java функций.

 

3.2 Важные настройки Trino для записи данных

Для просмотра текущих значений параметров сеанса и конфигурации в Trino используется следующий запрос.

 

SHOW SESSION;

 

Или с фильтром:

SHOW SESSION LIKE ‘%memory%’;

 

В рамках сессии некоторые из параметров могут быть модифицированы, так команда SET SESSION param_key = param_value, позволит вам максимально эффективно настроить Trino под нужную вам задачу.  

При записи данных (INSERT) в формате Apache Iceberg важно обратить внимание на некоторые параметры Trino. Некоторые из них имеют принципиальное значение влияющее на скорость записи данных. iceberg.target_max_file_size — данный параметр определяет размер итоговых data files. Рекомендуемое значение этого параметра — 256MB – 512MB, это позволит избежать создания большого количества малых файлов.

 

Пример. SET SESSION iceberg.target_max_file_size = ‘512MB’

 

В распределенных системах хранения данных, размер файла имеет критическое значение для производительности: если этот параметр будет слишком маленьким (~ 1 MB), Trino создаст тысячи крошечных файлов. Тогда, при чтении такой таблицы Trino (и другие движки) будет тратить больше времени на открытие/закрытие файлов и чтение метаданных, чем на обработку самих данных. Это будет резко замедлять выборку (SELECT). Форматы вроде Parquet и ORC используют колоночное сжатие. В больших файлах данных больше повторяющихся значений, а значит и алгоритмы сжатия работают более эффективно. Вместе с тем, очень большое значение параметра iceberg.target_max_file_size может привести к проблеме, так как памяти может просто не хватить на такую операцию и все закончится ошибкой Out of Memory.

            После загрузки большого объема данных желательно выполнить оптимизацию. [16]

 

Пример

ALTER TABLE iceberg.wh.tbl_persons

EXECUTE optimize(file_size_threshold => ‘512MB’);

 

Еще один набор параметров на который стоит обратить внимание при настройке производительности Load операций — это task_min_writer_count — минимальное количество writer-ов  на одном worker-е на одну задачу, task_max_writer_count максимальное количество writer-ов на задачу. Данные параметры влияют на сценарии параллельной записи только при task_scale_writers_enabled = true

Рекомендуемые параметры для типичных сценарием записи данных такой

task_min_writer_count от 1-2

task_max_writer_count не более 64 (зависит от CPU, часто 16 — хороший компромисс)

task_scale_writers_enabled = true

 

Еще два параметра важны при записи (CTAS, Insert)

redistribute_writes = false — если неравномерность распределения данных по нодам минимальна

use_preferred_write_partitioning = false — если происходит запись в непартицированую таблицу.

3.3 Системные таблицы метаданных в Trino

При загрузке данных в целевую таблицу очень важно понимать её внутреннюю структуру: какие есть партиции, сколько файлов, какие snapshot-ы активны, как устроен branch-инг и т.д. Trino с коннектором Iceberg предоставляет удобный и мощный способ получить всю эту информацию — через системные таблицы метаданных. Каждая таблица Iceberg в Trino имеет набор специальных «виртуальных» таблиц метаданных. Чтобы к ним обратиться, достаточно добавить к имени основной таблицы знак $ и название нужной таблицы метаданных.

Приведу несколько примеров.

В Trino вы можете посмотреть структуру таблицы

SELECT * FROM iceberg.wh.»tbl_persons$properties»

key                 |value                                                           |--------------------+----------------------------------------------------------------+format              |iceberg/PARQUET                                                 |provider            |iceberg                                                         |current-snapshot-id |3717634582152414119                                             |location            |s3://warehouse/wh/tbl_persons-0271f7868e1e497fac2facb0788e03c9  |format-version      |2                                                               |nessie.commit.id    |5514a32d98ff6eb39b71098aad2d3e323486035f68d68499c7a7d9b26398a09c|write.format.default|PARQUET                                                         |nessie.gc.no-warning|true                                                            

В $history таблице представлен журнал изменений метаданных, внесенных в таблицу Iceberg

SELECT * FROM iceberg.wh.»tbl_persons$history»

made_current_at              |snapshot_id        |parent_id|is_current_ancestor|-----------------------------+-------------------+---------+-------------------+2026-03-21 16:24:32.401 +0300|3717634582152414119|         |true               |

Ряд других важных данных о структуре таблице и ее истории можно посмотреть соответствующими запросами:

  • метаданные таблицы — iceberg.wh.»tbl_persons$metadata_log_entries»

  • snapshot-ы таблицы — iceberg.wh.»tbl_persons$snapshots»

  • манифесты таблицы  — iceberg.wh.»tbl_persons$manifests»

  • партиции таблицы — iceberg.wh.»tbl_persons$partitions»

  • файлы данных — iceberg.wh.»tbl_persons$files»

И так далее. Подробней вы можете прочитать в официальной документации — [17]

 

3.4 Использование branching (ветвления) в Apache Iceberg с точки зрения Trino

В Apache Iceberg механизм branching (ветвления) был введен для того, чтобы работать с таблицами по принципу, похожему на работу с кодом в системах контроля версий, например в Git. В аналитических системах часто существуют большие таблицы с ценными данными, которые необходимо регулярно обновлять, но при этом важно не повредить основную версию данных. Один из распространенных подходов — сначала загружать данные во временные таблицы, проверять их, а затем переносить в основную таблицу. Однако использование веток позволяет сделать этот процесс более безопасным и удобным: можно создать branch от основной таблицы и выполнять все загрузки, преобразования и проверки данных в изолированной ветке. Если проверка не проходит, ветку можно просто удалить, и основная таблица останется без изменений.

У такого подхода есть несколько важных преимуществ. Во-первых, создание ветки происходит очень быстро и почти не требует дополнительного места, поскольку создается лишь новая ссылка на текущий snapshot таблицы, а сами файлы данных не копируются. Во-вторых, изменения в ветке полностью изолированы: основная таблица не изменяется до тех пор, пока изменения из ветки явно не будут объединены с основной веткой с помощью операции merge. Кроме того, ветки позволяют строить более надежные ETL-процессы: данные можно полностью подготовить и проверить в отдельной ветке, а затем атомарно применить изменения к основной таблице. Это позволяет обновлять большие таблицы без использования временных staging-таблиц и снижает риск повреждения данных в production-системах.

Чтобы воспользоваться механизмом branching (ветвления) в Apache Iceberg, потребуется дополнительный компонент — Project Nessie [18].

На март 2026 года Trino не поддерживает создание, слияние, удаление или прямое управление ветками и тегами Project Nessie на уровне SQL при использовании Nessie-каталога. Поэтому все операции с ветками (создание, слияние, удаление и т.д.) выполняются вне Trino — с использованием внешних инструментов, таких как CLI или REST API Nessie. Для автоматизации этих процессов можно использовать оркестратор, например Apache Airflow. В Airflow выполнение SQL-запросов в Trino осуществляется с помощью оператора TrinoOperator, входящего в официальный provider-пакет для интеграции с Trino. Взаимодействие с Nessie при этом может быть организовано через его REST API, например с использованием операторов HttpOperator или PythonOperator. Таким образом, управление версиями данных (ветками и тегами) выносится за пределы Trino и реализуется на уровне оркестрации, что позволяет строить полноценные пайплайны обработки данных с использованием Trino, Nessie и Airflow.

Trino может только читать данные из определенной ветки (допустим main), если в свойствах каталога указать iceberg.nessie-catalog.ref=main (см. Листинг 17).

Чтобы мы могли видеть ветку «etl» в Trino и Dbeaver нам потребуется файл аналогичный iceberg.properties. Для этого, в той же папке (.\stack\trino\catalogs) создадим файл iceberg_etl.properties, а единственным отличием от iceberg.properties будет строка с параметром iceberg.nessie-catalog.ref — указателем на etl ветку, т.е. теперь он будет выглядеть следующим образом iceberg.nessie-catalog.ref=etl.

  nessie:    image: ghcr.io/projectnessie/nessie:0.107.3    container_name: nessie    ports:      - "19120:19120"    environment:      - QUARKUS_HTTP_PORT=19120      - NESSIE_VERSION_STORE_TYPE=ROCKSDB      - NESSIE_VERSION_STORE_PERSIST_ROCKS_DATABASE_PATH=/nessie/data    volumes:      - ./stack/nessie-data:/nessie/data    networks:      - trino-network  nessie-cli:    image: ghcr.io/projectnessie/nessie-cli:0.107.3    container_name: nessie-cli    stdin_open: true    tty: true           networks:      - trino-network    depends_on:      - nessie    command: >      sh -c "      echo 'Connecting to Nessie...' &&      echo 'CONNECT TO http://nessie:19120/api/v2;' | /opt/nessie-cli/bin/nessie-cli      "

Листирнг 19. Часть файла docker-compose.yaml, отвечающего за работу с Nessie.

В главе 3.1 мы создали таблицу iceberg.wh.tbl_persons, по умолчанию она находится в ветке main. Теперь создадим ее копию — ветку etl при помощи nessie-cli. Для этого подключимся к контейнеру nessie-cli

docker run -it —rm —network trino-shared-net ghcr.io/projectnessie/nessie-cli:0.107.3,

здесь trino-shared-net имя сети.

Рис. 4. Окно приветствия Nessie cli в случае удачного подключения к Nessie 

Рис. 4. Окно приветствия Nessie cli в случае удачного подключения к Nessie 

Попадаем в интерактивную оболочку контейнера Nessie CLI, где можно подключиться с Nessie.

Nessie> CONNECT TO http://nessie:19120/api/v2;

После подключения попадаем в клиентскую консоль, где уже и можно создать новую ветку etl.

main> CREATE BRANCH etl FROM main

Теперь вернемся в dbeaver, и если раньше при подключении к коннектору iceberg_etl мы видели что никаких данных нет, теперь там те же схемы и таблицы что и в каталоге iceberg. Да, select к таблицах iceberg.wh.tbl_persons и iceberg_etl.wh.tbl_persons показывает что это одни и те же данные (см. рис. 5).

Рис. 5. Dbeaver, панель database navigator, iceberg и iceberg_etl catalog-и c ветками main и etl для таблицы tbl_persons.

Рис. 5. Dbeaver, панель database navigator, iceberg и iceberg_etl catalog-и c ветками main и etl для таблицы tbl_persons.

Теперь добавим данные в ветку etl

INSERT INTO iceberg_etl.wh.tbl_persons

SELECT *

  FROM kafka.default.tbl_persons;

Теперь таблица iceberg_etl.wh.»tbl_persons$history» выглядит по другому

made_current_at              |snapshot_id        |parent_id          |is_current_ancestor|-----------------------------+-------------------+-------------------+-------------------+2026-03-21 16:24:32.401 +0300|3717634582152414119|                   |true               |2026-03-21 17:00:20.422 +0300|8493246017612782479|3717634582152414119|true               |

Видим, что в историю добавилась запись, ответственная за новый snapshot. Таких операций по добавлению/изменению может быть несколько. После того как данные в полном объеме будут добавлены в ветку etl можно приступать к их проверке. Если проверка правильности/консистентности не будет пройдена мы должны будем удалить branch, т.е. выполнить команду

main> drop branch etl

После чего начать все сначала: создание branch, наполнение данными, проверка. Если же все ок, то следующим шагом выполняем команду merge.

main> MERGE BRANCH etl INTO main;

Теперь у нас в основной таблице iceberg.wh.tbl_persons полный набор проверенных данных. Важно, что пока в брaнч-таблице выполняются запись и проверка, таблица в ветке main должна оставаться без изменений. Если допустить что в основную таблицу за время работы с branch тоже будут добавлены данные, то при попытке выполнить команду MERGE вы получите ошибку — конфликт (values of existing and expected content for key ‘wh.tbl_persons’ are different).  Здесь полный аналог с работой git.

Как правильно поступать в случае конфликта — вопрос, требующий отдельного рассмотрения. Скажу только, что самым простым вариантом является игнорирование записей в основной таблице появившиеся после create branch, тогда можно принудительно обновить основную таблицу данными из branch.

main> MERGE BRANCH etl INTO main BEHAVIOR FORCE;

Итак, команда MERGE позволяет безопасно и атомарно менять данные в таблице Iceberg. Благодаря механизму snapshot isolation (изоляции снимков) всё происходит без блокировок и почти без остановки работы. В этом случае данные не копируются, а создается лишь снимок (snapshot) с изменениями. Новый snapshot станет частью основной таблицы только в случае успешно выполненной команды commit (merge), а пока этого не случилось все пользователи будут работать со старой версией.

4. Заключение

Чаще всего Trino используется как инструмент для решения аналитических задач, что не требует радикальной трансформации существующего ИТ-ландшафта. Он может быть интегрирован в уже сформированную архитектуру хранения и обработки данных, выступая в роли дополнительного слоя доступа. В таком подходе Trino обеспечивает высокопроизводительную обработку аналитических запросов и повышает качество отчетности. Однако подобный сценарий лишь частично раскрывает потенциал данного инструмента.

Богатые функциональные возможности Trino формируют новые подходы к реализации процессов извлечения, преобразования и загрузки данных (ETL/ELT). В рамках статьи были рассмотрены лишь некоторые задачи, которые он способен решать, и это далеко не полный перечень его возможностей. Нативная поддержка современных форматов хранения данных, таких как Apache Iceberg, позволяет эффективно работать с архитектурами Data Lakehouse. Поддержка современных оркестраторов упрощает интеграцию Trino в ETL/ELT-процессы, а наличие широкого набора коннекторов, включая интеграции с AI-сервисами, открывает возможности для решения более продвинутых задач.

В то же время Trino не всегда является оптимальным решением для реализации сложной кастомной логики обработки данных, особенно в случаях, когда требуется процедурное программирование, реализация сложных алгоритмов или интеграция с внешними библиотеками. В таких сценариях более подходящим выбором может стать Apache Spark.

При этом Trino активно развивается, а его экосистема поддерживается большим и активным сообществом. Регулярные обновления расширяют возможности SQL-движка, улучшают механизмы оптимизации запросов и средства расширения функциональности. В результате можно утверждать, что Trino постепенно эволюционирует от распределённого SQL-движка к универсальному вычислительному слою аналитических платформ.

ссылка на оригинал статьи https://habr.com/ru/articles/1031326/