Когда я искал учебные проекты по data engineering, картина была примерно одинаковой: либо туториал на два инструмента («пишем в Kafka, читаем в Spark»), либо enterprise-схема без единой строчки кода. Мне хотелось чего-то среднего — реальный стек, реальные данные, реальные проблемы, но при этом всё поднимается одной командой make deploy без предварительной настройки.
Результат: платформа сбора и анализа криптовалютных данных, которую можно склонировать и запустить на любой машине с Docker. В этой статье расскажу об архитектуре, интересных технических решениях и граблях, которые встретились по пути.
Что получилось
Перечислю сначала, что делает система:
-
Скачивает 30 дней истории по 5 символам (BTC/ETH/SOL/BNB/XRP) в момент деплоя
-
Стримит live-данные с Binance через WebSocket в реальном времени
-
Обнаруживает аномалии: всплески объёма и аномально крупные свечи
-
Вычисляет rolling volatility и классифицирует рыночные режимы через Apache Spark
-
Собирает всё на аналитическом дашборде — тоже автоматически
И весь этот стек поднимается командой:
git clone https://github.com/andreivolokhovskii-coder/Crypto-Research-Workbench.gitcd Crypto-Research-Workbenchmake deploy
Пароли генерируются автоматически, все сервисы настраиваются сами.
Технологический стек и почему именно он
|
Слой |
Инструмент |
Зачем |
|---|---|---|
|
Аналитическая СУБД |
ClickHouse 23.8 |
Колоночная БД, агрегации в 10–100× быстрее PostgreSQL |
|
Объектное хранилище |
MinIO |
S3-совместимое хранилище — тот же код работает в AWS |
|
Очередь |
Apache Kafka |
At-least-once delivery, replay данных, развязка producer/consumer |
|
Batch-обработка |
Apache Spark 3.5 |
Горизонтальное масштабирование для тяжёлых вычислений |
|
Трансформации |
dbt 1.7 + ClickHouse |
SQL с тестами, lineage, документацией |
|
Оркестрация |
Apache Airflow 2.8 |
Расписание, мониторинг, retry |
|
BI |
Apache Superset 3.0 |
Дашборды |
|
Контейнеризация |
Docker Compose v2 |
Весь стек одной командой |
Почему ClickHouse, а не PostgreSQL для аналитики?
PostgreSQL — строчная СУБД. На запросе SELECT AVG(close) FROM klines GROUP BY symbol, date он читает все колонки каждой строки. ClickHouse читает только колонки close, symbol, open_time — остальные физически не трогает. На таблице в 200k строк разница незаметна. На таблице в 200 млн — принципиальна.
Почему MinIO, а не просто диск?
MinIO реализует S3 API. Код, который пишет в MinIO, без изменений запишет в AWS S3 — достаточно поменять endpoint в .env. Это принципиально для production-ready архитектуры.
Почему dbt И Spark одновременно?
Это не дублирование, а разделение ответственности:
-
dbt — SQL-трансформации в ClickHouse: ETL, агрегации, обогащение данных. Просто, тестируемо, быстро разрабатывается.
-
Spark — для задач, которые горизонтально масштабируются: rolling window по 90 дням минутных свечей по всем символам. При росте данных — добавляешь workers, время вычисления не меняется.
Медальонная архитектура: bronze, silver, gold
Это стандарт индустрии. Данные проходят три уровня очистки:
BRONZE (сырые) → SILVER (нормализованные) → GOLD (аналитические)
Bronze — принцип неизменности
Данные как пришли с биржи, так и лежат. Временные метки в миллисекундах, raw JSON-поля. Никогда не удаляем, никогда не изменяем.
CREATE TABLE bronze_klines ( ingested_at DateTime DEFAULT now(), exchange LowCardinality(String), symbol LowCardinality(String), open_time Int64, -- миллисекунды, как в API Binance open Float64, ... _source_file String -- путь к Parquet в MinIO) ENGINE = MergeTree()PARTITION BY toYYYYMM(_partition_date)ORDER BY (exchange, symbol, interval, open_time);
Зачем хранить «сырьё»? Если трансформация оказалась неверной — перезапускаешь её с нуля. Bronze — источник правды, из которого восстанавливается весь pipeline.
Silver — идемпотентность через ReplacingMergeTree
Здесь происходит нормализация: Int64 миллисекунды → DateTime UTC, правильные типы. Ключевое решение — движок таблицы:
CREATE TABLE silver_klines ( exchange LowCardinality(String), symbol LowCardinality(String), open_time DateTime, -- нормализованный UTC close Float64, ...) ENGINE = ReplacingMergeTree(ingested_at)ORDER BY (exchange, symbol, interval, open_time);
ReplacingMergeTree(ingested_at) — при наличии дублей по ключу (exchange, symbol, interval, open_time) движок оставляет строку с максимальным ingested_at. Это значит: пайплайн можно запустить дважды — лишних данных не появится. SELECT ... FINAL форсирует слияние дублей на лету.
Gold — dbt строит аналитику
silver_klines └── stg_klines (view, тонкая обёртка с FINAL) ├── fact_candles (enriched: price_change_pct, is_bullish) └── mart_volatility (rolling vol 7d/30d, ATR-14)
dbt строит граф зависимостей автоматически из {{ ref('stg_klines') }} и запускает модели в правильном порядке. Плюс встроенные тесты:
- name: is_bullish tests: - accepted_values: values: [0, 1] # dbt сгенерирует SQL-проверку автоматически
Стриминг: Kafka + WebSocket
Производитель
Подключаемся к Binance combined stream — один WebSocket на все символы:
uri = "wss://stream.binance.com:9443/stream?streams=" + \ "/".join(f"{s.lower()}@kline_1m" for s in SYMBOLS)
Каждое событие валидируется через Pydantic перед публикацией в Kafka. Невалидные сообщения уходят в Dead Letter Queue, а не теряются. Reconnect с exponential backoff: 1s, 2s, 4s… до 60s.
Потребитель
Ключевой паттерн — at-least-once delivery с ручным commit:
# Накапливаем батч: 50 закрытых свечей ИЛИ 10 секундwhile True: msg = consumer.poll(timeout=1.0) if msg and msg.value().is_closed: batch.append(msg) if len(batch) >= 50 or time_since_last_flush > 10: clickhouse.insert(batch) # пишем в ClickHouse consumer.commit() # ТОЛЬКО ПОСЛЕ успешной записи batch.clear()
Если ClickHouse недоступен — offset не коммитится, сообщения будут обработаны повторно после восстановления.
Параллельно с записью в ClickHouse — детекция аномалий в реальном времени. Для каждого символа держим rolling window из 60 свечей:
# Volume spike: z-score объёма > 2.5σz_score = (current_volume - mean_volume) / std_volumeif z_score > 2.5: emit_signal("volume_spike", z_score)# Large candle: размах > 3× ATR-14candle_range = abs(high - low)if candle_range > 3.0 * atr_14: emit_signal("large_candle", candle_range / atr_14)
Сигналы пишутся в rt_signals с TTL 7 дней — ClickHouse автоматически удаляет старые.
Spark: вычисление рыночных режимов
Это самая интересная часть с точки зрения data engineering. Читаем silver_klines через JDBC, агрегируем по дням, вычисляем метрики через Spark window functions:
# Rolling realized volatility — стандарт в quantitative financewindow_7d = Window.partitionBy("exchange", "symbol") \ .orderBy("trade_date") \ .rowsBetween(-6, 0)df = df.withColumn( "vol_7d", F.stddev_samp("log_return").over(window_7d) * F.sqrt(F.lit(365)) # аннуализация: умножаем на √365 для calendar days volatility)
Классификация рыночного режима:
regime = F.when(F.col("vol_7d") > F.lit(1.5) * F.col("vol_30d"), "volatile") \ .when((F.col("close") - F.col("sma_20")) / F.col("sma_20") > 0.02, "trending_up") \ .when((F.col("sma_20") - F.col("close")) / F.col("sma_20") > 0.02, "trending_down") \ .otherwise("ranging")
Результат в mart_market_regime — каждый день по каждому символу имеет метку режима.
Почему именно Spark, а не ClickHouse SQL? Потому что при росте данных Spark горизонтально масштабируется: добавляем worker-ноды в docker-compose и время вычисления не растёт.
Оркестрация: Airflow без ручной настройки
Стандартная проблема с Airflow: после деплоя нужно вручную создать Connection для Spark, заполнить Variables с паролями. Я решил это в airflow-init:
# Создаём Spark connection автоматическиairflow connections add spark_default \ --conn-type spark \ --conn-host "spark://spark-master" \ --conn-port 7077 || true# Заполняем Variables (на случай если DAG использует var.value.get)airflow variables set CLICKHOUSE_PASSWORD "${CLICKHOUSE_PASSWORD}"
Это часть команды в docker-compose.yml для airflow-init контейнера. || true — если connection уже существует (redeploy), не падаем с ошибкой.
DAGs используют os.environ.get() вместо Jinja {{ var.value.get() }}:
# Плохо: читает из Airflow Variables (нужен ручной шаг или Variables seeding)"CLICKHOUSE_PASSWORD": "{{ var.value.get('CLICKHOUSE_PASSWORD', '') }}"# Хорошо: читает из env контейнера при загрузке DAG"CLICKHOUSE_PASSWORD": os.environ.get("CLICKHOUSE_PASSWORD", "")
Расписание построено с умыслом:
00:00, 06:00, 12:00, 18:00 → daily_pipeline (backfill + metadata + dbt)00:30, 06:30, 12:30, 18:30 → spark_batch (volatility + market regime)02:00 ежедневно → data_quality (freshness + dbt tests + row counts)
+30 минут offset у Spark гарантирует что dbt уже положил свежие данные в silver_klines.
Superset: дашборд из коробки
Проблема
После деплоя Superset абсолютно пустой. Нужно: добавить Database connection, зарегистрировать датасеты, создать чарты, собрать дашборд. Это 20+ кликов в UI — неприемлемо для “одна команда деплой”.
Решение
Весь этот код живёт в docker/superset/entrypoint.sh и выполняется при каждом старте контейнера:
app = create_app()with app.app_context(): # 1. Пересоздаём DB connection (пароль мог измениться при redeploy) database = Database( database_name="ClickHouse", sqlalchemy_uri=f"clickhouse+http://{user}:{pw}@{host}:{port}/{db}" ) # 2. Регистрируем все датасеты for table_name in TABLES: tbl = SqlaTable(table_name=table_name, database_id=database.id) db.session.add(tbl) tbl.fetch_metadata() # синхронизируем схему колонок # 3. Создаём 6 чартов и дашборд # (версионируем через json_metadata.init_version — пересоздаём только при смене версии)
Баг с двойным time grain в ClickHouse
Это заняло несколько часов дебага. Superset генерировал GROUP BY с двойным оборачиванием:
-- Ожидалось:GROUP BY toStartOfDay(open_time), symbol-- Получалось (ошибка ClickHouse NOT_AN_AGGREGATE):GROUP BY toStartOfDay(toDateTime(toStartOfDay(toDateTime(open_time)))), symbol
Причина: clickhouse-sqlalchemy==0.2.5 применяет grain-функцию к уже алиасированному столбцу в GROUP BY, а не к исходному выражению. ClickHouse strict mode считает это разными выражениями.
Решение: создаём pre-aggregated view в ClickHouse и выставляем time_grain_sqla: None в чартах:
# Создаём view при старте Superset через HTTP API ClickHouse_ch_exec(""" CREATE OR REPLACE VIEW crypto.v_daily_klines AS SELECT exchange, symbol, toDate(open_time) AS trade_date, -- уже Date, не DateTime argMin(open, open_time) AS day_open, max(high) AS day_high, min(low) AS day_low, argMax(close, open_time) AS day_close, sum(volume) AS day_volume FROM crypto.silver_klines WHERE interval = '1m' GROUP BY exchange, symbol, trade_date""")
Теперь Superset делает простой GROUP BY без grain-функций:
SELECT trade_date, symbol, AVG(day_close)FROM v_daily_klinesGROUP BY trade_date, symbol -- валидный ClickHouse GROUP BY
clickhouse-sqlalchemy vs clickhouse-connect
Ещё один подводный камень: Superset 3.x работает на SQLAlchemy 1.4. clickhouse-connect >= 0.7 требует SQLAlchemy 2.0. При установке clickhouse-connect==0.7+ Superset падает при любом запросе к ClickHouse.
Решение в docker/superset/Dockerfile:
# clickhouse-connect 0.7+ требует SQLAlchemy 2.0, Superset 3.x использует 1.4RUN pip install --no-cache-dir \ clickhouse-sqlalchemy==0.2.5 \ clickhouse-connect==0.6.23 # последняя совместимая версия
И URI вида clickhouse+http:// вместо clickhouse+connect://.
Деплой: как это работает изнутри
Генерация секретов
Первое, что делает make deploy — запускает setup.sh:
gen_pass() { openssl rand -hex 24; } # 48 hex-символов, без спецсимволовgen_fernet() { python3 -c "import base64, os; print(base64.urlsafe_b64encode(os.urandom(32)).decode())"; }CLICKHOUSE_PASSWORD=$(gen_pass)AIRFLOW_FERNET_KEY=$(gen_fernet)# ... ещё 7 паролей# sed с разделителем | — base64 содержит слэши, они сломают s/.../sed -i "s|change_me_clickhouse_password|${CLICKHOUSE_PASSWORD}|g" .env
Каждый деплой — уникальные пароли. .env в .gitignore. В git хранится только .env.example с заглушками.
Порядок запуска сервисов
# Webserver стартует только после успешного airflow-initairflow-webserver: depends_on: postgres: condition: service_healthy # ждёт pg_isready airflow-init: condition: service_completed_successfully # ждёт exit 0
service_completed_successfully — контейнер может быть завершён (одноразовые init-контейнеры). service_healthy — контейнер должен быть запущен и отвечать на healthcheck.
Makefile: явное ожидание зависимостей
Makefile ждёт ClickHouse и MinIO перед запуском backfill:
deploy: @bash setup.sh DOCKER_BUILDKIT=0 $(COMPOSE) up --build -d # DOCKER_BUILDKIT=0 — обходит баг с IPv6 DNS в некоторых Linux-окружениях @echo "Waiting for ClickHouse..." @until docker inspect workbench-clickhouse \ --format='{{.State.Health.Status}}' | grep -q healthy; do sleep 2; done @echo "Waiting for MinIO buckets..." @until docker inspect workbench-minio-init \ --format='{{.State.Status}}' | grep -qE 'exited'; do sleep 2; done $(COMPOSE) run --rm app python ingestion/historical/klines_backfill.py $(COMPOSE) run --rm app python ingestion/metadata/coingecko_dims.py \ || echo "[warn] CoinGecko unavailable — dim_coin will be empty" $(COMPOSE) run --rm dbt dbt deps && dbt build $(COMPOSE) exec spark-master spark-submit volatility_batch.py \ || echo "[warn] Spark failed — run 'make spark-volatility' manually"
|| echo "[warn]" — graceful degradation. Если CoinGecko недоступен или Spark упал, деплой не прерывается. Кritичные данные (klines) загружены, дашборд работает.
Что оказалось сложнее, чем казалось
1. Stale URI в Superset volume
При redeploy с новым паролем Superset брал URI из зашифрованного volume. Database.sqlalchemy_uri хранится зашифрованным через Fernet — обновление не помогало. Решение: всегда удалять и пересоздавать запись при старте:
existing = db.session.query(Database).filter_by(database_name="ClickHouse").first()if existing: db.session.delete(existing) db.session.commit()# Создаём заново с актуальным паролемdatabase = Database(database_name="ClickHouse", sqlalchemy_uri=uri)
2. Права доступа в dbt/dbt_packages
dbt deps запускается внутри Docker-контейнера от root. Файлы dbt_packages/ создаются с owner=root. При попытке rm -rf от обычного пользователя — Permission denied.
Решение: при полной переустановке использовать sudo rm -rf.
3. DOCKER_BUILDKIT=0
На некоторых Linux-системах BuildKit пытается разрешить registry-1.docker.io через IPv6, получает NXDOMAIN, и сборка падает на шаге FROM. Отключение BuildKit (DOCKER_BUILDKIT=0) переключает на legacy builder с IPv4-only DNS.
4. Spark JDBC authentication
Spark-мастер не получал CLICKHOUSE_PASSWORD из .env — в docker-compose для spark-сервисов прописывались только SPARK_* переменные. .env монтировался как файл (/app/.env), но Python через os.getenv() читает окружение контейнера, а не файл.
Решение: явно прокинуть все CLICKHOUSE_* в environment spark-master и spark-worker.
Архитектурные принципы
Идемпотентность везде: ReplacingMergeTree, dbt build с atomic swap через EXCHANGE TABLES, CREATE OR REPLACE VIEW, версионирование дашборда. Любой шаг можно повторить.
Lambda Architecture: batch и streaming пишут в silver_klines. dbt и Spark читают из одного источника независимо от происхождения данных.
Infrastructure as Code: весь стек описан декларативно. Новый разработчик клонирует репо — всё работает.
Graceful degradation: необязательные шаги (метаданные, Spark) не блокируют деплой при сбое.
Дашборд
После make deploy открываем http://localhost:8088:
-
Ряд 1: Price History (все 5 символов, дневные свечи) + Volume History
-
Ряд 2: Realized Volatility 7d (BTC vs ETH vs SOL) + Market Regime по символам
-
Ряд 3: Live Prices (текущие цены, обновляются каждую минуту) + Trading Signals
Всё это без единого клика — только make deploy.
Исходный код
Репозиторий: github.com/andreivolokhovskii-coder/Crypto-Research-Workbench
Что есть в репо:
-
Полный docker-compose стек (14 сервисов)
-
Ingestion: REST backfill + WebSocket streaming + CoinGecko metadata
-
dbt модели с тестами
-
Spark job для market regime classification
-
Airflow DAGs с автосозданными connections и variables
-
Superset с автонастроенным дашбордом
-
Документация архитектуры в
ARCHITECTURE.md
Что дальше
Проект намеренно оставлен на уровне development-деплоя. Для production потребуется:
-
Multi-node Kafka с replication_factor ≥ 2
-
CeleryExecutor в Airflow вместо LocalExecutor
-
ClickHouse Keeper вместо single-node для репликации
-
Centralized logging (Grafana Loki или ELK)
-
Secrets management (Vault, AWS Secrets Manager) вместо
.env
Если статья окажется полезной — в следующей части разберу, как перевести это в production на Kubernetes.
Теги: data engineering, clickhouse, kafka, apache spark, dbt, airflow, superset, docker, python
ссылка на оригинал статьи https://habr.com/ru/articles/1047304/