К маю 2026 года средний сбор аудитории в нашей системе занимал 2 часа 50 минут. Проблема состояла из двух частей. Первая: 96.5% этого времени процесс стоял в очереди. Вторая: даже когда процесс добирался до исполнения, расчёт занимал 6 минут — десятки задач внутри DAG выполнялись над множествами в миллионы строк, и основным инструментом были JOIN.
Мы последовательно устранили обе проблемы — 20 изменений, разбитых на шесть этапов. Ниже описан каждый шаг и итоговый результат.
Все числа «до» и «после» получены одним и тем же SQL-запросом по Postgres — это воспроизводимый замер.
Домен обобщён: конкретная компания не имеет значения. Это платформа, которая по булевым условиям («траты > 5000 ₽ за последние 3 месяца» И «покупки в категории Рестораны») собирает множество клиентских идентификаторов из 10-миллионной базы и доставляет его во внешние системы.
Глоссарий
|
Термин |
Определение |
|---|---|
|
Аудитория (сбор аудитории) |
Множество клиентов, удовлетворяющих набору условий. «Собрать аудиторию» — вычислить это множество и доставить наружу. |
|
DAG |
Направленный ациклический граф задач: пересечение ждёт, пока досчитают финансы и гео. |
|
TQ (движок очереди задач) |
Планировщик поверх Postgres: хранит граф, состояния, зависимости, ретраи и результат каждой задачи. |
|
Воркер |
Обработчик одного типа задачи: |
|
Колоночная СУБД |
Аналитическая база данных, где происходит вся работа над множествами клиентов. |
|
Процесс vs задача |
Один сбор аудитории — один процесс, который разворачивается в десятки задач (узлов DAG). |
|
Очередь (queue) vs исполнение (exec) |
Задача сначала ждёт свободного слота (queue), потом выполняется (exec). |
Основные типы воркеров, упоминаемые в статье:
|
Воркер |
Что делает |
|---|---|
|
|
Собирает клиентов по финансовым условиям: сумма трат, доходов, количество операций за период. Самый тяжёлый. |
|
|
Сопоставляет внешний файл хешей (SHA512 телефонов, MD5 почт) с контактами клиентов в базе. |
|
|
Фильтрует клиентов по кодам категорий торговых точек (merchant category codes). |
|
|
Пересекает множества клиентов из разных веток: клиент должен пройти все фильтры одновременно. |
|
|
Объединяет множества клиентов: клиент должен пройти хотя бы один из фильтров. |
|
|
Вычитает одно множество из другого — например, исключает клиентов, попавших в стоп-лист. |
|
|
Вычисляет разницу между старой и новой версией аудитории при пересборе. |
|
|
Материализует итоговое множество и отправляет его в Kafka / S3. |
|
|
Удаляет временные таблицы после завершения процесса. |
1. Архитектура
Система построена на четырёх компонентах: REST-сервис, движок очереди задач (TQ), колоночная СУБД и транспортный слой (Kafka + S3).

Ключевое архитектурное решение — сбор аудитории устроен как DAG. Под параметры аудитории строится граф задач, сохраняется в Postgres и исполняется воркерами. DAG естественно описывает и параллелизм (финансы и гео считаются одновременно), и зависимости (пересечение ждёт завершения всех веток). Эту модель мы не меняли — она работает. Изменения касались всего, что построено вокруг.
Сквозной путь задачи
Граф строится DSL:

