Код в статье иллюстративный, показывает архитектурные решения и объясняет почему именно так. Не предназначен для copy-paste в прод без адаптации под вашу инфраструктуру, мониторинг и требования.
Думал, зайду в крипту и буду просто дёргать API блокчейна. Не вышло.
Захожу в проект. Стек: FastAPI, PostgreSQL, Redis как Celery broker, Celery workers, Docker, Web3. Стартап на хайпе, деньги реальные, архитектура собрана на коленке. Смотрю на архитектуру платёжного процессинга и первая мысль: ребята, вы серьёзно? Финансовые операции с реальными деньгами, без idempotency вообще, Redis как брокер без persistence, Web3.py синхронные вызовы внутри Celery тасков.
Разговор был короткий: задача такая, чини что есть. Сроки горели.
Что было сломано
Первый месяц прода. Пользователь пишет в поддержку: зачислили дважды, вывел двойную сумму. Открываю логи, чисто. Два идентичных события, оба 200, разница четыре секунды. Оба обработаны. Пользователь получил двойной баланс.
Ежедневная сверка с on-chain данными показала расхождение: несколько аккаунтов с балансом больше чем должно быть по confirmed транзакциям. За первый месяц нашли 23 дублирующих зачисления на ~180k транзакций, около 0.013% error rate. 23 двойных зачисления за месяц. Живые деньги, не метрика.
Первое, что вылезло: дубли от провайдера. Alchemy, Infura и все остальные блокчейн-провайдеры работают по at-least-once delivery. При сетевом сбое, рестарте, под нагрузкой провайдер повторяет доставку. Провайдер так и написал в доках. Это не баг, это условия игры. Провайдер повторяет доставку, твой код должен это переживать без последствий. Наш не переживал.
Дальше хуже. Два параллельных запроса на вывод читали баланс одновременно, оба видели достаточно средств, оба проходили валидацию. Два запроса читают баланс одновременно, оба видят что денег хватает, оба списывают. Школьная гонка.
async def withdraw(conn, user_id: int, amount: Decimal): balance = await conn.fetchval( "SELECT balance FROM users WHERE id = $1", user_id ) if balance >= amount: await conn.execute( "UPDATE users SET balance = balance - $1 WHERE id = $2", amount, user_id )
Дальше. Celery с дефолтными настройками подтверждает задачу брокеру в момент получения. Воркер падает в середине обработки, событие подтверждено, до записи в БД не дошло. Никакого retry, никакого DLQ. Воркер упал, задача подтверждена, деньги не пришли. Пользователь ждёт и не понимает что случилось.
И отдельный тихий убийца: amount сериализуется в JSON через Celery broker как float. Decimal("50.1") превращается в JSON float, то есть в 50.099999999999994. На масштабе это копится в реальный убыток. Никто не заметил, пока не посчитали.
Последнее: прямой вызов .delay() из webhook handler создаёт окно между записью в БД и постановкой в очередь. Если процесс упал в этот момент, событие зависнет в pending без автоматического восстановления.
Итого пять проблем. Начал чинить.
Первый инстинкт: Redis distributed lock
SET NX EX на user_id. Паттерн описан у Antirez, реализован за 20 минут. Не взлетело.
Вот конкретный сценарий, который вскрылся в логах. Воркер берёт лок в Redis. Начинает транзакцию в PostgreSQL. Между этими двумя операциями OOM killer убивает процесс. PostgreSQL транзакция откатилась автоматически, баланс не изменился. Redis лок висит 30 секунд до TTL. Через 30 секунд следующий воркер берёт лок, видит что idempotency_key не записан (записать было некому, транзакция откатилась) и обрабатывает событие заново. Двойное зачисление. В логах оба воркера чистые.
Проблема не в размере TTL. Проблема в отсутствии cross-system атомарности между Redis и PostgreSQL. Redis не подходит, нет атомарности с PostgreSQL. Проверка в коде тоже не работает, два воркера оба пройдут SELECT до INSERT. Единственное что атомарно по определению unique constraint. С деньгами нет «почти правильно».
Схема базы данных
CREATE TABLE payment_events ( event_id TEXT PRIMARY KEY, user_id INTEGER NOT NULL REFERENCES users(id), amount NUMERIC(38, 18) NOT NULL, event_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', retry_count INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT valid_status CHECK ( status IN ('pending', 'enqueued', 'processing', 'confirmed', 'failed') ));CREATE TABLE balance_events ( id BIGSERIAL PRIMARY KEY, user_id INTEGER NOT NULL REFERENCES users(id), amount NUMERIC(38, 18) NOT NULL, event_type TEXT NOT NULL, source_event_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT uq_balance_events_source UNIQUE (source_event_id, event_type));CREATE TABLE processed_events ( idempotency_key TEXT PRIMARY KEY, outcome TEXT NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW());CREATE TABLE dead_letter_queue ( id BIGSERIAL PRIMARY KEY, event_id TEXT NOT NULL, event_type TEXT NOT NULL, user_id INTEGER NOT NULL, amount NUMERIC(38, 18) NOT NULL, error TEXT NOT NULL, attempt INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW());ALTER TABLE users ADD COLUMN IF NOT EXISTS initial_balance NUMERIC(38, 18) NOT NULL DEFAULT 0, ADD CONSTRAINT balance_non_negative CHECK (balance >= 0);CREATE INDEX idx_payment_events_pending ON payment_events (updated_at, created_at) WHERE status = 'pending';CREATE INDEX idx_payment_events_enqueued ON payment_events (updated_at) WHERE status = 'enqueued';CREATE INDEX idx_payment_events_processing ON payment_events (updated_at) WHERE status = 'processing';CREATE INDEX idx_balance_events_user_id ON balance_events (user_id);CREATE INDEX idx_balance_events_created_at ON balance_events (created_at DESC);CREATE INDEX idx_processed_events_created ON processed_events (created_at);CREATE INDEX idx_processed_events_pending_stale ON processed_events (created_at) WHERE outcome = 'pending';CREATE INDEX idx_dlq_event_id ON dead_letter_queue (event_id);CREATE INDEX idx_dlq_created_at ON dead_letter_queue (created_at DESC);
В схеме три неочевидных решения.
NUMERIC(38, 18), не NUMERIC(20, 8). Колонка amount хранится в ETH, не в wei. Webhook-провайдер присылает уже сконвертированное значение. Если ваш провайдер возвращает wei, конвертируйте на входе: amount_eth = Decimal(wei_str) / Decimal(10**18) до передачи в _validate_amount . ERC-20 сами объявляют decimals(): USDC/USDT — 6, WBTC — 8, DAI/WETH/MKR — 18. ETH в wei тоже 10^18. NUMERIC(20, 8) выдержит USDC/USDT, но физически не вмещает 18-decimal токены, поэтому берём worst case, NUMERIC(38, 18).
initial_balance нужна для сверки. При миграции заполняете её текущим балансом, UPDATE users SET initial_balance = balance WHERE <условие>. Это означает что balance_events начинают наполняться с нуля, и hot_path_balance_check корректно считает только тех пользователей, у которых все операции прошли через balance_events. Для новых систем initial_balance остаётся 0.
Отдельные индексы для pending/enqueued/processing вместо одного status IN (...), так как poller’ы используют разные паттерны доступа. idx_payment_events_pending, partial index с (updated_at, created_at) для ORDER BY created_at в enqueue_pending_events, иначе планировщик сортирует без индекса.
retry_count в payment_events добавлен для предотвращения бесконечного цикла pending -> enqueued при durable outage Redis, об этом подробнее в секции про деградацию.
Как это починилось
Инициализация
import osimport uuidimport jsonimport hmacimport randomimport hashlibimport secretsimport threadingimport structlogimport psycopg2import psycopg2.extrasimport psycopg2.poolimport redis as redis_libfrom typing import Literal, Optionalimport refrom decimal import Decimal, InvalidOperationfrom contextvars import ContextVarfrom dataclasses import dataclass, fieldfrom datetime import datetime, timezone, timedeltafrom celery import shared_taskfrom celery.exceptions import Ignore, MaxRetriesExceededErrorlogger = structlog.get_logger()@dataclass(frozen=True)class Settings: DATABASE_URL: str WEBHOOK_SECRET: str ETH_RPC_URL: str ALERT_EMAIL: str REDIS_URL: str = "redis://localhost:6379/0" @classmethod def from_env(cls) -> "Settings": required = ("DATABASE_URL", "WEBHOOK_SECRET", "ETH_RPC_URL", "ALERT_EMAIL") missing = [k for k in required if not os.environ.get(k)] if missing: raise RuntimeError(f"Missing required env vars: {', '.join(missing)}") return cls( DATABASE_URL = os.environ["DATABASE_URL"], WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"], ETH_RPC_URL = os.environ["ETH_RPC_URL"], ALERT_EMAIL = os.environ["ALERT_EMAIL"], REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0"), )settings = Settings.from_env()_redis_client = redis_lib.Redis.from_url( settings.REDIS_URL, decode_responses=True, socket_connect_timeout=2, socket_timeout=2, retry_on_timeout=False,)
send_alert: rate-limited обёртка над логгером. В проде заменяется на PagerDuty/OpsGenie SDK. Одинаковые alert_key внутри cooldown окна подавляются. Если ключ не задан, отправка без rate-limit, для одноразовых критичных алертов. Не бросает исключений никогда.
_alert_last_sent растёт при уникальных ключах. Если генерировать ключи per-event-id (а мы так и делаем для orphan-алертов), за месяц это несколько миллионов записей. Поэтому при переполнении сначала чистим устаревшие ключи, а если после чистки место всё равно не освободилось, подавляем новые. Костыль, да. Но за восемь месяцев не отвалился.
_alert_lock = threading.Lock()_alert_last_sent: dict = {}MAX_ALERT_KEYS = 1_000def send_alert(message: str, alert_key: Optional[str] = None, cooldown_seconds: int = 300) -> None: try: if alert_key is None: logger.critical("ALERT", message=message) return with _alert_lock: now = datetime.now(timezone.utc) if len(_alert_last_sent) >= MAX_ALERT_KEYS: stale_cutoff = now - timedelta(seconds=cooldown_seconds * 2) stale = [k for k, v in _alert_last_sent.items() if v < stale_cutoff] for k in stale: del _alert_last_sent[k] if len(_alert_last_sent) >= MAX_ALERT_KEYS and alert_key not in _alert_last_sent: logger.warning("send_alert suppressed: rate limit dict full", alert_key=alert_key) return last = _alert_last_sent.get(alert_key) if last and (now - last).total_seconds() < cooldown_seconds: return _alert_last_sent[alert_key] = now logger.critical("ALERT", message=message, alert_key=alert_key) except Exception as e: logger.error("send_alert failed", error=str(e))class ImproperlyConfigured(RuntimeError): pass
Trace ID через всю цепочку
Каждый воркер получает свой ContextVar автоматически, шарить его между потоками невозможно.
_trace_id: ContextVar[str] = ContextVar('trace_id', default='')def get_trace_id() -> str: return _trace_id.get() or 'no-trace'def set_trace_id(tid: str) -> None: _trace_id.set(tid)def new_trace_id() -> str: tid = str(uuid.uuid4()) _trace_id.set(tid) return tidstructlog.configure( processors=[ structlog.processors.add_log_level, lambda _, __, event_dict: {**event_dict, "trace_id": get_trace_id()}, structlog.processors.JSONRenderer(), ])
Idempotency key через DB unique constraint
Ключ строится из event_id и event_type, пишется в отдельную таблицу с unique constraint в той же транзакции что и изменение баланса.
Redis не подходит, нет атомарности с PostgreSQL. Проверка в коде тоже не работает, два воркера оба пройдут SELECT до INSERT. Единственное, что атомарно по определению: unique constraint.
Сначала делал конкатенацию f"{event_id}::{event_type}". Словил коллизию при :: в event_id. Попробовал NUL-разделитель: f"{event_id}\0{event_type}".encode(). Тоже коллизия: _idempotency_key("a\x00b", "c") == _idempotency_key("a", "b\x00c"), оба дают байты b"a\x00b\x00c". Финальный вариант, length-prefix encoding: каждое поле предваряется 4-байтовой длиной, коллизия между полями принципиально невозможна.
class RetryableError(Exception): passclass AlreadyProcessedError(Exception): passMAX_AMOUNT = Decimal("10") ** 20_AMOUNT_RE = re.compile(r"^[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?$")def _validate_amount(amount) -> Decimal: if isinstance(amount, float): raise ValueError( f"float не допускается — передавайте amount как str из JSON payload. " f"Получено: {amount!r}" ) if isinstance(amount, str) and amount != amount.strip(): raise ValueError( f"amount содержит whitespace: {amount!r}. " f"Передавайте amount без пробелов." ) if isinstance(amount, str) and not _AMOUNT_RE.fullmatch(amount): raise ValueError(f"invalid amount format: {amount!r}") try: amount_decimal = Decimal(str(amount)) if not amount_decimal.is_finite(): raise ValueError(f"amount must be finite, got {amount_decimal}") if amount_decimal <= 0: raise ValueError(f"amount must be positive, got {amount_decimal}") if amount_decimal.normalize().as_tuple().exponent < -18: raise ValueError(f"amount precision exceeds 18 decimals: {amount_decimal}") if amount_decimal >= MAX_AMOUNT: raise ValueError( f"amount exceeds NUMERIC(38,18) capacity: {amount_decimal} >= 10^20" ) return amount_decimal except InvalidOperation: raise ValueError(f"invalid amount format: {amount!r}")def _idempotency_key(event_id: str, event_type: str) -> str: a = event_id.encode("utf-8") b = event_type.encode("utf-8") payload = ( len(a).to_bytes(4, "big") + a + len(b).to_bytes(4, "big") + b ) return hashlib.sha256(payload).hexdigest()
Два места в первой версии были сломаны на граничных случаях: MAX_AMOUNT = Decimal("10")**20 - 1 отвергала валидную сумму, а 50.0000000000000000000 уходило в отказ как exponent=-19 хотя значащих цифр там нет. Починено: 10**20 без -1, и normalize() перед проверкой exponent. На граничные значения своих валидаторов стоит писать тесты, узнаёшь интересное.
Ещё один сюрприз из той же серии: _validate_amount("+50.1"),_validate_amount("1_000") и _validate_amount("١٢٣") все возвращают корректный Decimal. Python толерантен к underscore-нотации, leading + и арабо-индийским цифрам. Для финансового валидатора это нежелательное поведение, на входе ожидается строго [цифры].[цифры]. Добавлен regex ^[0-9]+(\.[0-9]+)?([eE][+-]?[0-9]+)?$ перед Decimal(), отклоняет всё нестандартное.
FSM переходов, одна точка правды
Статус события — детерминированный конечный автомат. Сначала было три места с raw UPDATE payment_events SET status = .... Это нарушало инвариант FSM.
Переходы: pending идёт в enqueued через poller. Enqueued в processing когда воркер взял задачу. Из processing только в confirmed или failed. Отдельный edge, enqueued сразу в confirmed, нужен для replay path: когда processed_events уже содержит outcome=success, но воркер упал после того как записал это и до того как успел перевести payment_events в confirmed. Без этого edge события зависали бы в enqueued вечно.
VALID_TRANSITIONS: dict[str, set[str]] = { "pending": {"enqueued", "failed"}, "enqueued": {"processing", "failed", "pending", "confirmed"}, "processing": {"confirmed", "failed"},}TERMINAL_STATUSES = frozenset({"confirmed", "failed"})def transition_event_status(cur, event_id, from_status, to_status): if to_status not in VALID_TRANSITIONS.get(from_status, set()): raise ValueError(f"невалидный переход: {from_status} -> {to_status}") cur.execute( "UPDATE payment_events SET status = %s, updated_at = NOW() " "WHERE event_id = %s AND status = %s", (to_status, event_id, from_status), ) if cur.rowcount == 0: cur.execute("SELECT status FROM payment_events WHERE event_id = %s", (event_id,)) actual = cur.fetchone() actual_status = actual["status"] if actual else "not found" if actual_status == to_status: logger.info("status already set", event_id=event_id, status=to_status) return if actual_status in TERMINAL_STATUSES: raise AlreadyProcessedError(f"event already terminal: {actual_status}") raise RetryableError( f"concurrent status transition event_id={event_id} " f"expected={from_status} actual={actual_status}" )
_mark_event_failed: безопасный перевод в failed из любого нетерминального статуса. Коммитит сама, правило одно, статус failed должен лечь в БД несмотря ни на что. Всё остальное потом. Вызывать только на чистом соединении, после rollback.
_mark_event_failed коммитит сама, если будешь рефакторить, это тебя укусит.
def _mark_event_failed(conn, event_id) -> bool: try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SET LOCAL lock_timeout = '2s'") cur.execute( "SELECT status FROM payment_events WHERE event_id = %s FOR UPDATE NOWAIT", (event_id,) ) row = cur.fetchone() if row is None: conn.rollback() return False current = row["status"] if current in TERMINAL_STATUSES: conn.rollback() return False try: transition_event_status(cur, event_id, current, "failed") conn.commit() return True except (ValueError, RetryableError, AlreadyProcessedError): conn.rollback() return False except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled): try: conn.rollback() except Exception: pass return False except Exception: try: conn.rollback() except Exception: pass raise
SELECT FOR UPDATE NOWAIT + lock_timeout
С обычным FOR UPDATE воркер молча ждёт лок, блокируя thread. NOWAIT это убирает.
Миграция с ALTER TABLE блокирует таблицу целиком и при этом lock_timeout = '2s' не даёт воркеру висеть всё это время.
Про коды ошибок: lock_timeout бросает LockNotAvailable (pgcode 55P03), statement_timeout бросает QueryCanceled (pgcode 57014). Путаница приводит к непойманному исключению в продакшене. DeadlockDetected (pgcode 40P01): транзакция убита PostgreSQL из-за цикла блокировок, тоже transient, тоже retryable. PostgreSQL сам выбирает жертву и откатывает её транзакцию, повтор решает проблему. Все три нужно ловить вместе.
Пул соединений с валидацией
Соединение в пуле может быть мёртвым: PostgreSQL закрывает idle-соединения через tcp_keepalives_idle или idle_in_transaction_session_timeout. Без проверки воркер получит разорванный TCP и упадёт с InterfaceError в случайный момент.
_PooledConn: обёртка над соединением, которая знает как вернуть его владельцу. Через putconn() обратно в пул или через close() если соединение создавалось напрямую. putconn() идемпотентен, второй вызов no-op. getattr не проксирует dunder-методы, поэтому _PooledConn нельзя использовать как context manager. Весь код работает через conn.cursor().
get_validated_conn делает три уровня проверки без I/O в нормальном потоке: сначала conn.closed (in-memory флаг), потом conn.status (грязная транзакция из предыдущего использования), и только если статус не STATUS_READY, делает SELECT 1.
class _PooledConn: def __init__(self, conn, pool=None): self._conn = conn self._pool = pool def putconn(self, close=False): if self._pool is None: try: self._conn.close() except Exception: pass return pool, self._pool = self._pool, None # idempotent: second call is a no-op try: pool.putconn(self._conn, close=close) except Exception: pass def __getattr__(self, name): return getattr(self._conn, name)def get_validated_conn(pool: psycopg2.pool.SimpleConnectionPool) -> "_PooledConn": try: conn = pool.getconn() except psycopg2.pool.PoolError as e: raise RetryableError(f"DB connection pool exhausted: {e}") if conn.closed != 0: try: pool.putconn(conn, close=True) except Exception: pass direct = psycopg2.connect(dsn=settings.DATABASE_URL) return _PooledConn(direct, pool=None) if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION: try: conn.rollback() logger.warning("get_validated_conn: rolled back dirty connection") except Exception: try: pool.putconn(conn, close=True) except Exception: pass direct = psycopg2.connect(dsn=settings.DATABASE_URL) return _PooledConn(direct, pool=None) if conn.status != psycopg2.extensions.STATUS_READY: try: with conn.cursor() as cur: cur.execute("SELECT 1") except Exception: try: pool.putconn(conn, close=True) except Exception: pass direct = psycopg2.connect(dsn=settings.DATABASE_URL) return _PooledConn(direct, pool=None) return _PooledConn(conn, pool)
Deposit и withdrawal
При INSERT INTO processed_events два исхода: успех, идём дальше (first-time path). UniqueViolation, событие уже видели (replay path).
На replay path смотрим outcome. Если success, синхронизируем статус payment_events с реальностью. Если pending, другой воркер ещё в середине транзакции, бросаем RetryableError для немедленного retry вместо ожидания recover_stale_enqueued_events через 3 минуты.
retry_count сбрасывается в 0 при успешной обработке. Без этого: событие попало в retry, обработалось успешно с retry_count=7, через 14+ дней processed_events вычистился по TTL, событие пришло снова через reorg compensation. Стартует уже с 7/10 до DLQ вместо 0/10.
Отдельный случай в replay: processed_events говорит что всё ок, а payment_events об этом событии ничего не знает. Такого быть не должно. Логируем, алертим, не паникуем.
def process_deposit_sync(conn, event_id, event_type, user_id, amount): amount_decimal = _validate_amount(amount) idempotency_key = _idempotency_key(event_id, event_type) with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SET LOCAL lock_timeout = '2s'") cur.execute("SET LOCAL statement_timeout = '5s'") try: cur.execute( "INSERT INTO processed_events (idempotency_key, outcome) " "VALUES (%s, 'pending')", (idempotency_key,), ) except psycopg2.errors.UniqueViolation: conn.rollback() with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur2: cur2.execute("SET LOCAL lock_timeout = '2s'") cur2.execute("SET LOCAL statement_timeout = '5s'") cur2.execute( "SELECT outcome FROM processed_events WHERE idempotency_key = %s", (idempotency_key,), ) row = cur2.fetchone() outcome = row["outcome"] if row else "pending" if outcome == "success": cur2.execute( "SELECT status FROM payment_events WHERE event_id = %s", (event_id,) ) r = cur2.fetchone() current = r["status"] if r else None if current is None: logger.error( "deposit_replay: orphan event, processed_events " "exists but payment_event not found", event_id=event_id, ) send_alert( f"[CRITICAL] orphan deposit event: {event_id}", alert_key=f"orphan_deposit:{event_id}", ) elif current == "enqueued": transition_event_status(cur2, event_id, "enqueued", "confirmed") elif current == "processing": transition_event_status(cur2, event_id, "processing", "confirmed") elif current == "confirmed": pass else: conn.rollback() raise RetryableError( f"deposit_replay FSM violation: " f"payment_event.status={current!r} with outcome=success " f"for event_id={event_id}" ) conn.commit() return conn.rollback() raise RetryableError( f"deposit idempotency hit with outcome={outcome!r} for event_id={event_id}" ) except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled, psycopg2.errors.DeadlockDetected) as e: conn.rollback() raise RetryableError(f"timeout on deposit for user {user_id}: {e}") try: transition_event_status(cur, event_id, "enqueued", "processing") cur.execute("SELECT id FROM users WHERE id = %s", (user_id,)) if cur.fetchone() is None: conn.rollback() raise ValueError(f"user {user_id} not found") cur.execute( "UPDATE users SET balance = balance + %s WHERE id = %s", (amount_decimal, user_id), ) if cur.rowcount == 0: conn.rollback() raise ValueError(f"user {user_id} disappeared between SELECT and UPDATE") except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled, psycopg2.errors.DeadlockDetected) as e: conn.rollback() raise RetryableError(f"lock/timeout on deposit first-time path for user {user_id}: {e}") try: cur.execute( "INSERT INTO balance_events " "(user_id, amount, event_type, source_event_id, created_at) " "VALUES (%s, %s, %s, %s, NOW())", (user_id, amount_decimal, event_type, event_id), ) except psycopg2.errors.UniqueViolation: conn.rollback() raise Exception( f"balance_events duplicate without idempotency key violation " f"event_id={event_id}, investigate immediately" ) cur.execute( "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s", (idempotency_key,), ) cur.execute( "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s", (event_id,), ) transition_event_status(cur, event_id, "processing", "confirmed") conn.commit()WithdrawalOutcome = Literal["success", "insufficient_funds"]def process_withdrawal_sync(conn, event_id, event_type, user_id, amount) -> WithdrawalOutcome: amount_decimal = _validate_amount(amount) idempotency_key = _idempotency_key(event_id, event_type) with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SET LOCAL lock_timeout = '2s'") cur.execute("SET LOCAL statement_timeout = '5s'") try: cur.execute( "INSERT INTO processed_events (idempotency_key, outcome) " "VALUES (%s, 'pending')", (idempotency_key,), ) except psycopg2.errors.UniqueViolation: conn.rollback() with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as lc: lc.execute("SET LOCAL lock_timeout = '2s'") lc.execute("SET LOCAL statement_timeout = '5s'") lc.execute( "SELECT outcome FROM processed_events WHERE idempotency_key = %s", (idempotency_key,), ) stored = (lc.fetchone() or {"outcome": "pending"})["outcome"] if stored == "success": lc.execute( "SELECT status FROM payment_events WHERE event_id = %s", (event_id,) ) r = lc.fetchone() current = r["status"] if r else None if current is None: logger.error( "withdrawal_replay: orphan event, processed_events " "exists but payment_event not found", event_id=event_id, ) send_alert( f"[CRITICAL] orphan withdrawal event: {event_id}", alert_key=f"orphan_withdrawal:{event_id}", ) elif current == "enqueued": transition_event_status(lc, event_id, "enqueued", "confirmed") elif current == "processing": transition_event_status(lc, event_id, "processing", "confirmed") elif current == "confirmed": pass else: conn.rollback() raise RetryableError( f"withdrawal_replay FSM violation: " f"payment_event.status={current!r} with outcome=success " f"for event_id={event_id}" ) conn.commit() return "success" elif stored == "insufficient_funds": lc.execute( "SELECT status FROM payment_events WHERE event_id = %s", (event_id,) ) r = lc.fetchone() current = r["status"] if r else None if current == "enqueued": transition_event_status(lc, event_id, "enqueued", "failed") elif current == "processing": transition_event_status(lc, event_id, "processing", "failed") elif current is None: logger.error( "withdrawal_replay insufficient_funds: orphan event", event_id=event_id, ) send_alert( f"[CRITICAL] orphan withdrawal (insufficient_funds): {event_id}", alert_key=f"orphan_withdrawal_insuf:{event_id}", ) conn.commit() return "insufficient_funds" else: conn.rollback() raise RetryableError( f"withdrawal idempotency hit with outcome={stored!r} " f"for event_id={event_id}" ) except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled, psycopg2.errors.DeadlockDetected) as e: conn.rollback() raise RetryableError(f"lock/timeout on withdrawal INSERT for user {user_id}: {e}") try: cur.execute( "SELECT balance FROM users WHERE id = %s FOR UPDATE NOWAIT", (user_id,), ) except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled, psycopg2.errors.DeadlockDetected) as e: conn.rollback() raise RetryableError(f"lock/timeout/deadlock on user lock for user {user_id}: {e}") row = cur.fetchone() if row is None: conn.rollback() raise ValueError(f"user {user_id} not found") try: if row["balance"] < amount_decimal: cur.execute( "UPDATE processed_events SET outcome = 'insufficient_funds' " "WHERE idempotency_key = %s", (idempotency_key,), ) transition_event_status(cur, event_id, "enqueued", "failed") conn.commit() return "insufficient_funds" transition_event_status(cur, event_id, "enqueued", "processing") cur.execute( "UPDATE users SET balance = balance - %s WHERE id = %s", (amount_decimal, user_id), ) try: cur.execute( "INSERT INTO balance_events " "(user_id, amount, event_type, source_event_id, created_at) " "VALUES (%s, %s, %s, %s, NOW())", (user_id, -amount_decimal, event_type, event_id), ) except psycopg2.errors.UniqueViolation: conn.rollback() raise Exception( f"balance_events duplicate without idempotency key violation " f"event_id={event_id}, investigate immediately" ) cur.execute( "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s", (idempotency_key,), ) cur.execute( "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s", (event_id,), ) transition_event_status(cur, event_id, "processing", "confirmed") conn.commit() return "success" except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled, psycopg2.errors.DeadlockDetected) as e: conn.rollback() raise RetryableError(f"lock/timeout on withdrawal path for user {user_id}: {e}")
Webhook: outbox pattern вместо прямого .delay()
Прямой вызов .delay() из webhook handler создаёт окно между записью в БД и постановкой в очередь. Если процесс упал в этот момент, событие зависнет в pending навсегда.
Решение: webhook только пишет в БД. Отдельный poller каждые 5 секунд берёт pending события, сначала атомарно меняет статусы и коммитит, и только после этого ставит в Celery очередь. Сначала коммитим, потом кладём в очередь, иначе воркер стартует раньше чем БД знает об enqueued.
Alchemy и Infura знают только tx-hash и адрес получателя. Маппинг to_address в user_id делается отдельным запросом к таблице deposit_addresses. Этот слой вынесен из статьи, но без него злоумышленник с HMAC-ключом зачислит деньги на любой user_id. Стоит держать это в голове.
verify_webhook_signature принимает raw_body байтами до JSON-парсинга, подпись считается по исходным байтам. secrets.compare_digest защищает от timing attack.
import asyncpgfrom fastapi import FastAPI, Request, HTTPExceptionfrom slowapi import Limiter, _rate_limit_exceeded_handlerfrom slowapi.util import get_remote_addressfrom slowapi.errors import RateLimitExceededapp = FastAPI()limiter = Limiter(key_func=get_remote_address, storage_uri=settings.REDIS_URL)app.state.limiter = limiterapp.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)ALLOWED_EVENT_TYPES = frozenset({"deposit", "airdrop", "withdrawal", "withdrawal_fee"})def verify_webhook_signature(raw_body, signature_header, signing_key): if not signing_key: raise ImproperlyConfigured("WEBHOOK_SECRET is not set") if len(signing_key) < 32: raise ImproperlyConfigured( f"WEBHOOK_SECRET too short: {len(signing_key)} chars, minimum 32" ) if not signature_header: return False mac = hmac.new( key=signing_key.encode("utf-8"), msg=raw_body, digestmod=hashlib.sha256, ) return secrets.compare_digest(mac.hexdigest(), signature_header)@app.post("/webhook/payments")@limiter.limit("300/minute")@limiter.limit("30/second")async def payment_webhook(request: Request): raw_body = await request.body() signature = request.headers.get("X-Alchemy-Signature", "") if not verify_webhook_signature(raw_body, signature, settings.WEBHOOK_SECRET): raise HTTPException(status_code=401, detail="invalid signature") trace_id = ( request.headers.get("X-Request-ID") or request.headers.get("X-Alchemy-Request-ID") or new_trace_id() ) set_trace_id(trace_id) try: payload = json.loads(raw_body) event_id = payload["event_id"] event_type = payload["event_type"] user_id = payload["user_id"] if not isinstance(payload.get("amount"), str): raise HTTPException(status_code=400, detail="amount must be a JSON string, not a number") amount_str = payload["amount"] except (json.JSONDecodeError, KeyError) as e: raise HTTPException(status_code=400, detail=f"invalid payload: {e}") if event_type not in ALLOWED_EVENT_TYPES: raise HTTPException(status_code=400, detail=f"unknown event_type: {event_type!r}") try: _validate_amount(amount_str) except ValueError as e: raise HTTPException(status_code=400, detail=f"invalid amount: {e}") db = request.app.state.db try: async with db.transaction(): await db.fetchrow( "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) " "VALUES ($1, $2, $3, $4, 'pending') ON CONFLICT (event_id) DO NOTHING", event_id, user_id, amount_str, event_type, ) except asyncpg.exceptions.ForeignKeyViolationError: logger.warning( "orphan webhook event (user not found)", event_id=event_id, user_id=user_id, ) raise HTTPException(status_code=400, detail="user not found") return {"status": "accepted", "trace_id": trace_id}
FastAPI работает в async event loop, блокирующий psycopg2 там убьёт throughput, поэтому asyncpg в webhook. Celery workers, отдельные процессы без event loop, asyncpg там только добавит сложность.
enqueue_pending_events использует тот же паттерн SAVEPOINT что и recover_stale_enqueued_events. Без него один event с AlreadyProcessedError от race с recover’ом откатывал весь батч из 100 событий. Они оставались в pending и подхватывались следующим тиком, но это видно в логах как потерянный тик. SAVEPOINT sp_enq на каждый event изолирует ошибку.
@shared_task(name="enqueue_pending_events")def enqueue_pending_events() -> dict: conn = get_validated_conn(db_pool) events_to_enqueue = [] try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT event_id, event_type, user_id, amount FROM payment_events WHERE status = 'pending' AND updated_at < NOW() - INTERVAL '5 seconds' ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED """) events = cur.fetchall() events_ok = [] for event in events: try: cur.execute("SAVEPOINT sp_enq") transition_event_status(cur, event['event_id'], "pending", "enqueued") cur.execute("RELEASE SAVEPOINT sp_enq") events_ok.append(event) except (ValueError, RetryableError, AlreadyProcessedError) as e: cur.execute("ROLLBACK TO SAVEPOINT sp_enq") cur.execute("RELEASE SAVEPOINT sp_enq") logger.warning("enqueue: skipped event", event_id=event['event_id'], error=str(e)) conn.commit() events_to_enqueue = list(events_ok) except Exception: try: conn.rollback() except Exception: pass logger.exception("enqueue_pending_events: transition failed") raise finally: try: conn.putconn() except Exception: pass enqueued = 0 for event in events_to_enqueue: try: process_payment_event.apply_async( args=[event['event_id'], event['event_type'], event['user_id'], str(event['amount'])], kwargs={"trace_id": str(uuid.uuid4())}, ) enqueued += 1 except Exception: logger.exception("apply_async failed", event_id=event['event_id']) logger.info("enqueue_pending_events done", enqueued=enqueued) return {"enqueued": enqueued}
recover_stale_enqueued_events каждые 2 минуты находит события, застрявшие в enqueued. После MAX_RECOVERY_ATTEMPTS попыток переводит в failed + DLQ. SAVEPOINT на каждое событие, ошибка в одном не откатывает весь батч.
MAX_RECOVERY_ATTEMPTS = 10@shared_task(name="recover_stale_enqueued_events")def recover_stale_enqueued_events() -> dict: conn = get_validated_conn(db_pool) recovered = 0 dlqed = 0 try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT event_id, event_type, user_id, amount, retry_count FROM payment_events WHERE status = 'enqueued' AND updated_at < NOW() - INTERVAL '3 minutes' ORDER BY created_at LIMIT 50 FOR UPDATE SKIP LOCKED """) stale = cur.fetchall() for event in stale: try: cur.execute("SAVEPOINT sp_recover") if event['retry_count'] >= MAX_RECOVERY_ATTEMPTS: transition_event_status(cur, event['event_id'], "enqueued", "failed") cur.execute( "INSERT INTO dead_letter_queue " "(event_id, event_type, user_id, amount, error) " "VALUES (%s, %s, %s, %s, %s)", (event['event_id'], event['event_type'], event['user_id'], str(event['amount']), f"exhausted recovery attempts ({MAX_RECOVERY_ATTEMPTS})") ) dlqed += 1 else: transition_event_status(cur, event['event_id'], "enqueued", "pending") cur.execute( "UPDATE payment_events SET retry_count = retry_count + 1 " "WHERE event_id = %s", (event['event_id'],) ) recovered += 1 cur.execute("RELEASE SAVEPOINT sp_recover") except Exception as sp_exc: try: cur.execute("ROLLBACK TO SAVEPOINT sp_recover") cur.execute("RELEASE SAVEPOINT sp_recover") except Exception: pass logger.error("recover: event skipped due to error", event_id=event['event_id'], error=str(sp_exc)) conn.commit() if recovered or dlqed: logger.warning("recover_stale_enqueued_events", recovered=recovered, dlqed=dlqed) if dlqed: send_alert( f"[WARNING] {dlqed} events exhausted recovery attempts, check DLQ", alert_key="recovery_exhausted", ) return {"recovered": recovered, "dlqed": dlqed} except Exception: try: conn.rollback() except Exception: pass logger.exception("recover_stale_enqueued_events failed") raise finally: try: conn.putconn() except Exception: pass
Celery: acks_late + reject_on_worker_lost + пул соединений
acks_late=True: подтверждение брокеру после завершения обработки, не в момент получения. По умолчанию Celery подтверждает сразу: воркер падает в середине, задача потеряна.
reject_on_worker_lost=True: при SIGKILL/OOM задача возвращается в очередь.
Если создать пул до fork, все воркеры наследуют одни и те же файловые дескрипторы. Два процесса шлют запросы в один сокет и ответы перемешиваются, поэтому пул создаётся в worker_process_init.
import osfrom celery.signals import worker_process_initdb_pool = None_local_breaker = None@worker_process_init.connectdef init_worker(**kwargs): global db_pool, _local_breaker _local_breaker = _InProcessBreaker() worker_pool = os.environ.get("CELERY_WORKER_POOL", "prefork") is_threaded = worker_pool in ("gevent", "eventlet") pool_class = ( psycopg2.pool.ThreadedConnectionPool if is_threaded else psycopg2.pool.SimpleConnectionPool ) db_pool = pool_class(minconn=2, maxconn=10, dsn=settings.DATABASE_URL) logger.info("worker init done", pool=pool_class.__name__, worker_pool=worker_pool)def _get_local_breaker() -> "_InProcessBreaker": global _local_breaker if _local_breaker is None: _local_breaker = _InProcessBreaker() return _local_breakerBACKOFF_BASE_SEC = 1BACKOFF_CAP_SEC = 60def jittered_backoff(attempt: int) -> float: cap = min(BACKOFF_CAP_SEC, BACKOFF_BASE_SEC * (2 ** attempt)) return random.uniform(0, cap)
Двухуровневый DLQ
Сначала делал LPUSH и EXPIRE двумя отдельными командами. Между ними возможен crash, ключ остаётся без TTL, живёт вечно. Фикс через pipeline.
Про схему DLQ: таблица использует BIGSERIAL PRIMARY KEY, не event_id PRIMARY KEY. Это позволяет хранить несколько попыток для одного event_id. Следствие: ON CONFLICT (event_id) DO NOTHING недопустим, event_id не имеет UNIQUE constraint. Каждый INSERT создаёт новую запись с полной историей попытки.
DLQ_REDIS_KEY = "dlq:payment_events"DLQ_REDIS_TTL = 7 * 24 * 3600 # 7 днейdef save_to_dlq_sync(conn, event_id, event_type, user_id, amount, error): payload = { "event_id": event_id, "event_type": event_type, "user_id": user_id, "amount": str(amount), "error": error, "trace_id": get_trace_id(), } db_ok = False try: with conn.cursor() as cur: cur.execute( "INSERT INTO dead_letter_queue " "(event_id, event_type, user_id, amount, error, created_at) " "VALUES (%s, %s, %s, %s, %s, NOW())", (event_id, event_type, user_id, str(amount), error) ) conn.commit() db_ok = True except Exception as db_exc: logger.error("DLQ postgres write failed", event_id=event_id, error=str(db_exc)) try: conn.rollback() except Exception: pass if db_ok: return try: pipe = _redis_client.pipeline() pipe.lpush(DLQ_REDIS_KEY, json.dumps(payload)) pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL) pipe.execute() logger.warning("DLQ saved to Redis fallback", event_id=event_id) return except redis_lib.RedisError as e: logger.error("DLQ redis write failed", event_id=event_id, error=str(e)) logger.critical( "DLQ_UNRECOVERABLE", event_id=event_id, event_type=event_type, user_id=user_id, amount=str(amount), error=error, trace_id=get_trace_id(), dlq_payload=payload, )
drain_redis_dlq запускается по расписанию после инцидента с БД. failed сбрасывается в 0 при каждом успешном INSERT, счётчик consecutive, не total. Чередующиеся успехи и ошибки не триггерят аварийный break, но drained > 0 при этом.
DRAIN_BATCH_SIZE = 500@shared_task(name="drain_redis_dlq")def drain_redis_dlq() -> dict: drained = 0 failed = 0 conn = get_validated_conn(db_pool) try: for _ in range(DRAIN_BATCH_SIZE): raw = _redis_client.rpop(DLQ_REDIS_KEY) if raw is None: break try: payload = json.loads(raw) except json.JSONDecodeError: logger.critical( "drain_redis_dlq: malformed JSON in DLQ, item discarded", raw=raw[:200], ) continue try: with conn.cursor() as cur: cur.execute( "INSERT INTO dead_letter_queue " "(event_id, event_type, user_id, amount, error, created_at) " "VALUES (%s, %s, %s, %s, %s, NOW())", (payload['event_id'], payload['event_type'], payload['user_id'], payload['amount'], payload['error']) ) conn.commit() drained += 1 failed = 0 except Exception as e: try: conn.rollback() except Exception: pass pipe = _redis_client.pipeline() pipe.lpush(DLQ_REDIS_KEY, raw) pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL) pipe.execute() failed += 1 logger.error( "drain failed, requeued to head", event_id=payload.get('event_id'), error=str(e), ) if failed >= 10: logger.error("drain aborted after 10 consecutive failures") break finally: conn.putconn() logger.info("drain_redis_dlq done", drained=drained, failed=failed) return {"drained": drained, "failed": failed}
Celery task: полная версия
@shared_task(name="process_payment_event", bind=True, max_retries=5, acks_late=True, reject_on_worker_lost=True)def process_payment_event(self, event_id, event_type, user_id, amount, trace_id=""): set_trace_id(trace_id or new_trace_id()) conn = get_validated_conn(db_pool) committed = False conn_ok = True try: if event_type in ("deposit", "airdrop"): process_deposit_sync(conn, event_id, event_type, user_id, amount) committed = True elif event_type in ("withdrawal", "withdrawal_fee"): outcome = process_withdrawal_sync(conn, event_id, event_type, user_id, amount) committed = True if outcome == "insufficient_funds": notify_user_insufficient_funds(user_id) else: logger.error("unknown event_type", event_type=event_type, event_id=event_id) try: conn.rollback() except Exception: pass try: _mark_event_failed(conn, event_id) except Exception as mark_exc: logger.error("_mark_event_failed raised", event_id=event_id, error=str(mark_exc)) save_to_dlq_sync( conn, event_id, event_type, user_id, amount, f"unknown event_type: {event_type!r}" ) raise Ignore() except AlreadyProcessedError as exc: logger.info("event already processed", event_id=event_id, reason=str(exc)) raise Ignore() except RetryableError as exc: delay = jittered_backoff(self.request.retries) logger.warning( "retrying", event_id=event_id, attempt=self.request.retries, delay=delay, reason=str(exc), ) try: raise self.retry(exc=exc, countdown=delay) except MaxRetriesExceededError: conn.rollback() _mark_event_failed(conn, event_id) save_to_dlq_sync(conn, event_id, event_type, user_id, amount, f"retries exhausted: {exc}") raise Ignore() except Ignore: raise except Exception as exc: logger.exception("unhandled error", event_id=event_id) try: conn.rollback() except Exception: pass try: _mark_event_failed(conn, event_id) except Exception as mark_exc: logger.error("_mark_event_failed raised in catch-all", event_id=event_id, error=str(mark_exc)) try: save_to_dlq_sync(conn, event_id, event_type, user_id, amount, str(exc)) except Exception as dlq_exc: logger.critical( "DLQ write failed, manual recovery required", event_id=event_id, trace_id=get_trace_id(), original_error=str(exc), dlq_error=str(dlq_exc), ) self.update_state(state="FAILURE", meta={"error": str(exc)}) raise Ignore() finally: if not committed: try: conn.rollback() except Exception as rb_exc: logger.error("rollback failed", event_id=event_id, error=str(rb_exc)) conn_ok = False conn.putconn(close=not conn_ok)def notify_user_insufficient_funds(user_id: int) -> None: pass
notify_user_insufficient_funds: заглушка. В проде нужна outbox-запись внутри process_withdrawal_sync до финального commit. Вызов отсюда (после commit) означает out-of-band: отдельная транзакция, отдельные гарантии доставки.
Здесь скрыта ловушка специфичная именно для at-least-once. Провайдер присылает одно событие трижды process_payment_event отработает трижды. Баланс не изменится (idempotency через unique constraint). Но notify вызовется трижды, пользователь получит три уведомления «недостаточно средств» вместо одного. Идемпотентность БД не распространяется на side-effects вне транзакции автоматически.
Вторая проблема с этим placement: если notify бросит исключение, он попадёт в catch-all except Exception, который запишет событие в DLQ, хотя деньги уже списаны корректно и business-транзакция закоммичена. Это шум в DLQ который будет скрывать реальные инциденты.
Circuit breaker для Web3 RPC
Трёхфазный: closed → open → half-open → closed. Redis как shared state между инстансами, in-process breaker как fallback когда Redis недоступен.
_InProcessBreaker: простой per-process счётчик с lock. Открывается приRPC_ERROR_THRESHOLD ошибках, закрывается автоматически после RPC_COOLDOWN_SEC секунд. Нужен именно как fallback: если Redis сам недоступен, circuit breaker не должен перестать работать.
Нужно запустить только один тестовый запрос пока breaker открыт, и тогда SET nx=True гарантирует, что только первый воркер получит право, а остальные увидят занятый probe key.
import web3from prometheus_client import Counterrpc_errors_total = Counter("web3_rpc_errors_total", "Web3 RPC failures", ["method"])_w3 = web3.Web3(web3.HTTPProvider(settings.ETH_RPC_URL))FINALIZED_CACHE_TTL = 86_400PENDING_CACHE_TTL = 30CACHE_PREFIX = "eth:fin:"RPC_ERROR_THRESHOLD = 5RPC_COOLDOWN_SEC = 60RPC_ERROR_WINDOW_SEC = 30HALF_OPEN_PROBE_KEY = "circuit:web3:half_open_probe"HALF_OPEN_PROBE_TTL = 10@dataclassclass _InProcessBreaker: _lock: threading.Lock = field(default_factory=threading.Lock) _errors: int = 0 _open_until: "datetime | None" = None def is_open(self) -> bool: with self._lock: if self._open_until is None: return False if datetime.now(timezone.utc) > self._open_until: self._open_until = None self._errors = 0 return False return True def record_error(self) -> None: with self._lock: self._errors += 1 if self._errors >= RPC_ERROR_THRESHOLD: self._open_until = datetime.now(timezone.utc) + timedelta(seconds=RPC_COOLDOWN_SEC) def record_success(self) -> None: with self._lock: self._errors = 0 self._open_until = Nonedef _is_circuit_open() -> bool: try: if _redis_client.get("circuit:web3:open"): is_probe = _redis_client.set( HALF_OPEN_PROBE_KEY, "1", nx=True, ex=HALF_OPEN_PROBE_TTL ) if is_probe: return False return True except redis_lib.RedisError: pass return _get_local_breaker().is_open()def _record_rpc_error(method: str) -> None: rpc_errors_total.labels(method=method).inc() _get_local_breaker().record_error() try: pipe = _redis_client.pipeline() pipe.incr("circuit:web3:errors") pipe.expire("circuit:web3:errors", RPC_ERROR_WINDOW_SEC) count, _ = pipe.execute() if count >= RPC_ERROR_THRESHOLD: open_pipe = _redis_client.pipeline() open_pipe.setex("circuit:web3:open", RPC_COOLDOWN_SEC, "1") open_pipe.set(HALF_OPEN_PROBE_KEY, "1", ex=HALF_OPEN_PROBE_TTL) open_pipe.execute() logger.critical("web3 circuit breaker opened", count=count) send_alert( f"[CRITICAL] Web3 RPC circuit breaker OPEN, " f"{count} failures in {RPC_ERROR_WINDOW_SEC}s.", alert_key="web3_breaker_open", ) else: if not _redis_client.exists("circuit:web3:open"): _redis_client.delete(HALF_OPEN_PROBE_KEY) except redis_lib.RedisError as e: logger.warning("circuit breaker state write failed", error=str(e))def _record_rpc_success() -> None: _get_local_breaker().record_success() try: _redis_client.delete("circuit:web3:errors") _redis_client.delete(HALF_OPEN_PROBE_KEY) if _redis_client.delete("circuit:web3:open"): logger.info("web3 circuit breaker closed") send_alert( "[INFO] Web3 RPC circuit breaker CLOSED, recovered", alert_key="web3_breaker_closed", ) except redis_lib.RedisError: passdef is_transaction_finalized(tx_hash: str) -> bool: if _is_circuit_open(): logger.warning("web3 circuit open, skipping", tx_hash=tx_hash) return False cache_key = f"{CACHE_PREFIX}{tx_hash}" try: cached = _redis_client.get(cache_key) if cached == "1": return True if cached == "0": return False except redis_lib.RedisError as e: logger.warning("redis cache unavailable", tx_hash=tx_hash, error=str(e)) method = "get_transaction_receipt" try: receipt = _w3.eth.get_transaction_receipt(tx_hash) if receipt is None: return False method = "get_block_finalized" finalized_block = _w3.eth.get_block("finalized")["number"] result = receipt["blockNumber"] <= finalized_block _record_rpc_success() except Exception as e: logger.error("eth rpc error", tx_hash=tx_hash, error=str(e)) _record_rpc_error(method) return False try: ttl = FINALIZED_CACHE_TTL if result else PENDING_CACHE_TTL _redis_client.setex(cache_key, ttl, "1" if result else "0") except redis_lib.RedisError as e: logger.warning("redis cache write failed", tx_hash=tx_hash, error=str(e)) return result
Мониторинг
hot_path_balance_check: не полноценная финансовая сверка, а только мониторинг горячего пути за последние 10 минут. users.balance и SUM(balance_events) читаются одним JOIN-запросом, снимок данных берётся атомарно. REPEATABLE_READ здесь defensive engineering: гарантирует консистентный снимок на уровне транзакции и защищает от phantom reads если транзакция вырастет до нескольких операторов в будущем. Для полной исторической сверки нужна ночная задача, это в бэклоге.
set_isolation_level может бросить если соединение разорвано. Без try/except вокруг него соединение вернётся в пул с REPEATABLE_READ, следующий пользователь не будет ожидать такого поведения.
from prometheus_client import Counterhot_path_runs = Counter("hot_path_balance_check_runs_total", "Runs of hot path balance check")@shared_task(name="hot_path_balance_check")def hot_path_balance_check() -> None: conn = get_validated_conn(db_pool) conn_ok = True try: conn.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SET LOCAL statement_timeout = '10s'") cur.execute(""" SELECT u.id, u.balance AS actual_balance, u.initial_balance + COALESCE(SUM(be.amount), 0) AS calculated_balance FROM users u INNER JOIN ( SELECT DISTINCT user_id FROM balance_events WHERE created_at > NOW() - INTERVAL '10 minutes' ) recent ON recent.user_id = u.id LEFT JOIN balance_events be ON be.user_id = u.id GROUP BY u.id, u.balance, u.initial_balance HAVING u.balance != u.initial_balance + COALESCE(SUM(be.amount), 0) """) mismatches = cur.fetchall() conn.commit() except Exception: conn.rollback() raise finally: try: conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) except Exception: conn_ok = False hot_path_runs.inc() if mismatches: send_alert( f"[CRITICAL] balance mismatch: {[dict(m) for m in mismatches]}", alert_key="balance_mismatch", ) except Exception: logger.exception("hot_path_balance_check failed") try: conn.rollback() except Exception: pass send_alert( "[WARNING] hot_path_balance_check failed, check worker logs", alert_key="balance_check_failed", ) raise finally: conn.putconn(close=not conn_ok)@shared_task(name="alert_zombie_events")def alert_zombie_events() -> None: conn = get_validated_conn(db_pool) try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT event_id, user_id, status, updated_at FROM payment_events WHERE (status = 'processing' AND updated_at < NOW() - INTERVAL '5 minutes') OR (status IN ('pending', 'enqueued') AND updated_at < NOW() - INTERVAL '15 minutes') """) zombies = cur.fetchall() cur.execute(""" SELECT COUNT(*) AS dlq_size FROM dead_letter_queue WHERE created_at > NOW() - INTERVAL '1 hour' """) recent_dlq = cur.fetchone()['dlq_size'] cur.execute(""" SELECT COUNT(*) AS stuck FROM processed_events WHERE outcome = 'pending' AND created_at < NOW() - INTERVAL '10 minutes' """) stuck_pending = cur.fetchone()['stuck'] if zombies: send_alert( f"[WARNING] zombie events: {[z['event_id'] for z in zombies[:20]]}", alert_key="zombie_events", ) if recent_dlq > 10: send_alert( f"[WARNING] {recent_dlq} events in DLQ last hour, investigate", alert_key="dlq_flood", ) if stuck_pending: send_alert( f"[CRITICAL] {stuck_pending} stuck 'pending' rows in processed_events, " f"architectural invariant broken, investigate urgently", alert_key="processed_events_stuck_pending", ) finally: try: conn.rollback() except Exception: pass conn.putconn()
Prometheus-алерты:
- alert: HotPathBalanceCheckNotRunning expr: increase(hot_path_balance_check_runs_total[6m]) == 0 for: 0m- alert: Web3RPCCircuitOpen expr: increase(web3_rpc_errors_total[5m]) > 5 for: 0m- alert: CeleryHighRetryRate expr: rate(celery_tasks_total{state="retry"}[5m]) / rate(celery_tasks_total{state="success"}[5m]) > 0.1- alert: CeleryQueueDepth expr: celery_queue_depth > 500 for: 5m
Beat расписание:
from celery.schedules import crontabbeat_schedule = { "enqueue-pending-events": {"task": "enqueue_pending_events", "schedule": 5.0}, "recover-stale-enqueued-events": {"task": "recover_stale_enqueued_events", "schedule": 120.0}, "cleanup-processed-events": {"task": "cleanup_processed_events", "schedule": crontab(hour=3, minute=0)}, "hot-path-balance-check": {"task": "hot_path_balance_check", "schedule": 60.0}, "alert-zombie-events": {"task": "alert_zombie_events", "schedule": 60.0}, "drain-redis-dlq": {"task": "drain_redis_dlq", "schedule": crontab(hour=4, minute=0)},}
Cleanup
processed_events чистит только terminal outcomes. outcome='pending' не трогается никогда. Это защита от тихого удаления событий в середине обработки. Соединение берётся и возвращается в пул на каждый батч, а не удерживается на всё время задачи.
TTL 14 дней безопасен благодаря UNIQUE (source_event_id, event_type) на balance_events: даже если провайдер реплеит событие спустя 14+ дней и processed_events уже очищен, повторный INSERT в balance_events отклоняется unique constraint. Даже если приложение облажается, unique constraint на balance_events не пропустит дубль. База будет последним рубежом.
CLEANUP_BATCH_SIZE = 5_000CLEANUP_BATCH_PAUSE = 0.1CLEANUP_MAX_BATCHES = 200CLEANUP_TTL_DAYS = 14CLEANUP_SAFE_STATUSES = ("success", "insufficient_funds")@shared_task(name="cleanup_processed_events")def cleanup_processed_events() -> dict: import time total_deleted = 0 batches = 0 for _ in range(CLEANUP_MAX_BATCHES): conn = get_validated_conn(db_pool) try: with conn.cursor() as cur: cur.execute(""" DELETE FROM processed_events WHERE idempotency_key IN ( SELECT idempotency_key FROM processed_events WHERE outcome = ANY(%s) AND created_at < NOW() - (%s * INTERVAL '1 day') LIMIT %s FOR UPDATE SKIP LOCKED ) """, (list(CLEANUP_SAFE_STATUSES), CLEANUP_TTL_DAYS, CLEANUP_BATCH_SIZE)) deleted = cur.rowcount conn.commit() except Exception: try: conn.rollback() except Exception: pass raise finally: conn.putconn() total_deleted += deleted batches += 1 if deleted < CLEANUP_BATCH_SIZE: break time.sleep(CLEANUP_BATCH_PAUSE) return {"batches": batches, "deleted": total_deleted}
Тесты и что они не покрывают
Idempotency на моках не проверишь. Unique constraint должен работать как в проде, значит нужна реальная БД. threading не подходит, GIL мешает нормально воспроизвести гонку. Поэтому multiprocessing.
Баланс может сойтись, а balance_events при этом задублирован. Проверяй оба. Именно так и бывает, баланс сходится, всё выглядит чисто, а дубли в balance_events всплывают только когда приходит аудитор.
import pytestimport psycopg2import psycopg2.extrasimport multiprocessingfrom decimal import DecimalTEST_DSN = "host=localhost port=5433 dbname=testdb user=testuser password=testuser"@pytest.fixture(scope="session")def db_conn_session(): conn = psycopg2.connect(TEST_DSN) yield conn conn.close()@pytest.fixturedef db_conn(db_conn_session): conn = db_conn_session conn.rollback() with conn.cursor() as cur: cur.execute(""" TRUNCATE balance_events, processed_events, payment_events, dead_letter_queue, users RESTART IDENTITY CASCADE """) cur.execute( "INSERT INTO users (id, balance, initial_balance) VALUES (1, 100.0, 100.0)" ) conn.commit() yield conn try: conn.rollback() except Exception: passdef _deposit_worker(dsn, event_id, event_type, user_id, amount, barrier, q): conn = psycopg2.connect(dsn) try: barrier.wait(timeout=10) process_deposit_sync(conn, event_id, event_type, user_id, amount) q.put(("ok", None)) except Exception as e: q.put(("err", f"{type(e).__name__}: {e}")) finally: conn.close()def test_duplicate_deposits_produce_single_credit(db_conn): """10 воркеров, ОДИН event_id, имитация at-least-once delivery.""" with db_conn.cursor() as cur: cur.execute( "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) " "VALUES ('evt_dup', 1, '50.0', 'deposit', 'enqueued')" ) db_conn.commit() N = 10 barrier = multiprocessing.Barrier(N) q = multiprocessing.Queue() workers = [ multiprocessing.Process( target=_deposit_worker, args=(TEST_DSN, "evt_dup", "deposit", 1, "50.0", barrier, q), ) for _ in range(N) ] for w in workers: w.start() for w in workers: w.join(timeout=20) with db_conn.cursor() as cur: cur.execute("SELECT balance FROM users WHERE id = 1") balance = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM balance_events") be_count = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM processed_events") pe_count = cur.fetchone()[0] assert balance == Decimal("150.0"), f"дублирование! balance={balance}" assert be_count == 1, f"balance_events дублирован: {be_count}" assert pe_count == 1
Что тесты не покрывают
Throughput под реальной нагрузкой. NOWAIT сериализует доступ, throughput достигается через Celery retry с jittered backoff. Для нагрузочного тестирования нужен отдельный стенд с Celery workers.
Падение воркера посреди транзакции. reject_on_worker_lost=True требует убийства Celery worker процесса и проверки что задача возвращена в брокер. Это integration test, живёт отдельно.
Redis DLQ fallback. Требует реального Redis и имитации падения PostgreSQL. Проверяется через chaos testing.
Backpressure и деградация
Под высокой конкуренцией на одного пользователя RetryableError от NOWAIT накапливаются. max_retries=5 даёт 6 попыток (initial + 5 retry), суммарный worst case delay до ~63с. Если throughput входящих webhooks превышает способность разгребать retry, CeleryQueueDepth начнёт расти.
Когда DLQ начинает заполняться: alert_zombie_events зафиксирует dlq_size > 10 events/hour, первый сигнал. PostgreSQL dead_letter_queue растёт без ограничений, теоретически до размера диска. Дальше ручная работа: разобрать причину, починить, сделать replay из DLQ.
Автореплей не сделан сознательно. Событие попало в DLQ, значит что-то пошло не так. Пусть человек разберётся прежде, чем гнать его обратно. Не хочу, чтобы система сама решала что делать с деньгами, которые уже один раз упали.
Если Redis broker недоступен, apply_async всегда падает. Событие остаётся в enqueued, через 3 минуты recover_stale_enqueued_events переводит обратно в pending и инкрементит retry_count. После MAX_RECOVERY_ATTEMPTS (10 попыток ~ 30 минут) событие уходит в DLQ + алерт.
acks_late и reject_on_worker_lost защищают от падения воркера, не брокера. Если master Redis упадёт, in-flight задачи потеряны. appendonly yes + appendfsync everysec, минимум который должен быть. Если совсем не хочешь терять данные, appendfsync always, но throughput просядет.
Про blockchain reorg
Для Ethereum после перехода на PoS финализация через два checkpoint эпохи (~12-15 минут). Эвристика «12 блоков» из эпохи PoW сейчас не применима. Для L2 (Arbitrum, Optimism) правила другие, здесь только про Ethereum L1.
Текущая реализация reorg не обрабатывает, это скоуп первого релиза. При реорге блокчейн «откатывает» несколько блоков. Транзакция может попасть в новый блок без изменений или не попасть никуда, эффективно отменена.
Самый простой вариант: компенсирующая запись в balance_events со знаком минус, с отдельным idempotency key чтобы не конфликтовать с оригинальным событием:
def handle_reorg_event(original_tx_hash: str, user_id: int, amount: Decimal) -> None: reorg_event_id = f"reorg:{original_tx_hash}" idempotency_key = _idempotency_key(reorg_event_id, "reorg_compensation") # далее стандартный flow через processed_events + balance_events
Что в бэклоге
Самое болезненное прямо сейчас: notify_user_insufficient_funds вызывается вне транзакции, нужен outbox pattern, запись в outbox-таблицу внутри той же транзакции что и UPDATE users. Без этого при at-least-once delivery пользователь получает N уведомлений на один отказ, а при сбое notify в DLQ попадают записи об успешно завершённых транзакциях. При >50 воркеров прямые соединения упираются в max_connections, нужен PgBouncer. Мина которую стоит учесть до его включения в transaction pooling: hot_path_balance_check использует conn.set_isolation_level(REPEATABLE_READ), это session-level команда, PgBouncer в transaction pooling её не сохраняет между транзакциями. SET LOCAL lock_timeout/statement_timeout работают нормально (они tx-scoped), а isolation level придётся переписать на BEGIN ISOLATION LEVEL REPEATABLE READ прямо в запросе или оставить session pooling для этой конкретной таски. Это классическая сеньорская мина: в dev всё работает, ломается только после включения PgBouncer в проде. processed_events растёт, cleanup закрывает проблему на ближайшее время, но партиционирование по дате при переходе за сотни миллионов строк неизбежно.
Дальше: Redis broker с appendfsync always, полная историческая сверка через materialized view, полная реализация reorg handling, Admin UI для DLQ вместо ручного SQL. Одна ловушка в коде которую я пока не трогал: _mark_event_failed коммитит сама, при будущем рефакторинге это тебя укусит.
Результат
Очередь разгребается и суммы сходятся, это разные вещи. Prometheus показывает первое, hot_path_balance_check второе. Нужны оба, одним не обойтись.
8 месяцев в проде. 0 дублирующих зачислений после деплоя фикса, против 23 за первый месяц на ~180k транзакций. Webhook-доставка 100% с учётом retry-логики провайдера.
Здесь нет ни одного решения которое я придумал заранее. Каждое закрывает дыру которая уже стрельнула. Idempotency через DB unique constraint, TOCTOU через SELECT FOR UPDATE NOWAIT, потеря транзакций через acks_late + outbox, FSM через VALID_TRANSITIONS. Каждое из этих решений по отдельности не гарантия. Вместе закрывают друг друга. Восемь месяцев без инцидентов.
ссылка на оригинал статьи https://habr.com/ru/articles/1028708/