Как я собрал эталонный Data Engineering проект: ClickHouse, Kafka, Spark, dbt, Airflow и Superset за одну команду

от автора

Когда я искал учебные проекты по data engineering, картина была примерно одинаковой: либо туториал на два инструмента («пишем в Kafka, читаем в Spark»), либо enterprise-схема без единой строчки кода. Мне хотелось чего-то среднего — реальный стек, реальные данные, реальные проблемы, но при этом всё поднимается одной командой make deploy без предварительной настройки.

Результат: платформа сбора и анализа криптовалютных данных, которую можно склонировать и запустить на любой машине с Docker. В этой статье расскажу об архитектуре, интересных технических решениях и граблях, которые встретились по пути.


Что получилось

Перечислю сначала, что делает система:

  • Скачивает 30 дней истории по 5 символам (BTC/ETH/SOL/BNB/XRP) в момент деплоя

  • Стримит live-данные с Binance через WebSocket в реальном времени

  • Обнаруживает аномалии: всплески объёма и аномально крупные свечи

  • Вычисляет rolling volatility и классифицирует рыночные режимы через Apache Spark

  • Собирает всё на аналитическом дашборде — тоже автоматически

И весь этот стек поднимается командой:

git clone https://github.com/andreivolokhovskii-coder/Crypto-Research-Workbench.gitcd Crypto-Research-Workbenchmake deploy

Пароли генерируются автоматически, все сервисы настраиваются сами.


Технологический стек и почему именно он

Слой

Инструмент

Зачем

Аналитическая СУБД

ClickHouse 23.8

Колоночная БД, агрегации в 10–100× быстрее PostgreSQL

Объектное хранилище

MinIO

S3-совместимое хранилище — тот же код работает в AWS

Очередь

Apache Kafka

At-least-once delivery, replay данных, развязка producer/consumer

Batch-обработка

Apache Spark 3.5

Горизонтальное масштабирование для тяжёлых вычислений

Трансформации

dbt 1.7 + ClickHouse

SQL с тестами, lineage, документацией

Оркестрация

Apache Airflow 2.8

Расписание, мониторинг, retry

BI

Apache Superset 3.0

Дашборды

Контейнеризация

Docker Compose v2

Весь стек одной командой

Почему ClickHouse, а не PostgreSQL для аналитики?

PostgreSQL — строчная СУБД. На запросе SELECT AVG(close) FROM klines GROUP BY symbol, date он читает все колонки каждой строки. ClickHouse читает только колонки close, symbol, open_time — остальные физически не трогает. На таблице в 200k строк разница незаметна. На таблице в 200 млн — принципиальна.

Почему MinIO, а не просто диск?

MinIO реализует S3 API. Код, который пишет в MinIO, без изменений запишет в AWS S3 — достаточно поменять endpoint в .env. Это принципиально для production-ready архитектуры.

Почему dbt И Spark одновременно?

Это не дублирование, а разделение ответственности:

  • dbt — SQL-трансформации в ClickHouse: ETL, агрегации, обогащение данных. Просто, тестируемо, быстро разрабатывается.

  • Spark — для задач, которые горизонтально масштабируются: rolling window по 90 дням минутных свечей по всем символам. При росте данных — добавляешь workers, время вычисления не меняется.


Медальонная архитектура: bronze, silver, gold

Это стандарт индустрии. Данные проходят три уровня очистки:

BRONZE (сырые)  →  SILVER (нормализованные)  →  GOLD (аналитические)

Bronze — принцип неизменности

Данные как пришли с биржи, так и лежат. Временные метки в миллисекундах, raw JSON-поля. Никогда не удаляем, никогда не изменяем.

CREATE TABLE bronze_klines (    ingested_at  DateTime DEFAULT now(),    exchange     LowCardinality(String),    symbol       LowCardinality(String),    open_time    Int64,        -- миллисекунды, как в API Binance    open         Float64,    ...    _source_file String        -- путь к Parquet в MinIO) ENGINE = MergeTree()PARTITION BY toYYYYMM(_partition_date)ORDER BY (exchange, symbol, interval, open_time);

