Инфраструктура для Data-Engineer Apache Iceberg

от автора

Не зря технология называется Iceberg, потому что с первого взгляда кажется – простой инструмент, а оказывается что столько всего скрыто, что можно изучать и изучать.

В данной статье я хотел бы пробежаться по основным моментам, которые будут интересны дата-инженерам.

В статье мы рассмотрим зачем это нужно, когда это нужно, как это можно применить на практике и конечно же рассмотрим несколько примеров использования Apache Iceberg.

Введение

Apache Iceberg является более лучшей версией Hive Meta Store (HMS).

HMS имеет множество проблем, которые уже давно известны в дата-инженерии:

  • Привязка к HDFS

  • Имеет трудности при изменении модели данных

  • etc

Так как Apache Iceberg является улучшением концепции HMS, то он имеет ряд преимуществ:

  • Изменение модели данных «на лету«, без лишних перегрузок данных.

  • Версионирование данных после каждого изменения.

  • Ветки при работе с данными.

  • Путешествие «во времени» в данных.

  • Можно использовать почти любой вычислитель (compute).

Я думаю, что на этом моменте стоить закончить с теорией и перейти к практике. Но если у вас есть какие-то вопросы по инструменту, то рекомендую обращаться как всегда к документации.

Стоит держать в уме, что там не описаны все краевые случаи, но точно сможете найти ответы на общие вопросы.

Также стоит отметить, что Iceberg является продуктом Apache и поэтому все инструменты, которые используются в нём тоже поддерживаются Apache (ну почти): .avro, .parquet, Spark, etc.

По каждому из инструментов вы сможете также легко найти документацию. А я постараюсь ниже обратить внимание на какие-то моменты, которые нашёл при исследовании Apache Iceberg.

Поднятие сервисов

Для поднятия всех сервисов я использовал docker-compose из — Quickstart. Я его немного модифицировал под себя, но об этом ниже.

Весь код также доступен в моём репозитории.

docker-compose.yaml
version: '3.8'  services:   postgres:     image: postgres:13     environment:       POSTGRES_USER: postgres       POSTGRES_PASSWORD: postgres       POSTGRES_DB: iceberg     ports:       - "5432:5432"     networks:       - iceberg_net    rest:     image: tabulario/iceberg-rest:1.6.0     container_name: iceberg-rest     networks:       iceberg_net:     ports:       - "8181:8181"     environment:       - AWS_ACCESS_KEY_ID=minioadmin       - AWS_SECRET_ACCESS_KEY=minioadmin       - AWS_REGION=us-east-1       - CATALOG_WAREHOUSE=s3://warehouse/       - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO       - CATALOG_S3_ENDPOINT=http://minio:9000       - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres       - CATALOG_JDBC_DRIVER=org.postgresql.Driver       - CATALOG_JDBC_USER=postgres       - CATALOG_JDBC_PASSWORD=postgres     depends_on:       - postgres    minio:     image: minio/minio:RELEASE.2024-07-04T14-25-45Z     restart: always     command: server /data --console-address ":9001"     volumes:       - ./data:/data     environment:       - MINIO_ROOT_USER=minioadmin       - MINIO_ROOT_PASSWORD=minioadmin       - MINIO_DOMAIN=minio     ports:       - "9000:9000"  # MinIO S3 API       - "9001:9001"  # MinIO Console     networks:       iceberg_net:         aliases:           - warehouse.minio    mc:     depends_on:       - minio     image: minio/minio:RELEASE.2024-07-04T14-25-45Z     container_name: mc     networks:       iceberg_net:     environment:       - AWS_ACCESS_KEY_ID=minioadmin       - AWS_SECRET_ACCESS_KEY=minioadmin       - AWS_REGION=us-east-1     entrypoint: >       /bin/sh -c "       until (/usr/bin/mc config host add minio http://minio:9000 minioadmin minioadmin) do echo '...waiting...' && sleep 1; done;       /usr/bin/mc mb minio/warehouse;       /usr/bin/mc policy set public minio/warehouse;       tail -f /dev/null       "   spark-iceberg:     image: tabulario/spark-iceberg:3.5.1_1.5.0     container_name: spark-iceberg     build: spark/     networks:       iceberg_net:     depends_on:       - rest       - minio     volumes:       - ./warehouse:/home/iceberg/warehouse       - ./notebooks:/home/iceberg/notebooks/notebooks     environment:       - AWS_ACCESS_KEY_ID=minioadmin       - AWS_SECRET_ACCESS_KEY=minioadmin       - AWS_REGION=us-east-1     ports:       - "8888:8888"       - "8080:8080"       - "10000:10000"       - "10001:10001"  networks:   iceberg_net:

