Автор работал в различных дата-инженерных проектах и иногда проекты представляют собой набор модулей без логики и без общего подхода. Поэтому цель статьи - разработать этот общий подход и заодно поупражняться вместе с читателем в его создании.
Подводящие идеи
В Enterprise мы работаем с трансформациями данных, поэтому:
-
Краеугольный камень - это бизнес-процесс, некий Flow, который преобразует одну или несколько таблиц.
-
Таблица - это вспомогательная сущность, обслуживающая бизнес. Попытка сделать таблицы основными сущностями приведет к размыванию бизнес-логики.
-
Потоки и таблицы удобно реализовывать как классы.
-
Так как бизнес-логика может быть сложной, её разумно разбить на шаги (steps) — функции.
Важно: за классом таблицы может стоять что угодно — например, Spark DataFrame. В этой статье будем ориентироваться именно на Spark.
Основные идеи подхода
-
Создать базовый класс Flow, в котором будет реализована логика фреймворка
-
Описывать конкретные пайплайны через наследование:
class MyFlow(Flow): ... -
Разбивать процесс на шаги — методы класса, которые можно переиспользовать
-
Делать описание шагов через декораторы
Подход - Class-based pipeline orchestration на декораторах
Рассмотрим пример описания. У нас есть поток MyFlow, унаследованный от Flow, который мы опишем далее, пока это не важно. И есть одна функция, для которой мы делаем описание, что это: шаг потока, что она принимает на вход таблицу MyTable, и выходом является таблица MyTable2:
class MyFlow(Flow):
@classmethod
@Flow.step()
@Flow.input([MyTable])
@Flow.output(MyTable2)
def step_one(cls, context: Context) -> DataFrame:
print(f" Step 1: создаём/обновляем MyTable,{context.id}")
Итого: у нас есть один шаг step_one. Что делают декораторы:
-
@Flow.step()— помечает метод как шаг pipeline -
@Flow.input([MyTable])— описывает входные данные -
@Flow.output(MyTable2)— описывает результат
Важно: сами декораторы не выполняют логику, они только добавляют мета-информацию.
Context — контейнер состояния
Наша функция-шаг имеет на входе некий context:Context и возвращаемое значение DataFrame - это стандартный тип спарка для табличек. Что такое Context?
Тут надо решить - как мы будем работать с аргументами. Лично мне нравится некий обобщенный вид, когда не надо писать длинные списки параметров. Это имеет свои плюсы и минусы, решать вам. Но для фреймворка это имеет смысл.
Возможная реализация:
from pydantic import BaseModel
class Context(BaseModel):
# допустим глобальный конфиг
config: Dict[Type[Config], Config] = {}
# текущие таблички, с которыми работаем
data: Dict[Union[str, Type[Table]], DataFrame] = {}
# какие-то другие переменные
diff: Dict[Any, Any] = {}
Плюсы такого подхода:
-
меньше аргументов в функциях
-
легко расширять
-
единая точка передачи состояния
Минусы:
-
менее явные зависимости
Таблицы
В Spark схема описывается через StructType:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("item", StringType(), nullable=False),
StructField("loc", StringType(), nullable=False),
StructField("qty", IntegerType(), nullable=True)
])
df = spark.createDataFrame([], schema=schema)
Часто для описания табличек используется SQLAlchemy, поэтому может быть хорошей идеей использовать именно его, и только конвертировать это описание. Будем использовать декларативную нотацию версий 1.x,
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Table(Base):
__abstract__ = True # чтобы не создавать таблицу для этого класса
class MyTable(Table):
__tablename__ = "my_table"
id = Column(Integer, primary_key=True, nullable=True)
name = Column(String(50), nullable=True)
age = Column(Integer, nullable=True)
Объявляем табличку с 3 колонками, первая - primary key
Теперь надо определить класс Table, чтобы получать описание в spark-стиле, для этого нам нужен конвертер
def _convert_type(sqlalchemy_type) -> T.DataType:
# Упрощённая карта типов
if isinstance(sqlalchemy_type, Integer):
return T.IntegerType()
if isinstance(sqlalchemy_type, String):
return T.StringType()
if isinstance(sqlalchemy_type, Float):
return T.FloatType()
if isinstance(sqlalchemy_type, Boolean):
return T.BooleanType()
if isinstance(sqlalchemy_type, Date):
return T.DateType()
if isinstance(sqlalchemy_type, DateTime):
return T.TimestampType()
if isinstance(sqlalchemy_type, Numeric):
return T.DoubleType()
raise ValueError(f"Unsupported type: {sqlalchemy_type}")
class Table(Base):
__abstract__ = True # чтобы не создавать таблицу для этого класса
@classmethod
def get_schema(cls) -> T.StructType:
fields = []
for column in cls.__table__.columns:
spark_type = _convert_type(column.type)
fields.append(
T.StructField(column.name, spark_type, column.nullable)
)
return T.StructType(fields)
можно использовать код вида MyTable.get_schema(), чтобы достать описание таблицы в спарк-стиле. Естественно можем использовать полиморфизм и делать свои кастомные таблички
Можно полностью переопределить метод:
class MyTable(Table):
__tablename__ = "my_table"
id = Column(Integer)
name = Column(String)
@classmethod
def get_schema(cls) -> T.StructType:
return T.StructType([
T.StructField("id", T.LongType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("extra_col", T.StringType(), True),
])
здесь мы напрямую возвращаем StructType. А можно использовать дефолтную логику и добавлять/менять поля поверх неё.
class MyExtendedTable(Table):
__tablename__ = "ext"
id = Column(Integer)
name = Column(String)
@classmethod
def get_schema(cls):
# возьмем SQLAlchemy-версию
schema = super().get_schema()
# добавим вычисляемую колонку
fields = schema.fields + [
T.StructField("sys_load_ts", T.TimestampType(), False)
]
return T.StructType(fields)
Крупноблочно мы описали, теперь займемся его реализацией. Основа фреймворка - декораторы, давайте немного вспомним теорию. Конструкция:
@classmethod
@Flow.step(order=1)
def step_one(cls, context):
pass
Разворачивается так:
-
Flow.step(order=1)→ возвращает декоратор -
декоратор применяется к функции
-
результат оборачивается в
classmethod
Важно: порядок применения — снизу вверх
Основные идеи:
-
Сами декораторы только навешивают некую внутреннюю мета-информацию _step_meta на функцию, в которой будем определять порядок, таблицы и т.д.
-
Сборка пайплайна в специальном методе __init_subclass__, запускается в последний момент, после всех декораторов.
-
Ключевая идея — собрать шаги автоматически при создании класса.
Напомню, что __init_subclass__ это специальный метод введенный в Python 3.6 (PEP 487). Это позволяет базовому классу настраивать инициализацию своего класса.
from functools import wraps
class Flow:
def __init_subclass__(cls):
# 1) переопределяем переменные для каждого подкласса, важно!
cls.steps = []
cls.step_meta = {}
# 2) обходим все атрибуты класса, ищем наши шаги
for name, attr in cls.__dict__.items():
func = None
if isinstance(attr, classmethod):
func = attr.__func__
elif callable(attr):
func = attr
# 3) добавляем мета-инфорцацию
if func and hasattr(func, "_step_meta"):
cls.steps.append(name)
cls.step_meta[name] = func._step_meta
# 4) и напоследок сортируем по order
cls.steps.sort(key=lambda n: cls.step_meta[n]["order"])
Плюс:
-
каждый Flow получает свой список шагов
-
нет смешивания состояния между классами
Определим наконец наш декоратор, который превращает функцию в шаг. Основная наша задача - добавить мета-информацию на функцию
@classmethod
def step(cls, order):
def decorator(func):
func._step_meta = getattr(func, "_step_meta", {})
func._step_meta["order"] = order
return func
return decorator
И запуск реализуем так - проходимся по всем шагам, получаем функцию по имени шага, и запускаем ее:
@classmethod
def run(cls, context):
print(f"n=== RUN FLOW {cls.__name__} ===")
for step in cls.steps:
print("META:", step, cls.step_meta[step])
# получаем функцию и вызываем ее с context
getattr(cls, step)(context)
Файл конфигурации
Используем pydantic. Для работы нам нужно наследоваться от класса , который реализует всю магию.
from pydantic import BaseModel
class Config(BaseModel):
param1: str = "A"
Плюсы:
-
валидация типов
-
автодополнение в IDE
-
удобная сериализация
Удобно использовать yaml-формат для параметром, рекомендую использовать его, функции для сохранения и загрузки в yaml конфигурационного файла:
def save_config(config: Config, filepath: str):
yaml.dump(config.dict(), open(filepath, 'w'))
def load_config(filepath: str) -> Config:
return Config(**yaml.safe_load(open(filepath)))
SQL как основной инструмент трансформаций
Идея:
-
сначала SQL
-
если не хватает — DataFrame API
Плюсы:
-
быстрее писать
-
понятнее аналитикам
Для спарка, как мы знаем, есть возможность выполнять непосредственно SQL
df = spark.range(10)
df.createOrReplaceTempView("my_view")
df = spark.sql("select id, 1 as asd, 2 as ewq from my_view")
df.show()
Основная идея - отдельный шаг, в котором либо автоматически, либо мы сами вызовем некоторую функцию, которая загрузит параметризованный SQL, подставит параметры и выполнит.
class MyFlow(Flow):
@classmethod
@Flow.step(order=3)
@Flow.input([MyTable])
@Flow.output(AnotherTable)
# Либо декоратор для исполнения файла
@Flow.sql("step_three.sql")
def step_three(cls, context: Context):
context.data[MyTable].show()
# либо сами вызываем
df = cls.execute_sql(context, "step_three.sql",vars={"id":1})
df.show()
И сам файл для выполнения step_three.sql:
select * from my_table
where id={{id}}
Первое, что нам нужно - это декоратор, в котором мы только передаем мета-информацию в функцию
@classmethod
def sql(cls, file: str, vars: Dict[str, Any] = None):
"""
Вешается на шаг:
@Flow.sql("step1.sql", vars={"x":1})
"""
def decorator(func):
func._step_meta = getattr(func, "_step_meta", {})
func._step_meta["sql"] = {
"file": file,
"vars": vars
}
return func
return decorator
Понадобится вспомогательная функция создания временных view:
def camel_to_snake(name: ...) -> str:
"""
Приведения названия таблицы в нужный вид.
Пример:
Abc -> abc
"""
@classmethod
def create_temp_views(cls, context: Context, tables: List[Any]):
"""
Создаёт временные spark view для SQL
"""
for table_class in tables:
df = context.data.get(table_class, None)
table_name = table_class if isinstance(table_class, str) else camel_to_snake(table_class.__name__)
if df is not None:
df.createOrReplaceTempView(table_name)
Функция исполнения sql-кода из файла, используем подход jinja2 для подстановки переменных
from jinja2 import Template
@classmethod
def execute_sql(cls, context: Context, *, file: str, vars: Dict[str, Any]):
# читаем файл
sql_query = cls.read_sql(file)
# Подставляем переменные
if vars:
sql_query = Template(sql_query).render(vars)
result_df = context.spark.sql(sql_query)
return result_df
И блок исполнения теперь выглядит так:
@classmethod
def run(cls, context:Context):
print(f"n=== RUN FLOW {cls.__name__} ===")
for i, step in enumerate(cls.steps):
print("META:", step, cls.step_meta[step], i)
cls.current_step = i
meta = cls.step_meta[step]
# ---------- SQL ----------
if "sql" in meta:
sql_meta = meta["sql"]
cls.create_temp_views(context, meta["input"])
result = cls.execute_sql(
context,
file=sql_meta["file"],
vars=sql_meta.get("vars")
)
result.show()
# сохраняем результат
result_table = meta["output"][0]
context.data[result_table] = result
# Выполняем шаг
getattr(cls, step)(context)
Итог
Мы получили простой декларативный фреймворк:
-
Flow = бизнес-процесс
-
Step = шаг трансформации
-
Context = состояние
-
Таблицы = декларативные модели
При этом:
-
структура единообразная
-
легко расширять
Что дальше
В статье разобраны базовые идеи. Возможные улучшения:
-
dependency graph вместо order
-
валидация входов/выходов
-
retry и error handling
-
execution backend (Spark / SQL / Pandas)
-
реализация через дескрипторы вместо
__init_subclass__
Немного про dependency graph
Текущая реализация использует явный порядок выполнения шагов (order). Однако в более зрелых системах orchestration используется dependency graph (DAG), где шаги определяются через зависимости друг от друга. Это позволяет:
-
автоматически вычислять порядок выполнения
-
запускать независимые шаги параллельно
-
упростить поддержку пайплайна
Можно будет делать так:
@Flow.step(depends_on=["step_one", "step_two"])
def step_three(cls, context):
...
Автор: eager_igor