Зачем хранить «сырьё»? Если трансформация оказалась неверной — перезапускаешь её с нуля. Bronze — источник правды, из которого восстанавливается весь pipeline.

Silver — идемпотентность через ReplacingMergeTree

Здесь происходит нормализация: Int64 миллисекунды → DateTime UTC, правильные типы. Ключевое решение — движок таблицы:

CREATE TABLE silver_klines (    exchange    LowCardinality(String),    symbol      LowCardinality(String),    open_time   DateTime,    -- нормализованный UTC    close       Float64,    ...) ENGINE = ReplacingMergeTree(ingested_at)ORDER BY (exchange, symbol, interval, open_time);

ReplacingMergeTree(ingested_at) — при наличии дублей по ключу (exchange, symbol, interval, open_time) движок оставляет строку с максимальным ingested_at. Это значит: пайплайн можно запустить дважды — лишних данных не появится. SELECT ... FINAL форсирует слияние дублей на лету.

Gold — dbt строит аналитику

silver_klines    └── stg_klines (view, тонкая обёртка с FINAL)            ├── fact_candles    (enriched: price_change_pct, is_bullish)            └── mart_volatility (rolling vol 7d/30d, ATR-14)

dbt строит граф зависимостей автоматически из {{ ref('stg_klines') }} и запускает модели в правильном порядке. Плюс встроенные тесты:

- name: is_bullish  tests:    - accepted_values:        values: [0, 1]  # dbt сгенерирует SQL-проверку автоматически

Стриминг: Kafka + WebSocket

Производитель

Подключаемся к Binance combined stream — один WebSocket на все символы:

uri = "wss://stream.binance.com:9443/stream?streams=" + \      "/".join(f"{s.lower()}@kline_1m" for s in SYMBOLS)

Каждое событие валидируется через Pydantic перед публикацией в Kafka. Невалидные сообщения уходят в Dead Letter Queue, а не теряются. Reconnect с exponential backoff: 1s, 2s, 4s… до 60s.

Потребитель

Ключевой паттерн — at-least-once delivery с ручным commit:

# Накапливаем батч: 50 закрытых свечей ИЛИ 10 секундwhile True:    msg = consumer.poll(timeout=1.0)    if msg and msg.value().is_closed:        batch.append(msg)        if len(batch) >= 50 or time_since_last_flush > 10:        clickhouse.insert(batch)          # пишем в ClickHouse        consumer.commit()                  # ТОЛЬКО ПОСЛЕ успешной записи        batch.clear()

Если ClickHouse недоступен — offset не коммитится, сообщения будут обработаны повторно после восстановления.

Параллельно с записью в ClickHouse — детекция аномалий в реальном времени. Для каждого символа держим rolling window из 60 свечей:

# Volume spike: z-score объёма > 2.5σz_score = (current_volume - mean_volume) / std_volumeif z_score > 2.5:    emit_signal("volume_spike", z_score)# Large candle: размах > 3× ATR-14candle_range = abs(high - low)if candle_range > 3.0 * atr_14:    emit_signal("large_candle", candle_range / atr_14)

Сигналы пишутся в rt_signals с TTL 7 дней — ClickHouse автоматически удаляет старые.


Spark: вычисление рыночных режимов

Это самая интересная часть с точки зрения data engineering. Читаем silver_klines через JDBC, агрегируем по дням, вычисляем метрики через Spark window functions:

# Rolling realized volatility — стандарт в quantitative financewindow_7d = Window.partitionBy("exchange", "symbol") \                  .orderBy("trade_date") \                  .rowsBetween(-6, 0)df = df.withColumn(    "vol_7d",    F.stddev_samp("log_return").over(window_7d) * F.sqrt(F.lit(365))    # аннуализация: умножаем на √365 для calendar days volatility)