На что здесь стоит обратить внимание:

  • Я указал версию для образов (на октябрь 2024).

  • AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY заменил на minioadmin для удобства.

  • Убрал из скрипта команду: /usr/bin/mc rm -r --force minio/warehouse; Если данную строку вернуть в docker-compose, то при каждому перезапуске проекта будет очищаться хранилище.

  • Добавил сохранение мета-данных в PostgreSQL.

Meta-store

Как я указал в последнем пункте: «Добавил сохранение мета-данных в PostgreSQL«. В оригинальном docker-compose создания мета-стора выглядит так:

... rest:     image: tabulario/iceberg-rest     container_name: iceberg-rest     networks:       iceberg_net:     ports:       - 8181:8181     environment:       - AWS_ACCESS_KEY_ID=admin       - AWS_SECRET_ACCESS_KEY=password       - AWS_REGION=us-east-1       - CATALOG_WAREHOUSE=s3://warehouse/       - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO       - CATALOG_S3_ENDPOINT=http://minio:9000 ...

Если создать с указанным environment, то ваш meta-store будет создаваться в формате memory.

Данная команда будет выполнена в оригинальном образе:

CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory

И ваш meta-store будет доступен внутри контейнера в папке tmp/

iceberg@e2d60003b684:/tmp$ ls -lah total 1.1M drwxrwxrwt 1 root    root    4.0K Oct  7 10:58  . drwxr-xr-x 1 root    root    4.0K Oct  7 06:43  .. drwxr-xr-x 2 iceberg iceberg 4.0K Oct  7 06:43  hsperfdata_iceberg -rw-r--r-- 1 iceberg iceberg  20K Oct  7 10:58 'iceberg_rest_mode=memory' -rwxr--r-- 1 iceberg iceberg 1.1M Oct  7 06:43  sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so -rw-r--r-- 1 iceberg iceberg    0 Oct  7 06:43  sqlite-3.46.0.0-be26ebff-c4de-43c2-bd44-71c205b8c5bd-libsqlitejdbc.so.lck 

Я хотел контролировать meta-store и беспрепятственно его просматривать, поэтому изменил сборку образа на такую:

...   postgres:     image: postgres:13     environment:       POSTGRES_USER: postgres       POSTGRES_PASSWORD: postgres       POSTGRES_DB: iceberg     ports:       - "5432:5432"     networks:       - iceberg_net    rest:     image: tabulario/iceberg-rest:1.6.0     container_name: iceberg-rest     networks:       iceberg_net:     ports:       - "8181:8181"     environment:       - AWS_ACCESS_KEY_ID=minioadmin       - AWS_SECRET_ACCESS_KEY=minioadmin       - AWS_REGION=us-east-1       - CATALOG_WAREHOUSE=s3://warehouse/       - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO       - CATALOG_S3_ENDPOINT=http://minio:9000       - CATALOG_URI=jdbc:postgresql://postgres:5432/iceberg?user=postgres&password=postgres       - CATALOG_JDBC_DRIVER=org.postgresql.Driver       - CATALOG_JDBC_USER=postgres       - CATALOG_JDBC_PASSWORD=postgres     depends_on:       - postgres ...

Важно: Логин и пароль в сборке открыты, но для pet-задач этого будет достаточно.

Взаимодействие с Apache Iceberg

Основным способом для взаимодействия с Apache Iceberg является Apache Spark, но я хотел бы уделить немного времени ещё парочки инструментов.

Информация про Spark будет ниже.

API

Начнём с того что с Iceberg можно взаимодействовать при помощи официального API.

Вся спецификация API доступна по ссылке – Iceberg REST Open API specification.

Получить список namespaces:

