Автор работал в различных дата-инженерных проектах и иногда проекты представляют собой набор модулей без логики и без общего подхода. Поэтому цель статьи — разработать этот общий подход и заодно поупражняться вместе с читателем в его создании.
Подводящие идеи
В Enterprise мы работаем с трансформациями данных, поэтому:
-
Краеугольный камень — это бизнес-процесс, некий Flow, который преобразует одну или несколько таблиц.
-
Таблица — это вспомогательная сущность, обслуживающая бизнес. Попытка сделать таблицы основными сущностями приведет к размыванию бизнес-логики.
-
Потоки и таблицы удобно реализовывать как классы.
-
Так как бизнес-логика может быть сложной, её разумно разбить на шаги (steps) — функции.
Важно: за классом таблицы может стоять что угодно — например, Spark DataFrame. В этой статье будем ориентироваться именно на Spark.
Основные идеи подхода
-
Создать базовый класс Flow, в котором будет реализована логика фреймворка
-
Описывать конкретные пайплайны через наследование:
class MyFlow(Flow): ... -
Разбивать процесс на шаги — методы класса, которые можно переиспользовать
-
Делать описание шагов через декораторы
Подход — Class-based pipeline orchestration на декораторах
Рассмотрим пример описания. У нас есть поток MyFlow, унаследованный от Flow, который мы опишем далее, пока это не важно. И есть одна функция, для которой мы делаем описание, что это: шаг потока, что она принимает на вход таблицу MyTable, и выходом является таблица MyTable2:
class MyFlow(Flow): @classmethod @Flow.step() @Flow.input([MyTable]) @Flow.output(MyTable2) def step_one(cls, context: Context) -> DataFrame: print(f" Step 1: создаём/обновляем MyTable,{context.id}")
Итого: у нас есть один шаг step_one. Что делают декораторы:
-
@Flow.step()— помечает метод как шаг pipeline -
@Flow.input([MyTable])— описывает входные данные -
@Flow.output(MyTable2)— описывает результат
Важно: сами декораторы не выполняют логику, они только добавляют мета-информацию.
Context — контейнер состояния
Наша функция-шаг имеет на входе некий context:Context и возвращаемое значение DataFrame — это стандартный тип спарка для табличек. Что такое Context?
Тут надо решить — как мы будем работать с аргументами. Лично мне нравится некий обобщенный вид, когда не надо писать длинные списки параметров. Это имеет свои плюсы и минусы, решать вам. Но для фреймворка это имеет смысл.
Возможная реализация:
from pydantic import BaseModelclass Context(BaseModel): # допустим глобальный конфиг config: Dict[Type[Config], Config] = {} # текущие таблички, с которыми работаем data: Dict[Union[str, Type[Table]], DataFrame] = {} # какие-то другие переменные diff: Dict[Any, Any] = {}
Плюсы такого подхода:
-
меньше аргументов в функциях
-
легко расширять
-
единая точка передачи состояния
Минусы:
-
менее явные зависимости
Таблицы
В Spark схема описывается через StructType:
from pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([ StructField("item", StringType(), nullable=False), StructField("loc", StringType(), nullable=False), StructField("qty", IntegerType(), nullable=True)])df = spark.createDataFrame([], schema=schema)
Часто для описания табличек используется SQLAlchemy, поэтому может быть хорошей идеей использовать именно его, и только конвертировать это описание. Будем использовать декларативную нотацию версий 1.x,
from sqlalchemy.ext.declarative import declarative_baseBase = declarative_base()class Table(Base): __abstract__ = True # чтобы не создавать таблицу для этого классаclass MyTable(Table): __tablename__ = "my_table" id = Column(Integer, primary_key=True, nullable=True) name = Column(String(50), nullable=True) age = Column(Integer, nullable=True)
Объявляем табличку с 3 колонками, первая — primary key
Теперь надо определить класс Table, чтобы получать описание в spark-стиле, для этого нам нужен конвертер
def _convert_type(sqlalchemy_type) -> T.DataType: # Упрощённая карта типов if isinstance(sqlalchemy_type, Integer): return T.IntegerType() if isinstance(sqlalchemy_type, String): return T.StringType() if isinstance(sqlalchemy_type, Float): return T.FloatType() if isinstance(sqlalchemy_type, Boolean): return T.BooleanType() if isinstance(sqlalchemy_type, Date): return T.DateType() if isinstance(sqlalchemy_type, DateTime): return T.TimestampType() if isinstance(sqlalchemy_type, Numeric): return T.DoubleType() raise ValueError(f"Unsupported type: {sqlalchemy_type}")class Table(Base): __abstract__ = True # чтобы не создавать таблицу для этого класса @classmethod def get_schema(cls) -> T.StructType: fields = [] for column in cls.__table__.columns: spark_type = _convert_type(column.type) fields.append( T.StructField(column.name, spark_type, column.nullable) ) return T.StructType(fields)
можно использовать код вида MyTable.get_schema(), чтобы достать описание таблицы в спарк-стиле. Естественно можем использовать полиморфизм и делать свои кастомные таблички
Можно полностью переопределить метод:
class MyTable(Table): __tablename__ = "my_table" id = Column(Integer) name = Column(String) @classmethod def get_schema(cls) -> T.StructType: return T.StructType([ T.StructField("id", T.LongType(), False), T.StructField("name", T.StringType(), True), T.StructField("extra_col", T.StringType(), True), ])
здесь мы напрямую возвращаем StructType. А можно использовать дефолтную логику и добавлять/менять поля поверх неё.
class MyExtendedTable(Table): __tablename__ = "ext" id = Column(Integer) name = Column(String) @classmethod def get_schema(cls): # возьмем SQLAlchemy-версию schema = super().get_schema() # добавим вычисляемую колонку fields = schema.fields + [ T.StructField("sys_load_ts", T.TimestampType(), False) ] return T.StructType(fields)
Крупноблочно мы описали, теперь займемся его реализацией. Основа фреймворка — декораторы, давайте немного вспомним теорию. Конструкция:
@classmethod@Flow.step(order=1)def step_one(cls, context): pass
Разворачивается так:
-
Flow.step(order=1)→ возвращает декоратор -
декоратор применяется к функции
-
результат оборачивается в
classmethod
Важно: порядок применения — снизу вверх
Основные идеи:
-
Сами декораторы только навешивают некую внутреннюю мета-информацию _step_meta на функцию, в которой будем определять порядок, таблицы и т.д.
-
Сборка пайплайна в специальном методе __init_subclass__, запускается в последний момент, после всех декораторов.
-
Ключевая идея — собрать шаги автоматически при создании класса.
Напомню, что __init_subclass__ это специальный метод введенный в Python 3.6 (PEP 487). Это позволяет базовому классу настраивать инициализацию своего класса.
from functools import wrapsclass Flow: def __init_subclass__(cls): # 1) переопределяем переменные для каждого подкласса, важно! cls.steps = [] cls.step_meta = {} # 2) обходим все атрибуты класса, ищем наши шаги for name, attr in cls.__dict__.items(): func = None if isinstance(attr, classmethod): func = attr.__func__ elif callable(attr): func = attr # 3) добавляем мета-инфорцацию if func and hasattr(func, "_step_meta"): cls.steps.append(name) cls.step_meta[name] = func._step_meta # 4) и напоследок сортируем по order cls.steps.sort(key=lambda n: cls.step_meta[n]["order"])
Плюс:
-
каждый Flow получает свой список шагов
-
нет смешивания состояния между классами
Определим наконец наш декоратор, который превращает функцию в шаг. Основная наша задача — добавить мета-информацию на функцию
@classmethoddef step(cls, order): def decorator(func): func._step_meta = getattr(func, "_step_meta", {}) func._step_meta["order"] = order return func return decorator
И запуск реализуем так — проходимся по всем шагам, получаем функцию по имени шага, и запускаем ее:
@classmethoddef run(cls, context): print(f"\n=== RUN FLOW {cls.__name__} ===") for step in cls.steps: print("META:", step, cls.step_meta[step]) # получаем функцию и вызываем ее с context getattr(cls, step)(context)
Файл конфигурации
Используем pydantic. Для работы нам нужно наследоваться от класса , который реализует всю магию.
from pydantic import BaseModelclass Config(BaseModel): param1: str = "A"
Плюсы:
-
валидация типов
-
автодополнение в IDE
-
удобная сериализация
Удобно использовать yaml-формат для параметром, рекомендую использовать его, функции для сохранения и загрузки в yaml конфигурационного файла:
def save_config(config: Config, filepath: str): yaml.dump(config.dict(), open(filepath, 'w'))def load_config(filepath: str) -> Config: return Config(**yaml.safe_load(open(filepath)))
SQL как основной инструмент трансформаций
Идея:
-
сначала SQL
-
если не хватает — DataFrame API
Плюсы:
-
быстрее писать
-
понятнее аналитикам
Для спарка, как мы знаем, есть возможность выполнять непосредственно SQL
df = spark.range(10)df.createOrReplaceTempView("my_view")df = spark.sql("select id, 1 as asd, 2 as ewq from my_view")df.show()
Основная идея — отдельный шаг, в котором либо автоматически, либо мы сами вызовем некоторую функцию, которая загрузит параметризованный SQL, подставит параметры и выполнит.
class MyFlow(Flow): @classmethod @Flow.step(order=3) @Flow.input([MyTable]) @Flow.output(AnotherTable) # Либо декоратор для исполнения файла @Flow.sql("step_three.sql") def step_three(cls, context: Context): context.data[MyTable].show() # либо сами вызываем df = cls.execute_sql(context, "step_three.sql",vars={"id":1}) df.show()
И сам файл для выполнения step_three.sql:
select * from my_tablewhere id={{id}}
Первое, что нам нужно — это декоратор, в котором мы только передаем мета-информацию в функцию
@classmethoddef sql(cls, file: str, vars: Dict[str, Any] = None): """ Вешается на шаг: @Flow.sql("step1.sql", vars={"x":1}) """ def decorator(func): func._step_meta = getattr(func, "_step_meta", {}) func._step_meta["sql"] = { "file": file, "vars": vars } return func return decorator
Понадобится вспомогательная функция создания временных view:
def camel_to_snake(name: ...) -> str: """ Приведения названия таблицы в нужный вид. Пример: Abc -> abc """ @classmethoddef create_temp_views(cls, context: Context, tables: List[Any]): """ Создаёт временные spark view для SQL """ for table_class in tables: df = context.data.get(table_class, None) table_name = table_class if isinstance(table_class, str) else camel_to_snake(table_class.__name__) if df is not None: df.createOrReplaceTempView(table_name)
Функция исполнения sql-кода из файла, используем подход jinja2 для подстановки переменных
from jinja2 import Template@classmethoddef execute_sql(cls, context: Context, *, file: str, vars: Dict[str, Any]): # читаем файл sql_query = cls.read_sql(file) # Подставляем переменные if vars: sql_query = Template(sql_query).render(vars) result_df = context.spark.sql(sql_query) return result_df
И блок исполнения теперь выглядит так:
@classmethod def run(cls, context:Context): print(f"\n=== RUN FLOW {cls.__name__} ===") for i, step in enumerate(cls.steps): print("META:", step, cls.step_meta[step], i) cls.current_step = i meta = cls.step_meta[step] # ---------- SQL ---------- if "sql" in meta: sql_meta = meta["sql"] cls.create_temp_views(context, meta["input"]) result = cls.execute_sql( context, file=sql_meta["file"], vars=sql_meta.get("vars") ) result.show() # сохраняем результат result_table = meta["output"][0] context.data[result_table] = result # Выполняем шаг getattr(cls, step)(context)
Итог
Мы получили простой декларативный фреймворк:
-
Flow = бизнес-процесс
-
Step = шаг трансформации
-
Context = состояние
-
Таблицы = декларативные модели
При этом:
-
структура единообразная
-
легко расширять
Что дальше
В статье разобраны базовые идеи. Возможные улучшения:
-
dependency graph вместо order
-
валидация входов/выходов
-
retry и error handling
-
execution backend (Spark / SQL / Pandas)
-
реализация через дескрипторы вместо
__init_subclass__
Немного про dependency graph
Текущая реализация использует явный порядок выполнения шагов (order). Однако в более зрелых системах orchestration используется dependency graph (DAG), где шаги определяются через зависимости друг от друга. Это позволяет:
-
автоматически вычислять порядок выполнения
-
запускать независимые шаги параллельно
-
упростить поддержку пайплайна
Можно будет делать так:
@Flow.step(depends_on=["step_one", "step_two"])def step_three(cls, context): ...
ссылка на оригинал статьи https://habr.com/ru/articles/1025014/