Классификация рыночного режима:

regime = F.when(F.col("vol_7d") > F.lit(1.5) * F.col("vol_30d"), "volatile") \          .when((F.col("close") - F.col("sma_20")) / F.col("sma_20") > 0.02, "trending_up") \          .when((F.col("sma_20") - F.col("close")) / F.col("sma_20") > 0.02, "trending_down") \          .otherwise("ranging")

Результат в mart_market_regime — каждый день по каждому символу имеет метку режима.

Почему именно Spark, а не ClickHouse SQL? Потому что при росте данных Spark горизонтально масштабируется: добавляем worker-ноды в docker-compose и время вычисления не растёт.


Оркестрация: Airflow без ручной настройки

Стандартная проблема с Airflow: после деплоя нужно вручную создать Connection для Spark, заполнить Variables с паролями. Я решил это в airflow-init:

# Создаём Spark connection автоматическиairflow connections add spark_default \    --conn-type spark \    --conn-host "spark://spark-master" \    --conn-port 7077 || true# Заполняем Variables (на случай если DAG использует var.value.get)airflow variables set CLICKHOUSE_PASSWORD "${CLICKHOUSE_PASSWORD}"

Это часть команды в docker-compose.yml для airflow-init контейнера. || true — если connection уже существует (redeploy), не падаем с ошибкой.

DAGs используют os.environ.get() вместо Jinja {{ var.value.get() }}:

# Плохо: читает из Airflow Variables (нужен ручной шаг или Variables seeding)"CLICKHOUSE_PASSWORD": "{{ var.value.get('CLICKHOUSE_PASSWORD', '') }}"# Хорошо: читает из env контейнера при загрузке DAG"CLICKHOUSE_PASSWORD": os.environ.get("CLICKHOUSE_PASSWORD", "")

Расписание построено с умыслом:

00:00, 06:00, 12:00, 18:00  →  daily_pipeline (backfill + metadata + dbt)00:30, 06:30, 12:30, 18:30  →  spark_batch (volatility + market regime)02:00 ежедневно             →  data_quality (freshness + dbt tests + row counts)

+30 минут offset у Spark гарантирует что dbt уже положил свежие данные в silver_klines.


Superset: дашборд из коробки

Проблема

После деплоя Superset абсолютно пустой. Нужно: добавить Database connection, зарегистрировать датасеты, создать чарты, собрать дашборд. Это 20+ кликов в UI — неприемлемо для “одна команда деплой”.

Решение

Весь этот код живёт в docker/superset/entrypoint.sh и выполняется при каждом старте контейнера:

app = create_app()with app.app_context():    # 1. Пересоздаём DB connection (пароль мог измениться при redeploy)    database = Database(        database_name="ClickHouse",        sqlalchemy_uri=f"clickhouse+http://{user}:{pw}@{host}:{port}/{db}"    )    # 2. Регистрируем все датасеты    for table_name in TABLES:        tbl = SqlaTable(table_name=table_name, database_id=database.id)        db.session.add(tbl)        tbl.fetch_metadata()  # синхронизируем схему колонок    # 3. Создаём 6 чартов и дашборд    # (версионируем через json_metadata.init_version — пересоздаём только при смене версии)

Баг с двойным time grain в ClickHouse

Это заняло несколько часов дебага. Superset генерировал GROUP BY с двойным оборачиванием:

-- Ожидалось:GROUP BY toStartOfDay(open_time), symbol-- Получалось (ошибка ClickHouse NOT_AN_AGGREGATE):GROUP BY toStartOfDay(toDateTime(toStartOfDay(toDateTime(open_time)))), symbol

Причина: clickhouse-sqlalchemy==0.2.5 применяет grain-функцию к уже алиасированному столбцу в GROUP BY, а не к исходному выражению. ClickHouse strict mode считает это разными выражениями.

