redb.Route — Apache Camel для .NET, который мы написали потому что выхода другого не было

от автора

redb

redb

Проблема

У вас не 5 микросервисов — у вас десятки. Бэкенд, который рос три года: монолит, расколотый на куски, GPS-фид от автопарка, мобильное приложение водителя, веб-кабинет диспетчера, интеграции с SAP / 1С / регуляторами / маркетплейсами, отдельный SMTP-воркер, отдельный PDF-генератор, отдельный шедулер ночных пересчётов. Между ними — Kafka (несколько кластеров, по топику на домен), RabbitMQ (RPC + pub/sub + DLQ), Redis (кэш, last-known-state, pub/sub-каналы), пара HTTP-эндпоинтов наружу, SFTP с поставщиком, SQL-polling outbox-таблицы старого монолита, MQTT с трекеров, IBM MQ для одного древнего банковского контура, SignalR-хабы для real-time-дашбордов. На каждом стыке — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик), своя бойлерплейт-обвязка из консьюмеров и try/catch.

Каждый из этих стыков живёт своей жизнью в Program.cs соответствующего сервиса. Каждый — это hand-rolled цикл:

// Где-то в OrdersService:var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();consumer.Subscribe("orders");while (!stoppingToken.IsCancellationRequested){    try    {        var msg = consumer.Consume(stoppingToken);        var dto = JsonSerializer.Deserialize<OrderDto>(msg.Value);        if (dto.Type != "new") continue;          // filter        var retries = 0;        while (true)                              // retry        {            try { await PublishToRabbit(dto); break; }            catch (Exception ex) when (retries++ < 3)            { await Task.Delay(TimeSpan.FromSeconds(retries * retries)); }        }        consumer.Commit(msg);                     // ack    }    catch (ConsumeException ex) { _logger.LogError(ex, "kafka"); }    catch (Exception ex)        { _logger.LogError(ex, "send"); }}

И таких 80 строк — на каждое направление. У каждого — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик). На code review это «ну да, чё». Через год на онбординге нового разработчика — «а где тут вообще что лежит».

На JVM эту задачу 20 лет назад решили Apache Camel и более общий Enterprise Integration Patterns (книжка Gregor Hohpe и Bobby Woolf, та самая «жёлтая»). Идея простая: маршрут описывается как pipeline From → Process → To, EIP-паттерны (Splitter, Aggregator, Content-Based Router, WireTap, Dead Letter Channel, Idempotent Consumer, Saga) — first-class элементы DSL. Транспортов — 300+: от Kafka до LDAP, от S3 до IBM MQ.

В .NET этого не было.

MassTransit, NServiceBus, Wolverine — отличные message bus‘ы, но это не одно и то же. У них 4–7 транспортов (Kafka, RabbitMQ, Azure SB, SQS, иногда SQL), фокус — durable saga + handler classes. EIP-каталог они закрывают на 3-4 паттерна (Saga, Request/Response, Outbox). А когда нужно «забери XML из SQL stored proc → распарси → положи в RabbitMQ → продублируй в SFTP-архив с retention 30 дней» — пишешь руками всю эту классическую кашу из консьюмеров, продюсеров и try/catch.

И отдельный поворот, который случился буквально сейчас: MassTransit v9 стал коммерческим. Проект переехал к новой компании Massient, Inc., на главной новой документации masstransit.massient.com кнопка «Get a License», в доках — «Configure the license key», Customer Support и Usage Telemetry. Дословно с их сайта: «Massient is the new company behind the commercial release of MassTransit v9.» То есть из «трёх .NET-альтернатив» две — платные (NServiceBus давно, MassTransit v9 теперь), остаётся Wolverine на MIT — и он, опять-таки, не ESB, а mediator + saga, без двух десятков транспортов и без EIP-каталога. Свободного Camel-уровня инструмента под .NET до сих пор не было.

Хотелось: описать маршрут как pipeline на C#, чтоб библиотека сама знала про Kafka, RabbitMQ, SQL, SFTP, HTTP, MQTT, retry, DLQ, transactional ack, и при этом был полный набор EIP-паттернов из канонической книжки.

Написали. Выложили под Apache 2.0.


Production case

Этот текст — не теория. redb.Route — это ESB-каркас уровня Apache Camel, и мы это утверждаем не на бумаге: 22 внешних транспорта + 5 встроенных компонентов, 30+ EIP-паттернов как first-class элементы DSL, compiled expression engine на System.Linq.Expressions, transactional pipelines с ITransactedAction (Kafka EOS, RabbitMQ publisher confirms + tx channels, IBM MQ syncpoint, AMQP 1.0, SQL via TransactionScope), persistent IdempotentConsumerSaga с компенсирующими шагами, CircuitBreakerThrottleAggregatorResequencerScatter-GatherRecipientListDynamicRouterEnrich/PollEnrichClaim Check, OpenTelemetry на каждый шаг, runtime-контейнер с hot-reload и кластером без внешнего координатора. 351 проходящий тест, включая отдельные DSL reference-suites под Camel-семантику Choice/When/Otherwise, TryCatchFinally и Filter (~1500 строк только в спецификациях). Это не «фреймворк-обёртка над IHostedService», это полноценный ESB.

И он работает в проде. Первый — крупная логистическая компания (~150k заказов/мес, ~20k B2B-клиентов, собственный автопарк, 600+ городов). Внутренняя TMS — ~500 водителей + ~50 диспетчеров, 3-нодовый кластер (Xeon, 4 ядра / 8 ГБ / 50 ГБ SSD на ноду), ~3 месяца стабильной работы, 10–15% CPU под полной нагрузкой.

Через redb.Route туда заведены: SAP S/4 (через SQL polling stored procedure → XML → redb), Kafka (фид с GPS-трекеров), RabbitMQ (внутрисервисная шина событий), HTTP API для UI диспетчера, Меркурий / ЕГАИС / Честный ЗНАК / ФГИС Зерно (российские регуляторы), LDAP/AD для авторизации, cron-backup через redb.Export с ротацией. Один InitRoute-файл регистрирует 22 RouteBuilder-класса. Каждый — изолированный pipeline, со своими OnException-обработчиками, своим RouteId, своими метриками OpenTelemetry.

Второй прод — платформа доставки того же заказчика, десятки микросервисов в одном репозитории (~37 .NET-проектов, ~50+ RouteBuilder-классов). Модули общаются через RabbitMQ (RPC + pub/sub), внутренний event-bus, Kafka с Consumer Group для GPS-стрима с мобилок, Redis для last-known-location-кэша, RabbitMQ-DLQ под poison messages, и всё это завязано на единый ESB-каркас. То есть это не «5 микросервисов с REST API» — это десятки независимо деплоящихся модулей, у каждого свой IRouteContext, свой набор endpoint’ов, свой lifecycle. Сравнимая по сложности картинка на JVM была бы — Apache Camel + Karaf. У нас — redb.Route + Tsak.

И всё это — в трёх стендах: dev / test / prod, каждый — 3-нодовый кластер. То есть RouteBuilder-классы, которые вы видите дальше по статье, прямо сейчас крутятся в девяти инстансах Tsak параллельно: три ноды поднимают одинаковые контексты, redb-координатор раздаёт между ними кластерные маршруты (Kafka Consumer Group — балансировкой партиций, RabbitMQ — конкурентные consumer’ы на той же очереди, HTTP-фасады — за L4-балансером, cron — leader-election через redb-объекты, чтобы крон-задача отработала ровно один раз на кластер). Дев-стенд — для разработки и feature-веток, test — приёмочные сценарии и регресс перед релизом, prod — собственно нагрузка. Между стендами разъезжается тот же .tpkg-пакет: меняется только конфиг и строка подключения к БД. Cluster + hot-reload + три параллельных стенда — это и есть «production-ready», а не «у меня на ноуте работает».

Третий прод — аналитическая платформа (~672k объектов, ~8M свойств), там Route играет роль внутренней шины между модулями. Реальный код всех трёх — ниже.

И это не «локальные сборки из репозитория». Вся стопка 3.0.0 опубликована на nuget.org — 43 пакета под префиксом redb.*, суммарно ~20 800 загрузок на момент написания статьи. По убыванию: redb.Route (~1600), redb.Core (~1590), redb.Core.Pro (~1160), redb.Templatesredb.Postgres / redb.MSSql (~1000 каждый), дальше Pro-провайдеры, redb.CLIredb.Export, и весь зоопарк redb.Route.* транспортов (Kafka, RabbitMQ, IBM MQ, MQTT, AMQP, Redis, gRPC, SignalR, WebSocket, TCP, SFTP, FTP, File, S3, Mail, LDAP, Quartz, Elasticsearch, AzureServiceBus, Firebase, Http, Sql) + redb.Tsak.* (Core, Core.Pro, Client, CLI, Contracts, Templates). То есть всё, про что эта статья — реально лежит, версионируется и подтягивается через dotnet add package.


