Представьте, что у вас есть многослойный пайплайн обработки данных.
-
Слой 0 — сырые события: цена тика, действие пользователя, показание датчика.
-
Слой 1 — агрегаты по инструментам: дельта, гамма, скор.
-
Слой 2 — агрегаты по секторам: риск на сектор, общая экспозиция.
-
Слой 3 — портфельные метрики: VaR, ожидаемая прибыль.
-
Слой 4 — enterprise-лимиты и алерты.
Ширина слоя — 5000 узлов. Количество слоёв — 60. Общее число узлов — 300 000.
Каждую секунду приходит 10 новых событий (изменений на входе). Наивный подход — пересчитать всё с нуля — будет перебирать все 300 000 узлов на каждое обновление. При 10 обновлениях в секунду это 3 млн вычислений узлов в секунду. А если ширина слоя 100 000 и слоёв 100? Получаем 10 млн узлов на пересчёт. Компьютер не справляется.
Классические подходы и их ограничения
|
Подход |
Проблема |
|---|---|
|
Полный пересчёт (recompute everything) |
Экспоненциальный рост времени при увеличении графа |
|
Триггеры в БД |
Не работают для многослойных in-memory графов |
|
Stream-процессоры (Flink, Kafka Streams) |
Тяжёлые, не для in-memory иерархий |
|
Кастомный кэш инвалидации |
Сложно реализовать корректно, легко ошибиться |
Требования к решению
-
Инкрементальный пересчёт: только затронутые узлы, а не все.
-
Минимальные аллокации в горячем пути.
-
Точные индексы для быстрых запросов (Fenwick, гистограммы, суммы).
-
Поддержка двух режимов обновления: SetValue (production) и Mutate (симуляции).
Решение: PhiFlow
PhiFlow — библиотека для .NET 8.0+, реализующая инкрементальные вычисления на слоистых графах фиксированной ширины.
dotnet add package PhiFlow --version 0.1.3
Ключевые идеи
-
Interval Cone-of-Influence — для каждого обновления вычисляется минимальное множество затронутых узлов в виде непрерывных интервалов на каждом слое.
-
Фиксированная ширина слоёв — упрощает индексацию и ускоряет доступ.
-
Точные индексы (Fenwick, гистограммы, суммы) — без приближений и с вероятностными структурами вроде HyperLogLog.
-
Два режима обновления — SetValue (замена значения) для бизнес-логики и Mutate (дельта) для симуляций.
Быстрый старт
Шаг 1. Создание графа
Определяем параметры:
-
width— количество узлов в каждом слое. -
layers— количество слоёв. -
domain— диапазон дискретных значений (0..DomainSize-1).
using PhiFlow;int width = 5000;int layers = 60;int domain = 1024;var rt = new PhiFlowRuntime(width, layers, domain);// Рекомендуется: зарезервировать место под дельтыrt.Reserve(maxDeltaCount: 16);
Шаг 2. Подключение индексов
Индексы ускоряют запросы (CountGreater, RangeCount, Sum, TopKSum). Подключаются к нужному слою.
int lastLayer = layers - 1;// Fenwick-индекс для быстрых CountGreater и RangeCountrt.AttachIndex(lastLayer, new FenwickCountIndex(domain));// Индекс для суммы значенийrt.AttachIndex(lastLayer, new SumIndex(width));// Индекс для Top-K через гистограммыrt.AttachIndex(lastLayer, new HistogramTopKIndex(domain, width));
Шаг 3. Инициализация входного слоя
Заполняем слой 0 начальными значениями.
var rnd = new Random(1);int[] input = new int[width];for (int i = 0; i < width; i++){ input[i] = rnd.Next(domain);}rt.SetInput(input);rt.BuildAll(kWork: 50); // полная сборка графа
Шаг 4. Применение обновлений
Вместо пересчёта всего графа передаём только изменившиеся входы.
var updates = new InputUpdate[]{ new InputUpdate(index: 10, value: 512), new InputUpdate(index: 123, value: 7), new InputUpdate(index: 2048, value: 999)};rt.ApplyInputUpdates(updates, kWork: 50);
Библиотека сама определяет, какие узлы затронуты (Interval Cone-of-Influence), и пересчитывает только их.
Шаг 5. Выполнение запросов
Благодаря индексам запросы выполняются мгновенно.
// Количество элементов на последнем слое > 500long countGt = rt.CountGreater(lastLayer, threshold: 500);// Количество элементов в диапазоне [100, 200)long rangeCount = rt.RangeCount(lastLayer, loInclusive: 100, hiExclusive: 200);// Сумма всех значений на слоеlong sum = rt.Sum(lastLayer);// Среднее значениеlong avg = rt.Avg(lastLayer);// Сумма топ-50 значенийlong topKSum = rt.TopKSum(lastLayer, k: 50);
Полный рабочий пример
using PhiFlow;public class RealTimeAnalyticsPipeline{ private readonly PhiFlowRuntime _runtime; private readonly int _lastLayer; public RealTimeAnalyticsPipeline(int width, int layers, int domain) { _runtime = new PhiFlowRuntime(width, layers, domain); _runtime.Reserve(maxDeltaCount: 32); _lastLayer = layers - 1; // Подключаем индексы для аналитики _runtime.AttachIndex(_lastLayer, new FenwickCountIndex(domain)); _runtime.AttachIndex(_lastLayer, new SumIndex(width)); _runtime.AttachIndex(_lastLayer, new HistogramTopKIndex(domain, width)); } public void Initialize(int[] initialData) { _runtime.SetInput(initialData); _runtime.BuildAll(kWork: 50); } public void ProcessEvents(IEnumerable<InputUpdate> events) { var updates = events.ToArray(); _runtime.ApplyInputUpdates(updates, kWork: 50); } public AnalyticsSnapshot GetSnapshot() { return new AnalyticsSnapshot { TotalCount = _runtime.Sum(_lastLayer), HighThresholdCount = _runtime.CountGreater(_lastLayer, 800), MidRangeCount = _runtime.RangeCount(_lastLayer, 200, 600), Top10Sum = _runtime.TopKSum(_lastLayer, 10) }; }}public class AnalyticsSnapshot{ public long TotalCount { get; set; } public long HighThresholdCount { get; set; } public long MidRangeCount { get; set; } public long Top10Sum { get; set; }}
Сценарии использования
1. FinTech: управление рисками
4 слоя: инструменты → сектора → портфель → enterprise-лимиты.
int width = 10000; // 10k инструментовint layers = 4;int domain = 100000; // дискретные уровни экспозицииvar riskRuntime = new PhiFlowRuntime(width, layers, domain);riskRuntime.AttachIndex(3, new FenwickCountIndex(domain)); // лимиты// Пришло обновление цены на инструмент 42var tickUpdate = new InputUpdate(42, newExposureValue);riskRuntime.ApplyInputUpdates(new[] { tickUpdate }, kWork: 50);// Мгновенный запрос: сколько секторов превысили лимит?long breachedCount = riskRuntime.CountGreater(2, threshold: 10000);
2. GameDev: экономическая симуляция
6 слоёв: налог провинции → доход провинции → счастье → производительность → военная сила → обслуживание армии.
int provinces = 5000;int layers = 6;int domain = 1000;var ecoRuntime = new PhiFlowRuntime(provinces, layers, domain);ecoRuntime.AttachIndex(5, new SumIndex(provinces)); // общая сила армииecoRuntime.AttachIndex(5, new HistogramTopKIndex(domain, provinces)); // топ-провинции// Игрок повысил налог в провинции 123ecoRuntime.ApplyInputUpdates(new[] { new InputUpdate(123, 75) }, kWork: 50);// Как изменилась общая сила армии?long totalArmyPower = ecoRuntime.Sum(5);
3. IIoT: предиктивная аналитика
Датчики → станки → линии → заводы → регион.
int sensors = 20000;int layers = 5;int domain = 4096; // показания датчиков 0..4095var iotRuntime = new PhiFlowRuntime(sensors, layers, domain);iotRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // алерты по регионам// Датчик 5001 показал аномалиюvar anomalyUpdate = new InputUpdate(5001, 3800);iotRuntime.ApplyInputUpdates(new[] { anomalyUpdate }, kWork: 50);// Сколько заводов в аномальной зоне?long anomalousPlants = iotRuntime.CountGreater(3, threshold: 3500);
4. AdTech: real-time bidding
Импрессия → пользователь → сегмент → кампания → бюджет.
int users = 100000;int layers = 5;int domain = 100; // скор пользователя 0..99var adRuntime = new PhiFlowRuntime(users, layers, domain);adRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // бюджетные лимиты// Пользователь 42 совершил конверсиюvar conversionUpdate = new InputUpdate(42, 95);adRuntime.ApplyInputUpdates(new[] { conversionUpdate }, kWork: 50);// Сколько сегментов превысили бюджетный порог?long budgetBreached = adRuntime.CountGreater(3, threshold: 80);
Производительность
Бенчмарки на Intel Core i5-11400F, Windows 11, .NET 8.0, BenchmarkDotNet 0.15.8.
Параметры графа: ширина 5000, слоёв 60, домен 1024.
|
Сценарий |
Без PhiFlow (полный пересчёт) |
С PhiFlow |
Ускорение |
|---|---|---|---|
|
1 дельта, KWork=50 |
10.7 с |
0.195 с |
~55x |
|
4 дельты, KWork=50 |
10.7 с |
0.75 с |
~14x |
|
1 дельта, KWork=10 |
1.8 с |
0.034 с |
~53x |
Что означают эти цифры:
-
При одном изменении на входе библиотека пересчитывает не все 300 000 узлов, а только интервал затронутых.
-
Чем меньше дельт относительно общего объёма графа, тем больше выигрыш.
-
Индексы добавляют ускорение для запросов: CountGreater, RangeCount, TopKSum выполняются за O(log domain) или O(1).
Сравнение с альтернативами
|
Характеристика |
PhiFlow |
Полный пересчёт |
Stream processor (Flink) |
ClickHouse |
|---|---|---|---|---|
|
Инкрементальный пересчёт |
✅ (interval cone) |
❌ |
✅ |
❌ |
|
Точные индексы |
✅ |
❌ |
❌ |
✅ (но не для per-event) |
|
In-memory |
✅ |
✅ |
❌ (Java/JVM) |
❌ (диск) |
|
Многослойные графы |
✅ (родной) |
❌ |
❌ |
❌ |
|
Латентность на запрос |
микросекунды |
зависит |
миллисекунды+ |
миллисекунды |
|
Сложность внедрения |
низкая |
высокая |
очень высокая |
средняя |
Пошаговая интеграция в проект
Шаг 1. Моделирование пайплайна
Определите, сколько у вас слоёв и какова ширина каждого. PhiFlow требует фиксированной ширины для всех слоёв — это упрощает индексацию.
Шаг 2. Выбор домена
Домен — это диапазон дискретных значений (0..DomainSize-1). Чем меньше домен, тем компактнее индексы Fenwick и гистограммы.
Шаг 3. Инициализация runtime
var runtime = new PhiFlowRuntime(width, layers, domain);runtime.Reserve(maxDeltaCount: expectedUpdatesPerBatch);
Шаг 4. Подключение индексов к слоям, которые часто запрашиваются
if (needThresholdQueries) runtime.AttachIndex(layer, new FenwickCountIndex(domain));if (needSumQueries) runtime.AttachIndex(layer, new SumIndex(width));if (needTopKQueries) runtime.AttachIndex(layer, new HistogramTopKIndex(domain, width));
Шаг 5. Загрузка начальных данных
runtime.SetInput(initialData);runtime.BuildAll(kWork: 50); // 50 — эвристика, подбирается под вашу топологию
Шаг 6. Приём обновлений
void OnInputChanged(int index, int newValue){ var update = new InputUpdate(index, newValue); runtime.ApplyInputUpdates(new[] { update }, kWork: 50);}
Шаг 7. Маршрутизация запросов через индексы
public long GetHighRiskCount(int threshold) => runtime.CountGreater(riskLayer, threshold);
Два режима обновлений
SetValue (рекомендуется для production)
Заменяет значение узла на новое.
var update = new InputUpdate(index: 42, value: 512);runtime.ApplyInputUpdates(new[] { update }, kWork: 50);
Mutation (для симуляций и тестирования)
Детерминированная мутация значения. Полезно, когда нужно воспроизвести последовательность изменений.
var mutation = new InputMutation(index: 42, delta: +5);runtime.ApplyInputMutations(new[] { mutation }, kWork: 50);
Бесплатное тестирование — в рамках Community Edition. Коммерческое использование требует лицензии.
Где взять
NuGet: dotnet add package PhiFlow
GitHub (бенчмарки): https://github.com/likeslines-maker/PhiFlow
PhiFlow — это библиотека для инкрементальных вычислений на слоистых графах фиксированной ширины.
Она решает конкретную задачу: когда в многослойный пайплайн приходит небольшое количество обновлений, а вам нужно мгновенно получать точные агрегаты (CountGreater, RangeCount, Sum, TopK) на любом слое.
Библиотека не пытается заменить полноценные stream-процессоры или OLAP-базы. Она занимает свою нишу: in-memory, микросекундные латентности, точные индексы, минимальные аллокации.
Если ваш пайплайн из 60 слоёв пересчитывается за 10 секунд вместо 0.2 — возможно, вы просто считали не тем способом.
ссылка на оригинал статьи https://habr.com/ru/articles/1040918/