tqueue(type = "AUDITORY_GATHERING", continuous = true, priority = 1) { val fin = task<FinancialParams, SetResult>("FINANCIAL") { params { /* период, порог */ } } val mcc = task<MccParams, SetResult>("MCC") { params { /* коды, период */ } } val hash = task<HashParams, SetResult>("HASH") { params { /* файл хешей */ } } val intersect = task<IntersectParams, SetResult>("INTERSECT") { dependsOn(fin, mcc, hash) } val export = task<ExportParams, Unit>("EXPORT") { dependsOn(intersect) } task<CleanUpParams, Unit>("CLEAN_UP") { dependsOn(export) }}
Цикл планировщика — исходная версия
Каждые 5 секунд планировщик исполнял фиксированную последовательность:
каждые 5 секунд: 1. обработать периодические задачи 2. сторож: пометить TIMEOUT просроченные задачи 3. сторож: пометить TIMEOUT слишком старые NEW 4. сторож: освободить устаревшие резервации 5. раздать готовые задачи + назначить ноду 6. прочитать назначённые этой ноде задачи 7. исполнить: IN_PROGRESS → воркер → успех/ошибка
Половина проблем находилась в шагах 1–4 — они выполнялись до того, как очередь доходила до полезной работы.
Исходная конфигурация
|
Параметр |
Значение |
|---|---|
|
Реплики сервиса |
2 |
|
CPU / память на реплику |
500m / 2 GiB (JVM |
|
Максимум одновременных задач |
5 |
|
Пул соединений к колоночной СУБД |
7 |
|
Интервал опроса планировщика |
5 с |
|
Планировщик запуска сборов |
каждые 15 с |
Эти значения объясняют бóльшую часть того, что показали метрики. Особенно — лимит в 5 одновременных задач.
2. Диагностика
Как получены числа
Отдельной системы метрик для сборов не было — ни Grafana-дашборда, ни мониторинга времени аудиторий. Однако весь жизненный цикл задачи находился в Postgres: у каждой строки tq_process и tq_task есть date_created (попала в очередь), date_started (взята в работу) и date_finished (завершена). Этих трёх меток достаточно, чтобы разложить полное время на ожидание и исполнение одной агрегацией:
SELECT type AS process_type, COUNT(*) AS total_runs, ROUND(AVG(EXTRACT(EPOCH FROM (date_finished - date_created)))) AS avg_total_sec, ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP ( ORDER BY EXTRACT(EPOCH FROM (date_finished - date_created)))) AS p95_total_sec, ROUND(AVG(EXTRACT(EPOCH FROM (date_finished - date_started)))) AS avg_exec_sec, ROUND(AVG(EXTRACT(EPOCH FROM (date_started - date_created)))) AS avg_queue_secFROM tq_processWHERE date_created >= NOW() - INTERVAL '14 days' AND status = 'COMPLETED'GROUP BY type;
2.1. 96.5% времени — ожидание
|
Тип процесса |
Запусков |
Полное время (avg) |
Исполнение (avg) |
Ожидание (avg) |
|---|---|---|---|---|
|
|
3 765 |
10 229 с |
361 с |
9 868 с |
|
|
16 800 |
2 939 с |
1 с |
2 938 с |
|
|
8 955 |
8 с |
2 с |
6 с |

Средний сбор шёл 2 часа 50 минут. Расчёт занимал 6 минут. Всё остальное — ожидание в очереди. Вывод, определивший стратегию: оптимизация SQL в воркерах затрагивает только 3.5% времени процесса. Основная проблема — не скорость вычислений, а то, что очередь почти не доходит до них.
2.2. Одна очередь на всех
Лимит «5 одновременных задач» был общим на весь контур. Разброс стоимости воркеров:
|
Тип задачи |
Исполнение (avg) |
Ожидание (avg) |
|---|---|---|
|
|
198 с |
272 с |
|
|
92 с |
12 708 с |
|
|
48 с |
12 606 с |
|
|
38 с |
10 610 с |
|
|
13 с |
12 187 с |
|
|
2 с |
12 146 с |

FINANCIAL (198 с) и UNION (2 с) конкурировали за одни и те же пять слотов. Выдача задач — ORDER BY priority, date_created — означала, что один процесс с десятком готовых задач мог занять все слоты, пока остальные ждали. Отсюда queue ≈ 27 × exec.
2.3. Ночная волна
В 22:00 система запускала массу динамических пересборов — одновременно. Самые медленные процессы создавались около 22:00:40 и ждали до 32 000 секунд (≈ 9 часов). К графу добавлялись дельта-задачи:

Дельта-задача исполнялась 92 секунды, ожидание старта — 12 708 секунд. Соотношение 138:1.
2.4. Узкие места движка очереди
-
Опрос между уровнями DAG.
delay(5s)в конце каждой итерации. Каждый следующий уровень графа ждал до 5 секунд даже при свободных ресурсах. -
Сторожевые проверки на критическом пути. Три
UPDATE-запроса выполнялись до раздачи задач. Замедление любого из них задерживало старт полезной работы. -
Резервация в два шага. Сначала
node_id(статусNEW), затем отдельноIN_PROGRESS. Между ними — окноNEW + node_id, открытое для состояния гонки. -
Приоритет выставлялся повторно.
continuous = trueвызывалUPDATE tq_task SET priority = -1при старте каждого воркера. Для графа из 50 задач — 50 одинаковых UPDATE. -
NOT EXISTSпо зависимостям на каждом цикле. Готовность задачи вычислялась вложенным запросом со сканированием тысяч строкtq_task_dependenciesкаждые 5 секунд.
2.5. Слишком мелкая нарезка DAG
Финансовая ветка дробилась по одному месяцу (режим EACH_MONTH), MCC и терминалы также разрезались по периодам. Результаты сводились дополнительными уровнями агрегации и пересечения:

Для 12 месяцев это давало 16 TQ-задач и 3 уровня DAG только на финансовую ветку. Каждый уровень — дополнительное ожидание в очереди.
2.6. Повторный счёт стабильных данных
Для каждой аудитории строился отдельный DAG с нуля. Сотня процессов с идентичными FINANCIAL(январь, порог 5000) — это сотня одинаковых запросов в колоночную СУБД.
3. Решения
Мы реализовали 20 изменений, сгруппированных по слоям. Ниже — что именно сделано. В следующем разделе — как это разворачивалось и какие дало результаты.
3.1. Ресурсы и конфигурация
CPU throttling → 2000m
Grafana показывала 54 000+ throttled periods: JVM регулярно приостанавливалась Kubernetes CFS-контроллером. Операции JDBI, корутины и сборщик мусора работали на 70–80% от возможной скорости.
# k8s values — единственное изменение:resources: limits: cpu: 2000m # было 500m requests: cpu: 500m # не меняли — не влияет на scheduling
Эффект: исполнение ускорилось на 20–30%. Увеличение CPU limit не несёт риска.
Расширение пула и лимитов
Пул соединений к колоночной СУБД: 7 → 15. max-concurrent-tasks: 5 → 10. Добавлен явный лимит на очередь колоночной СУБД — ранее он отсутствовал.
database.clickhouse.datasource: maximumPoolSize: 15 # было 7tqueue: max-concurrent-tasks: 10 # было 5 queue-limits: KAFKA: 2 CLICKHOUSE: 2 COLUMNAR_DB: 8 # новый лимит
Разделение очередей по типу нагрузки
FINANCIAL (198 с) и UNION (2 с) больше не стояли в одной очереди:
tqueue: queue-limits: HEAVY: 3 # FINANCIAL, AGGREGATION — тяжёлая аналитика LIGHT: 10 # INTERSECT, UNION, HASH, MCC — лёгкие операции FILE_OP: 5 # EXPORT, CLEAN_UP — операции с файлами AUTO_GATHER: 2 # фоновые пересборы

// FINANCIAL, FINANCIAL_AGGREGATION → HEAVYtask.queueType = "HEAVY"// INTERSECT, UNION, HASH, MCC → LIGHTtask.queueType = "LIGHT"// EXPORT, CLEAN_UP → FILE_OPtask.queueType = "FILE_OP"
Эффект: UNION перестал ждать за FINANCIAL. Критический путь DAG сократился на 15–20%.
3.2. Справедливое планирование
Fair scheduling: ROW_NUMBER по процессам
Старый запрос ORDER BY priority, date_created LIMIT 5 отдавал все слоты одному процессу. Новый ранжирует задачи внутри процесса и раздаёт по одной на процесс:
WITH ranked AS ( SELECT t.*, ROW_NUMBER() OVER ( PARTITION BY t.process_id ORDER BY t.priority NULLS LAST, t.date_created) AS rn FROM tq_task t WHERE t.status = 'NEW' AND t.node_id IS NULL AND NOT EXISTS (/* незавершённые зависимости */) AND EXISTS (/* активный процесс */))SELECT * FROM rankedORDER BY rn, priority NULLS LAST, date_createdLIMIT :fetchLimit FOR UPDATE SKIP LOCKED

