Не зря технология называется Iceberg, потому что с первого взгляда кажется – простой инструмент, а оказывается что столько всего скрыто, что можно изучать и изучать.
В данной статье я хотел бы пробежаться по основным моментам, которые будут интересны дата-инженерам.
В статье мы рассмотрим зачем это нужно, когда это нужно, как это можно применить на практике и конечно же рассмотрим несколько примеров использования Apache Iceberg.
Введение
Apache Iceberg является более лучшей версией Hive Meta Store (HMS).
HMS имеет множество проблем, которые уже давно известны в дата-инженерии:
-
Привязка к HDFS
-
Имеет трудности при изменении модели данных
-
etc
Так как Apache Iceberg является улучшением концепции HMS, то он имеет ряд преимуществ:
-
Изменение модели данных «на лету«, без лишних перегрузок данных.
-
Версионирование данных после каждого изменения.
-
Ветки при работе с данными.
-
Путешествие «во времени» в данных.
-
Можно использовать почти любой вычислитель (compute).
Я думаю, что на этом моменте стоить закончить с теорией и перейти к практике. Но если у вас есть какие-то вопросы по инструменту, то рекомендую обращаться как всегда к документации.
Стоит держать в уме, что там не описаны все краевые случаи, но точно сможете найти ответы на общие вопросы.
Также стоит отметить, что Iceberg является продуктом Apache и поэтому все инструменты, которые используются в нём тоже поддерживаются Apache (ну почти): .avro
, .parquet
, Spark
, etc.
По каждому из инструментов вы сможете также легко найти документацию. А я постараюсь ниже обратить внимание на какие-то моменты, которые нашёл при исследовании Apache Iceberg.
Поднятие сервисов
Для поднятия всех сервисов я использовал docker-compose из — Quickstart. Я его немного модифицировал под себя, но об этом ниже.
Весь код также доступен в моём репозитории.
docker-compose.yaml
version: '3.8' services: postgres: image: postgres:13 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: iceberg ports: - "5432:5432" networks: - iceberg_net rest: image: tabulario/iceberg-rest:1.6.0 container_name: iceberg-rest networks: iceberg_net: ports: - "8181:8181" environment: - AWS_ACCESS_KEY_ID=minioadmin - AWS_SECRET_ACCESS_KEY=minioadmin - AWS_REGION=us-east-1 - CATALOG_WAREHOUSE=s3://warehouse/ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - CATALOG_S3_ENDPOINT=http://minio:9000 - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres - CATALOG_JDBC_DRIVER=org.postgresql.Driver - CATALOG_JDBC_USER=postgres - CATALOG_JDBC_PASSWORD=postgres depends_on: - postgres minio: image: minio/minio:RELEASE.2024-07-04T14-25-45Z restart: always command: server /data --console-address ":9001" volumes: - ./data:/data environment: - MINIO_ROOT_USER=minioadmin - MINIO_ROOT_PASSWORD=minioadmin - MINIO_DOMAIN=minio ports: - "9000:9000" # MinIO S3 API - "9001:9001" # MinIO Console networks: iceberg_net: aliases: - warehouse.minio mc: depends_on: - minio image: minio/minio:RELEASE.2024-07-04T14-25-45Z container_name: mc networks: iceberg_net: environment: - AWS_ACCESS_KEY_ID=minioadmin - AWS_SECRET_ACCESS_KEY=minioadmin - AWS_REGION=us-east-1 entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minioadmin minioadmin) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " spark-iceberg: image: tabulario/spark-iceberg:3.5.1_1.5.0 container_name: spark-iceberg build: spark/ networks: iceberg_net: depends_on: - rest - minio volumes: - ./warehouse:/home/iceberg/warehouse - ./notebooks:/home/iceberg/notebooks/notebooks environment: - AWS_ACCESS_KEY_ID=minioadmin - AWS_SECRET_ACCESS_KEY=minioadmin - AWS_REGION=us-east-1 ports: - "8888:8888" - "8080:8080" - "10000:10000" - "10001:10001" networks: iceberg_net:
На что здесь стоит обратить внимание:
-
Я указал версию для образов (на октябрь 2024).
-
AWS_ACCESS_KEY_ID
иAWS_SECRET_ACCESS_KEY
заменил наminioadmin
для удобства. -
Убрал из скрипта команду:
/usr/bin/mc rm -r --force minio/warehouse;
Если данную строку вернуть в docker-compose, то при каждому перезапуске проекта будет очищаться хранилище. -
Добавил сохранение мета-данных в PostgreSQL.
Meta-store
Как я указал в последнем пункте: «Добавил сохранение мета-данных в PostgreSQL«. В оригинальном docker-compose
создания мета-стора выглядит так:
... rest: image: tabulario/iceberg-rest container_name: iceberg-rest networks: iceberg_net: ports: - 8181:8181 environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - CATALOG_WAREHOUSE=s3://warehouse/ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - CATALOG_S3_ENDPOINT=http://minio:9000 ...
Если создать с указанным environment
, то ваш meta-store будет создаваться в формате memory
.
Данная команда будет выполнена в оригинальном образе:
CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
И ваш meta-store будет доступен внутри контейнера в папке tmp/
iceberg@e2d60003b684:/tmp$ ls -lah total 1.1M drwxrwxrwt 1 root root 4.0K Oct 7 10:58 . drwxr-xr-x 1 root root 4.0K Oct 7 06:43 .. drwxr-xr-x 2 iceberg iceberg 4.0K Oct 7 06:43 hsperfdata_iceberg -rw-r--r-- 1 iceberg iceberg 20K Oct 7 10:58 'iceberg_rest_mode=memory' -rwxr--r-- 1 iceberg iceberg 1.1M Oct 7 06:43 sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so -rw-r--r-- 1 iceberg iceberg 0 Oct 7 06:43 sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so.lck
Я хотел контролировать meta-store и беспрепятственно его просматривать, поэтому изменил сборку образа на такую:
... postgres: image: postgres:13 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: iceberg ports: - "5432:5432" networks: - iceberg_net rest: image: tabulario/iceberg-rest:1.6.0 container_name: iceberg-rest networks: iceberg_net: ports: - "8181:8181" environment: - AWS_ACCESS_KEY_ID=minioadmin - AWS_SECRET_ACCESS_KEY=minioadmin - AWS_REGION=us-east-1 - CATALOG_WAREHOUSE=s3://warehouse/ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - CATALOG_S3_ENDPOINT=http://minio:9000 - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres - CATALOG_JDBC_DRIVER=org.postgresql.Driver - CATALOG_JDBC_USER=postgres - CATALOG_JDBC_PASSWORD=postgres depends_on: - postgres ...
Важно: Логин и пароль в сборке открыты, но для pet-задач этого будет достаточно.
Взаимодействие с Apache Iceberg
Основным способом для взаимодействия с Apache Iceberg является Apache Spark, но я хотел бы уделить немного времени ещё парочки инструментов.
Информация про Spark будет ниже.
API
Начнём с того что с Iceberg можно взаимодействовать при помощи официального API.
Вся спецификация API доступна по ссылке – Iceberg REST Open API specification.
Получить список namespaces
:
curl http://localhost:8181/v1/namespaces
Получить список таблиц:
curl http://localhost:8181/v1/namespaces/default/tables
DuckDB
У DuckDB существует расширение Iceberg Extension.
Стоит сразу отметить несколько моментов при работе DuckDB с Iceberg:
-
Расширение находится в «сыром» виде и имеет множество багов при работе.
-
Расширение не позволяет в «полную» силу работать с Data Lake, разрешено проводить только операцию
SELECT
и то с ограничениями. Надеюсь, что это доработают для дальнейшей интеграции DuckDB в Data Lake, а пока (октябрь 2024) не доступны операции DDL, DML. Но если вы всё таки решитесь на использование DuckDB + Iceberg, то рекомендую обращаться к официальному репозиторию duckdb_iceberg.
Ниже пару примеров того с чем я столкнулся и как это можно исправить.
Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB
В начале предлагаю сконфигурировать нашу сессию:
INSTALL iceberg; LOAD iceberg; INSTALL httpfs; LOAD httpfs; SET s3_url_style = 'path'; SET s3_endpoint = 'localhost:9000'; SET s3_access_key_id = 'minioadmin'; SET s3_secret_access_key = 'minioadmin'; SET s3_use_ssl = FALSE;
Теперь если выполнить простой запрос как указано в документации:
SELECT * FROM iceberg_metadata( 's3://warehouse/default/animals/', allow_moved_paths = TRUE );
То мы получим ошибку:
SQL Error: java.sql.SQLException: HTTP Error: HTTP GET error on 'http://localhost:9000/warehouse/default/animals//metadata/version-hint.text' (HTTP 400)
Решение данной проблемы можно найти в данном issues. Но есть НО.
Можно читать определённый кусок мета-данных или проще говоря снэпшот:
SELECT * FROM iceberg_scan('s3://warehouse/default/animals/metadata/00001-f0d7c171-b179-4857-8eec-078f0108c1a9.metadata.json');
Но если сделать операцию INSERT
/UPDATE
/DELETE
, то прошлые мета-даннные будут ссылаться на прошлый снэпшот и чтобы получить «актуальные» данные необходимо найти новые мета-данные и выполнить запрос снова:
SELECT * FROM iceberg_scan('s3://warehouse/default/animals/metadata/00002-ff3b9eaf-3952-4494-a2c0-b29a78cc6bb7.metadata.json');
Важно: существует «маска» для создания новых снепшотов: 00001-*
, 00002-*
, etc
И теперь НО, про которое я сказал ранее. Если читать первый файл метаднных, то получим ошибку:
SQL Error: java.sql.SQLException: IO Error: No snapshots found
Чтобы корректно работать с Iceberg через DuckDB необходимо ждать исправления багов или вставлять костыли.
ClickHouse
ClickHouse тоже поддерживает Iceberg, но также только на уровне чтения. Об этом можно почитать в официальной документации.
Если вы ничего не знаете про ClickHouse, то рекомендую ознакомиться с моей статьей – Инфраструктура для Data-Engineer ClickHouse.
PyIceberg
Теперь давайте перейдём к одному из основных способов взаимодействия с Iceberg – это PyIceberg.
Доступ к каталогу
В начале рекомендую настроить доступ к нашему каталогу.
Из документации: This information must be placed inside a file called .pyiceberg.yaml
located either in the $HOME
or %USERPROFILE%
directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME
directory (if the corresponding environment variable is set).
Я создал файл в .pyiceberg.yaml
в корне своей учётки:
catalog: s3_warehouse: uri: http://127.0.0.1:8181 py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO s3.endpoint: http://127.0.0.1:9000 s3.access-key-id: minioadmin s3.secret-access-key: minioadmin
И теперь мы можем код вызывать таким образом:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") catalog.create_namespace("default")
Если не создать .pyiceberg.yaml
, то необходимо будет каждый раз инициализировать конфиг для подключения к каталогу таким образом:
from pyiceberg.catalog import load_catalog catalog = load_catalog( name="warehouse", **{ "uri": "http://127.0.0.1:8181", "s3.endpoint": "http://127.0.0.1:9000", "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.access-key-id": "minioadmin", "s3.secret-access-key": "minioadmin", }, ) catalog.create_namespace("default")
Я думаю, что лучше создать .pyiceberg.yaml
, чем дублировать код.
Создание namespace
Для дальнейшей работы нам необходимо создать namespace
– это как схема в базе данных. Для этого выполним код:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") catalog.create_namespace("default")
Создание таблицы
Таблицы можно создавать несколькими способами:
-
С использованием
pyiceberg.types
. -
С использованием
pa.field
. Второй вариант как по мне удобнее и проще. Но давайте рассмотрим каждый.
Для того чтобы создать таблицу с использованием pyiceberg.types
необходимо:
-
Импортировать нужные типы из пакета
pyiceberg.types
. -
Создать схему.
-
Создать таблицу с созданной схемой. Пример:
from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import BinaryType, LongType, NestedField, StringType, TimestamptzType schema = Schema( fields=[ NestedField(field_id=1, name="id", field_type=LongType(), required=False), NestedField(field_id=2, name="uuid", field_type=BinaryType(), required=False, doc="Binary -> UUID"), NestedField(field_id=3, name="name", field_type=StringType(), required=False), NestedField(field_id=4, name="created_at", field_type=TimestamptzType(), required=False), ], ) catalog = load_catalog("s3_warehouse") # Comment if Table does not exist catalog.drop_table("default.custom_table_pyiceberg_fields") catalog.create_table( identifier="default.custom_table_pyiceberg_fields", schema=schema, )
Для того чтобы создать таблицу с использованием pa.field
необходимо:
-
Импортировать
pyarrow
. -
Создать схему.
-
Создать таблицу. Пример:
import pyarrow as pa from pyiceberg.catalog import load_catalog schema = pa.schema( [ pa.field(name="id", type=pa.int64(), nullable=True), pa.field(name="uuid", type=pa.binary(), nullable=True), pa.field(name="name", type=pa.string(), nullable=True), pa.field(name="created_at", type=pa.timestamp(unit="s", tz="UTC"), nullable=True), ], ) catalog = load_catalog("s3_warehouse") # Comment if Table does not exist catalog.drop_table("default.custom_table_pyarrow_fields") catalog.create_table( identifier="default.custom_table_pyarrow_fields", schema=schema, )
Пример попроще:
import pyarrow as pa from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") table = pa.table( { "year": [2028, 2022, 2021, 2022, 2019, 2021], "n_legs": [2, 2, 4, 4, 5, 100], "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"], }, ) tbl = catalog.create_table( identifier="default.animals", schema=table.schema, ) tbl.append(table)
Вставка данных в таблицы
Для вставки данных в таблицу их необходимо привести к формату pyarrow.lib.Table
.
Пример:
import datetime import uuid from random import randint import pyarrow as pa from faker import Faker from pyiceberg.catalog import load_catalog fake = Faker(locale="ru_RU") catalog = load_catalog("s3_warehouse") tbl = catalog.load_table("default.custom_table_pyiceberg_fields") pa_table = pa.table( { "id": [randint(a=1, b=9223372036854775806)], # noqa: S311 "uuid": [uuid.uuid4().bytes], "name": [fake.first_name()], "created_at": [fake.date_time_ad(tzinfo=datetime.UTC)], }, ) tbl.append(pa_table) print( tbl.scan().to_arrow(), )
Также, если вы привыкли работать с pandas.DataFrame
, то его также необходимо трансформировать в pyarrow.lib.Table
.
Пример:
import pandas as pd import pyarrow as pa from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") catalog.drop_table("default.yellow_taxi") df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet") table = pa.Table.from_pandas(df=df) tbl = catalog.create_table( identifier="default.yellow_taxi", schema=table.schema, ) tbl.append(table)
Удаление данных из таблицы
Для удаления данных из таблицы необходимо воспользоваться методом tbl.delete()
предварительно инициализировав tbl
.
Пример:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") tbl = catalog.load_table("default.animals") tbl.delete(delete_filter="animal == 'Bear'")
Чтение таблицы
Для чтения необходимо вызвать метод tbl.scan()
предварительно инициализировав catalog
и tbl
.
Пример:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") tbl = catalog.load_table( "default.animals", ) print( tbl.scan().to_arrow(), )
Тут стоит обратить внимание, что после вызова метода tbl.scan()
мы получаем объект типа <class 'pyiceberg.table.DataScan'>
и можем преобразовать в удобный нам формат:
-
.to_pandas()
-
.to_arrow()
-
etc
Удаление таблицы
Для удаления таблицы необходимо вызвать метод catalog.drop_table()
предварительно инициализировав catalog
.
Пример:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") catalog.drop_table("default.animals")
Важно: При удалении таблицы происходит удаление из мета-стора, а файлы остаются на месте. Таблица в данном случае – это просто ссылка на какой-то объект в S3.
Если вы не знаете что такое S3 или хотите самостоятельно удалить файлы из S3, то можете ознакомиться с моей статьей – Инфраструктура для data engineer S3.
Time Travel
Отличительной особенностью Iceberg является – это создание snapshots
на каждое изменение таблицы и поэтому мы можем между ними перемещаться.
Для того, чтобы получить текущий список snapshots
для таблицы необходимо выполнить следующий код:
from pyiceberg.catalog import load_catalog import pandas as pd pd.set_option("display.max_columns", None) catalog = load_catalog("s3_warehouse") table = catalog.load_table("default.custom_table_pyarrow_fields") print(table.inspect.snapshots())
И в выводе мы увидим все id
наших snapshots
, которые сможем считать следующим образом:
from pyiceberg.catalog import load_catalog catalog = load_catalog("s3_warehouse") table = catalog.load_table("default.custom_table_pyarrow_fields") print(table.scan(snapshot_id=3826389439422852561))
Spark
Вот мы добрались и до Spark. На самом деле Spark – это на текущий момент единственный инструмент, который позволяет производить DDL и DML операции в Apache Iceberg и также позволяет использовать привычный для всех SQL-интерфейс и также Spark-синтаксис.
Вообще, все примеры работы Spark + Iceberg находятся в блокнотах, которые были предварительно созданы командой Iceberg. Для того чтобы посмотреть примеры на Spark перейдите по ссылке http://localhost:8888/.
Я постараюсь не дублировать весь код, а показать только самое важное, чтобы это можно было использовать в дальнейшем как «справочник«.
Доступ к каталогу
Во время сборки контейнера уже произошла инициализация подключения к нашему хранилищу, поэтому мы можем сразу же обращаться запросами к нему, исключая инициализацию.
Для начала давайте посмотрим какие таблицы нам доступны в namespace
default
, который мы создавали ранее. Для этого выполним следующий скрипт:
%%sql show tables in default;
Создание namespace
Для создания namespace
необходимо выполнить команду:
%%sql CREATE DATABASE IF NOT EXISTS foo;
После этого в наш мета-стор запишется информация о создании данного namespace
, но физически он не создастся. Данный namespace
появится только после создания в нём первой таблицы.
Создание таблицы
Как я говорил выше после создания namespace
foo
мы его можем видеть в нашем мета-сторе, но физически его ещё не существует, поэтому давайте создадим таблицу следующим кодом:
%%sql CREATE TABLE foo.bar( id bigint );
Также мы можем создать таблицу с партиционированием по выбранному полю:
CREATE TABLE foo.bar_partition ( id bigint, created_at timestamp, ) USING iceberg PARTITIONED BY (days(created_at))
Данный код нам создаст таблицу и при каждой вставке данных в неё он будет создавать партицию за нужный день. Вот так это выглядит в самом хранилище:
. ├── data │ ├── created_at=YYYY-MM-DD │ ├── created_at=YYYY-MM-DD │ ├── created_at=YYYY-MM-DD │ ├── ... │ ├── created_at=YYYY-MM-DD
Вы можете создавать партиции не только по дням, но и по часам, а также по перечислениям. К примеру по продавцам/покупателям/гендеру и прочее.
Важно: если вы некорректно создали партиции или вам не достаточно текущего уровня партиционирования, то можете его изменить через удаление партиций и создание новых.
Удаление текущих партиций:
%%sql ALTER TABLE foo.bar_partition DROP PARTITION FIELD days(created_at)
Создание новых партиций:
%%sql ALTER TABLE foo.bar_partition ADD PARTITION FIELD hours(created_at)
И после этого необходимо вызвать метод rewrite_data_files
, чтобы перезаписать партиции.
%%sql CALL system.rewrite_data_files('foo.bar_partition')
Вставка данных в таблицы
Так как мы работаем через Spark, то у нас есть два варианта вставки данных:
-
через
dataframe
:
df = spark.read.parquet(".parquet") df.writeTo("foo.bar").append()
-
Через привычный
INSERT
:
%%sql INSERT INTO foo.bar(id) VALUES (1)
Для примера не важен формат вставки, но вы можете без проблем совершать вставки на основании данных из Data Lake или из внешних источников.
Если вы хотите сгенерировать данные или поработать с реальными данными, то можете ознакомиться с моей статьей – Pet-проекты и данные для Data-Engineer.
Удаление данных из таблицы
Для удаления данных из таблицы необходимо выполнить следующий код:
%%sql DELETE FROM foo.bar WHERE id = 1
Чтение таблицы
Чтобы просто прочитать данные из таблицы foo.bar
необходимо выполнить код:
%%sql SELECT * FROM foo.bar
Для того чтобы прочитать данные для дальнейшей работы над ними, необходимо выполнить код:
df = spark.sql('SELECT * FROM foo.bar') df.show()
Удаление таблицы
Удаление таблицы происходит стандартной командой:
%%sql DROP TABLE IF EXISTS foo.bar;
Time Travel
Как я говорил ранее, отличительной способность Iceberg является – путешествие «во времени«. Поэтому каждый раз, когда мы изменяли нашу таблицу foo.bar
, при помощи INSERT
, DELETE
, UPDATE
, то у нас создавался snapshot
, к которому мы можем обратиться или даже «откатиться«.
Давайте рассмотрим это по порядку.
В начале давайте посмотрим список доступных нам snapshots
по таблице:
%%sql SELECT * FROM foo.bar.snapshots ORDER BY committed_at DESC
Теперь мы знаем все snapshot_id
для нашей таблицы и можем прочитать любое состояние таблицы по snapshot_id
:
%%sql SELECT count(*) as c FROM foo.bar FOR VERSION AS OF 3331575308018494635
И также можем это сделать по полю committed_at
:
%%sql SELECT count(*) as c FROM foo.bar FOR TIMESTAMP AS OF TIMESTAMP 'YYYY-MM-DD HH:MM:SS.000000'
Ну и конечно же мы можем «откатиться» на любое прошлое состояние. Для этого необходимо выполнить команду:
CALL system.rollback_to_snapshot('foo.bar', 3331575308018494635)
Я считаю, что это одна из уникальных особенностей Iceberg, который позволяет не только путешествовать во времени, но и соблюдать идемпотентность при построении pipeline.
Branching and Tagging DDL
Также важным свойством Iceberg является – возможность создавать ветки и теги.
Создание веток позволяет нам версионировать данные, не меняя продовые данные.
Давайте рассмотрим это на примере. Мы ранее с вами создали таблицу foo.bar
, теперь если вставить в неё несколько значений, от одного до пяти при помощи следующего кода:
%%sql INSERT INTO foo.bar(id) VALUES (1)
Затем мы можем проверить количество данных в нашей таблице:
SELECT count(*) AS cnt FROM foo.bar
Теперь нам необходимо установить возможность версионирования через ветки для таблицы, для этого выполним следующий код:
%%sql ALTER TABLE foo.bar SET TBLPROPERTIES ( 'write.wap.enabled'='true' )
Теперь мы можем создавать ветки для нашей таблицы, давайте создадим ветку delete_one_value
:
%%sql ALTER TABLE foo.bar CREATE BRANCH delete_one_value
После этого нам нужно переключиться на неё:
spark.conf.set('spark.wap.branch', 'delete_one_value')
И теперь выполним нашу задачу в нужной ветке:
%%sql DELETE FROM foo.bar WHERE id = 1
И для проверки того, что мы всё сделали корректно можем выполнить следующий код:
%%sql SELECT count(*) AS cnt -- FROM foo.bar VERSION AS OF 'main' FROM foo.bar.branch_main
И здесь мы получим то количество строк, который у нас находятся в main
.
А при выполнении следующего кода мы сможем получить то количество строк, которое соответствует ветке delete_one_value
.
%%sql SELECT count(*) AS cnt -- FROM foo.bar VERSION AS OF 'delete_one_value' FROM foo.bar.branch_delete_one_value
Важно: В коде указано две версии того, как мы можем обращаться к нашим веткам.
-
Через указание ветки:
... FROM foo.bar VERSION AS OF 'main' ...
-
Через точку и добавлением префикса:
... FROM foo.bar.branch_main ...
Ну и теперь самое главное – это публикация изменений в main
.
Для начала нам нужно узнать какой snapshot_id
использовать для публикации.
%%sql SELECT * FROM foo.bar.refs
Важно: мы можем опубликовать любой snapshot_id
из нашей ветки delete_one_value
, но рекомендуется использоваться последний, чтобы было всё логично.
И чтобы сделать саму публикацию необходимо выполнить код:
%%sql CALL system.cherrypick_snapshot('foo.bar', 5235792610289419748)
Ну и в конце нужно убрать за собой, поэтому удалим ветку delete_one_value
при помощи следующего кода:
%%sql ALTER TABLE foo.bar DROP BRANCH delete_one_value
И чтобы проверить, что всё удалено корректно мы можем выполнить данный код:
%%sql SELECT * FROM foo.bar.refs
Заключение
Я постарался показать основные возможности для построения Data Lake на базе Apache Iceberg.
Вообще, у этого инструмента довольно много применений. Можно как строить полноценный аналитический продукт на его базе, так и использовать как «холодное» или «тёплое» хранилище. Над «горячим» я бы подумал, как его организовать, с учётом постоянных изменений модели данных и частых изменений dds, dm слоя.
Iceberg по своей сути является только абстракцией, предлагая удобный интерфейс для взаимодействия с данными. Он не является серебряной пулей, он имеет свои ограничения и тонкости.
На что я бы обратил внимание при работе с Iceberg:
-
Качественный DDL ваших таблиц: правильная модель данных, правильное партиционирование, название таблицы и нужная схема (
namespace
). -
Следить за изменениями таблицы:
INSERT
,UPDATE
,DELETE
,DROP
. В частности надDROP
, потому что при удалении таблицы вы удаляете ссылку на файл из мета-стора, а физически файлы хранятся в вашем S3. Если для вас вопрос storage важен, то это стоит контролировать. -
Вопрос мета-данных на уровне дата-каталога, а не на уровне DDL. Важно за данными следить, отслеживать источники, pipeline загрузки и про данные не забывать. Стоит продумать варианты того, как информировать пользователей о наличии тех или иных данных.
-
Бэкап meta-store, чтобы не потерять ссылки на текущие «таблицы«.
Если резюмировать, то Iceberg является хорошим продуктом, который позволит вам разделить compute и storage при работе с данными. Его стоит попробовать, если в вашей инфраструктуре есть Apache Spark или есть опыт работы с Spark, чтобы его правильно «сварить» с Apache Iceberg.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.
ссылка на оригинал статьи https://habr.com/ru/articles/850674/
Добавить комментарий