curl http://localhost:8181/v1/namespaces

Получить список таблиц:

curl http://localhost:8181/v1/namespaces/default/tables

DuckDB

У DuckDB существует расширение Iceberg Extension.

Стоит сразу отметить несколько моментов при работе DuckDB с Iceberg:

  • Расширение находится в «сыром» виде и имеет множество багов при работе.

  • Расширение не позволяет в «полную» силу работать с Data Lake, разрешено проводить только операцию SELECT и то с ограничениями. Надеюсь, что это доработают для дальнейшей интеграции DuckDB в Data Lake, а пока (октябрь 2024) не доступны операции DDL, DML. Но если вы всё таки решитесь на использование DuckDB + Iceberg, то рекомендую обращаться к официальному репозиторию duckdb_iceberg.

Ниже пару примеров того с чем я столкнулся и как это можно исправить.

Если вы ничего не знаете о DuckDB, то рекомендую ознакомиться с моей статьей – Всё что нужно знать про DuckDB

В начале предлагаю сконфигурировать нашу сессию:

INSTALL iceberg; LOAD iceberg; INSTALL httpfs; LOAD httpfs; SET s3_url_style = 'path'; SET s3_endpoint = 'localhost:9000'; SET s3_access_key_id = 'minioadmin'; SET s3_secret_access_key = 'minioadmin'; SET s3_use_ssl = FALSE;

Теперь если выполнить простой запрос как указано в документации:

SELECT * FROM iceberg_metadata( 's3://warehouse/default/animals/', allow_moved_paths = TRUE );

То мы получим ошибку:

SQL Error: java.sql.SQLException: HTTP Error: HTTP GET error on 'http://localhost:9000/warehouse/default/animals//metadata/version-hint.text' (HTTP 400)

Решение данной проблемы можно найти в данном issues. Но есть НО.

Можно читать определённый кусок мета-данных или проще говоря снэпшот:

SELECT * FROM iceberg_scan('s3://warehouse/default/animals/metadata/00001-f0d7c171-b179-4857-8eec-078f0108c1a9.metadata.json'); 

Но если сделать операцию INSERT/UPDATE/DELETE, то прошлые мета-даннные будут ссылаться на прошлый снэпшот и чтобы получить «актуальные» данные необходимо найти новые мета-данные и выполнить запрос снова:

SELECT * FROM iceberg_scan('s3://warehouse/default/animals/metadata/00002-ff3b9eaf-3952-4494-a2c0-b29a78cc6bb7.metadata.json'); 

Важно: существует «маска» для создания новых снепшотов: 00001-*, 00002-*, etc

И теперь НО, про которое я сказал ранее. Если читать первый файл метаднных, то получим ошибку:

SQL Error: java.sql.SQLException: IO Error: No snapshots found

Чтобы корректно работать с Iceberg через DuckDB необходимо ждать исправления багов или вставлять костыли.

ClickHouse

ClickHouse тоже поддерживает Iceberg, но также только на уровне чтения. Об этом можно почитать в официальной документации.

Если вы ничего не знаете про ClickHouse, то рекомендую ознакомиться с моей статьей – Инфраструктура для Data-Engineer ClickHouse.

PyIceberg

Теперь давайте перейдём к одному из основных способов взаимодействия с Iceberg – это PyIceberg.

Доступ к каталогу

В начале рекомендую настроить доступ к нашему каталогу.

Из документации: This information must be placed inside a file called .pyiceberg.yaml located either in the $HOME or %USERPROFILE% directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME directory (if the corresponding environment variable is set).

Я создал файл в .pyiceberg.yaml в корне своей учётки:

catalog:   s3_warehouse:     uri: http://127.0.0.1:8181     py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO     s3.endpoint: http://127.0.0.1:9000     s3.access-key-id: minioadmin     s3.secret-access-key: minioadmin

И теперь мы можем код вызывать таким образом:

from pyiceberg.catalog import load_catalog        catalog = load_catalog("s3_warehouse")      catalog.create_namespace("default")

Если не создать .pyiceberg.yaml, то необходимо будет каждый раз инициализировать конфиг для подключения к каталогу таким образом:

from pyiceberg.catalog import load_catalog      catalog = load_catalog(       name="warehouse",       **{           "uri": "http://127.0.0.1:8181",           "s3.endpoint": "http://127.0.0.1:9000",           "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",           "s3.access-key-id": "minioadmin",           "s3.secret-access-key": "minioadmin",       },   )    catalog.create_namespace("default")

Я думаю, что лучше создать .pyiceberg.yaml, чем дублировать код.

Создание namespace

Для дальнейшей работы нам необходимо создать namespace – это как схема в базе данных. Для этого выполним код:

from pyiceberg.catalog import load_catalog        catalog = load_catalog("s3_warehouse")      catalog.create_namespace("default")

Создание таблицы

Таблицы можно создавать несколькими способами:

  • С использованием pyiceberg.types.

  • С использованием pa.field. Второй вариант как по мне удобнее и проще. Но давайте рассмотрим каждый.

Для того чтобы создать таблицу с использованием pyiceberg.types необходимо:

  1. Импортировать нужные типы из пакета pyiceberg.types.

  2. Создать схему.

  3. Создать таблицу с созданной схемой. Пример:

from pyiceberg.catalog import load_catalog   from pyiceberg.schema import Schema   from pyiceberg.types import BinaryType, LongType, NestedField, StringType, TimestamptzType      schema = Schema(       fields=[           NestedField(field_id=1, name="id", field_type=LongType(), required=False),           NestedField(field_id=2, name="uuid", field_type=BinaryType(), required=False, doc="Binary -> UUID"),           NestedField(field_id=3, name="name", field_type=StringType(), required=False),           NestedField(field_id=4, name="created_at", field_type=TimestamptzType(), required=False),       ],   )      catalog = load_catalog("s3_warehouse")      # Comment if Table does not exist   catalog.drop_table("default.custom_table_pyiceberg_fields")      catalog.create_table(       identifier="default.custom_table_pyiceberg_fields",       schema=schema,   ) 

Для того чтобы создать таблицу с использованием pa.field необходимо:

  1. Импортировать pyarrow.

  2. Создать схему.

  3. Создать таблицу. Пример:

import pyarrow as pa   from pyiceberg.catalog import load_catalog      schema = pa.schema(       [           pa.field(name="id", type=pa.int64(), nullable=True),           pa.field(name="uuid", type=pa.binary(), nullable=True),           pa.field(name="name", type=pa.string(), nullable=True),           pa.field(name="created_at", type=pa.timestamp(unit="s", tz="UTC"), nullable=True),       ],   )      catalog = load_catalog("s3_warehouse")      # Comment if Table does not exist   catalog.drop_table("default.custom_table_pyarrow_fields")      catalog.create_table(       identifier="default.custom_table_pyarrow_fields",       schema=schema,   ) 

Пример попроще:

import pyarrow as pa from pyiceberg.catalog import load_catalog  catalog = load_catalog("s3_warehouse")  table = pa.table(     {         "year": [2028, 2022, 2021, 2022, 2019, 2021],         "n_legs": [2, 2, 4, 4, 5, 100],         "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],     }, )  tbl = catalog.create_table(     identifier="default.animals", schema=table.schema, )  tbl.append(table) 

Вставка данных в таблицы

Для вставки данных в таблицу их необходимо привести к формату pyarrow.lib.Table.

Пример:

import datetime import uuid from random import randint  import pyarrow as pa from faker import Faker from pyiceberg.catalog import load_catalog  fake = Faker(locale="ru_RU")  catalog = load_catalog("s3_warehouse")  tbl = catalog.load_table("default.custom_table_pyiceberg_fields")  pa_table = pa.table(         {             "id": [randint(a=1, b=9223372036854775806)],  # noqa: S311             "uuid": [uuid.uuid4().bytes],             "name": [fake.first_name()],             "created_at": [fake.date_time_ad(tzinfo=datetime.UTC)],         },     )  tbl.append(pa_table)  print(     tbl.scan().to_arrow(), ) 

Также, если вы привыкли работать с pandas.DataFrame, то его также необходимо трансформировать в pyarrow.lib.Table.

Пример:

import pandas as pd import pyarrow as pa from pyiceberg.catalog import load_catalog  catalog = load_catalog("s3_warehouse")  catalog.drop_table("default.yellow_taxi")  df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet") table = pa.Table.from_pandas(df=df)  tbl = catalog.create_table(     identifier="default.yellow_taxi", schema=table.schema, )  tbl.append(table) 

Удаление данных из таблицы

Для удаления данных из таблицы необходимо воспользоваться методом tbl.delete() предварительно инициализировав tbl.

Пример:

from pyiceberg.catalog import load_catalog      catalog = load_catalog("s3_warehouse")      tbl = catalog.load_table("default.animals")      tbl.delete(delete_filter="animal == 'Bear'") 

Чтение таблицы

Для чтения необходимо вызвать метод tbl.scan() предварительно инициализировав catalog и tbl.

Пример:

from pyiceberg.catalog import load_catalog  catalog = load_catalog("s3_warehouse")  tbl = catalog.load_table(     "default.animals", )  print(     tbl.scan().to_arrow(), ) 

Тут стоит обратить внимание, что после вызова метода tbl.scan() мы получаем объект типа <class 'pyiceberg.table.DataScan'> и можем преобразовать в удобный нам формат:

  • .to_pandas()

  • .to_arrow()

  • etc

Удаление таблицы

Для удаления таблицы необходимо вызвать метод catalog.drop_table() предварительно инициализировав catalog.

Пример:

from pyiceberg.catalog import load_catalog      catalog = load_catalog("s3_warehouse")      catalog.drop_table("default.animals") 

Важно: При удалении таблицы происходит удаление из мета-стора, а файлы остаются на месте. Таблица в данном случае – это просто ссылка на какой-то объект в S3.

Если вы не знаете что такое S3 или хотите самостоятельно удалить файлы из S3, то можете ознакомиться с моей статьей – Инфраструктура для data engineer S3.

Time Travel

Отличительной особенностью Iceberg является – это создание snapshots на каждое изменение таблицы и поэтому мы можем между ними перемещаться.

Для того, чтобы получить текущий список snapshots для таблицы необходимо выполнить следующий код:

from pyiceberg.catalog import load_catalog import pandas as pd  pd.set_option("display.max_columns", None) catalog = load_catalog("s3_warehouse")  table = catalog.load_table("default.custom_table_pyarrow_fields")  print(table.inspect.snapshots())

И в выводе мы увидим все id наших snapshots, которые сможем считать следующим образом:

from pyiceberg.catalog import load_catalog    catalog = load_catalog("s3_warehouse")      table = catalog.load_table("default.custom_table_pyarrow_fields")    print(table.scan(snapshot_id=3826389439422852561))

Spark

Вот мы добрались и до Spark. На самом деле Spark – это на текущий момент единственный инструмент, который позволяет производить DDL и DML операции в Apache Iceberg и также позволяет использовать привычный для всех SQL-интерфейс и также Spark-синтаксис.

Вообще, все примеры работы Spark + Iceberg находятся в блокнотах, которые были предварительно созданы командой Iceberg. Для того чтобы посмотреть примеры на Spark перейдите по ссылке http://localhost:8888/.

Я постараюсь не дублировать весь код, а показать только самое важное, чтобы это можно было использовать в дальнейшем как «справочник«.

Доступ к каталогу

Во время сборки контейнера уже произошла инициализация подключения к нашему хранилищу, поэтому мы можем сразу же обращаться запросами к нему, исключая инициализацию.

Для начала давайте посмотрим какие таблицы нам доступны в namespace default, который мы создавали ранее. Для этого выполним следующий скрипт:

%%sql  show tables in default;

Создание namespace

Для создания namespace необходимо выполнить команду:

%%sql  CREATE DATABASE IF NOT EXISTS foo;

После этого в наш мета-стор запишется информация о создании данного namespace, но физически он не создастся. Данный namespace появится только после создания в нём первой таблицы.

Создание таблицы

Как я говорил выше после создания namespace foo мы его можем видеть в нашем мета-сторе, но физически его ещё не существует, поэтому давайте создадим таблицу следующим кодом:

%%sql  CREATE TABLE foo.bar(     id bigint );

Также мы можем создать таблицу с партиционированием по выбранному полю:

CREATE TABLE foo.bar_partition (     id bigint,     created_at timestamp, ) USING iceberg PARTITIONED BY (days(created_at))

Данный код нам создаст таблицу и при каждой вставке данных в неё он будет создавать партицию за нужный день. Вот так это выглядит в самом хранилище:

. ├── data │  ├── created_at=YYYY-MM-DD │  ├── created_at=YYYY-MM-DD │  ├── created_at=YYYY-MM-DD │  ├── ... │  ├── created_at=YYYY-MM-DD

Вы можете создавать партиции не только по дням, но и по часам, а также по перечислениям. К примеру по продавцам/покупателям/гендеру и прочее.

Важно: если вы некорректно создали партиции или вам не достаточно текущего уровня партиционирования, то можете его изменить через удаление партиций и создание новых.

Удаление текущих партиций:

%%sql  ALTER TABLE foo.bar_partition DROP PARTITION FIELD days(created_at)

Создание новых партиций:

%%sql  ALTER TABLE foo.bar_partition ADD PARTITION FIELD hours(created_at)

И после этого необходимо вызвать метод rewrite_data_files, чтобы перезаписать партиции.

%%sql  CALL system.rewrite_data_files('foo.bar_partition')

Вставка данных в таблицы

Так как мы работаем через Spark, то у нас есть два варианта вставки данных:

  1. через dataframe:

df = spark.read.parquet(".parquet") df.writeTo("foo.bar").append()
  1. Через привычный INSERT:

%%sql   INSERT INTO foo.bar(id) VALUES (1)

Для примера не важен формат вставки, но вы можете без проблем совершать вставки на основании данных из Data Lake или из внешних источников.

Если вы хотите сгенерировать данные или поработать с реальными данными, то можете ознакомиться с моей статьей – Pet-проекты и данные для Data-Engineer.

Удаление данных из таблицы

Для удаления данных из таблицы необходимо выполнить следующий код:

%%sql  DELETE FROM foo.bar WHERE id = 1

Чтение таблицы

Чтобы просто прочитать данные из таблицы foo.bar необходимо выполнить код:

%%sql  SELECT * FROM foo.bar

Для того чтобы прочитать данные для дальнейшей работы над ними, необходимо выполнить код:

df = spark.sql('SELECT * FROM foo.bar') df.show()

Удаление таблицы

Удаление таблицы происходит стандартной командой:

%%sql  DROP TABLE IF EXISTS foo.bar;

Time Travel

Как я говорил ранее, отличительной способность Iceberg является – путешествие «во времени«. Поэтому каждый раз, когда мы изменяли нашу таблицу foo.bar, при помощи INSERT, DELETE, UPDATE, то у нас создавался snapshot, к которому мы можем обратиться или даже «откатиться«.

Давайте рассмотрим это по порядку.

В начале давайте посмотрим список доступных нам snapshots по таблице:

%%sql  SELECT * FROM foo.bar.snapshots ORDER BY committed_at DESC

Теперь мы знаем все snapshot_id для нашей таблицы и можем прочитать любое состояние таблицы по snapshot_id:

%%sql  SELECT     count(*) as c FROM     foo.bar FOR VERSION AS OF 3331575308018494635

И также можем это сделать по полю committed_at:

%%sql  SELECT     count(*) as c FROM     foo.bar FOR TIMESTAMP AS OF TIMESTAMP 'YYYY-MM-DD HH:MM:SS.000000'

Ну и конечно же мы можем «откатиться» на любое прошлое состояние. Для этого необходимо выполнить команду:

CALL system.rollback_to_snapshot('foo.bar', 3331575308018494635)

Я считаю, что это одна из уникальных особенностей Iceberg, который позволяет не только путешествовать во времени, но и соблюдать идемпотентность при построении pipeline.

Branching and Tagging DDL

Также важным свойством Iceberg является – возможность создавать ветки и теги.

Создание веток позволяет нам версионировать данные, не меняя продовые данные.

Давайте рассмотрим это на примере. Мы ранее с вами создали таблицу foo.bar, теперь если вставить в неё несколько значений, от одного до пяти при помощи следующего кода:

%%sql   INSERT INTO foo.bar(id) VALUES (1)

Затем мы можем проверить количество данных в нашей таблице:

SELECT count(*) AS cnt FROM foo.bar

Теперь нам необходимо установить возможность версионирования через ветки для таблицы, для этого выполним следующий код:

%%sql  ALTER TABLE foo.bar SET TBLPROPERTIES (     'write.wap.enabled'='true' )

Теперь мы можем создавать ветки для нашей таблицы, давайте создадим ветку delete_one_value:

%%sql  ALTER TABLE foo.bar  CREATE BRANCH delete_one_value

После этого нам нужно переключиться на неё:

spark.conf.set('spark.wap.branch', 'delete_one_value')

И теперь выполним нашу задачу в нужной ветке:

%%sql  DELETE FROM foo.bar WHERE id = 1

И для проверки того, что мы всё сделали корректно можем выполнить следующий код:

%%sql      SELECT count(*) AS cnt -- FROM foo.bar VERSION AS OF 'main' FROM foo.bar.branch_main

И здесь мы получим то количество строк, который у нас находятся в main.

А при выполнении следующего кода мы сможем получить то количество строк, которое соответствует ветке delete_one_value.

%%sql      SELECT count(*) AS cnt -- FROM foo.bar VERSION AS OF 'delete_one_value' FROM foo.bar.branch_delete_one_value

Важно: В коде указано две версии того, как мы можем обращаться к нашим веткам.

  1. Через указание ветки:

... FROM foo.bar VERSION AS OF 'main' ...
  1. Через точку и добавлением префикса:

... FROM foo.bar.branch_main ...

Ну и теперь самое главное – это публикация изменений в main.

Для начала нам нужно узнать какой snapshot_id использовать для публикации.

%%sql  SELECT * FROM foo.bar.refs

Важно: мы можем опубликовать любой snapshot_id из нашей ветки delete_one_value, но рекомендуется использоваться последний, чтобы было всё логично.

И чтобы сделать саму публикацию необходимо выполнить код:

%%sql      CALL system.cherrypick_snapshot('foo.bar', 5235792610289419748)

Ну и в конце нужно убрать за собой, поэтому удалим ветку delete_one_value при помощи следующего кода:

%%sql      ALTER TABLE foo.bar DROP BRANCH delete_one_value

И чтобы проверить, что всё удалено корректно мы можем выполнить данный код:

%%sql  SELECT * FROM foo.bar.refs

Заключение

Я постарался показать основные возможности для построения Data Lake на базе Apache Iceberg.

Вообще, у этого инструмента довольно много применений. Можно как строить полноценный аналитический продукт на его базе, так и использовать как «холодное» или «тёплое» хранилище. Над «горячим» я бы подумал, как его организовать, с учётом постоянных изменений модели данных и частых изменений dds, dm слоя.

Iceberg по своей сути является только абстракцией, предлагая удобный интерфейс для взаимодействия с данными. Он не является серебряной пулей, он имеет свои ограничения и тонкости.

На что я бы обратил внимание при работе с Iceberg:

  • Качественный DDL ваших таблиц: правильная модель данных, правильное партиционирование, название таблицы и нужная схема (namespace).

  • Следить за изменениями таблицы: INSERT, UPDATE, DELETE, DROP. В частности над DROP, потому что при удалении таблицы вы удаляете ссылку на файл из мета-стора, а физически файлы хранятся в вашем S3. Если для вас вопрос storage важен, то это стоит контролировать.

  • Вопрос мета-данных на уровне дата-каталога, а не на уровне DDL. Важно за данными следить, отслеживать источники, pipeline загрузки и про данные не забывать. Стоит продумать варианты того, как информировать пользователей о наличии тех или иных данных.

  • Бэкап meta-store, чтобы не потерять ссылки на текущие «таблицы«.

Если резюмировать, то Iceberg является хорошим продуктом, который позволит вам разделить compute и storage при работе с данными. Его стоит попробовать, если в вашей инфраструктуре есть Apache Spark или есть опыт работы с Spark, чтобы его правильно «сварить» с Apache Iceberg.


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.


ссылка на оригинал статьи https://habr.com/ru/articles/850674/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *