Реальный time series агрегатор: как обрабатывать 10 событий/сек на графе из 300k узлов

от автора

Представьте, что у вас есть многослойный пайплайн обработки данных.

  • Слой 0 — сырые события: цена тика, действие пользователя, показание датчика.

  • Слой 1 — агрегаты по инструментам: дельта, гамма, скор.

  • Слой 2 — агрегаты по секторам: риск на сектор, общая экспозиция.

  • Слой 3 — портфельные метрики: VaR, ожидаемая прибыль.

  • Слой 4 — enterprise-лимиты и алерты.

Ширина слоя — 5000 узлов. Количество слоёв — 60. Общее число узлов — 300 000.

Каждую секунду приходит 10 новых событий (изменений на входе). Наивный подход — пересчитать всё с нуля — будет перебирать все 300 000 узлов на каждое обновление. При 10 обновлениях в секунду это 3 млн вычислений узлов в секунду. А если ширина слоя 100 000 и слоёв 100? Получаем 10 млн узлов на пересчёт. Компьютер не справляется.

Классические подходы и их ограничения

Подход

Проблема

Полный пересчёт (recompute everything)

Экспоненциальный рост времени при увеличении графа

Триггеры в БД

Не работают для многослойных in-memory графов

Stream-процессоры (Flink, Kafka Streams)

Тяжёлые, не для in-memory иерархий

Кастомный кэш инвалидации

Сложно реализовать корректно, легко ошибиться

Требования к решению

  • Инкрементальный пересчёт: только затронутые узлы, а не все.

  • Минимальные аллокации в горячем пути.

  • Точные индексы для быстрых запросов (Fenwick, гистограммы, суммы).

  • Поддержка двух режимов обновления: SetValue (production) и Mutate (симуляции).

Решение: PhiFlow

PhiFlow — библиотека для .NET 8.0+, реализующая инкрементальные вычисления на слоистых графах фиксированной ширины.

dotnet add package PhiFlow --version 0.1.3

Ключевые идеи

  1. Interval Cone-of-Influence — для каждого обновления вычисляется минимальное множество затронутых узлов в виде непрерывных интервалов на каждом слое.

  2. Фиксированная ширина слоёв — упрощает индексацию и ускоряет доступ.

  3. Точные индексы (Fenwick, гистограммы, суммы) — без приближений и с вероятностными структурами вроде HyperLogLog.

  4. Два режима обновления — SetValue (замена значения) для бизнес-логики и Mutate (дельта) для симуляций.

Быстрый старт

Шаг 1. Создание графа

Определяем параметры:

  • width — количество узлов в каждом слое.

  • layers — количество слоёв.

  • domain — диапазон дискретных значений (0..DomainSize-1).

using PhiFlow;int width = 5000;int layers = 60;int domain = 1024;var rt = new PhiFlowRuntime(width, layers, domain);// Рекомендуется: зарезервировать место под дельтыrt.Reserve(maxDeltaCount: 16);

Шаг 2. Подключение индексов

Индексы ускоряют запросы (CountGreater, RangeCount, Sum, TopKSum). Подключаются к нужному слою.

int lastLayer = layers - 1;// Fenwick-индекс для быстрых CountGreater и RangeCountrt.AttachIndex(lastLayer, new FenwickCountIndex(domain));// Индекс для суммы значенийrt.AttachIndex(lastLayer, new SumIndex(width));// Индекс для Top-K через гистограммыrt.AttachIndex(lastLayer, new HistogramTopKIndex(domain, width));

Шаг 3. Инициализация входного слоя

Заполняем слой 0 начальными значениями.

var rnd = new Random(1);int[] input = new int[width];for (int i = 0; i < width; i++){    input[i] = rnd.Next(domain);}rt.SetInput(input);rt.BuildAll(kWork: 50); // полная сборка графа

Шаг 4. Применение обновлений

Вместо пересчёта всего графа передаём только изменившиеся входы.

var updates = new InputUpdate[]{    new InputUpdate(index: 10, value: 512),    new InputUpdate(index: 123, value: 7),    new InputUpdate(index: 2048, value: 999)};rt.ApplyInputUpdates(updates, kWork: 50);

Библиотека сама определяет, какие узлы затронуты (Interval Cone-of-Influence), и пересчитывает только их.

Шаг 5. Выполнение запросов

Благодаря индексам запросы выполняются мгновенно.

// Количество элементов на последнем слое > 500long countGt = rt.CountGreater(lastLayer, threshold: 500);// Количество элементов в диапазоне [100, 200)long rangeCount = rt.RangeCount(lastLayer, loInclusive: 100, hiExclusive: 200);// Сумма всех значений на слоеlong sum = rt.Sum(lastLayer);// Среднее значениеlong avg = rt.Avg(lastLayer);// Сумма топ-50 значенийlong topKSum = rt.TopKSum(lastLayer, k: 50);

Полный рабочий пример

using PhiFlow;public class RealTimeAnalyticsPipeline{    private readonly PhiFlowRuntime _runtime;    private readonly int _lastLayer;        public RealTimeAnalyticsPipeline(int width, int layers, int domain)    {        _runtime = new PhiFlowRuntime(width, layers, domain);        _runtime.Reserve(maxDeltaCount: 32);        _lastLayer = layers - 1;                // Подключаем индексы для аналитики        _runtime.AttachIndex(_lastLayer, new FenwickCountIndex(domain));        _runtime.AttachIndex(_lastLayer, new SumIndex(width));        _runtime.AttachIndex(_lastLayer, new HistogramTopKIndex(domain, width));    }        public void Initialize(int[] initialData)    {        _runtime.SetInput(initialData);        _runtime.BuildAll(kWork: 50);    }        public void ProcessEvents(IEnumerable<InputUpdate> events)    {        var updates = events.ToArray();        _runtime.ApplyInputUpdates(updates, kWork: 50);    }        public AnalyticsSnapshot GetSnapshot()    {        return new AnalyticsSnapshot        {            TotalCount = _runtime.Sum(_lastLayer),            HighThresholdCount = _runtime.CountGreater(_lastLayer, 800),            MidRangeCount = _runtime.RangeCount(_lastLayer, 200, 600),            Top10Sum = _runtime.TopKSum(_lastLayer, 10)        };    }}public class AnalyticsSnapshot{    public long TotalCount { get; set; }    public long HighThresholdCount { get; set; }    public long MidRangeCount { get; set; }    public long Top10Sum { get; set; }}

Сценарии использования

1. FinTech: управление рисками

4 слоя: инструменты → сектора → портфель → enterprise-лимиты.

int width = 10000;  // 10k инструментовint layers = 4;int domain = 100000; // дискретные уровни экспозицииvar riskRuntime = new PhiFlowRuntime(width, layers, domain);riskRuntime.AttachIndex(3, new FenwickCountIndex(domain)); // лимиты// Пришло обновление цены на инструмент 42var tickUpdate = new InputUpdate(42, newExposureValue);riskRuntime.ApplyInputUpdates(new[] { tickUpdate }, kWork: 50);// Мгновенный запрос: сколько секторов превысили лимит?long breachedCount = riskRuntime.CountGreater(2, threshold: 10000);

2. GameDev: экономическая симуляция

6 слоёв: налог провинции → доход провинции → счастье → производительность → военная сила → обслуживание армии.

int provinces = 5000;int layers = 6;int domain = 1000;var ecoRuntime = new PhiFlowRuntime(provinces, layers, domain);ecoRuntime.AttachIndex(5, new SumIndex(provinces)); // общая сила армииecoRuntime.AttachIndex(5, new HistogramTopKIndex(domain, provinces)); // топ-провинции// Игрок повысил налог в провинции 123ecoRuntime.ApplyInputUpdates(new[] { new InputUpdate(123, 75) }, kWork: 50);// Как изменилась общая сила армии?long totalArmyPower = ecoRuntime.Sum(5);

3. IIoT: предиктивная аналитика

Датчики → станки → линии → заводы → регион.

int sensors = 20000;int layers = 5;int domain = 4096; // показания датчиков 0..4095var iotRuntime = new PhiFlowRuntime(sensors, layers, domain);iotRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // алерты по регионам// Датчик 5001 показал аномалиюvar anomalyUpdate = new InputUpdate(5001, 3800);iotRuntime.ApplyInputUpdates(new[] { anomalyUpdate }, kWork: 50);// Сколько заводов в аномальной зоне?long anomalousPlants = iotRuntime.CountGreater(3, threshold: 3500);

4. AdTech: real-time bidding

Импрессия → пользователь → сегмент → кампания → бюджет.

int users = 100000;int layers = 5;int domain = 100; // скор пользователя 0..99var adRuntime = new PhiFlowRuntime(users, layers, domain);adRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // бюджетные лимиты// Пользователь 42 совершил конверсиюvar conversionUpdate = new InputUpdate(42, 95);adRuntime.ApplyInputUpdates(new[] { conversionUpdate }, kWork: 50);// Сколько сегментов превысили бюджетный порог?long budgetBreached = adRuntime.CountGreater(3, threshold: 80);

Производительность

Бенчмарки на Intel Core i5-11400F, Windows 11, .NET 8.0, BenchmarkDotNet 0.15.8.

Параметры графа: ширина 5000, слоёв 60, домен 1024.

Сценарий

Без PhiFlow (полный пересчёт)

С PhiFlow

Ускорение

1 дельта, KWork=50

10.7 с

0.195 с

~55x

4 дельты, KWork=50

10.7 с

0.75 с

~14x

1 дельта, KWork=10

1.8 с

0.034 с

~53x

Что означают эти цифры:

  • При одном изменении на входе библиотека пересчитывает не все 300 000 узлов, а только интервал затронутых.

  • Чем меньше дельт относительно общего объёма графа, тем больше выигрыш.

  • Индексы добавляют ускорение для запросов: CountGreater, RangeCount, TopKSum выполняются за O(log domain) или O(1).

Сравнение с альтернативами

Характеристика

PhiFlow

Полный пересчёт

Stream processor (Flink)

ClickHouse

Инкрементальный пересчёт

✅ (interval cone)

Точные индексы

✅ (но не для per-event)

In-memory

❌ (Java/JVM)

❌ (диск)

Многослойные графы

✅ (родной)

Латентность на запрос

микросекунды

зависит

миллисекунды+

миллисекунды

Сложность внедрения

низкая

высокая

очень высокая

средняя

Пошаговая интеграция в проект

Шаг 1. Моделирование пайплайна

Определите, сколько у вас слоёв и какова ширина каждого. PhiFlow требует фиксированной ширины для всех слоёв — это упрощает индексацию.

Шаг 2. Выбор домена

Домен — это диапазон дискретных значений (0..DomainSize-1). Чем меньше домен, тем компактнее индексы Fenwick и гистограммы.

Шаг 3. Инициализация runtime

var runtime = new PhiFlowRuntime(width, layers, domain);runtime.Reserve(maxDeltaCount: expectedUpdatesPerBatch);

Шаг 4. Подключение индексов к слоям, которые часто запрашиваются

if (needThresholdQueries)    runtime.AttachIndex(layer, new FenwickCountIndex(domain));if (needSumQueries)    runtime.AttachIndex(layer, new SumIndex(width));if (needTopKQueries)    runtime.AttachIndex(layer, new HistogramTopKIndex(domain, width));

Шаг 5. Загрузка начальных данных

runtime.SetInput(initialData);runtime.BuildAll(kWork: 50); // 50 — эвристика, подбирается под вашу топологию

Шаг 6. Приём обновлений

void OnInputChanged(int index, int newValue){    var update = new InputUpdate(index, newValue);    runtime.ApplyInputUpdates(new[] { update }, kWork: 50);}

Шаг 7. Маршрутизация запросов через индексы

public long GetHighRiskCount(int threshold) =>    runtime.CountGreater(riskLayer, threshold);

Два режима обновлений

SetValue (рекомендуется для production)

Заменяет значение узла на новое.

var update = new InputUpdate(index: 42, value: 512);runtime.ApplyInputUpdates(new[] { update }, kWork: 50);

Mutation (для симуляций и тестирования)

Детерминированная мутация значения. Полезно, когда нужно воспроизвести последовательность изменений.

var mutation = new InputMutation(index: 42, delta: +5);runtime.ApplyInputMutations(new[] { mutation }, kWork: 50);

Бесплатное тестирование — в рамках Community Edition. Коммерческое использование требует лицензии.

Где взять

NuGet: dotnet add package PhiFlow

GitHub (бенчмарки): https://github.com/likeslines-maker/PhiFlow

PhiFlow — это библиотека для инкрементальных вычислений на слоистых графах фиксированной ширины.

Она решает конкретную задачу: когда в многослойный пайплайн приходит небольшое количество обновлений, а вам нужно мгновенно получать точные агрегаты (CountGreater, RangeCount, Sum, TopK) на любом слое.

Библиотека не пытается заменить полноценные stream-процессоры или OLAP-базы. Она занимает свою нишу: in-memory, микросекундные латентности, точные индексы, минимальные аллокации.

Если ваш пайплайн из 60 слоёв пересчитывается за 10 секунд вместо 0.2 — возможно, вы просто считали не тем способом.

ссылка на оригинал статьи https://habr.com/ru/articles/1040918/