Декларативный Data Pipeline

от автора

Автор работал в различных дата-инженерных проектах и иногда проекты представляют собой набор модулей без логики и без общего подхода. Поэтому цель статьи — разработать этот общий подход и заодно поупражняться вместе с читателем в его создании.

Подводящие идеи

В Enterprise мы работаем с трансформациями данных, поэтому: 

  1. Краеугольный камень — это бизнес-процесс, некий Flow, который преобразует одну или несколько таблиц. 

  2. Таблица — это вспомогательная сущность, обслуживающая бизнес. Попытка сделать таблицы основными сущностями приведет к размыванию бизнес-логики.

  3. Потоки и таблицы удобно реализовывать как классы.

  4. Так как бизнес-логика может быть сложной, её разумно разбить на шаги (steps) — функции.

Важно: за классом таблицы может стоять что угодно — например, Spark DataFrame. В этой статье будем ориентироваться именно на Spark.

Основные идеи подхода

  1. Создать базовый класс Flow, в котором будет реализована логика фреймворка

  2. Описывать конкретные пайплайны через наследование:

    class MyFlow(Flow):      ...
  3. Разбивать процесс на шаги — методы класса, которые можно переиспользовать

  4. Делать описание шагов через декораторы

Подход — Class-based pipeline orchestration на декораторах

Рассмотрим пример описания. У нас есть поток MyFlow, унаследованный от Flow, который мы опишем далее, пока это не важно. И есть одна функция, для которой мы делаем описание, что это: шаг потока, что она принимает на вход таблицу MyTable, и выходом является таблица MyTable2:

class MyFlow(Flow):    @classmethod    @Flow.step()    @Flow.input([MyTable])    @Flow.output(MyTable2)    def step_one(cls, context: Context) -> DataFrame:        print(f" Step 1: создаём/обновляем MyTable,{context.id}")

Итого: у нас есть один шаг step_one. Что делают декораторы:

  • @Flow.step() — помечает метод как шаг pipeline

  • @Flow.input([MyTable]) — описывает входные данные

  • @Flow.output(MyTable2) — описывает результат

Важно: сами декораторы не выполняют логику, они только добавляют мета-информацию.

Context — контейнер состояния

Наша функция-шаг имеет на входе  некий  context:Context и возвращаемое значение DataFrame — это стандартный тип спарка для табличек. Что такое Context?

Тут надо решить — как мы будем работать с аргументами. Лично мне нравится некий обобщенный вид, когда не надо писать длинные списки параметров. Это имеет свои плюсы и минусы, решать вам. Но для фреймворка это имеет смысл.

Возможная реализация:

from pydantic import BaseModelclass Context(BaseModel):    # допустим глобальный конфиг    config: Dict[Type[Config], Config] = {}    # текущие таблички, с которыми работаем    data: Dict[Union[str, Type[Table]], DataFrame] = {}    # какие-то другие переменные    diff: Dict[Any, Any] = {}

Плюсы такого подхода:

  • меньше аргументов в функциях

  • легко расширять

  • единая точка передачи состояния

Минусы:

  • менее явные зависимости

Таблицы

В Spark схема описывается через StructType:

from pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([    StructField("item", StringType(), nullable=False),    StructField("loc", StringType(), nullable=False),    StructField("qty", IntegerType(), nullable=True)])df = spark.createDataFrame([], schema=schema)

Часто для описания табличек используется SQLAlchemy, поэтому может быть хорошей идеей использовать именно его, и только конвертировать это описание. Будем использовать декларативную нотацию версий 1.x, 

from sqlalchemy.ext.declarative import declarative_baseBase = declarative_base()class Table(Base):    __abstract__  = True  # чтобы не создавать таблицу для этого классаclass MyTable(Table):    __tablename__  = "my_table"    id = Column(Integer, primary_key=True, nullable=True)    name = Column(String(50), nullable=True)    age = Column(Integer, nullable=True)

Объявляем табличку с 3 колонками, первая — primary key

Теперь надо определить класс Table, чтобы получать описание в spark-стиле, для этого нам нужен конвертер