Как выглядит маршрут

Чтобы не врать игрушечным From → Filter → To, сразу боевой shape. Это упрощённый набросок из лога-трекинговой системы: HTTP-фасад принимает массив GPS-точек, throttle на 200 rps, идемпотентность по MessageId, валидация JSON, ветвление по типу события, Multicast параллельно в три транспорта (Kafka на агрегацию, RabbitMQ на DAL-запись, Redis-кэш last-known-location фоном через WireTap), а наружу — JSON-ответ:

public class GpsHttpFacadeRoutes : RouteBuilder{    protected override void Configure()    {        OnException<JsonException>()            .Handled()            .SetHeader("HTTP_SC", Constant(400))            .To("direct://error-handler")        .EndOnException();        From("http://0.0.0.0:5090/integration/gps?methods=POST")            .RouteId("gps-http-facade")            .Throttle(200).Per(TimeSpan.FromSeconds(1))                      // не больше 200 rps            .IdempotentConsumer(Header("MessageId"), repository: "redb")     // дубли отсекаем            .Process(ValidateGpsBatch)                                       // JSON → List<GpsPoint>            .Choice()                .When(Header("eventType").IsEqualTo("location"))                    .Multicast().ParallelProcessing()                        .To("kafka:mobile.trips.location.sync?key=${header.tripId}")                        .To("rabbitmq:?exchange=lt.dal&routingKey=dal.trips.location.sync")                        .WireTap("redis:set:gps:last:location:${header.tripId}?ttl=86400")                    .EndMulticast()                .When(Header("eventType").IsEqualTo("checkin"))                    .To("rabbitmq:?exchange=lt.dal&routingKey=dal.gps.checkin_checkout")                .Otherwise()                    .Log(LogLevel.Warning, "unknown eventType: ${header.eventType}")            .EndChoice()            .SetBody(Constant("""{ "status": "accepted" }"""))            .SetHeader("Content-Type", Constant("application/json"));    }}

Этот один класс заменяет: HTTP-контроллер, middleware-throttle, ручную идемпотентность через Redis, ручной Kafka-продюсер, ручную RabbitMQ-обвязку, try/catch поверх десериализации и логику дёргания Redis из background-сервиса. Всё, что внутри — first-class элементы DSL: ThrottleIdempotentConsumerChoiceMulticastWireTapOnException<T>. Каждый из них — отдельный EIP-паттерн с собственными метриками OpenTelemetry.

Кстати, именно такой Route-фасад в одном из продов сейчас замещает WSO2 Micro Integrator, который до этого играл роль HTTP→Kafka/RabbitMQ-шлюза. WSO2 даёт ту же машинерию через XML-конфиги и Synapse-DSL — но когда вокруг уже C#-стек, держать отдельную JVM ради <switch>/<foreach parallel-execution="true">/<endpoint uri="rabbitmq:/?..."> — лишнее звено. Route даёт те же EIP, но в одном процессе, на одном языке, с C#-типизацией и общими IServiceProvider-зависимостями.

Регистрация:

// Program.csbuilder.Services.AddRedbRoute(route =>    route.AddRouteBuilder<GpsHttpFacadeRoutes>());builder.Services.AddRedbRouteHttp();builder.Services.AddRedbRouteKafka();builder.Services.AddRedbRouteRabbitMQ();builder.Services.AddRedbRouteRedis();

Маршрут стартует вместе с приложением, останавливается на shutdown с graceful drain, метрики OpenTelemetry — по каждому шагу из коробки (route.gps-http-facade.durationmulticast.to[1].failuresidempotent.duplicatesthrottle.rejected).

Полный код такого аггрегатора (только с Kafka на входе, без HTTP) — ниже, в разделе «Второй прод — RabbitMQ как RPC и pub/sub шина». А кому нужен hello world на 15 строк — он в redb.Route.Demo.


Чего хотел Camel — и что из этого есть

EIP-паттернов из канонической книжки в Camel — 80+. Мы закрыли 30+ — самые ходовые. Все они — first-class элементы DSL:

// Content-Based Router.Choice()    .When(Header("priority").isEqualTo("high")).To("direct://fast-lane")    .When(Header("priority").isEqualTo("low")).To("seda://batch")    .Otherwise().To("direct://standard").EndChoice()// Splitter — каждое вложение обрабатываем отдельно.Split(e => e.In.GetBody<Order>().Items)    .Process(async (e, ct) => await ProcessItemAsync(e, ct)).EndSplit()// Aggregator — собираем по correlationId.Aggregate(Header("orderId"),    strategy: new CompletionAggregator(),    completionSize: 10,    completionTimeout: TimeSpan.FromSeconds(30))// WireTap — отправить копию в audit без блокировки основного потока.WireTap("seda://audit")// Recipient List — динамический fan-out.RecipientList(e => ResolveDestinations(e))// Idempotent Consumer — дедупликация по ключу (persistent backend).IdempotentConsumer(Header("messageId"), repository: redbRepository)

И ещё: MulticastDynamic RouterResequencerScatter-GatherClaim CheckThrottleDelayLoopEnrichPollEnrichSaga (без durable state — компенсирующие шаги), Circuit BreakerValidateMarshal/UnmarshalTransactedProcess. И стандартные обвязки — RetryDeadLetterChannelTryCatch/DoCatch/DoFinallyOnException.


Compiled expression engine

В Camel есть Simple Language — ${header.priority}${body.items.size()} и так далее. У него один минус: он интерпретируется в рантайме. На горячем пути это заметно.

У redb.Route есть то же самое, но компилируемое${header.priority}${body.OrderId}, арифметика, JSONPath, XPath — всё это парсится один раз при Build() маршрута, превращается в Expression<Func<IExchange, T>> через System.Linq.Expressions, компилируется в IL и кэшируется. На горячем пути — обычный делегат.

// Предикаты в Choice.When / Filter — три формы:.When(e => e.In.GetHeader<string>("priority") == "high")   // 1. lambda.When(Header("priority").isEqualTo("high"))                // 2. typed builder.When("header.priority == 'high'")                         // 3. string expression// String templates в SetBody / SetHeader / Log:.SetHeader("reply", "${header.orderId}-confirmed").SetBody("Order ${body.Id}: total=${body.Total} for ${header.customerName}").Log("Processed ${header.orderId} in ${header.elapsed}ms")

Можно смешивать. Internally — один и тот же AST с одним и тем же компилятором.


Транспорты

22 внешних + 5 встроенных:

Категория

Транспорты

Очереди / шины

Kafka, RabbitMQ, IBM MQ, MQTT (5.0), AMQP 1.0, Azure Service Bus, Redis

HTTP / RPC

HTTP (in/out), WebSocket, SignalR, gRPC, TCP

Файлы / хранилища

SFTP, FTP, File, S3, Firebase (Firestore + Cloud Storage + FCM), GenericFile (base)

Базы данных

SQL (polling outbox-style)

Корпоративные

LDAP / Active Directory, Mail (SMTP/IMAP/POP3), Elasticsearch 8.x

Планировщик

Quartz, Cron

Встроенные

Direct, SEDA, Timer, Mock, Log

Каждый — отдельный NuGet-пакет (redb.Route.Kafkaredb.Route.RabbitMQ и т.д.). Подключаются явно через AddRedbRoute*(). У большинства — fluent builder сверху URI-строки:

// URI formFrom("kafka://orders?groupId=svc&brokers=broker1:9092,broker2:9092&autoOffsetReset=earliest");// Type-safe builder — то же самоеFrom(Kafka.Topic("orders")    .Brokers("broker1:9092", "broker2:9092")    .GroupId("svc")    .AutoOffsetReset(AutoOffsetReset.Earliest)    .Acks("All"));

Builder лучше тем, что (а) intellisense, (б) опечатки ловятся компилятором, (в) refactor → rename работает.


Обработка ошибок — четыре уровня

Самое больное в hand-rolled-интеграциях. У Route — четыре уровня, которые комбинируются:

1. Per-step Retry — локальный retry вокруг одного шага. Когда внешний сервис флакающий, но в целом живой:

.Retry(maxRetries: 5, initialDelay: TimeSpan.FromSeconds(1)).To("http://flaky-service/submit")

2. DeadLetterChannel — route-level. Любая необработанная ошибка после retry уходит в указанный sink. Exception сохраняется на exchange, можно прочитать в DLC-маршруте:

From("kafka://orders")    .DeadLetterChannel("seda://dlq")    .Process(...)    .To("sql://orders");From("seda://dlq")    .Log("DLQ: ${exception.message}")    .Choice()        .When(e => e.GetException() is TimeoutException).To("seda://retry-later")        .Otherwise().To("sftp://archive/failed/")    .EndChoice();

3. DoTry / DoCatch / DoFinally — scoped, как обычный try/catch внутри маршрута:

.DoTry()    .To("http://external-api/submit")    .Process(async (e, ct) => await PostProcess(e, ct)).DoCatch<HttpRequestException>()    .Log("HTTP failure: ${exception.message}")    .To("seda://retry-queue").DoCatch<TimeoutException>()    .Log("Timeout, archiving")    .To("sftp://archive/timeouts/").DoFinally()    .Log("Attempt complete (success or failure)").End()

4. OnException<T> — на уровне всего контекста. Глобальные обработчики: регистрируются в IRouteContext и применяются ко всем маршрутам контекста, а не только к From(...) из этого класса. Объявил один раз в любом RouteBuilder — ловит исключения по всем From(...) всех RouteBuilder-ов этого модуля:

public class OrderRoutes : RouteBuilder{    protected override void Configure()    {        OnException<HttpRequestException>()            .MaximumRedeliveries(5)            .UseExponentialBackOff()            .BackOffMultiplier(2.0)            .Handled()                  // exchange продолжает идти как ни в чём            .To("seda://http-failures")        .EndOnException();        OnException<DbException>()            .MaximumRedeliveries(2)            .UseOriginalMessage()       // восстановить тело до обработки            .To("seda://db-failures")        .EndOnException();        // Эти обработчики ловят исключения не только тут, а во всех        // RouteBuilder-ах модуля — они живут на уровне контекста:        From("kafka://orders").To("http://payments-svc/charge");        From("kafka://shipments").To("http://logistics-svc/dispatch");    }}

.Handled().Continued().UseExponentialBackOff().UseOriginalMessage() — стандартные крутилки Camel’а, перенесённые один-в-один.


Transactional routes — что под капотом

.Transacted() — это не просто метка «оберни в TransactionScope». Это контракт с транспортом: при успехе всего pipeline сделай commit/ack, при failure — rollback/nack.

From("kafka://orders?groupId=svc&brokers=...")    .Transacted()                          // открывается транзакция    .ProcessWithRedb(async (redb, e, ct) => await SaveOrderAsync(redb, e, ct))    .To("rabbitmq://orders-confirmed");    // подтверждается в RabbitMQ                                           // commit Kafka offset + commit RabbitMQ + commit redb — атомарно

Что реально происходит:

  • Kafka-транспорт включает transactional producer (enable.idempotence=truetransactional.id), консьюмер коммитит offset через producer.SendOffsetsToTransaction(...) в той же транзакции. Это Kafka EOS — exactly-once между partition’ами.

  • RabbitMQ-транспорт включает publisher confirms (MaxOutstandingConfirms), плюс при .Transacted() — transacted channels (tx-select / tx-commit / tx-rollback).

  • IBM MQ, AMQP 1.0 — нативные транзакции через MQGMO_SYNCPOINT/local transactions.

  • SQL-транспорт биндит IDbTransaction к каждому шагу через TransactionScope (распределённую — для SQL Server, локальную — для PostgreSQL и MySQL).

Под капотом — интерфейс ITransactedAction, который каждый транспорт реализует по-своему. С точки зрения вашего кода — одна строчка .Transacted().


Реальный код из прода

Эти куски выдрал из боевого репозитория той самой внутренней TMS. Имена методов и переменных как есть.

InitRoute — точка входа модуля Tsak. Регистрируются компоненты (транспорты), redb-схемы, словари, и 11 RouteBuilder-классов:

public static class InitRoute{    public static IRouteContext main(IRouteContext context)    {        // 1. Транспорты модуля        context.AddComponent(new SqlComponent());        context.AddComponent(new HttpComponent { ServerManager = new SharedHttpServerManager() });        context.AddComponent(new CronComponent());        // 2. Named SQL data source — connection string в DI, не в URI маршрута        DbProviderFactories.RegisterFactory("Microsoft.Data.SqlClient", SqlClientFactory.Instance);        context.AddToRegistry("sap-s4", (ISqlConnectionFactory)new SqlConnectionFactory(            new SqlConnectionOptions            {                ConnectionString = sapConn,                ProviderName = "Microsoft.Data.SqlClient"            }));        // 3. Синхронизация redb-схем — code-first, без миграций        var redb = context.GetRedbService();        redb.InitializeAsync(ensureCreated: true).GetAwaiter().GetResult();        redb.SyncSchemeAsync<Driver>().GetAwaiter().GetResult();        redb.SyncSchemeAsync<Vehicle>().GetAwaiter().GetResult();        redb.SyncSchemeAsync<TransportationRoute>().GetAwaiter().GetResult();        redb.SyncSchemeAsync<TransportationPoint>().GetAwaiter().GetResult();        // ... ещё 8 схем        EnsureTsUmLists(redb);        await RefDataCache.RefreshAsync(redb);        // 4. Маршруты        var rc = (RouteContext)context;        rc.AddRoutes(new TsumExceptionRouteBuilder());     // global OnException        rc.AddRoutes(new TsumRouteBuilder());              // SQL polling от SAP S/4        rc.AddRoutes(new TsumPlacementRouteBuilder());     // расстановка машин        rc.AddRoutes(new TsumTransportStatusRouteBuilder());        rc.AddRoutes(new TsumShippingPointRouteBuilder());        rc.AddRoutes(new TsumBackupRouteBuilder());        // daily cron backup        rc.AddRoutes(new TsumCleanupRouteBuilder());        rc.AddRoutes(new TsumSliceJobRouteBuilder());        // ... ещё 3        return context;    }}

SQL polling из SAP S/4 stored procedure → XML → redb. Один из ключевых ETL-маршрутов:

private void ConfigureSqlConsumer(){    var procs = Context!.GetProperty<IDictionary<string, object?>>("Procedures")!;    var proc = (string)procs["MonitoringReport"]!;   // имя процедуры — из конфига, по Mode (prod/test)    var builder = Sql.Poll($"EXEC {proc} @DateFrom, @DateTo, @OutputFormat")        .DataSource(Constant("sap-s4"))        .CommandTimeout(300)        .Delay(60000)                                // опрос раз в минуту        .OutputType(SqlOutputType.Scalar)            // FOR XML возвращает одно значение        .Param("DateFrom", (object?)null)        .Param("DateTo", (object?)null)        .Param("OutputFormat", "XML");    From(builder)        .RouteId("tsum-sql-consumer")        .Process(e => e.In.ContentType = "application/xml")        .Process(DeserializeXml)                     // XmlSerializer → List<TransportationOrder>        .To("direct://tsum");}private void ConfigureHttpConsumer(){    // Тот же конечный обработчик, но фронт для тестов руками    From("http:0.0.0.0:5089/api/tsum/monitoring?inOut=true")        .RouteId("tsum-http-consumer")        .ConvertBody<string>()        .Process(DeserializeXml)        .To("direct://tsum");}private void ConfigureProcessing(){    // Единая точка обработки — что из SQL, что из HTTP    From("direct://tsum")        .RouteId("tsum-processing")        .ProcessWithRedb(async (redb, exchange, ct) =>        {            var orders = exchange.In.GetBody<List<TransportationOrder>>();            if (orders is null || orders.Count == 0) return;            // Синхронизация справочников водителей/машин/точек            var dicts = await DictionarySyncService.SyncFromOrdersAsync(redb, orders, ct);            if (dicts.AnyChanged) await RefDataCache.RefreshAsync(redb);            // Дедупликация существующих маршрутов            var codes = orders.Select(o => o.Code).ToList();            var existing = await redb.Query<TransportationRoute>()                .WhereRedb(o => codes.Contains(o.ValueString!))                .ToListAsync();            // ... ещё ~200 строк бизнес-логики        });}

Заметная штука: один и тот же обработчик (direct://tsum) обслуживает и SQL-источник, и HTTP-эндпоинт для ручных тестов. Это и есть тот самый Camel’овский трюк — direct: как in-process invariant’ный канал, к которому можно прицепить любой To(...).

HTTP API на порту 5090 с авторизацией через JWT + LDAP. Один из endpoint’ов:

private void ConfigureRoutesEndpoint(){    From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true&corsOrigins=*")        .RouteId("tsum-api-routes")        .Process(Auth.ProcessAsync)                         // валидация JWT, наполнение headers        .Process(TsumAuthProcessor.RequirePermission(TsumPermission.ViewAll))        .ConvertBody<string>()        .ProcessWithRedb(async (redb, exchange, ct) =>        {            var bodyStr = exchange.In.Body?.ToString() ?? "{}";            var filter = JsonSerializer.Deserialize<RouteFilterRequest>(bodyStr, JsonOptions);            // GET-параметры тоже принимаем — для отладки в браузере            string? QP(string name) =>                exchange.In.Headers.TryGetValue($"redbHttp.QueryParam.{name}", out var v)                    ? v?.ToString() : null;            if (QP("startPlanFrom") is { } qpFrom &&                DateTimeOffset.TryParse(qpFrom, out var from))                filter.StartPlanFrom ??= from;            // ... ещё несколько query-параметров            var query = redb.Query<TransportationRoute>();            // Динамическая фильтрация через redb LINQ            if (!string.IsNullOrEmpty(filter.CodeSearch))                query = query.WhereRedb(o => o.ValueString!.Contains(filter.CodeSearch));            var spItems = await ResolveIds(filter.ShippingPointIds);            if (spItems.Count > 0)                query = query.Where(r => spItems.Contains(r.ShippingPoint));            // ... ещё ~10 фильтров и пагинация            var items = await query.Skip(offset).Take(limit).ToListAsync();            JsonRouteHelper.SetJsonBody(exchange, new { total, offset, limit, items });        });}

HTTP-транспорт здесь — это redb.Route.Http с собственным Kestrel-консьюмером. inOut=true — request/response режим, ответ возвращается в Body exchange’а. Заголовки redbHttp.QueryParam.* парсятся транспортом автоматически.

Cron backup всей БД через redb.Export, с ротацией. Без сторонних шедулеров:

protected override void Configure(){    var backupConfig = Context!.GetProperty<IDictionary<string, object?>>("Backup");    var directory = (string)backupConfig["Directory"]! ?? "backups";    var retentionDays = int.Parse(backupConfig["RetentionDays"]?.ToString() ?? "7");    Context.SetProperty("_backup.directory", directory);    Context.SetProperty("_backup.retentionDays", retentionDays);    From("cron://tsum-backup?schedule=0 0 3 * * ?")    // каждый день в 03:00        .RouteId("tsum-backup-cron")        .ProcessWithRedb(RunBackupAsync);}private async Task RunBackupAsync(IRedbService redb, IExchange exchange, CancellationToken ct){    var pgConn = redb.Configuration.ConnectionString!;    var directory = Context!.GetProperty<string>("_backup.directory")!;    Directory.CreateDirectory(directory);    var fileName = $"tsum_backup_{DateTime.UtcNow:yyyy-MM-dd_HHmmss}.redb";    var filePath = Path.Combine(directory, fileName);    var provider = ProviderFactory.Create("postgres");    await provider.OpenAsync(pgConn, ct);    var exportService = new ExportService(provider, verbose: false, batchSize: 10000);    await exportService.ExportAsync(filePath, schemeIds: null, compress: true, dryRun: false, ct);    RotateBackups(directory, retentionDays);}

Транспорт cron:// — это redb.Route.Quartz с Quartz.NET под капотом. Cron-expression стандартный, Quartz-овский.

Если посмотреть на эти три фрагмента — там нет ни одной строки про инициализацию консьюмеров, про работу с IServiceProvider, про lifecycle, про graceful shutdown. Всё это сидит внутри Route. Бизнес-код — это Process(...) и ProcessWithRedb(...).


Второй прод — RabbitMQ как RPC и pub/sub шина

Те куски выше про TMS — это SQL + HTTP + cron. Чтобы не складывалось впечатление «redb.Route только под polling-задачи», вот фрагменты из второй продовой системы того же заказчика — платформы доставки на ~150k заказов/мес. Модули общаются друг с другом через RabbitMQ: одни маршруты — синхронный RPC поверх очередей с CorrelationId, другие — fire-and-forget pub/sub. Старая версия кода была на snake-case-форке нашего DSL; сейчас всё работает на актуальном 3.0 — привожу к каноническому API.

RPC через RabbitMQ — запрос/ответ с CorrelationId. Модуль ControlPanel пишет в очередь dal.controlpanel.trips.get.tracking.link, ждёт ответ с тем же CorrelationId. На стороне DAL:

public class TripLinkRouteBuilder : RouteBuilder{    protected override void Configure()    {        From(Rabbit.Queue("dal.controlpanel.trips.get.tracking.link")                   .ConnectionFactory("RabbitMQConnectionFactory"))            .RouteId("controlpanel_getTrackingLink")            .SetHeader("routeId", "controlpanel_getTrackingLink")            // Валидация: без CorrelationId роутить нечего            .Filter(Header("CorrelationId").IsNull())                .DoThrow<ArgumentNullException>(                    "CorrelationId is required. RouteId='controlpanel_getTrackingLink'")            .EndFilter()            .Log("GetTrackingLink DAL STARTED: CorrelationId=${header.CorrelationId}")            .InvokeController()                            // → TripController.GetTrackingLink            .DbSaveChanges()                               // commit EF Core changes            .ApiPackResponse(                              // упаковка в стандартный envelope                toData:  Property("data"),                toMeta:  Property("meta"),                code_dal: 200,                packJwt: true)            .WireTap("direct://delivery.info.send")        // асинхронная нотификация — fire-and-forget            .Log("GetTrackingLink DAL COMPLETED: CorrelationId=${header.CorrelationId}")            .Respond();                                    // ответ на reply-to queue с тем же correlationId    }}

Rabbit.Queue(...).ConnectionFactory(...) использует named-фабрику из DI-registry (как named SQL data source в первом проде — секреты не торчат в URI маршрута). .Respond() — это RabbitMQ RPC-pattern: транспорт сам читает reply-to и correlation-id из заголовков входящего сообщения и публикует тело exchange’а обратно. Никакого ручного BasicProperties.ReplyTo / BasicProperties.CorrelationId в коде.

Глобальный OnException на весь RouteBuilder — стандартный envelope ошибки + 500:

protected override void Configure(){    if (!HasExceptionRoute<Exception>())        OnException<Exception>()            .Log("Exception: ${exception.message}", LogLevel.Error)            .SetHeader("code_dal", (int)HttpStatusCode.InternalServerError)            .SetBody(e => new BaseErrorResponse            {                traceId     = e.In.GetHeader<string>("CorrelationId") ?? "no-correlation",                code        = (int)HttpStatusCode.InternalServerError,                message     = e.Exception?.Message ?? "Internal server error.",                description = e.Exception?.ToString()            }.ToJson())            .ExceptionHandled()            .Stop()        .EndOnException();    // ConfigureGetTrackingLinkRoute() ... остальные routes выше}

Pub/sub через RabbitMQ — email-нотификации. Любой модуль может бросить exchange’у lt.dal с routing key send.email — отдельный SMTP-воркер (отдельный процесс, отдельный модуль Tsak) подберёт и отправит. Локальная точка входа — direct://send-trip-email-notification, в неё пишут все RouteBuilder’ы, которым нужно уведомить:

public class EmailNotificationRouteBuilder : RouteBuilder{    protected override void Configure()    {        From("direct://send-trip-email-notification")            .RouteId("dal_sendTripEmailNotification")            .Log(LogLevel.Debug,                "SendTripEmailNotification: tripId=${property.tripId}, type=${property.emailType}")            .Process((e, ct) =>            {                // Изолированный scope под scope'd DbContext (route отрабатывает в WireTap-таске)                using var scope = Context.GetServiceProvider()!.CreateScope();                var db     = scope.ServiceProvider.GetRequiredService<LtContext>();                var logger = Context.GetService<ILogger>();                var cfg    = Context.GetProperty<IDictionary<string, object?>>("EmailNotifications");                if (!IsEnabled(cfg)) { e.In.SetHeader("code_dal", 204); return; }                var tripId = e.GetProperty<long>("tripId");                var trip   = db.Trips.AsNoTracking()                    .Include(t => t.Driver).ThenInclude(d => d!.IdNavigation).ThenInclude(a => a.Branch)                    .Include(t => t.Trippoints).ThenInclude(tp => tp.Location)                    .FirstOrDefault(t => t.Id == tripId);                if (trip?.Driver?.IdNavigation?.Branch is not { } branch)                { e.In.SetHeader("code_dal", 404); return; }                // Адресатов берём из настроек филиала — список email'ов в одной из двух колонок                var emailType = e.GetProperty<string>("emailType");                var recipients = EmailHelper.ParseAndValidateEmailList(                    emailType == "trip-point-change"                        ? branch.Trippointorderchangeemaillist                        : branch.Tripcompletionemaillist);                if (recipients.Count == 0)                { e.In.SetHeader("code_dal", 204); return; }                var (subject, html) = BuildEmailContent(trip, emailType, cfg, e);                e.In.SetHeader("emailTo",          string.Join(",", recipients));                e.In.SetHeader("emailSubject",     subject);                e.In.SetHeader("emailContentType", "text/html");                e.In.SetBody(new { content = html }.ToJson());                e.In.SetHeader("code_dal", 200);            })            // Только если письмо реально готово к отправке — пушим в очередь            .Filter(Header("code_dal").IsEqualTo(200))                .To(Rabbit.Exchange("lt.dal", type: "topic")                          .RoutingKey("send.email")                          .ConnectionFactory("RabbitMQConnectionFactory"))                .Log("Email sent to RabbitMQ exchange lt.dal/send.email")            .EndFilter();    }}

Дальше эту direct-точку дёргают через WireTap из любого маршрута, который меняет поездку:

// Где-то в TripRouteBuilder, после успешного UPDATE.SetProperty("tripId",        Header("trip-id")).SetProperty("emailType",     Constant("trip-point-change")).SetProperty("changeDescription", e => "Order added on point #" + e.In.GetHeader<int>("point-seq")).WireTap("direct://send-trip-email-notification")

WireTap копирует exchange и пушит копию в указанный endpoint асинхронно, не блокируя основной поток. Получатель — direct://... в этом же процессе, дальше — RabbitMQ. Если SMTP-воркер лежит — письма копятся в очереди, основной HTTP API продолжает отвечать.

Kafka — GPS-координаты с мобилок. Самый нагруженный маршрут той же системы. Водители (~500 одновременно) льют батчи координат через мобильное API → топик mobile.trips.location.sync. На каждой ноде кластера сидит один consumer; Kafka Consumer Group сам раздаёт партиции между нодами (3 партиции → 3 ноды, по одной на каждую). Дальше — группировка по tripId, последняя точка каждого tripId асинхронно уходит в Redis (last GPS location для веб-дашборда), полный батч рассылается по RabbitMQ-очередям DAL-модулей (одна — на запись в БД, вторая — на детекцию check-in/check-out, третья — на пересчёт метрик):

public class GpsKafkaBatchAggregatorBuilder : RouteBuilder{    protected override void Configure()    {        var cfg = Context.GetSection(GetType().Name);        // Один consumer на ноду. Партиции (0..N) распределит Consumer Group.        // partition= НЕ указываем — иначе сломаем балансировку.        From(Kafka.Topic(cfg.GetValue("kafka.Topic", "mobile.trips.location.sync"))                  .Brokers(cfg.GetValue("kafka.Brokers", "localhost:9092"))                  .GroupId(cfg.GetValue("kafka.GroupId", "gps-batch-aggregator"))                  .MaxPollRecords(cfg.GetValue("kafka.MaxPollRecords", 100))                  .AutoOffsetReset("earliest"))            .RouteId("GpsKafkaBatchAggregator")            .Transacted()                                       // Kafka EOS: commit offset в одной tx с downstream            .Log("incoming batch: kafka.batch.size=${header.kafka.batch.size}")            .To("direct://gps-batch-grouper");        From("direct://gps-batch-grouper")            .RouteId("GpsBatchGrouper")            .Process(new GpsBatchGrouperProcessor(outputType: GroupingOutputType.Dictionary))            .Log("trips=${header.gps.trips.count}, points=${header.gps.total.points}")            // Извлекаем последнюю координату по каждому tripId → property для Redis            .Process((e, ct) =>            {                var grouped = e.In.GetBody<Dictionary<long, List<LocationHistory>>>();                var last = grouped.ToDictionary(                    kvp => kvp.Key.ToString(),                    kvp => new GpsLocationDto(                        ts:  kvp.Value.Max(l => l.ts).ToString("o"),                        lat: kvp.Value.OrderByDescending(l => l.ts).First().lat,                        lon: kvp.Value.OrderByDescending(l => l.ts).First().lon).ToJson());                e.SetProperty("gpsLastLocationMap", last);            })            // Fire-and-forget в Redis — основной pipeline ждать не должен            .WireTap("direct://save-gps-last-location")            // Раздача батча по трём очередям DAL            .Multicast().ParallelProcessing()                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.trips.location.sync"))                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.gps.checkin_checkout"))                .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.metrics.calculate"))            .EndMulticast();        // Запись last-location в Redis (TTL 24 часа). Ключ: gps:last:location:{tripId}        From("direct://save-gps-last-location")            .RouteId("SaveGpsLastLocation")            .Split(Property("gpsLastLocationMap"))                .SetHeader("tripId", e => ((KeyValuePair<string, string>)e.In.GetBody()).Key)                .SetBody(e =>            ((KeyValuePair<string, string>)e.In.GetBody()).Value)                .To("redis:set:gps:last:location:${header.tripId}?ttl=86400")            .EndSplit();    }}

Соседний consumer (mobile.trips.unplanned.stops) делает детекцию незапланированных остановок — там интересен локальный OnException. JSON, который не парсится — это не транзиентная ошибка, retry не поможет. Маршрут отправляет сообщение в RabbitMQ-DLQ с метаданными (partition, offset, preview оригинального body) и пробрасывает исключение наверх — consumer падает, Kafka откатывает offset, поднимается заново, и проблемная партиция уйдёт в DLQ при следующей попытке. Это правильный pattern для poison message:

From(Kafka.Topic("mobile.trips.unplanned.stops").GroupId("unplanned-stops-processor")          .Brokers(brokers).AutoOffsetReset("latest"))    .RouteId("UnplannedStop-Kafka")    .Transacted()    .To("direct://unplanned-stop-grouper");From("direct://unplanned-stop-grouper")    .RouteId("UnplannedStop-Grouper")    .OnException<JsonException>()        .MaximumRedeliveries(0)    // JSON не станет валидным от повтора        .Handled(false)            // консьюмер ДОЛЖЕН упасть, чтоб Kafka не закоммитила offset        .Process(e =>        {            e.In.SetHeader("dlq.errorType",     "JsonException");            e.In.SetHeader("dlq.errorMessage",  e.Exception?.Message);            e.In.SetHeader("dlq.partition",     e.In.GetHeader<int>("kafka.partition"));            e.In.SetHeader("dlq.offset",        e.In.GetHeader<long>("kafka.offset"));            e.In.SetHeader("dlq.rawBodyPreview",                Truncate(e.GetProperty<string>("OriginalKafkaBody") ?? e.In.GetBody<string>(), 500));        })        .To(Rabbit.Exchange("lt.dlq").RoutingKey("dal.dlq.gps.json"))        .Log(LogLevel.Error,            "JSON failed → DLQ: partition=${header.kafka.partition}, offset=${header.kafka.offset}")    .EndOnException()    .Process(new UnplannedStopDetectorProcessor(cacheManager, lookbackMinutes: 60))    .To("direct://persist-unplanned-stops");

Несколько штук, которые видно только на реальном проде:

  • partition= в Kafka URI указывать нельзя. Чуть выше прокомментировано в коде — это типичная грабля. Если указать — Consumer Group ломается, все ноды конкурируют за одну партицию.

  • .Transacted() + .Multicast() — батч уходит во все три очереди одновременно (ParallelProcessing), но Kafka offset коммитится только если все .To(...) отработали. Если RabbitMQ временно недоступен — батч приедет повторно, без потерь.

  • WireTap для Redis — last-location нужен на дашборде «здесь и сейчас», но если Redis лежит, основной pipeline на запись в БД не должен ждать. WireTap копирует exchange в отдельный поток.

  • DLQ с dlq.rawBodyPreview — когда инженер открывает DLQ-очередь, ему нужны не «что-то сломалось», а partition + offset + первые 500 символов исходного body. С этим можно прийти к мобильному разрабу и сказать «вот это вы шлёте».

Что важно увидеть на этом примере:

  • Один и тот же Rabbit.Queue(...) / Rabbit.Exchange(...) builder покрывает и RPC (с .Respond()), и pub/sub (с .To(...)) — разница только в форме хвоста маршрута.

  • InvokeController() / DbSaveChanges() / ApiPackResponse(...) — это не часть базового Route, это шаги-расширения, которые сделали внутри проекта (lt.Core.Route). Любой шаг — это IProcessor, регистрируется в IRouteContext, доступен через extension method. То есть DSL расширяемый: команде нужны свои first-class steps под «вызови контроллер → сохрани EF Core → упакуй ответ» — пишут их один раз, используют во всех маршрутах.

  • WireTap + direct:// — стандартный Camel-овский паттерн для fire-and-forget’а внутри процесса. Никаких Task.Run, никаких IHostedService-очередей под капотом — это всё в Route.


Если хочется потрогать руками — redb.Route.Demo

В репозитории лежит модуль redb.Route.Demo — 39 маршрутов в 9 секциях, 18 транспортов, всё в одном проекте, всё запускается через dotnet run. По сути — единый reference implementation, в котором есть каждая фича фреймворка: RPC через RabbitMQ/AMQP 1.0/gRPC/IBM MQ, WireTap в Kafka и файл, SQL+TransactionScope, Redis Pub/Sub, TCP echo, WebSocket push, MQTT, SEDA, DirectVM cross-context, Timer/Cron, CircuitBreakerRetry с backoff, DeadLetterChannelAggregatorMulticastRecipientListDynamicRouterLoopResequencerEnrichIdempotentConsumerThrottle, JSON Schema validation, Traced+Metered, lifecycle listeners, named IRedbService.

Заходной маршрут — POST /api/demo. Дальше идёт типовой ESB-pipeline:

HTTP → Throttle(10/s) → IdempotentConsumer → JSON Schema validation  → Choice(mode) → Multicast(parallel)      ├─ RabbitMQ RPC  → stamp.rabbit      ├─ AMQP 1.0 RPC  → stamp.amqp      ├─ gRPC RPC      → stamp.grpc      └─ IBM MQ RPC    → stamp.wmq  → BeginTransaction → SQL INSERT + SELECT → CommitTransaction  → WireTap fan-out → Kafka + File + Redis + MQTT + SEDA  → JSON response

Это ровно то, что в проде «склеивает» Camel у джавистов — только здесь это runnable demo на 1500 строк C#, который запускается одной командой против docker compose с RabbitMQ/Postgres/Kafka/Redis/MQTT.

Самая показательная секция для тех, кто думает «а как Route дружит с redb.Core» — NamedRedbRoutes.cs. Из маршрута поднимается named IRedbService (можно держать несколько подключений к разным БД одновременно — pg-testmssql-test), CRUD идёт напрямую через ProcessWithRedb("instance-name", async (redb, ex, ct) => ...):

internal sealed class NamedRedbRoutes : RouteBuilder{    protected override void Configure()    {        From("timer://named-redb-check?period=30000&delay=5000")            .RouteId("demo-named-redb-pgsql")            .Log("[NAMED-REDB] checking pg-test instance...")            // Метаданные подключения — версия БД, имя cache-domain            .ProcessWithRedb("pg-test", (redb, ex) =>            {                ex.In.Headers["pg-db-type"]      = redb.dbType    ?? "n/a";                ex.In.Headers["pg-db-version"]   = redb.dbVersion ?? "n/a";                ex.In.Headers["pg-cache-domain"] = redb.CacheDomain ?? "n/a";            })            .Log("[NAMED-REDB] pg-test: ${header.pg-db-type} ${header.pg-db-version}")            // SAVE — RedbObject<TProps> с автогенерацией схемы по [RedbScheme]            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>            {                var item = new RedbObject<DemoItemProps>                {                    name  = $"Demo-{DateTime.UtcNow:HHmmss}",                    Props = new DemoItemProps                    {                        Title       = "Named Redb Demo",                        Description = $"Created at {DateTime.UtcNow:O}",                        Priority    = Random.Shared.Next(1, 10)                    }                };                var id = await redb.SaveAsync(item);                ex.In.Headers["saved-id"] = id.ToString();            })            // LOAD by id            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>            {                var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!);                var obj = await redb.LoadAsync<DemoItemProps>(id);                ex.In.Headers["loaded"] = obj is null                    ? "NOT FOUND"                    : $"{obj.name} (priority={obj.Props?.Priority})";            })            // QUERY с LINQ-Where прямо по props            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>            {                var items = await redb.Query<DemoItemProps>()                    .Where(p => p.Priority > 3)                    .Take(5)                    .ToListAsync();                ex.In.Headers["query-count"] = items.Count.ToString();            })            // DELETE            .ProcessWithRedb("pg-test", async (redb, ex, ct) =>            {                var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!);                await redb.DeleteAsync(id);            })            .Log("[NAMED-REDB] PG CRUD cycle complete");        // Тот же шаблон, но на MSSql-инстансе — provider абстрагирован        From("timer://named-redb-mssql?period=30000&delay=8000")            .RouteId("demo-named-redb-mssql")            .ProcessWithRedb("mssql-test", async (redb, ex, ct) => { /* save / load / query / delete */ });    }}

Что тут видно:

  • ProcessWithRedb("name", ...) — это extension в пакете redb.Route.RedbCore. По имени достаёт нужный IRedbService из реестра (Redb секция в конфиге проекта), скоупит его на время обработки exchange’а и отдаёт в лямбду. Никакого IServiceProvider.GetRequiredService<IRedbService>() в коде маршрута.

  • SaveAsync / LoadAsync / Query<T>().Where(...).ToListAsync() / DeleteAsync — это стандартный IRedbService API, тот же что и в обычном ASP.NET-сервисе. Маршрут не делает ничего особенного — он просто получает уже инициализированный сервис.

  • Несколько именованных инстансов одновременно — pg-test и mssql-test живут параллельно, можно из одного маршрута писать в Postgres и одновременно зеркалить в MSSql.

  • Схема ([RedbScheme]-классы DemoItemProps) синхронизируется один раз при старте модуля (InitRoute.cs дёргает redb.InitializeAsync(ensureCreated: true)), дальше маршрут просто работает с типизированными RedbObject<TProps>.

То есть в проде из первого блока (tsum — SQL polling из SAP → redb.Query<TransportationRoute>() → апдейт) лежит ровно та же связка, что и здесь, в Demo — только обёрнутая в реальную бизнес-логику. Хотите потрогать без чужого прода — git clone, поднимите docker composedotnet run --project redb.Route.Demo, и оно ответит на POST http://localhost:5088/api/demo JSON-ом со всеми штампами от всех брокеров.


«Я люблю контроллеры из ASP.NET» — пожалуйста, redb.Route.Controllers

Отдельная категория комментариев, которая под такими статьями стабильно появляется: «зачем мне ваш fluent-DSL, я хочу [HttpGet("{id}")] и [FromBody] Order order, как привык в ASP.NET, и чтобы IDE подчёркивала несуществующие роуты». Для них есть пакет redb.Route.Controllers — контроллеры с теми же атрибутами, что и в ASP.NET, но transport-agnostic: один и тот же класс работает за HTTP, SignalR, gRPC или любым другим InOut-эндпоинтом.

[Route("orders")]public class OrdersController : RedbController{    [HttpGet("{id}")]    public Task<Order> GetOrder(int id, [FromHeader("X-Tenant")] string tenant)    {        var redb = Context.GetRedbService();              // тот же IRedbService        return redb.Query<OrderProps>()            .Where(o => o.Id == id && o.Tenant == tenant)            .FirstAsync();    }    [HttpPost]    public async Task<long> CreateOrder([FromBody] Order order, CancellationToken ct)    {        var redb = Context.GetRedbService();        return await redb.SaveAsync(new RedbObject<OrderProps> { Props = order.ToProps() });    }    [HttpDelete("{id}")]    public Task DeleteOrder([FromRoute("id")] int id) =>        Context.GetRedbService().DeleteAsync(id);}

Привязка маршрута к транспорту — одна строчка:

// HTTP — читает redbHttp.Method / redbHttp.Path из транспортаFrom("http://0.0.0.0:8080/api")    .RedbHttpController<OrdersController>();// SignalR — тот же класс, dispatch по redbSignalR.MethodFrom("signalr://bridge")    .RedbSignalRController<OrdersController>();// gRPC — dispatch по dispatch-method из metadataFrom("grpc://0.0.0.0:5000")    .RedbGrpcController<OrdersController>();// Или сразу пачкой по сборке — как MapControllers() в ASP.NETvar registry = new ControllerRegistry();registry.RegisterAssembly(typeof(OrdersController).Assembly);From("http://0.0.0.0:8080/api").RedbHttpController(registry);

Что тут важно понимать:

  • [Route][HttpGet/Post/Put/Delete/Patch][FromBody][FromHeader][FromQuery][FromRoute] — те же самые атрибуты, что вы знаете из ASP.NET. Кривая обучения нулевая.

  • Контроллер наследуется от RedbController и получает Context (тот самый IRouteContext) и Exchange напрямую — никакого IServiceProvider-инъекции в конструктор не нужно. Хотите DI — берёте сервис через Context.GetService<T>().

  • Один контроллер — три транспорта. Тот же OrdersController отвечает на GET /orders/42 через HTTP, на Send("GetOrder", 42) через SignalR-хаб и на gRPC-вызов с dispatch-method=GetOrder. Это та самая «adapter on top of transport»-идея, ради которой обычно пишут Mediator-обвязки руками.

  • Это не отдельный pipeline-инструмент, это IProcessor внутри маршрута. До контроллера можно влепить IdempotentConsumerThrottleOnException<T>WireTap в audit — а только потом отдать в controller dispatcher. Никакого «контроллер vs route» — это контроллер ВНУТРИ route:

From("http://0.0.0.0:8080/api")    .Throttle(500).Per(TimeSpan.FromSeconds(1))    .IdempotentConsumer(Header("Idempotency-Key"), repository: "redb")    .WireTap("seda://audit")    .RedbHttpController<OrdersController>();

Под капотом — ControllerRegistry сканирует сборку, собирает route table из атрибутов, на каждый запрос dispatcher читает headers, матчит template, биндит параметры ({id} → [FromRoute], query → [FromQuery], тело → [FromBody], остальное по конвенции имени). Ответ возвращается в exchange.In.Body, статус — в status.code (200 для значения, 204 для Task, 404 для no-match, 500 для исключения). Полная таблица атрибутов и conventions — в redb.Route.Controllers/README.md.

То есть это компромисс ровно для того лагеря, который и в 2026-м не готов отказаться от [HttpGet("{id}")]: внешне — привычный ASP.NET-style controller, внутри — обычный Route-pipeline со всеми EIP-плюшками и без привязки к Kestrel. Если ваш «HTTP API» завтра должен заодно слушать SignalR и gRPC — переписывать ничего не надо, добавляются ещё два From(...).Redb*Controller<OrdersController>().


redb.Tsak — где это хостить в проде

redb.Route описывает что делает маршрут. Когда у вас один проект — он живёт внутри dotnet run. Когда маршрутов 30 и они принадлежат разным командам — нужен runtime-контейнер.

tsak

tsak

redb.Tsak — это runtime container для модулей redb.Route. Каждый модуль (.dll или .tpkg-бандл) загружается в изолированный AssemblyLoadContext, имеет свой IRouteContext, свой DI, свои конфиги. Tsak управляет lifecycle’ом независимо: добавил модуль — он стартанул, убрал — выгрузился, обновил — hot-reload без рестарта процесса.

Что есть из коробки:

  • REST API — 32 endpoint’а: управление контекстами, маршрутами, модулями, кластером, scheduler’ом, логами, пользователями

  • CLI — 30 команд: tsak module uploadtsak context starttsak route stoptsak context list — удобно для CI/CD

  • Blazor Server dashboard — 10 страниц: CPU/RAM/GC, per-route latency, ring-buffer логи, статус watchdog

    endpoints

    endpoints
    monitoring

    monitoring
  • Hot-reload — кладёшь обновлённую .dll в Libs/, Tsak подхватывает, старая версия выгружается с graceful drain

  • Кластер — leader election + автоперераспределение контекстов между нодами, через redb-базу, без Redis / ZooKeeper / Consul

  • Quartz scheduler — RAMJobStore для standalone, AdoJobStore для кластера (схема создаётся автоматически)

  • OpenTelemetry — Activities + Meters на каждый маршрут и шаг, Prometheus scrape

Деплой нового маршрута:

# Вариант 1 — голая DLL. Бросаем сборку в Libs/, Tsak увидит, поднимет# модуль с graceful drain старой версии. Подходит для дев-цикла.cp Orders.dll /tsak/worker/Libs/# Вариант 2 — .tpkg-бандл (ZIP с manifest.json + DLL + <Module>.config.json).# Собирается прямо из .csproj — у redb.Route.Demo есть Target "PackTpkg"# (AfterTargets="Build"), который кладёт .tpkg рядом и копирует в Tsak Libs.# Так едет в прод: один файл, версионируемый, с конфигом и манифестом внутри.cp Orders.tpkg /tsak/worker/Libs/# или через CLI поверх REST APItsak module upload orders --file Orders.tpkgtsak context start orders# Остановить один маршрут без рестарта процессаtsak route stop orders order-pipeline# Что сейчас работаетtsak context listtsak route list orders

Папка Libs/ — это стандартная convention-точка модулей. В исходниках она лежит ровно по таким же путям: redb.Tsak/src/redb.Tsak.Worker/Libs/ для запуска из репозитория, worker/Libs/ в готовых релизных бинарниках. В Docker-образе — /app/worker/Libs/, монтируется как volume; добавили .dll или .tpkg снаружи — внутри контейнера Tsak поднял модуль без рестарта.

Минимальный пример MSBuild-таргета для упаковки .tpkg прямо из проекта модуля — манифест + DLL + конфиг в один ZIP, и сразу копия в Libs/ Tsak’а (как сделано в redb.Route.Demo.csproj):

<PropertyGroup>  <TsakModuleName>Orders</TsakModuleName>  <TsakLibsDir>$(MSBuildThisFileDirectory)..\..\tsak\worker\Libs</TsakLibsDir></PropertyGroup><Target Name="PackTpkg" AfterTargets="Build">  <PropertyGroup>    <_Staging>$(IntermediateOutputPath)tpkg</_Staging>    <_Tpkg>$(MSBuildThisFileDirectory)output\$(TsakModuleName).tpkg</_Tpkg>  </PropertyGroup>  <RemoveDir Directories="$(_Staging)" />  <MakeDir   Directories="$(_Staging)" />  <Copy SourceFiles="$(MSBuildThisFileDirectory)manifest.json"            DestinationFolder="$(_Staging)" />  <Copy SourceFiles="$(TargetPath)"                                        DestinationFolder="$(_Staging)" />  <Copy SourceFiles="$(MSBuildThisFileDirectory)$(TsakModuleName).config.json"        DestinationFolder="$(_Staging)"        Condition="Exists('$(MSBuildThisFileDirectory)$(TsakModuleName).config.json')" />  <ZipDirectory SourceDirectory="$(_Staging)" DestinationFile="$(_Tpkg)" Overwrite="true" />  <Copy SourceFiles="$(_Tpkg)" DestinationFolder="$(TsakLibsDir)" />  <Touch Files="$(TsakLibsDir)\$(TsakModuleName).tpkg" /></Target>

dotnet build → Orders.tpkg в Libs/ → Tsak его поднял. В CI/CD получается одна артефакт-сущность с проставленной версией, конфигом и манифестом — деплой сводится к публикации файла.

Маршруты с cluster=true идут через redb-координатор. В кластерном режиме маршрут можно пометить как кластерный — тогда Tsak сам распределяет его экземпляры между нодами и следит, что только нужное количество копий запущено. Состояние, partitioning, балансировка — через redb objects/values. Quartz при этом использует свои AdoJobStore-таблицы в той же БД, но это его внутренние таблицы.

Добавить ноду в кластер — запустить ещё один экземпляр Tsak с той же строкой подключения к БД. Никакого ZooKeeper / Consul / Redis как внешней зависимости.

Тот же самый RouteBuilder, который вы написали для обычного IHostedService, работает в Tsak без изменений — тот же Configure(), тот же IExchange, те же OnException и .Transacted().

351 проходящий тест, Apache 2.0.

Про Tsak — отдельная большая статья. Здесь это упомянуто крупными мазками, потому что Tsak — это самостоятельный продукт со своей архитектурой: AssemblyLoadContext-изоляция, 5-слойный конфиг-pipeline, leader election на redb-объектах без внешнего координатора, Blazor-дашборд, REST API, CLI, hot-reload c graceful drain. В одну Route-статью это не влезает. Если зайдёт — напишу отдельно.


Где Route живёт в экосистеме redb

redb.Route — это не одиночный пакет, а слой в стопке продуктов, которые мы пишем последние годы и которые собираются друг из друга. По уровням снизу вверх:

  • redb.Core — ядро: типизированная объектная модель (RedbObject<TProps>), code-first схемы ([RedbScheme]), expression-LINQ-провайдер, кэши, сериализация, валидация, security. Всё абстрактно, без привязки к СУБД.

  • redb.Core.Pro — платная надстройка над Core: change tracking, materialization, миграции схем, расширенные query/scheme-провайдеры.

  • redb.Postgres / redb.MSSql — Free-провайдеры: реализация IRedbContext и фабричных методов RedbServiceBase поверх Npgsql / Microsoft.Data.SqlClient.

  • redb.Postgres.Pro / redb.MSSql.Pro — Pro-провайдеры: подтягивают Core.Pro-возможности (materialization, миграции) на конкретную СУБД.

  • redb.Identity — самостоятельный identity-сервер, построенный поверх redb-объектов: пользователи, роли, JWT, refresh, LDAP-федерация. Используется и в Route-маршрутах (тот же Http.Listen().UseJwt(...)), и снаружи.

  • redb.Export — экспорт/бэкап redb-данных в файлы. Используется в TsUM-проде как cron-маршрут (Cron.Every(...).Process(BackupRedbBase)).

  • redb.Route — то, про что эта статья. Берёт IRedbService из провайдера, добавляет EIP-DSL, транспорты, OpenTelemetry. Маршруты — обычные C#-классы, schema-aware: ProcessWithRedb("pg-test", ctx => ctx.SaveAsync(obj)).

  • redb.Tsak — runtime-контейнер для Route-маршрутов: hot-reload, кластер, дашборд. Поверх Core + одного из провайдеров + Route + Identity (для логина в Blazor-UI).

  • redb.Doc.Web — сайт документации, написан на Blazor поверх того же стека: контент-объекты лежат как RedbObject<DocPageProps> в Postgres, навигация — через Tree-провайдер Core, поиск — через query-провайдер. То есть документация про redb работает на самом redb.

Зависимости честно однонаправленные: Route не знает про Tsak, Tsak не знает про Doc.Web, Core ничего не знает про провайдеров. Можно взять только Core + Postgres и писать своё приложение без Route и Tsak; можно взять Core + Postgres + Route и хостить маршруты в обычном IHostedService; можно собрать всю стопку целиком и получить Tsak + Identity + дашборд + сайт.

Эта статья — про один слой (Route + Tsak). Про Core, Identity и Doc.Web будут отдельные.


Что redb.Route НЕ делает

Честно — потому что иначе через две недели придут с issue.

Durable saga с DB-state machine. Это к MassTransit / NServiceBus / Wolverine. У redb.Route есть Saga() с компенсирующими шагами: forward + reverse, если что-то упало посередине — откатываемся в обратном порядке. Но самой state-machine (где «заказ в состоянии PaymentPending, ждём webhook от Stripe три дня, не теряем при рестарте процесса») — нет. Можно собрать руками через IdempotentConsumer + persistent backend в redb, но это не managed решение.

Managed transactional outbox container. В MassTransit/Wolverine есть отдельный фреймворк, который создаёт таблицу outbox_messages, пишет в неё в той же транзакции, что бизнес-данные, а в фоне публикует. У Route это собирается из примитивов: Sql.Poll(...) + IdempotentConsumer + .Transacted(). То есть outbox-паттерн работает, но автомат-инсталлятор таблицы и фоновый daemon — не предоставлены, пишутся как часть маршрута.

300+ компонентов, как у Camel. 22 транспорта закрывают большую часть реальных задач (Kafka, RabbitMQ, IBM MQ, MQTT, HTTP, gRPC, SQL, SFTP, S3, LDAP, Mail, Quartz), но если вам нужен, скажем, Salesforce-bulk-API-консьюмер или ServiceNow-REST-клиент — это всё ещё «напиши свой компонент через IConsumer/IProducer», или использовать generic HTTP/gRPC поверх их API.

XML-DSL и автодеплой как у Camel K. Только C#-fluent. Никаких маршрутов из YAML. На наш взгляд — плюс (intellisense, рефакторинг, типы), но кому-то декларативный YAML удобнее.


Free vs Pro

Никакой Pro-версии redb.Route нет. Apache 2.0 целиком: все 22 транспорта, все EIP-паттерны, expression engine, Transacted(), OpenTelemetry, retry/DLC, Tsak runtime container, cluster, hot-reload, dashboard.

(Pro-версия есть только у redb.Core — там оптимизации хранилища: compiled queries, parallel materialization, change tracking. Route про это вообще не знает — он работает поверх любого redb-провайдера, любой версии.)


Что в 3.0.0 свежего

Параллельно со статьёй «Free/Pro query parity» про redb.Core 3.0.0 у Route тоже вышла мажорка. Главное:

  • Один canonical compiler. Раньше внутри жил параллельный v2-стек (OldRouteCompilerIRouteDefinition2BlockStack). Удалили. Теперь AST маршрута — это дерево IProcessorDefinition нод, каждая компилирует себя через CreateProcessor(IRouteContext). Никаких bridge-классов, никакого «v2 DSL → bridge → legacy compiler» — та же модель, что у Camel внутри.

  • Dynamic endpoints (ToD, dynamic WireTap, dynamic Enrich). URI получателя вычисляется на каждое сообщение через string template, IExpression или Func<IExchange, string>. Это Camel’овский toD(...) один-в-один.

  • String-template DSL для SetBody / SetHeader / SetProperty / Log. ${header.x}${body.OrderId} — компилируются один раз при build, не интерпретируются.

  • OnException parity с Camel. Добавлены LogStackTrace(bool)LogExhausted(bool), чтобы fluent-набор совпадал с Camel’овским onException(...).

  • Reference-тесты на Choice / TryCatch / Filter (~1200 строк), которые пинят семантику и ловят регрессии.

Полный CHANGELOG: redb.Route/CHANGELOG.md.


Сравнение в одной таблице

Apache Camel

MassTransit

NServiceBus

Wolverine

redb.Route

Платформа

JVM

.NET

.NET

.NET

.NET 8/9/10

Лицензия

Apache 2.0

Коммерческая (v9, Massient)

Коммерческая

MIT

Apache 2.0

Фокус

EIP routing

Message bus + Saga

Message bus + Saga

Mediator + Saga

ESB / EIP routing

Транспорты

300+

5

7

4

22 + 5 встроенных

EIP-паттерны

80+

Saga, R/R, Outbox

Saga, R/R

Saga, R/R, Outbox

30+

Expression engine

Simple Language (интерпретируемый)

Compiled (System.Linq.Expressions)

Конфиг

XML или Java DSL

C# fluent

C# fluent

C# fluent

C# fluent only

Durable saga

Да

Да (DB-backed)

Да (DB-backed)

Да (Marten / EF Core)

Только компенсирующие шаги + persistent IdempotentConsumer

Managed outbox

Plugin

Да

Да

Да

Собирается из Sql.Poll(...).Transacted() + IdempotentConsumer

Publisher confirms / Kafka EOS

Да

Да

Да

Да

Да

Runtime container

Camel K / JBoss Fuse

redb.Tsak (hot-reload, cluster)

Кратко: у Camel — больше всего, но он JVM. У трёх .NET-альтернатив — отличная durable saga + managed outbox, но не ESB, и при этом две из трёх уже коммерческие (MassTransit v9 переехал в Massient, Inc., NServiceBus был платным изначально). Свободным остался только Wolverine на MIT — но это mediator+saga, без 20+ транспортов и без полного EIP-каталога. У redb.Route — все 30+ ходовых EIP, 22 транспорта, compiled expressions, transactional pipelines, и Tsak как runtime-контейнер с кластером. Apache 2.0 целиком. Если вам нужно «Apache Camel в .NET, и желательно без счёта на лицензию» — это redb.Route.


Ссылки

GitHub org (все репозитории) Репозиторий redb.Route Репозиторий redb.Tsak Готовые бинарники redb.Tsak (releases) Docker-образ redb-tsak-stack (GHCR) README.md redb.Route с полной документацией redb.Route.Demo — 39 маршрутов, 18 транспортов, runnable reference CHANGELOG 3.0.0 Документация и примеры (EN) NuGet — все 43 пакета redb.* (~20 800 загрузок) — Route, Core, провайдеры, 22 транспорта, Tsak Архитектура redb.Core — на чём всё это построено Предыдущая статья — про redb.Core / Free vs Pro

Если дочитали — спасибо. Комментарии, баги, EXPLAIN-планы — всё в GitHub Discussions.

ссылка на оригинал статьи https://habr.com/ru/articles/1042392/