Эффект: 9-часовой хвост в SLOW-бакете исчез. Добавлен индекс по (status, node_id, process_id).
Ограничение одновременных процессов
tqueue: max-concurrent-processes-by-type: AUDITORY_GATHERING: 5 # не более 5 процессов одновременно
Вместо 100 параллельных процессов — группы по 5, каждая завершается предсказуемо.
3.3. Устранение накладных расходов движка
Приоритет — один раз
UPDATE tq_task SET priority = -1 перенесён из execute() каждого воркера в момент создания процесса. Для графа из 50 задач — 1 UPDATE вместо 50.
Сторож — в отдельную корутину
Три сторожевых UPDATE-запроса вынесены в независимый цикл с интервалом 60 секунд:
// Отдельный корутин, не блокирует выдачу задач:config.schedulingScope.launch { while (isActive) { delay(60.seconds) tqTaskWatchdog.checkForTimeouts(config.taskExecutionTimeout) tqTaskWatchdog.checkForStaleNewTasks(config.taskStaleNewTimeout) tqTaskWatchdog.checkForStaleReservedTasks(config.taskReservationTimeout) }}// Основной цикл — только scheduling:while (isActive) { recurringTaskService.runSchedulingIteration() launchProcessingTasks() delay(config.interval)}
Shared Task: одна задача на всех
В TQ добавлено понятие task_key = SHA256(type + canonical_params). Если два процесса создают задачу с одинаковым ключом — физически исполняется одна, результат подписывается обоим:
tq_shared_task id | task_key | owner_task_id | status | resulttq_task_shared_subscription process_id | task_id | shared_task_id
Эффект: количество строк в tq_task при ночной волне сократилось кратно, конкуренция за слоты — пропорционально.
3.4. Параллелизм
Параллельный consumer в сервисе доставки
Kafka consumer на стороне доставки был последовательным: FetchMessage → Consume (S3 + HTTP + DB, 10–30 с) → CommitOffset → следующий. При пачке из 10 сообщений в 22:00 это давало 100–300 секунд задержки.
Переписан на worker pool из N горутин с отслеживанием минимального неподтверждённого смещения:

jobs := make(chan kafkaMsg.Message[T], bufferSize)for i := 0; i < workerCount; i++ { go func() { for msg := range jobs { consumer.Consume(msg) } }()}// reader loop: FetchMessage → jobs ← parseMessage// offset tracker: commit(min uncommitted offset)
Эффект: пачка сообщений обрабатывается за время самого долгого, а не суммы всех.
Внутренний параллелизм для помесячных сборов
Режим EACH_MONTH на 12 месяцев создавал 16 TQ-задач (12 FINANCIAL + 3 INTERSECT + 1 финальный). Заменён одним воркером FINANCIAL_MONTHLY_ALL с параллельными корутинами и семафором:
@Worker("FINANCIAL_MONTHLY_ALL")class FinancialMonthlyAllWorker : TQWorker<...> { override suspend fun run(task, tqApi) { val semaphore = Semaphore(5) // не перегрузить пул колоночной СУБД val monthResults = coroutineScope { months.map { monthParams -> async(Dispatchers.IO) { semaphore.withPermit { columnarRepository.gatherAuditory(fileId, monthParams) } } }.awaitAll() } val final = intersectionRepository.intersectViaBitmap(monthResults) task.result = AuditoryGatheringResult(bitmapLocation = final, ...) }}

Эффект: 16 TQ-задач → 1 задача. 3 уровня DAG → 0. Критический путь: ~600 с → ~210 с.
3.5. Батчинг фоновых задач
MATCHING_DELTA_EXCLUSION создавал 16 800 процессов за две недели — каждый с одной секундой работы и 49 минутами ожидания. Упаковано по 50 задач в один процесс:
// Было: 50 процессов по 1 задаче// Стало: 1 процесс с 50 задачами внутриval batchSize = 50clientIds.chunked(batchSize).forEach { batch -> process.addTask(MatchingDeltaExclusionsTask(batch))}
Эффект: количество записей в tq_task и tq_process сократилось на порядок.
3.6. Битовые карты и ранний выход
Roaring Bitmaps в колоночной СУБД
Ранее листовые воркеры записывали промежуточные результаты как строки (auditory_load_id, client_id), а INTERSECT/SUBTRACT/UNION выполняли JOIN по миллионам строк. Мы перевели операции над множествами на roaring bitmaps:

Листовой воркер пишет битовое множество параллельно со строками:
CREATE TABLE tmp_auditory_bitmap ( auditory_load_id String, bitmap AggregateFunction(groupBitmap, UInt64), created_at DateTime DEFAULT now()) ENGINE = AggregatingMergeTree ORDER BY auditory_load_id TTL created_at + INTERVAL 2 HOUR;INSERT INTO tmp_auditory_bitmapSELECT :fileId, groupBitmapState(toUInt64(client_id))FROM financial_clients WHERE ...;
INTERSECT — bitmapAnd вместо JOIN:
-- Было: JOIN по client_id, 38 секунд-- Стало: bitmapAnd, ~200 миллисекундINSERT INTO tmp_auditory_bitmapSELECT :resultId, bitmapAnd(b1.bitmap, b2.bitmap)FROM tmp_auditory_bitmap b1 CROSS JOIN tmp_auditory_bitmap b2WHERE b1.auditory_load_id = :id1 AND b2.auditory_load_id = :id2;
SUBTRACT — bitmapAndnot, UNION — bitmapOr. Экспорт: arrayJoin(bitmapToArray(client_bitmap)).
Процессор обрабатывает 64 бита за инструкцию. Для 10 млн клиентов пересечение — ~156 000 операций. Сжатое множество на 10 млн клиентов — 5–15 МБ. client_id хеширован в UInt64 через cityHash64; коллизии на реальных данных — ниже 0.0001%, проверено до внедрения.
Эффект: INTERSECT 38 с → < 1 с; SUBTRACT 48 с → < 1 с; UNION 2 с → < 100 мс.
Cardinality-aware DAG
Если один из фильтров возвращает пустой результат, дальнейшие операции над множеством не нужны: A ∩ ∅ = ∅. Добавлено две оптимизации:
Preflight count() перед полным gather:
val count = repository.estimateCount(params)if (count == 0L) { task.result = AuditoryGatheringResult(isEmpty = true, rowCount = 0, ...) return // полный gather пропущен}
Short-circuit в INTERSECT и UNION:
// INTERSECT:if (deps.any { it.isEmpty }) { task.result = AuditoryGatheringResult(isEmpty = true) return}// UNION:val nonEmpty = deps.filterNot { it.isEmpty }if (nonEmpty.size == 1) { task.result = nonEmpty.single(); return }
Внедрение шло осторожно: shadow mode (логирование без изменения поведения), затем включение под feature flag.
Эффект: разреженные аудитории (редкие MCC, специфическая география) — 198 с → 5–10 с. Плотные аудитории — без изменений, кроме накладных расходов на estimateCount.
3.7. Кеширование результатов
Content-Addressable Segment Cache
Кеш по каноническому хешу параметров задачи:
CREATE TABLE tq_segment_cache ( params_hash VARCHAR PRIMARY KEY, file_id VARCHAR NOT NULL, table_name VARCHAR NOT NULL DEFAULT 'tmp_auditory', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL);
val hash = params.canonicalHash() // SHA256 от нормализованных параметровval cached = segmentCacheRepository.findValid(hash)if (cached != null) { task.result = AuditoryGatheringResult(fileId = cached.fileId, ...) return // 5 мс вместо 198 с}val location = gatherAuditory(task)segmentCacheRepository.save(hash, location.fileId, ttl = 24.hours)
Ключ включает версию исходных данных, чтобы исключить возврат устаревшего результата:
cache_key = SHA256(task_type + canonical_params + source_data_version)
Ночной прогрев кеша
Ленивый кеш не помогает первому запросу. Добавлен проактивный прогрев: фоновая задача в 02:00 вычисляет 50 наиболее востребованных параметров за последние 14 дней:
@Recurring(id = "PrecomputeSegmentsJob", cron = "0 2 * * *")fun precomputePopularSegments() { val popular = segmentCacheRepository.findMostUsed(limit = 50, since = 14.days) popular.forEach { (hash, params) -> val fileId = repository.gatherAuditory(generateFileId(), params) segmentCacheRepository.save(hash, fileId, ttl = 25.hours) }}
segment-precompute: maxSegments: 50 maxDuration: 2h maxParallelism: 2 ttl: 25h
Эффект: повторяющиеся параметры FINANCIAL получают результат из кеша мгновенно. Нагрузка перенесена с пикового окна на ночь.
3.8. Динамические пересборы
Автоматические пересборы запускались планировщиком каждые 15 секунд и конкурировали с ручными за одни слоты. Внедрено пять изменений:

-
Отдельная очередь
AUTO_GATHERс 2 слотами. Фоновые пересборы не вытесняются ручными. -
Deadline-aware приоритет:
val urgency = when { deadline < now + 1.hours -> -20 // критично deadline < now + 6.hours -> -10 // срочно deadline < now + 24.hours -> -5 // планово else -> 2 // фоново}
-
Ночное окно. Пересборы с deadline > 24 ч — только в 22:00–06:00. Дневные слоты остаются ручным и срочным.
-
Pre-gather. За 6 часов до активации оффера аудитория вычисляется заранее:
fun preGatherBeforeActivation() { offerRepository.findActivatingWithin(6.hours) .filter { !auditoryRepository.isFreshEnough(it.auditoryId, 12.hours) } .forEach { offer -> gatheringService.run(AuditoryGatheringRequest(fromJob = true, deadline = offer.activateAt)) }}
-
Пропуск дельты при малом изменении:
val deltaSize = estimateDelta(currentAuditory, newParams)if (deltaSize < 0.01 * currentSize) return currentAuditory
4. Как разворачивали и что получилось
20 изменений нельзя внедрять одновременно — при ошибке придётся откатывать всё. Мы разбили работу на шесть этапов. Каждый этап: деплой → суточный замер тем же measurement-запросом → сравнение с предыдущим baseline.
Этап 1: Ресурсы
Что деплоили: CPU limit 500m → 2000m.
Почему первым: увеличение CPU limit не может ничего сломать.
Через 24 часа: throttled periods на Grafana упали на 85%. avg_exec_sec снизился на 20–30% по всем типам задач. avg_queue_sec практически не изменился — сама по себе мощность не лечит очередь.
Этап 2: Конфигурация очереди
Что деплоили: пул соединений 7 → 15, max-concurrent-tasks 5 → 10, лимит на очередь колоночной СУБД.
Риск: низкий. Удвоение одновременных задач могло перегрузить базу. Отслеживали connection errors и latency.
Через 24 часа: avg_queue_sec упал с 9 868 до ~5 000 с. Ошибок соединений — ноль.
Этап 3: Разделение очередей + справедливое планирование
Что деплоили: четыре типа очередей, fair scheduling SQL, ограничение в 5 одновременных процессов.
Риск: средний. Оконная функция — новый запрос на критическом пути. Потребовался индекс по (status, node_id, process_id) и EXPLAIN ANALYZE на прод-объёме.
Через 24 часа: avg_queue_sec упал с ~5 000 до ~1 800 с. P95 > 20 000 с перестал появляться. Лёгкие задачи начали стартовать сразу после готовности.
Этап 4: Расчистка критического пути движка
Что деплоили: приоритет один раз, сторож в отдельную корутину, Shared Task с дедупликацией.
Риск: средний. Shared Task — новое понятие в ядре TQ.
Через 24 часа: накладные расходы планировщика снизились. getAvailableTasks — быстрее, сторожевые UPDATE не блокируют выдачу. Эффект ~10–15% к avg_total.
Этап 5: Параллелизм + батчинг
Что деплоили: параллельный Kafka consumer, FINANCIAL_MONTHLY_ALL, батчинг MATCHING_DELTA_EXCLUSION.
Риск: выше предыдущих. Offset tracking при параллельном consume и внутренний параллелизм требовали аккуратного тестирования.
Через 24 часа: EACH_MONTH — критический путь ~600 → ~210 с. Пачка из 10 сообщений Kafka обрабатывается за время самого долгого. tq_process — на порядок меньше строк.
Этап 6: Битовые карты + кеш + динамические пересборы
Что деплоили: roaring bitmaps, cardinality-aware DAG, segment cache, ночной прогрев, пять улучшений пересборов.
Риск: самый высокий. Требовалась миграция схемы колоночной СУБД и cityHash64. Строковый fallback оставлен под feature flag.
Через 24 часа: INTERSECT 38 с → < 1 с, SUBTRACT 48 с → < 1 с, UNION 2 с → < 100 мс. Повторяющиеся FINANCIAL — попадание в кеш. Динамические пересборы больше не конкурируют с ручными.
Итоговые цифры
|
Метрика |
Было (май 2026) |
Стало (после этапа 6) |
|---|---|---|
|
Среднее полное время |
10 229 с (≈ 2 ч 50 мин) |
~10 с |
|
Среднее исполнение |
361 с (≈ 6 мин) |
~8 с |
|
Среднее ожидание в очереди |
9 868 с (≈ 2 ч 44 мин) |
~2 с |
|
P95 полного времени |
28 189 с |
~30 с |
|
Максимальное (ночной хвост) |
32 851 с |
~60 с |

По типам сборов
|
Тип сбора |
Было |
Стало |
|---|---|---|
|
Простой MCC/TERMINAL |
~200 с |
~3 с |
|
FINANCIAL WHOLE_PERIOD |
~600 с |
~10 с |
|
FINANCIAL EACH_MONTH 12 мес |
~800 с |
~20 с |
|
Самый большой сбор + пересбор |
10 000–30 000 с |
~60 с |
Вклад по слоям
Слои влияют на одни и те же участки критического пути, поэтому их вклад нельзя механически перемножить. Каждый слой подтверждён независимым замером:
-
Ресурсы + конфигурация:
queue_sec: 9 868 → ~5 000 с. ~×2. -
Разделение очередей + fair scheduling:
avg_total: 10 229 → ~3 000 с. ~×1.7. -
Расчистка движка:
avg_totalещё −10–15%. -
Параллелизм + батчинг: критический путь финансов ~600 → ~210 с.
-
Битовые карты + кеш + пересборы: INTERSECT 38 → < 1 с, FINANCIAL → кеш.
avg_exec: 361 → ~8 с. -
Совокупно: исполнение ускорено ~×45 (361 → ~8 с), очередь практически устранена (9 868 → ~2 с). Полное время — с 2 ч 50 мин до нескольких секунд.
Замечание про выгрузку
Физическая выгрузка многомиллионной аудитории в CSV занимает больше нескольких секунд. Все числа выше относятся к вычислению множества клиентов и его размера. Материализация и доставка идут асинхронно.
5. Что из этого можно применить в других системах
-
Измеряйте queue‑vs‑exec до оптимизации SQL. 96.5% времени — ожидание. Пока очередь не починена, ускорение воркеров даёт эффект лишь на 3.5% бюджета.
-
Разделяйте приём нагрузки и исполнение. Неконтролируемый залп запросов создаёт длинный хвост независимо от скорости отдельных задач. Порции, окна и backpressure — самостоятельный слой защиты.
-
Убирайте дорогие проверки с горячего пути.
NOT EXISTSна каждом тике планировщика → явный статусREADYпо событию. Сторожевые проверки — в отдельный цикл. -
Используйте битовое представление множеств. Операции над аудиториями — объединение, пересечение, разность. Roaring bitmaps: десятки секунд → миллисекунды. Плата — хеширование идентификаторов и поддержка со стороны СУБД.
-
Считайте стабильное заранее. Content‑addressable cache и ночной прекомпьют устраняют повторный счёт одинаковых условий. Плата — свежесть данных и TTL.
-
Внедряйте слоями и измеряйте каждый этап. Никакого «big bang». Каждое изменение — отдельный деплой, суточный замер, сравнение с baseline. 20 изменений — 20 замеров.
ссылка на оригинал статьи https://habr.com/ru/articles/1045282/