def _convert_type(sqlalchemy_type) -> T.DataType:    # Упрощённая карта типов       if isinstance(sqlalchemy_type, Integer):        return T.IntegerType()    if isinstance(sqlalchemy_type, String):        return T.StringType()    if isinstance(sqlalchemy_type, Float):        return T.FloatType()    if isinstance(sqlalchemy_type, Boolean):        return T.BooleanType()    if isinstance(sqlalchemy_type, Date):        return T.DateType()    if isinstance(sqlalchemy_type, DateTime):        return T.TimestampType()    if isinstance(sqlalchemy_type, Numeric):        return T.DoubleType()    raise ValueError(f"Unsupported type: {sqlalchemy_type}")class Table(Base):    __abstract__ = True  # чтобы не создавать таблицу для этого класса    @classmethod    def get_schema(cls) -> T.StructType:        fields = []        for column in cls.__table__.columns:            spark_type = _convert_type(column.type)            fields.append(                T.StructField(column.name, spark_type, column.nullable)            )        return T.StructType(fields)

можно использовать код вида MyTable.get_schema(), чтобы достать описание таблицы в спарк-стиле. Естественно можем использовать полиморфизм и делать свои кастомные таблички

Можно полностью переопределить метод:

class MyTable(Table):    __tablename__ = "my_table"    id = Column(Integer)    name = Column(String)    @classmethod    def get_schema(cls) -> T.StructType:        return T.StructType([            T.StructField("id", T.LongType(), False),            T.StructField("name", T.StringType(), True),            T.StructField("extra_col", T.StringType(), True),        ])

 здесь мы напрямую возвращаем StructType. А можно использовать дефолтную логику и добавлять/менять поля поверх неё.

class MyExtendedTable(Table):    __tablename__ = "ext"    id = Column(Integer)    name = Column(String)    @classmethod    def get_schema(cls):              # возьмем SQLAlchemy-версию        schema = super().get_schema()           # добавим вычисляемую колонку        fields = schema.fields + [            T.StructField("sys_load_ts", T.TimestampType(), False)        ]        return T.StructType(fields)

Крупноблочно мы описали, теперь займемся его реализацией. Основа фреймворка — декораторы, давайте немного вспомним теорию. Конструкция:

@classmethod@Flow.step(order=1)def step_one(cls, context):    pass

Разворачивается так:

  1. Flow.step(order=1) → возвращает декоратор

  2. декоратор применяется к функции

  3. результат оборачивается в classmethod

Важно: порядок применения — снизу вверх

Основные идеи:

  1. Сами декораторы только навешивают некую внутреннюю мета-информацию _step_meta на функцию, в которой будем определять порядок, таблицы и т.д.

  2. Сборка пайплайна в специальном методе __init_subclass__, запускается в последний момент, после всех декораторов.

  3. Ключевая идея — собрать шаги автоматически при создании класса.

Напомню, что __init_subclass__ это специальный метод введенный в Python 3.6 (PEP 487). Это позволяет базовому классу настраивать инициализацию своего класса.

from functools import wrapsclass Flow:    def __init_subclass__(cls):        # 1) переопределяем переменные для каждого подкласса, важно!        cls.steps = []        cls.step_meta = {}        # 2)  обходим все атрибуты класса, ищем наши шаги        for name, attr in cls.__dict__.items():            func = None            if isinstance(attr, classmethod):                func = attr.__func__            elif callable(attr):                func = attr            # 3) добавляем мета-инфорцацию            if func and hasattr(func, "_step_meta"):                cls.steps.append(name)                cls.step_meta[name] = func._step_meta        # 4) и напоследок сортируем по order        cls.steps.sort(key=lambda n: cls.step_meta[n]["order"])

Плюс:

  • каждый Flow получает свой список шагов

  • нет смешивания состояния между классами

Определим наконец наш декоратор, который превращает функцию в шаг. Основная наша задача — добавить мета-информацию на функцию

@classmethoddef step(cls, order):    def decorator(func):        func._step_meta = getattr(func, "_step_meta", {})        func._step_meta["order"] = order        return func    return decorator

И запуск реализуем так — проходимся по всем шагам, получаем функцию по имени шага, и запускаем ее:

@classmethoddef run(cls, context):    print(f"\n=== RUN FLOW {cls.__name__} ===")    for step in cls.steps:        print("META:", step, cls.step_meta[step])        # получаем функцию и вызываем ее с context        getattr(cls, step)(context)

Файл конфигурации

Используем pydantic. Для работы нам нужно наследоваться от класса , который реализует всю магию.

from pydantic import BaseModelclass Config(BaseModel):    param1: str = "A"

Плюсы:

  • валидация типов

  • автодополнение в IDE

  • удобная сериализация

Удобно использовать yaml-формат для параметром, рекомендую использовать его, функции для сохранения и загрузки в yaml конфигурационного файла: 