Решение: создаём pre-aggregated view в ClickHouse и выставляем time_grain_sqla: None в чартах:

# Создаём view при старте Superset через HTTP API ClickHouse_ch_exec("""    CREATE OR REPLACE VIEW crypto.v_daily_klines AS    SELECT exchange, symbol,           toDate(open_time)            AS trade_date,  -- уже Date, не DateTime           argMin(open,  open_time)     AS day_open,           max(high)                    AS day_high,           min(low)                     AS day_low,           argMax(close, open_time)     AS day_close,           sum(volume)                  AS day_volume    FROM crypto.silver_klines    WHERE interval = '1m'    GROUP BY exchange, symbol, trade_date""")

Теперь Superset делает простой GROUP BY без grain-функций:

SELECT trade_date, symbol, AVG(day_close)FROM v_daily_klinesGROUP BY trade_date, symbol  -- валидный ClickHouse GROUP BY

clickhouse-sqlalchemy vs clickhouse-connect

Ещё один подводный камень: Superset 3.x работает на SQLAlchemy 1.4. clickhouse-connect >= 0.7 требует SQLAlchemy 2.0. При установке clickhouse-connect==0.7+ Superset падает при любом запросе к ClickHouse.

Решение в docker/superset/Dockerfile:

# clickhouse-connect 0.7+ требует SQLAlchemy 2.0, Superset 3.x использует 1.4RUN pip install --no-cache-dir \    clickhouse-sqlalchemy==0.2.5 \    clickhouse-connect==0.6.23   # последняя совместимая версия

И URI вида clickhouse+http:// вместо clickhouse+connect://.


Деплой: как это работает изнутри

Генерация секретов

Первое, что делает make deploy — запускает setup.sh:

gen_pass()   { openssl rand -hex 24; }    # 48 hex-символов, без спецсимволовgen_fernet() { python3 -c "import base64, os;    print(base64.urlsafe_b64encode(os.urandom(32)).decode())"; }CLICKHOUSE_PASSWORD=$(gen_pass)AIRFLOW_FERNET_KEY=$(gen_fernet)# ... ещё 7 паролей# sed с разделителем | — base64 содержит слэши, они сломают s/.../sed -i "s|change_me_clickhouse_password|${CLICKHOUSE_PASSWORD}|g" .env

Каждый деплой — уникальные пароли. .env в .gitignore. В git хранится только .env.example с заглушками.

Порядок запуска сервисов

# Webserver стартует только после успешного airflow-initairflow-webserver:  depends_on:    postgres:      condition: service_healthy                  # ждёт pg_isready    airflow-init:      condition: service_completed_successfully   # ждёт exit 0

service_completed_successfully — контейнер может быть завершён (одноразовые init-контейнеры). service_healthy — контейнер должен быть запущен и отвечать на healthcheck.

Makefile: явное ожидание зависимостей

Makefile ждёт ClickHouse и MinIO перед запуском backfill:

deploy:    @bash setup.sh    DOCKER_BUILDKIT=0 $(COMPOSE) up --build -d    # DOCKER_BUILDKIT=0 — обходит баг с IPv6 DNS в некоторых Linux-окружениях    @echo "Waiting for ClickHouse..."    @until docker inspect workbench-clickhouse \        --format='{{.State.Health.Status}}' | grep -q healthy; do sleep 2; done    @echo "Waiting for MinIO buckets..."    @until docker inspect workbench-minio-init \        --format='{{.State.Status}}' | grep -qE 'exited'; do sleep 2; done    $(COMPOSE) run --rm app python ingestion/historical/klines_backfill.py    $(COMPOSE) run --rm app python ingestion/metadata/coingecko_dims.py \        || echo "[warn] CoinGecko unavailable — dim_coin will be empty"    $(COMPOSE) run --rm dbt dbt deps && dbt build    $(COMPOSE) exec spark-master spark-submit volatility_batch.py \        || echo "[warn] Spark failed — run 'make spark-volatility' manually"

|| echo "[warn]" — graceful degradation. Если CoinGecko недоступен или Spark упал, деплой не прерывается. Кritичные данные (klines) загружены, дашборд работает.


Что оказалось сложнее, чем казалось

1. Stale URI в Superset volume

При redeploy с новым паролем Superset брал URI из зашифрованного volume. Database.sqlalchemy_uri хранится зашифрованным через Fernet — обновление не помогало. Решение: всегда удалять и пересоздавать запись при старте:

existing = db.session.query(Database).filter_by(database_name="ClickHouse").first()if existing:    db.session.delete(existing)    db.session.commit()# Создаём заново с актуальным паролемdatabase = Database(database_name="ClickHouse", sqlalchemy_uri=uri)

2. Права доступа в dbt/dbt_packages

dbt deps запускается внутри Docker-контейнера от root. Файлы dbt_packages/ создаются с owner=root. При попытке rm -rf от обычного пользователя — Permission denied.

Решение: при полной переустановке использовать sudo rm -rf.

3. DOCKER_BUILDKIT=0

На некоторых Linux-системах BuildKit пытается разрешить registry-1.docker.io через IPv6, получает NXDOMAIN, и сборка падает на шаге FROM. Отключение BuildKit (DOCKER_BUILDKIT=0) переключает на legacy builder с IPv4-only DNS.

4. Spark JDBC authentication

Spark-мастер не получал CLICKHOUSE_PASSWORD из .env — в docker-compose для spark-сервисов прописывались только SPARK_* переменные. .env монтировался как файл (/app/.env), но Python через os.getenv() читает окружение контейнера, а не файл.

Решение: явно прокинуть все CLICKHOUSE_* в environment spark-master и spark-worker.


Архитектурные принципы

Идемпотентность везде: ReplacingMergeTree, dbt build с atomic swap через EXCHANGE TABLES, CREATE OR REPLACE VIEW, версионирование дашборда. Любой шаг можно повторить.

Lambda Architecture: batch и streaming пишут в silver_klines. dbt и Spark читают из одного источника независимо от происхождения данных.

Infrastructure as Code: весь стек описан декларативно. Новый разработчик клонирует репо — всё работает.

Graceful degradation: необязательные шаги (метаданные, Spark) не блокируют деплой при сбое.


Дашборд

После make deploy открываем http://localhost:8088:

  • Ряд 1: Price History (все 5 символов, дневные свечи) + Volume History

  • Ряд 2: Realized Volatility 7d (BTC vs ETH vs SOL) + Market Regime по символам

  • Ряд 3: Live Prices (текущие цены, обновляются каждую минуту) + Trading Signals

Всё это без единого клика — только make deploy.


Исходный код

Репозиторий: github.com/andreivolokhovskii-coder/Crypto-Research-Workbench

Что есть в репо:

  • Полный docker-compose стек (14 сервисов)

  • Ingestion: REST backfill + WebSocket streaming + CoinGecko metadata

  • dbt модели с тестами

  • Spark job для market regime classification

  • Airflow DAGs с автосозданными connections и variables

  • Superset с автонастроенным дашбордом

  • Документация архитектуры в ARCHITECTURE.md


Что дальше

Проект намеренно оставлен на уровне development-деплоя. Для production потребуется:

  • Multi-node Kafka с replication_factor ≥ 2

  • CeleryExecutor в Airflow вместо LocalExecutor

  • ClickHouse Keeper вместо single-node для репликации

  • Centralized logging (Grafana Loki или ELK)

  • Secrets management (Vault, AWS Secrets Manager) вместо .env

Если статья окажется полезной — в следующей части разберу, как перевести это в production на Kubernetes.


Теги: data engineering, clickhouse, kafka, apache spark, dbt, airflow, superset, docker, python

ссылка на оригинал статьи https://habr.com/ru/articles/1047304/