def save_config(config: Config, filepath: str):    yaml.dump(config.dict(), open(filepath, 'w'))def load_config(filepath: str) -> Config:    return Config(**yaml.safe_load(open(filepath)))

SQL как основной инструмент трансформаций

Идея:

  • сначала SQL

  • если не хватает — DataFrame API

Плюсы:

  • быстрее писать

  • понятнее аналитикам

Для спарка, как мы знаем, есть возможность выполнять непосредственно SQL

df = spark.range(10)df.createOrReplaceTempView("my_view")df = spark.sql("select id, 1 as asd, 2 as ewq from my_view")df.show()

Основная идея — отдельный шаг, в котором либо автоматически, либо мы сами вызовем некоторую функцию, которая загрузит параметризованный SQL, подставит параметры и выполнит.

class MyFlow(Flow):    @classmethod    @Flow.step(order=3)    @Flow.input([MyTable])    @Flow.output(AnotherTable)    # Либо декоратор для исполнения файла    @Flow.sql("step_three.sql")    def step_three(cls, context: Context):        context.data[MyTable].show()        # либо сами вызываем         df = cls.execute_sql(context, "step_three.sql",vars={"id":1})        df.show()

И сам файл для выполнения step_three.sql:

select * from my_tablewhere id={{id}}

Первое, что нам нужно — это декоратор, в котором мы только передаем мета-информацию в функцию

@classmethoddef sql(cls, file: str, vars: Dict[str, Any] = None):    """    Вешается на шаг:    @Flow.sql("step1.sql", vars={"x":1})    """    def decorator(func):        func._step_meta = getattr(func, "_step_meta", {})        func._step_meta["sql"] = {            "file": file,            "vars": vars        }        return func    return decorator

Понадобится вспомогательная функция создания временных view:

def camel_to_snake(name: ...) -> str:    """    Приведения названия таблицы в нужный вид.        Пример:        Abc -> abc    """    @classmethoddef create_temp_views(cls, context: Context, tables: List[Any]):    """    Создаёт временные spark view  для SQL    """    for table_class in tables:        df = context.data.get(table_class, None)        table_name = table_class if isinstance(table_class, str) else camel_to_snake(table_class.__name__)        if df is not None:                df.createOrReplaceTempView(table_name)

Функция исполнения sql-кода из файла, используем подход jinja2 для подстановки переменных

from jinja2 import Template@classmethoddef execute_sql(cls, context: Context, *, file: str, vars: Dict[str, Any]):    # читаем файл     sql_query = cls.read_sql(file)    # Подставляем переменные    if vars:        sql_query = Template(sql_query).render(vars)    result_df = context.spark.sql(sql_query)    return result_df

И блок исполнения теперь выглядит так:

@classmethod    def run(cls, context:Context):        print(f"\n=== RUN FLOW {cls.__name__} ===")        for i, step in enumerate(cls.steps):            print("META:", step, cls.step_meta[step], i)            cls.current_step = i                       meta = cls.step_meta[step]            # ---------- SQL ----------            if "sql" in meta:                sql_meta = meta["sql"]                 cls.create_temp_views(context, meta["input"])                        result = cls.execute_sql(                    context,                    file=sql_meta["file"],                    vars=sql_meta.get("vars")                )                result.show()                # сохраняем результат                result_table =  meta["output"][0]                context.data[result_table] = result            # Выполняем шаг            getattr(cls, step)(context)

Итог

Мы получили простой декларативный фреймворк:

  • Flow = бизнес-процесс

  • Step = шаг трансформации

  • Context = состояние

  • Таблицы = декларативные модели

При этом:

  • структура единообразная

  • легко расширять

Что дальше

В статье разобраны базовые идеи. Возможные улучшения:

  • dependency graph вместо order

  • валидация входов/выходов

  • retry и error handling

  • execution backend (Spark / SQL / Pandas)

  • реализация через дескрипторы вместо __init_subclass__

Немного про dependency graph

Текущая реализация использует явный порядок выполнения шагов (order). Однако в более зрелых системах orchestration используется dependency graph (DAG), где шаги определяются через зависимости друг от друга. Это позволяет:

  • автоматически вычислять порядок выполнения

  • запускать независимые шаги параллельно

  • упростить поддержку пайплайна

Можно будет делать так:

@Flow.step(depends_on=["step_one", "step_two"])def step_three(cls, context):    ...

ссылка на оригинал статьи https://habr.com/ru/articles/1025014/