Проблема
У вас не 5 микросервисов — у вас десятки. Бэкенд, который рос три года: монолит, расколотый на куски, GPS-фид от автопарка, мобильное приложение водителя, веб-кабинет диспетчера, интеграции с SAP / 1С / регуляторами / маркетплейсами, отдельный SMTP-воркер, отдельный PDF-генератор, отдельный шедулер ночных пересчётов. Между ними — Kafka (несколько кластеров, по топику на домен), RabbitMQ (RPC + pub/sub + DLQ), Redis (кэш, last-known-state, pub/sub-каналы), пара HTTP-эндпоинтов наружу, SFTP с поставщиком, SQL-polling outbox-таблицы старого монолита, MQTT с трекеров, IBM MQ для одного древнего банковского контура, SignalR-хабы для real-time-дашбордов. На каждом стыке — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик), своя бойлерплейт-обвязка из консьюмеров и try/catch.
Каждый из этих стыков живёт своей жизнью в Program.cs соответствующего сервиса. Каждый — это hand-rolled цикл:
// Где-то в OrdersService:var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();consumer.Subscribe("orders");while (!stoppingToken.IsCancellationRequested){ try { var msg = consumer.Consume(stoppingToken); var dto = JsonSerializer.Deserialize<OrderDto>(msg.Value); if (dto.Type != "new") continue; // filter var retries = 0; while (true) // retry { try { await PublishToRabbit(dto); break; } catch (Exception ex) when (retries++ < 3) { await Task.Delay(TimeSpan.FromSeconds(retries * retries)); } } consumer.Commit(msg); // ack } catch (ConsumeException ex) { _logger.LogError(ex, "kafka"); } catch (Exception ex) { _logger.LogError(ex, "send"); }}
И таких 80 строк — на каждое направление. У каждого — свой ретрай, свой DLQ (или нет DLQ), своя сериализация, свои метрики (или нет метрик). На code review это «ну да, чё». Через год на онбординге нового разработчика — «а где тут вообще что лежит».
На JVM эту задачу 20 лет назад решили Apache Camel и более общий Enterprise Integration Patterns (книжка Gregor Hohpe и Bobby Woolf, та самая «жёлтая»). Идея простая: маршрут описывается как pipeline From → Process → To, EIP-паттерны (Splitter, Aggregator, Content-Based Router, WireTap, Dead Letter Channel, Idempotent Consumer, Saga) — first-class элементы DSL. Транспортов — 300+: от Kafka до LDAP, от S3 до IBM MQ.
В .NET этого не было.
MassTransit, NServiceBus, Wolverine — отличные message bus‘ы, но это не одно и то же. У них 4–7 транспортов (Kafka, RabbitMQ, Azure SB, SQS, иногда SQL), фокус — durable saga + handler classes. EIP-каталог они закрывают на 3-4 паттерна (Saga, Request/Response, Outbox). А когда нужно «забери XML из SQL stored proc → распарси → положи в RabbitMQ → продублируй в SFTP-архив с retention 30 дней» — пишешь руками всю эту классическую кашу из консьюмеров, продюсеров и try/catch.
И отдельный поворот, который случился буквально сейчас: MassTransit v9 стал коммерческим. Проект переехал к новой компании Massient, Inc., на главной новой документации masstransit.massient.com кнопка «Get a License», в доках — «Configure the license key», Customer Support и Usage Telemetry. Дословно с их сайта: «Massient is the new company behind the commercial release of MassTransit v9.» То есть из «трёх .NET-альтернатив» две — платные (NServiceBus давно, MassTransit v9 теперь), остаётся Wolverine на MIT — и он, опять-таки, не ESB, а mediator + saga, без двух десятков транспортов и без EIP-каталога. Свободного Camel-уровня инструмента под .NET до сих пор не было.
Хотелось: описать маршрут как pipeline на C#, чтоб библиотека сама знала про Kafka, RabbitMQ, SQL, SFTP, HTTP, MQTT, retry, DLQ, transactional ack, и при этом был полный набор EIP-паттернов из канонической книжки.
Написали. Выложили под Apache 2.0.
Production case
Этот текст — не теория. redb.Route — это ESB-каркас уровня Apache Camel, и мы это утверждаем не на бумаге: 22 внешних транспорта + 5 встроенных компонентов, 30+ EIP-паттернов как first-class элементы DSL, compiled expression engine на System.Linq.Expressions, transactional pipelines с ITransactedAction (Kafka EOS, RabbitMQ publisher confirms + tx channels, IBM MQ syncpoint, AMQP 1.0, SQL via TransactionScope), persistent IdempotentConsumer, Saga с компенсирующими шагами, CircuitBreaker, Throttle, Aggregator, Resequencer, Scatter-Gather, RecipientList, DynamicRouter, Enrich/PollEnrich, Claim Check, OpenTelemetry на каждый шаг, runtime-контейнер с hot-reload и кластером без внешнего координатора. 351 проходящий тест, включая отдельные DSL reference-suites под Camel-семантику Choice/When/Otherwise, TryCatchFinally и Filter (~1500 строк только в спецификациях). Это не «фреймворк-обёртка над IHostedService», это полноценный ESB.
И он работает в проде. Первый — крупная логистическая компания (~150k заказов/мес, ~20k B2B-клиентов, собственный автопарк, 600+ городов). Внутренняя TMS — ~500 водителей + ~50 диспетчеров, 3-нодовый кластер (Xeon, 4 ядра / 8 ГБ / 50 ГБ SSD на ноду), ~3 месяца стабильной работы, 10–15% CPU под полной нагрузкой.
Через redb.Route туда заведены: SAP S/4 (через SQL polling stored procedure → XML → redb), Kafka (фид с GPS-трекеров), RabbitMQ (внутрисервисная шина событий), HTTP API для UI диспетчера, Меркурий / ЕГАИС / Честный ЗНАК / ФГИС Зерно (российские регуляторы), LDAP/AD для авторизации, cron-backup через redb.Export с ротацией. Один InitRoute-файл регистрирует 22 RouteBuilder-класса. Каждый — изолированный pipeline, со своими OnException-обработчиками, своим RouteId, своими метриками OpenTelemetry.
Второй прод — платформа доставки того же заказчика, десятки микросервисов в одном репозитории (~37 .NET-проектов, ~50+ RouteBuilder-классов). Модули общаются через RabbitMQ (RPC + pub/sub), внутренний event-bus, Kafka с Consumer Group для GPS-стрима с мобилок, Redis для last-known-location-кэша, RabbitMQ-DLQ под poison messages, и всё это завязано на единый ESB-каркас. То есть это не «5 микросервисов с REST API» — это десятки независимо деплоящихся модулей, у каждого свой IRouteContext, свой набор endpoint’ов, свой lifecycle. Сравнимая по сложности картинка на JVM была бы — Apache Camel + Karaf. У нас — redb.Route + Tsak.
И всё это — в трёх стендах: dev / test / prod, каждый — 3-нодовый кластер. То есть RouteBuilder-классы, которые вы видите дальше по статье, прямо сейчас крутятся в девяти инстансах Tsak параллельно: три ноды поднимают одинаковые контексты, redb-координатор раздаёт между ними кластерные маршруты (Kafka Consumer Group — балансировкой партиций, RabbitMQ — конкурентные consumer’ы на той же очереди, HTTP-фасады — за L4-балансером, cron — leader-election через redb-объекты, чтобы крон-задача отработала ровно один раз на кластер). Дев-стенд — для разработки и feature-веток, test — приёмочные сценарии и регресс перед релизом, prod — собственно нагрузка. Между стендами разъезжается тот же .tpkg-пакет: меняется только конфиг и строка подключения к БД. Cluster + hot-reload + три параллельных стенда — это и есть «production-ready», а не «у меня на ноуте работает».
Третий прод — аналитическая платформа (~672k объектов, ~8M свойств), там Route играет роль внутренней шины между модулями. Реальный код всех трёх — ниже.
И это не «локальные сборки из репозитория». Вся стопка 3.0.0 опубликована на nuget.org — 43 пакета под префиксом redb.*, суммарно ~20 800 загрузок на момент написания статьи. По убыванию: redb.Route (~1600), redb.Core (~1590), redb.Core.Pro (~1160), redb.Templates, redb.Postgres / redb.MSSql (~1000 каждый), дальше Pro-провайдеры, redb.CLI, redb.Export, и весь зоопарк redb.Route.* транспортов (Kafka, RabbitMQ, IBM MQ, MQTT, AMQP, Redis, gRPC, SignalR, WebSocket, TCP, SFTP, FTP, File, S3, Mail, LDAP, Quartz, Elasticsearch, AzureServiceBus, Firebase, Http, Sql) + redb.Tsak.* (Core, Core.Pro, Client, CLI, Contracts, Templates). То есть всё, про что эта статья — реально лежит, версионируется и подтягивается через dotnet add package.
Как выглядит маршрут
Чтобы не врать игрушечным From → Filter → To, сразу боевой shape. Это упрощённый набросок из лога-трекинговой системы: HTTP-фасад принимает массив GPS-точек, throttle на 200 rps, идемпотентность по MessageId, валидация JSON, ветвление по типу события, Multicast параллельно в три транспорта (Kafka на агрегацию, RabbitMQ на DAL-запись, Redis-кэш last-known-location фоном через WireTap), а наружу — JSON-ответ:
public class GpsHttpFacadeRoutes : RouteBuilder{ protected override void Configure() { OnException<JsonException>() .Handled() .SetHeader("HTTP_SC", Constant(400)) .To("direct://error-handler") .EndOnException(); From("http://0.0.0.0:5090/integration/gps?methods=POST") .RouteId("gps-http-facade") .Throttle(200).Per(TimeSpan.FromSeconds(1)) // не больше 200 rps .IdempotentConsumer(Header("MessageId"), repository: "redb") // дубли отсекаем .Process(ValidateGpsBatch) // JSON → List<GpsPoint> .Choice() .When(Header("eventType").IsEqualTo("location")) .Multicast().ParallelProcessing() .To("kafka:mobile.trips.location.sync?key=${header.tripId}") .To("rabbitmq:?exchange=lt.dal&routingKey=dal.trips.location.sync") .WireTap("redis:set:gps:last:location:${header.tripId}?ttl=86400") .EndMulticast() .When(Header("eventType").IsEqualTo("checkin")) .To("rabbitmq:?exchange=lt.dal&routingKey=dal.gps.checkin_checkout") .Otherwise() .Log(LogLevel.Warning, "unknown eventType: ${header.eventType}") .EndChoice() .SetBody(Constant("""{ "status": "accepted" }""")) .SetHeader("Content-Type", Constant("application/json")); }}
Этот один класс заменяет: HTTP-контроллер, middleware-throttle, ручную идемпотентность через Redis, ручной Kafka-продюсер, ручную RabbitMQ-обвязку, try/catch поверх десериализации и логику дёргания Redis из background-сервиса. Всё, что внутри — first-class элементы DSL: Throttle, IdempotentConsumer, Choice, Multicast, WireTap, OnException<T>. Каждый из них — отдельный EIP-паттерн с собственными метриками OpenTelemetry.
Кстати, именно такой Route-фасад в одном из продов сейчас замещает WSO2 Micro Integrator, который до этого играл роль HTTP→Kafka/RabbitMQ-шлюза. WSO2 даёт ту же машинерию через XML-конфиги и Synapse-DSL — но когда вокруг уже C#-стек, держать отдельную JVM ради
<switch>/<foreach parallel-execution="true">/<endpoint uri="rabbitmq:/?...">— лишнее звено. Route даёт те же EIP, но в одном процессе, на одном языке, с C#-типизацией и общимиIServiceProvider-зависимостями.
Регистрация:
// Program.csbuilder.Services.AddRedbRoute(route => route.AddRouteBuilder<GpsHttpFacadeRoutes>());builder.Services.AddRedbRouteHttp();builder.Services.AddRedbRouteKafka();builder.Services.AddRedbRouteRabbitMQ();builder.Services.AddRedbRouteRedis();
Маршрут стартует вместе с приложением, останавливается на shutdown с graceful drain, метрики OpenTelemetry — по каждому шагу из коробки (route.gps-http-facade.duration, multicast.to[1].failures, idempotent.duplicates, throttle.rejected).
Полный код такого аггрегатора (только с Kafka на входе, без HTTP) — ниже, в разделе «Второй прод — RabbitMQ как RPC и pub/sub шина». А кому нужен
hello worldна 15 строк — он вredb.Route.Demo.
Чего хотел Camel — и что из этого есть
EIP-паттернов из канонической книжки в Camel — 80+. Мы закрыли 30+ — самые ходовые. Все они — first-class элементы DSL:
// Content-Based Router.Choice() .When(Header("priority").isEqualTo("high")).To("direct://fast-lane") .When(Header("priority").isEqualTo("low")).To("seda://batch") .Otherwise().To("direct://standard").EndChoice()// Splitter — каждое вложение обрабатываем отдельно.Split(e => e.In.GetBody<Order>().Items) .Process(async (e, ct) => await ProcessItemAsync(e, ct)).EndSplit()// Aggregator — собираем по correlationId.Aggregate(Header("orderId"), strategy: new CompletionAggregator(), completionSize: 10, completionTimeout: TimeSpan.FromSeconds(30))// WireTap — отправить копию в audit без блокировки основного потока.WireTap("seda://audit")// Recipient List — динамический fan-out.RecipientList(e => ResolveDestinations(e))// Idempotent Consumer — дедупликация по ключу (persistent backend).IdempotentConsumer(Header("messageId"), repository: redbRepository)
И ещё: Multicast, Dynamic Router, Resequencer, Scatter-Gather, Claim Check, Throttle, Delay, Loop, Enrich, PollEnrich, Saga (без durable state — компенсирующие шаги), Circuit Breaker, Validate, Marshal/Unmarshal, Transacted, Process. И стандартные обвязки — Retry, DeadLetterChannel, TryCatch/DoCatch/DoFinally, OnException.
Compiled expression engine
В Camel есть Simple Language — ${header.priority}, ${body.items.size()} и так далее. У него один минус: он интерпретируется в рантайме. На горячем пути это заметно.
У redb.Route есть то же самое, но компилируемое. ${header.priority}, ${body.OrderId}, арифметика, JSONPath, XPath — всё это парсится один раз при Build() маршрута, превращается в Expression<Func<IExchange, T>> через System.Linq.Expressions, компилируется в IL и кэшируется. На горячем пути — обычный делегат.
// Предикаты в Choice.When / Filter — три формы:.When(e => e.In.GetHeader<string>("priority") == "high") // 1. lambda.When(Header("priority").isEqualTo("high")) // 2. typed builder.When("header.priority == 'high'") // 3. string expression// String templates в SetBody / SetHeader / Log:.SetHeader("reply", "${header.orderId}-confirmed").SetBody("Order ${body.Id}: total=${body.Total} for ${header.customerName}").Log("Processed ${header.orderId} in ${header.elapsed}ms")
Можно смешивать. Internally — один и тот же AST с одним и тем же компилятором.
Транспорты
22 внешних + 5 встроенных:
|
Категория |
Транспорты |
|---|---|
|
Очереди / шины |
Kafka, RabbitMQ, IBM MQ, MQTT (5.0), AMQP 1.0, Azure Service Bus, Redis |
|
HTTP / RPC |
HTTP (in/out), WebSocket, SignalR, gRPC, TCP |
|
Файлы / хранилища |
SFTP, FTP, File, S3, Firebase (Firestore + Cloud Storage + FCM), GenericFile (base) |
|
Базы данных |
SQL (polling outbox-style) |
|
Корпоративные |
LDAP / Active Directory, Mail (SMTP/IMAP/POP3), Elasticsearch 8.x |
|
Планировщик |
Quartz, Cron |
|
Встроенные |
Direct, SEDA, Timer, Mock, Log |
Каждый — отдельный NuGet-пакет (redb.Route.Kafka, redb.Route.RabbitMQ и т.д.). Подключаются явно через AddRedbRoute*(). У большинства — fluent builder сверху URI-строки:
// URI formFrom("kafka://orders?groupId=svc&brokers=broker1:9092,broker2:9092&autoOffsetReset=earliest");// Type-safe builder — то же самоеFrom(Kafka.Topic("orders") .Brokers("broker1:9092", "broker2:9092") .GroupId("svc") .AutoOffsetReset(AutoOffsetReset.Earliest) .Acks("All"));
Builder лучше тем, что (а) intellisense, (б) опечатки ловятся компилятором, (в) refactor → rename работает.
Обработка ошибок — четыре уровня
Самое больное в hand-rolled-интеграциях. У Route — четыре уровня, которые комбинируются:
1. Per-step Retry — локальный retry вокруг одного шага. Когда внешний сервис флакающий, но в целом живой:
.Retry(maxRetries: 5, initialDelay: TimeSpan.FromSeconds(1)).To("http://flaky-service/submit")
2. DeadLetterChannel — route-level. Любая необработанная ошибка после retry уходит в указанный sink. Exception сохраняется на exchange, можно прочитать в DLC-маршруте:
From("kafka://orders") .DeadLetterChannel("seda://dlq") .Process(...) .To("sql://orders");From("seda://dlq") .Log("DLQ: ${exception.message}") .Choice() .When(e => e.GetException() is TimeoutException).To("seda://retry-later") .Otherwise().To("sftp://archive/failed/") .EndChoice();
3. DoTry / DoCatch / DoFinally — scoped, как обычный try/catch внутри маршрута:
.DoTry() .To("http://external-api/submit") .Process(async (e, ct) => await PostProcess(e, ct)).DoCatch<HttpRequestException>() .Log("HTTP failure: ${exception.message}") .To("seda://retry-queue").DoCatch<TimeoutException>() .Log("Timeout, archiving") .To("sftp://archive/timeouts/").DoFinally() .Log("Attempt complete (success or failure)").End()
4. OnException<T> — на уровне всего контекста. Глобальные обработчики: регистрируются в IRouteContext и применяются ко всем маршрутам контекста, а не только к From(...) из этого класса. Объявил один раз в любом RouteBuilder — ловит исключения по всем From(...) всех RouteBuilder-ов этого модуля:
public class OrderRoutes : RouteBuilder{ protected override void Configure() { OnException<HttpRequestException>() .MaximumRedeliveries(5) .UseExponentialBackOff() .BackOffMultiplier(2.0) .Handled() // exchange продолжает идти как ни в чём .To("seda://http-failures") .EndOnException(); OnException<DbException>() .MaximumRedeliveries(2) .UseOriginalMessage() // восстановить тело до обработки .To("seda://db-failures") .EndOnException(); // Эти обработчики ловят исключения не только тут, а во всех // RouteBuilder-ах модуля — они живут на уровне контекста: From("kafka://orders").To("http://payments-svc/charge"); From("kafka://shipments").To("http://logistics-svc/dispatch"); }}
.Handled(), .Continued(), .UseExponentialBackOff(), .UseOriginalMessage() — стандартные крутилки Camel’а, перенесённые один-в-один.
Transactional routes — что под капотом
.Transacted() — это не просто метка «оберни в TransactionScope». Это контракт с транспортом: при успехе всего pipeline сделай commit/ack, при failure — rollback/nack.
From("kafka://orders?groupId=svc&brokers=...") .Transacted() // открывается транзакция .ProcessWithRedb(async (redb, e, ct) => await SaveOrderAsync(redb, e, ct)) .To("rabbitmq://orders-confirmed"); // подтверждается в RabbitMQ // commit Kafka offset + commit RabbitMQ + commit redb — атомарно
Что реально происходит:
-
Kafka-транспорт включает transactional producer (
enable.idempotence=true,transactional.id), консьюмер коммитит offset черезproducer.SendOffsetsToTransaction(...)в той же транзакции. Это Kafka EOS — exactly-once между partition’ами. -
RabbitMQ-транспорт включает publisher confirms (
MaxOutstandingConfirms), плюс при.Transacted()— transacted channels (tx-select / tx-commit / tx-rollback). -
IBM MQ, AMQP 1.0 — нативные транзакции через
MQGMO_SYNCPOINT/local transactions. -
SQL-транспорт биндит
IDbTransactionк каждому шагу черезTransactionScope(распределённую — для SQL Server, локальную — для PostgreSQL и MySQL).
Под капотом — интерфейс ITransactedAction, который каждый транспорт реализует по-своему. С точки зрения вашего кода — одна строчка .Transacted().
Реальный код из прода
Эти куски выдрал из боевого репозитория той самой внутренней TMS. Имена методов и переменных как есть.
InitRoute — точка входа модуля Tsak. Регистрируются компоненты (транспорты), redb-схемы, словари, и 11 RouteBuilder-классов:
public static class InitRoute{ public static IRouteContext main(IRouteContext context) { // 1. Транспорты модуля context.AddComponent(new SqlComponent()); context.AddComponent(new HttpComponent { ServerManager = new SharedHttpServerManager() }); context.AddComponent(new CronComponent()); // 2. Named SQL data source — connection string в DI, не в URI маршрута DbProviderFactories.RegisterFactory("Microsoft.Data.SqlClient", SqlClientFactory.Instance); context.AddToRegistry("sap-s4", (ISqlConnectionFactory)new SqlConnectionFactory( new SqlConnectionOptions { ConnectionString = sapConn, ProviderName = "Microsoft.Data.SqlClient" })); // 3. Синхронизация redb-схем — code-first, без миграций var redb = context.GetRedbService(); redb.InitializeAsync(ensureCreated: true).GetAwaiter().GetResult(); redb.SyncSchemeAsync<Driver>().GetAwaiter().GetResult(); redb.SyncSchemeAsync<Vehicle>().GetAwaiter().GetResult(); redb.SyncSchemeAsync<TransportationRoute>().GetAwaiter().GetResult(); redb.SyncSchemeAsync<TransportationPoint>().GetAwaiter().GetResult(); // ... ещё 8 схем EnsureTsUmLists(redb); await RefDataCache.RefreshAsync(redb); // 4. Маршруты var rc = (RouteContext)context; rc.AddRoutes(new TsumExceptionRouteBuilder()); // global OnException rc.AddRoutes(new TsumRouteBuilder()); // SQL polling от SAP S/4 rc.AddRoutes(new TsumPlacementRouteBuilder()); // расстановка машин rc.AddRoutes(new TsumTransportStatusRouteBuilder()); rc.AddRoutes(new TsumShippingPointRouteBuilder()); rc.AddRoutes(new TsumBackupRouteBuilder()); // daily cron backup rc.AddRoutes(new TsumCleanupRouteBuilder()); rc.AddRoutes(new TsumSliceJobRouteBuilder()); // ... ещё 3 return context; }}
SQL polling из SAP S/4 stored procedure → XML → redb. Один из ключевых ETL-маршрутов:
private void ConfigureSqlConsumer(){ var procs = Context!.GetProperty<IDictionary<string, object?>>("Procedures")!; var proc = (string)procs["MonitoringReport"]!; // имя процедуры — из конфига, по Mode (prod/test) var builder = Sql.Poll($"EXEC {proc} @DateFrom, @DateTo, @OutputFormat") .DataSource(Constant("sap-s4")) .CommandTimeout(300) .Delay(60000) // опрос раз в минуту .OutputType(SqlOutputType.Scalar) // FOR XML возвращает одно значение .Param("DateFrom", (object?)null) .Param("DateTo", (object?)null) .Param("OutputFormat", "XML"); From(builder) .RouteId("tsum-sql-consumer") .Process(e => e.In.ContentType = "application/xml") .Process(DeserializeXml) // XmlSerializer → List<TransportationOrder> .To("direct://tsum");}private void ConfigureHttpConsumer(){ // Тот же конечный обработчик, но фронт для тестов руками From("http:0.0.0.0:5089/api/tsum/monitoring?inOut=true") .RouteId("tsum-http-consumer") .ConvertBody<string>() .Process(DeserializeXml) .To("direct://tsum");}private void ConfigureProcessing(){ // Единая точка обработки — что из SQL, что из HTTP From("direct://tsum") .RouteId("tsum-processing") .ProcessWithRedb(async (redb, exchange, ct) => { var orders = exchange.In.GetBody<List<TransportationOrder>>(); if (orders is null || orders.Count == 0) return; // Синхронизация справочников водителей/машин/точек var dicts = await DictionarySyncService.SyncFromOrdersAsync(redb, orders, ct); if (dicts.AnyChanged) await RefDataCache.RefreshAsync(redb); // Дедупликация существующих маршрутов var codes = orders.Select(o => o.Code).ToList(); var existing = await redb.Query<TransportationRoute>() .WhereRedb(o => codes.Contains(o.ValueString!)) .ToListAsync(); // ... ещё ~200 строк бизнес-логики });}
Заметная штука: один и тот же обработчик (direct://tsum) обслуживает и SQL-источник, и HTTP-эндпоинт для ручных тестов. Это и есть тот самый Camel’овский трюк — direct: как in-process invariant’ный канал, к которому можно прицепить любой To(...).
HTTP API на порту 5090 с авторизацией через JWT + LDAP. Один из endpoint’ов:
private void ConfigureRoutesEndpoint(){ From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true&corsOrigins=*") .RouteId("tsum-api-routes") .Process(Auth.ProcessAsync) // валидация JWT, наполнение headers .Process(TsumAuthProcessor.RequirePermission(TsumPermission.ViewAll)) .ConvertBody<string>() .ProcessWithRedb(async (redb, exchange, ct) => { var bodyStr = exchange.In.Body?.ToString() ?? "{}"; var filter = JsonSerializer.Deserialize<RouteFilterRequest>(bodyStr, JsonOptions); // GET-параметры тоже принимаем — для отладки в браузере string? QP(string name) => exchange.In.Headers.TryGetValue($"redbHttp.QueryParam.{name}", out var v) ? v?.ToString() : null; if (QP("startPlanFrom") is { } qpFrom && DateTimeOffset.TryParse(qpFrom, out var from)) filter.StartPlanFrom ??= from; // ... ещё несколько query-параметров var query = redb.Query<TransportationRoute>(); // Динамическая фильтрация через redb LINQ if (!string.IsNullOrEmpty(filter.CodeSearch)) query = query.WhereRedb(o => o.ValueString!.Contains(filter.CodeSearch)); var spItems = await ResolveIds(filter.ShippingPointIds); if (spItems.Count > 0) query = query.Where(r => spItems.Contains(r.ShippingPoint)); // ... ещё ~10 фильтров и пагинация var items = await query.Skip(offset).Take(limit).ToListAsync(); JsonRouteHelper.SetJsonBody(exchange, new { total, offset, limit, items }); });}
HTTP-транспорт здесь — это redb.Route.Http с собственным Kestrel-консьюмером. inOut=true — request/response режим, ответ возвращается в Body exchange’а. Заголовки redbHttp.QueryParam.* парсятся транспортом автоматически.
Cron backup всей БД через redb.Export, с ротацией. Без сторонних шедулеров:
protected override void Configure(){ var backupConfig = Context!.GetProperty<IDictionary<string, object?>>("Backup"); var directory = (string)backupConfig["Directory"]! ?? "backups"; var retentionDays = int.Parse(backupConfig["RetentionDays"]?.ToString() ?? "7"); Context.SetProperty("_backup.directory", directory); Context.SetProperty("_backup.retentionDays", retentionDays); From("cron://tsum-backup?schedule=0 0 3 * * ?") // каждый день в 03:00 .RouteId("tsum-backup-cron") .ProcessWithRedb(RunBackupAsync);}private async Task RunBackupAsync(IRedbService redb, IExchange exchange, CancellationToken ct){ var pgConn = redb.Configuration.ConnectionString!; var directory = Context!.GetProperty<string>("_backup.directory")!; Directory.CreateDirectory(directory); var fileName = $"tsum_backup_{DateTime.UtcNow:yyyy-MM-dd_HHmmss}.redb"; var filePath = Path.Combine(directory, fileName); var provider = ProviderFactory.Create("postgres"); await provider.OpenAsync(pgConn, ct); var exportService = new ExportService(provider, verbose: false, batchSize: 10000); await exportService.ExportAsync(filePath, schemeIds: null, compress: true, dryRun: false, ct); RotateBackups(directory, retentionDays);}
Транспорт cron:// — это redb.Route.Quartz с Quartz.NET под капотом. Cron-expression стандартный, Quartz-овский.
Если посмотреть на эти три фрагмента — там нет ни одной строки про инициализацию консьюмеров, про работу с IServiceProvider, про lifecycle, про graceful shutdown. Всё это сидит внутри Route. Бизнес-код — это Process(...) и ProcessWithRedb(...).
Второй прод — RabbitMQ как RPC и pub/sub шина
Те куски выше про TMS — это SQL + HTTP + cron. Чтобы не складывалось впечатление «redb.Route только под polling-задачи», вот фрагменты из второй продовой системы того же заказчика — платформы доставки на ~150k заказов/мес. Модули общаются друг с другом через RabbitMQ: одни маршруты — синхронный RPC поверх очередей с CorrelationId, другие — fire-and-forget pub/sub. Старая версия кода была на snake-case-форке нашего DSL; сейчас всё работает на актуальном 3.0 — привожу к каноническому API.
RPC через RabbitMQ — запрос/ответ с CorrelationId. Модуль ControlPanel пишет в очередь dal.controlpanel.trips.get.tracking.link, ждёт ответ с тем же CorrelationId. На стороне DAL:
public class TripLinkRouteBuilder : RouteBuilder{ protected override void Configure() { From(Rabbit.Queue("dal.controlpanel.trips.get.tracking.link") .ConnectionFactory("RabbitMQConnectionFactory")) .RouteId("controlpanel_getTrackingLink") .SetHeader("routeId", "controlpanel_getTrackingLink") // Валидация: без CorrelationId роутить нечего .Filter(Header("CorrelationId").IsNull()) .DoThrow<ArgumentNullException>( "CorrelationId is required. RouteId='controlpanel_getTrackingLink'") .EndFilter() .Log("GetTrackingLink DAL STARTED: CorrelationId=${header.CorrelationId}") .InvokeController() // → TripController.GetTrackingLink .DbSaveChanges() // commit EF Core changes .ApiPackResponse( // упаковка в стандартный envelope toData: Property("data"), toMeta: Property("meta"), code_dal: 200, packJwt: true) .WireTap("direct://delivery.info.send") // асинхронная нотификация — fire-and-forget .Log("GetTrackingLink DAL COMPLETED: CorrelationId=${header.CorrelationId}") .Respond(); // ответ на reply-to queue с тем же correlationId }}
Rabbit.Queue(...).ConnectionFactory(...) использует named-фабрику из DI-registry (как named SQL data source в первом проде — секреты не торчат в URI маршрута). .Respond() — это RabbitMQ RPC-pattern: транспорт сам читает reply-to и correlation-id из заголовков входящего сообщения и публикует тело exchange’а обратно. Никакого ручного BasicProperties.ReplyTo / BasicProperties.CorrelationId в коде.
Глобальный OnException на весь RouteBuilder — стандартный envelope ошибки + 500:
protected override void Configure(){ if (!HasExceptionRoute<Exception>()) OnException<Exception>() .Log("Exception: ${exception.message}", LogLevel.Error) .SetHeader("code_dal", (int)HttpStatusCode.InternalServerError) .SetBody(e => new BaseErrorResponse { traceId = e.In.GetHeader<string>("CorrelationId") ?? "no-correlation", code = (int)HttpStatusCode.InternalServerError, message = e.Exception?.Message ?? "Internal server error.", description = e.Exception?.ToString() }.ToJson()) .ExceptionHandled() .Stop() .EndOnException(); // ConfigureGetTrackingLinkRoute() ... остальные routes выше}
Pub/sub через RabbitMQ — email-нотификации. Любой модуль может бросить exchange’у lt.dal с routing key send.email — отдельный SMTP-воркер (отдельный процесс, отдельный модуль Tsak) подберёт и отправит. Локальная точка входа — direct://send-trip-email-notification, в неё пишут все RouteBuilder’ы, которым нужно уведомить:
public class EmailNotificationRouteBuilder : RouteBuilder{ protected override void Configure() { From("direct://send-trip-email-notification") .RouteId("dal_sendTripEmailNotification") .Log(LogLevel.Debug, "SendTripEmailNotification: tripId=${property.tripId}, type=${property.emailType}") .Process((e, ct) => { // Изолированный scope под scope'd DbContext (route отрабатывает в WireTap-таске) using var scope = Context.GetServiceProvider()!.CreateScope(); var db = scope.ServiceProvider.GetRequiredService<LtContext>(); var logger = Context.GetService<ILogger>(); var cfg = Context.GetProperty<IDictionary<string, object?>>("EmailNotifications"); if (!IsEnabled(cfg)) { e.In.SetHeader("code_dal", 204); return; } var tripId = e.GetProperty<long>("tripId"); var trip = db.Trips.AsNoTracking() .Include(t => t.Driver).ThenInclude(d => d!.IdNavigation).ThenInclude(a => a.Branch) .Include(t => t.Trippoints).ThenInclude(tp => tp.Location) .FirstOrDefault(t => t.Id == tripId); if (trip?.Driver?.IdNavigation?.Branch is not { } branch) { e.In.SetHeader("code_dal", 404); return; } // Адресатов берём из настроек филиала — список email'ов в одной из двух колонок var emailType = e.GetProperty<string>("emailType"); var recipients = EmailHelper.ParseAndValidateEmailList( emailType == "trip-point-change" ? branch.Trippointorderchangeemaillist : branch.Tripcompletionemaillist); if (recipients.Count == 0) { e.In.SetHeader("code_dal", 204); return; } var (subject, html) = BuildEmailContent(trip, emailType, cfg, e); e.In.SetHeader("emailTo", string.Join(",", recipients)); e.In.SetHeader("emailSubject", subject); e.In.SetHeader("emailContentType", "text/html"); e.In.SetBody(new { content = html }.ToJson()); e.In.SetHeader("code_dal", 200); }) // Только если письмо реально готово к отправке — пушим в очередь .Filter(Header("code_dal").IsEqualTo(200)) .To(Rabbit.Exchange("lt.dal", type: "topic") .RoutingKey("send.email") .ConnectionFactory("RabbitMQConnectionFactory")) .Log("Email sent to RabbitMQ exchange lt.dal/send.email") .EndFilter(); }}
Дальше эту direct-точку дёргают через WireTap из любого маршрута, который меняет поездку:
// Где-то в TripRouteBuilder, после успешного UPDATE.SetProperty("tripId", Header("trip-id")).SetProperty("emailType", Constant("trip-point-change")).SetProperty("changeDescription", e => "Order added on point #" + e.In.GetHeader<int>("point-seq")).WireTap("direct://send-trip-email-notification")
WireTap копирует exchange и пушит копию в указанный endpoint асинхронно, не блокируя основной поток. Получатель — direct://... в этом же процессе, дальше — RabbitMQ. Если SMTP-воркер лежит — письма копятся в очереди, основной HTTP API продолжает отвечать.
Kafka — GPS-координаты с мобилок. Самый нагруженный маршрут той же системы. Водители (~500 одновременно) льют батчи координат через мобильное API → топик mobile.trips.location.sync. На каждой ноде кластера сидит один consumer; Kafka Consumer Group сам раздаёт партиции между нодами (3 партиции → 3 ноды, по одной на каждую). Дальше — группировка по tripId, последняя точка каждого tripId асинхронно уходит в Redis (last GPS location для веб-дашборда), полный батч рассылается по RabbitMQ-очередям DAL-модулей (одна — на запись в БД, вторая — на детекцию check-in/check-out, третья — на пересчёт метрик):
public class GpsKafkaBatchAggregatorBuilder : RouteBuilder{ protected override void Configure() { var cfg = Context.GetSection(GetType().Name); // Один consumer на ноду. Партиции (0..N) распределит Consumer Group. // partition= НЕ указываем — иначе сломаем балансировку. From(Kafka.Topic(cfg.GetValue("kafka.Topic", "mobile.trips.location.sync")) .Brokers(cfg.GetValue("kafka.Brokers", "localhost:9092")) .GroupId(cfg.GetValue("kafka.GroupId", "gps-batch-aggregator")) .MaxPollRecords(cfg.GetValue("kafka.MaxPollRecords", 100)) .AutoOffsetReset("earliest")) .RouteId("GpsKafkaBatchAggregator") .Transacted() // Kafka EOS: commit offset в одной tx с downstream .Log("incoming batch: kafka.batch.size=${header.kafka.batch.size}") .To("direct://gps-batch-grouper"); From("direct://gps-batch-grouper") .RouteId("GpsBatchGrouper") .Process(new GpsBatchGrouperProcessor(outputType: GroupingOutputType.Dictionary)) .Log("trips=${header.gps.trips.count}, points=${header.gps.total.points}") // Извлекаем последнюю координату по каждому tripId → property для Redis .Process((e, ct) => { var grouped = e.In.GetBody<Dictionary<long, List<LocationHistory>>>(); var last = grouped.ToDictionary( kvp => kvp.Key.ToString(), kvp => new GpsLocationDto( ts: kvp.Value.Max(l => l.ts).ToString("o"), lat: kvp.Value.OrderByDescending(l => l.ts).First().lat, lon: kvp.Value.OrderByDescending(l => l.ts).First().lon).ToJson()); e.SetProperty("gpsLastLocationMap", last); }) // Fire-and-forget в Redis — основной pipeline ждать не должен .WireTap("direct://save-gps-last-location") // Раздача батча по трём очередям DAL .Multicast().ParallelProcessing() .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.trips.location.sync")) .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.gps.checkin_checkout")) .To(Rabbit.Exchange("lt.dal").RoutingKey("dal.metrics.calculate")) .EndMulticast(); // Запись last-location в Redis (TTL 24 часа). Ключ: gps:last:location:{tripId} From("direct://save-gps-last-location") .RouteId("SaveGpsLastLocation") .Split(Property("gpsLastLocationMap")) .SetHeader("tripId", e => ((KeyValuePair<string, string>)e.In.GetBody()).Key) .SetBody(e => ((KeyValuePair<string, string>)e.In.GetBody()).Value) .To("redis:set:gps:last:location:${header.tripId}?ttl=86400") .EndSplit(); }}
Соседний consumer (mobile.trips.unplanned.stops) делает детекцию незапланированных остановок — там интересен локальный OnException. JSON, который не парсится — это не транзиентная ошибка, retry не поможет. Маршрут отправляет сообщение в RabbitMQ-DLQ с метаданными (partition, offset, preview оригинального body) и пробрасывает исключение наверх — consumer падает, Kafka откатывает offset, поднимается заново, и проблемная партиция уйдёт в DLQ при следующей попытке. Это правильный pattern для poison message:
From(Kafka.Topic("mobile.trips.unplanned.stops").GroupId("unplanned-stops-processor") .Brokers(brokers).AutoOffsetReset("latest")) .RouteId("UnplannedStop-Kafka") .Transacted() .To("direct://unplanned-stop-grouper");From("direct://unplanned-stop-grouper") .RouteId("UnplannedStop-Grouper") .OnException<JsonException>() .MaximumRedeliveries(0) // JSON не станет валидным от повтора .Handled(false) // консьюмер ДОЛЖЕН упасть, чтоб Kafka не закоммитила offset .Process(e => { e.In.SetHeader("dlq.errorType", "JsonException"); e.In.SetHeader("dlq.errorMessage", e.Exception?.Message); e.In.SetHeader("dlq.partition", e.In.GetHeader<int>("kafka.partition")); e.In.SetHeader("dlq.offset", e.In.GetHeader<long>("kafka.offset")); e.In.SetHeader("dlq.rawBodyPreview", Truncate(e.GetProperty<string>("OriginalKafkaBody") ?? e.In.GetBody<string>(), 500)); }) .To(Rabbit.Exchange("lt.dlq").RoutingKey("dal.dlq.gps.json")) .Log(LogLevel.Error, "JSON failed → DLQ: partition=${header.kafka.partition}, offset=${header.kafka.offset}") .EndOnException() .Process(new UnplannedStopDetectorProcessor(cacheManager, lookbackMinutes: 60)) .To("direct://persist-unplanned-stops");
Несколько штук, которые видно только на реальном проде:
-
partition=в Kafka URI указывать нельзя. Чуть выше прокомментировано в коде — это типичная грабля. Если указать — Consumer Group ломается, все ноды конкурируют за одну партицию. -
.Transacted()+.Multicast()— батч уходит во все три очереди одновременно (ParallelProcessing), но Kafka offset коммитится только если все.To(...)отработали. Если RabbitMQ временно недоступен — батч приедет повторно, без потерь. -
WireTapдля Redis — last-location нужен на дашборде «здесь и сейчас», но если Redis лежит, основной pipeline на запись в БД не должен ждать. WireTap копирует exchange в отдельный поток. -
DLQ с
dlq.rawBodyPreview— когда инженер открывает DLQ-очередь, ему нужны не «что-то сломалось», а partition + offset + первые 500 символов исходного body. С этим можно прийти к мобильному разрабу и сказать «вот это вы шлёте».
Что важно увидеть на этом примере:
-
Один и тот же
Rabbit.Queue(...)/Rabbit.Exchange(...)builder покрывает и RPC (с.Respond()), и pub/sub (с.To(...)) — разница только в форме хвоста маршрута. -
InvokeController()/DbSaveChanges()/ApiPackResponse(...)— это не часть базового Route, это шаги-расширения, которые сделали внутри проекта (lt.Core.Route). Любой шаг — этоIProcessor, регистрируется вIRouteContext, доступен через extension method. То есть DSL расширяемый: команде нужны свои first-class steps под «вызови контроллер → сохрани EF Core → упакуй ответ» — пишут их один раз, используют во всех маршрутах. -
WireTap+direct://— стандартный Camel-овский паттерн для fire-and-forget’а внутри процесса. НикакихTask.Run, никакихIHostedService-очередей под капотом — это всё в Route.
Если хочется потрогать руками — redb.Route.Demo
В репозитории лежит модуль redb.Route.Demo — 39 маршрутов в 9 секциях, 18 транспортов, всё в одном проекте, всё запускается через dotnet run. По сути — единый reference implementation, в котором есть каждая фича фреймворка: RPC через RabbitMQ/AMQP 1.0/gRPC/IBM MQ, WireTap в Kafka и файл, SQL+TransactionScope, Redis Pub/Sub, TCP echo, WebSocket push, MQTT, SEDA, DirectVM cross-context, Timer/Cron, CircuitBreaker, Retry с backoff, DeadLetterChannel, Aggregator, Multicast, RecipientList, DynamicRouter, Loop, Resequencer, Enrich, IdempotentConsumer, Throttle, JSON Schema validation, Traced+Metered, lifecycle listeners, named IRedbService.
Заходной маршрут — POST /api/demo. Дальше идёт типовой ESB-pipeline:
HTTP → Throttle(10/s) → IdempotentConsumer → JSON Schema validation → Choice(mode) → Multicast(parallel) ├─ RabbitMQ RPC → stamp.rabbit ├─ AMQP 1.0 RPC → stamp.amqp ├─ gRPC RPC → stamp.grpc └─ IBM MQ RPC → stamp.wmq → BeginTransaction → SQL INSERT + SELECT → CommitTransaction → WireTap fan-out → Kafka + File + Redis + MQTT + SEDA → JSON response
Это ровно то, что в проде «склеивает» Camel у джавистов — только здесь это runnable demo на 1500 строк C#, который запускается одной командой против docker compose с RabbitMQ/Postgres/Kafka/Redis/MQTT.
Самая показательная секция для тех, кто думает «а как Route дружит с redb.Core» — NamedRedbRoutes.cs. Из маршрута поднимается named IRedbService (можно держать несколько подключений к разным БД одновременно — pg-test, mssql-test), CRUD идёт напрямую через ProcessWithRedb("instance-name", async (redb, ex, ct) => ...):
internal sealed class NamedRedbRoutes : RouteBuilder{ protected override void Configure() { From("timer://named-redb-check?period=30000&delay=5000") .RouteId("demo-named-redb-pgsql") .Log("[NAMED-REDB] checking pg-test instance...") // Метаданные подключения — версия БД, имя cache-domain .ProcessWithRedb("pg-test", (redb, ex) => { ex.In.Headers["pg-db-type"] = redb.dbType ?? "n/a"; ex.In.Headers["pg-db-version"] = redb.dbVersion ?? "n/a"; ex.In.Headers["pg-cache-domain"] = redb.CacheDomain ?? "n/a"; }) .Log("[NAMED-REDB] pg-test: ${header.pg-db-type} ${header.pg-db-version}") // SAVE — RedbObject<TProps> с автогенерацией схемы по [RedbScheme] .ProcessWithRedb("pg-test", async (redb, ex, ct) => { var item = new RedbObject<DemoItemProps> { name = $"Demo-{DateTime.UtcNow:HHmmss}", Props = new DemoItemProps { Title = "Named Redb Demo", Description = $"Created at {DateTime.UtcNow:O}", Priority = Random.Shared.Next(1, 10) } }; var id = await redb.SaveAsync(item); ex.In.Headers["saved-id"] = id.ToString(); }) // LOAD by id .ProcessWithRedb("pg-test", async (redb, ex, ct) => { var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!); var obj = await redb.LoadAsync<DemoItemProps>(id); ex.In.Headers["loaded"] = obj is null ? "NOT FOUND" : $"{obj.name} (priority={obj.Props?.Priority})"; }) // QUERY с LINQ-Where прямо по props .ProcessWithRedb("pg-test", async (redb, ex, ct) => { var items = await redb.Query<DemoItemProps>() .Where(p => p.Priority > 3) .Take(5) .ToListAsync(); ex.In.Headers["query-count"] = items.Count.ToString(); }) // DELETE .ProcessWithRedb("pg-test", async (redb, ex, ct) => { var id = long.Parse(ex.In.Headers["saved-id"]!.ToString()!); await redb.DeleteAsync(id); }) .Log("[NAMED-REDB] PG CRUD cycle complete"); // Тот же шаблон, но на MSSql-инстансе — provider абстрагирован From("timer://named-redb-mssql?period=30000&delay=8000") .RouteId("demo-named-redb-mssql") .ProcessWithRedb("mssql-test", async (redb, ex, ct) => { /* save / load / query / delete */ }); }}
Что тут видно:
-
ProcessWithRedb("name", ...)— это extension в пакетеredb.Route.RedbCore. По имени достаёт нужныйIRedbServiceиз реестра (Redbсекция в конфиге проекта), скоупит его на время обработки exchange’а и отдаёт в лямбду. НикакогоIServiceProvider.GetRequiredService<IRedbService>()в коде маршрута. -
SaveAsync/LoadAsync/Query<T>().Where(...).ToListAsync()/DeleteAsync— это стандартныйIRedbServiceAPI, тот же что и в обычном ASP.NET-сервисе. Маршрут не делает ничего особенного — он просто получает уже инициализированный сервис. -
Несколько именованных инстансов одновременно —
pg-testиmssql-testживут параллельно, можно из одного маршрута писать в Postgres и одновременно зеркалить в MSSql. -
Схема (
[RedbScheme]-классыDemoItemProps) синхронизируется один раз при старте модуля (InitRoute.csдёргаетredb.InitializeAsync(ensureCreated: true)), дальше маршрут просто работает с типизированнымиRedbObject<TProps>.
То есть в проде из первого блока (tsum — SQL polling из SAP → redb.Query<TransportationRoute>() → апдейт) лежит ровно та же связка, что и здесь, в Demo — только обёрнутая в реальную бизнес-логику. Хотите потрогать без чужого прода — git clone, поднимите docker compose, dotnet run --project redb.Route.Demo, и оно ответит на POST http://localhost:5088/api/demo JSON-ом со всеми штампами от всех брокеров.
«Я люблю контроллеры из ASP.NET» — пожалуйста, redb.Route.Controllers
Отдельная категория комментариев, которая под такими статьями стабильно появляется: «зачем мне ваш fluent-DSL, я хочу [HttpGet("{id}")] и [FromBody] Order order, как привык в ASP.NET, и чтобы IDE подчёркивала несуществующие роуты». Для них есть пакет redb.Route.Controllers — контроллеры с теми же атрибутами, что и в ASP.NET, но transport-agnostic: один и тот же класс работает за HTTP, SignalR, gRPC или любым другим InOut-эндпоинтом.
[Route("orders")]public class OrdersController : RedbController{ [HttpGet("{id}")] public Task<Order> GetOrder(int id, [FromHeader("X-Tenant")] string tenant) { var redb = Context.GetRedbService(); // тот же IRedbService return redb.Query<OrderProps>() .Where(o => o.Id == id && o.Tenant == tenant) .FirstAsync(); } [HttpPost] public async Task<long> CreateOrder([FromBody] Order order, CancellationToken ct) { var redb = Context.GetRedbService(); return await redb.SaveAsync(new RedbObject<OrderProps> { Props = order.ToProps() }); } [HttpDelete("{id}")] public Task DeleteOrder([FromRoute("id")] int id) => Context.GetRedbService().DeleteAsync(id);}
Привязка маршрута к транспорту — одна строчка:
// HTTP — читает redbHttp.Method / redbHttp.Path из транспортаFrom("http://0.0.0.0:8080/api") .RedbHttpController<OrdersController>();// SignalR — тот же класс, dispatch по redbSignalR.MethodFrom("signalr://bridge") .RedbSignalRController<OrdersController>();// gRPC — dispatch по dispatch-method из metadataFrom("grpc://0.0.0.0:5000") .RedbGrpcController<OrdersController>();// Или сразу пачкой по сборке — как MapControllers() в ASP.NETvar registry = new ControllerRegistry();registry.RegisterAssembly(typeof(OrdersController).Assembly);From("http://0.0.0.0:8080/api").RedbHttpController(registry);
Что тут важно понимать:
-
[Route],[HttpGet/Post/Put/Delete/Patch],[FromBody],[FromHeader],[FromQuery],[FromRoute]— те же самые атрибуты, что вы знаете из ASP.NET. Кривая обучения нулевая. -
Контроллер наследуется от
RedbControllerи получаетContext(тот самыйIRouteContext) иExchangeнапрямую — никакогоIServiceProvider-инъекции в конструктор не нужно. Хотите DI — берёте сервис черезContext.GetService<T>(). -
Один контроллер — три транспорта. Тот же
OrdersControllerотвечает наGET /orders/42через HTTP, наSend("GetOrder", 42)через SignalR-хаб и на gRPC-вызов сdispatch-method=GetOrder. Это та самая «adapter on top of transport»-идея, ради которой обычно пишут Mediator-обвязки руками. -
Это не отдельный pipeline-инструмент, это
IProcessorвнутри маршрута. До контроллера можно влепитьIdempotentConsumer,Throttle,OnException<T>,WireTapв audit — а только потом отдать в controller dispatcher. Никакого «контроллер vs route» — это контроллер ВНУТРИ route:
From("http://0.0.0.0:8080/api") .Throttle(500).Per(TimeSpan.FromSeconds(1)) .IdempotentConsumer(Header("Idempotency-Key"), repository: "redb") .WireTap("seda://audit") .RedbHttpController<OrdersController>();
Под капотом — ControllerRegistry сканирует сборку, собирает route table из атрибутов, на каждый запрос dispatcher читает headers, матчит template, биндит параметры ({id} → [FromRoute], query → [FromQuery], тело → [FromBody], остальное по конвенции имени). Ответ возвращается в exchange.In.Body, статус — в status.code (200 для значения, 204 для Task, 404 для no-match, 500 для исключения). Полная таблица атрибутов и conventions — в redb.Route.Controllers/README.md.
То есть это компромисс ровно для того лагеря, который и в 2026-м не готов отказаться от [HttpGet("{id}")]: внешне — привычный ASP.NET-style controller, внутри — обычный Route-pipeline со всеми EIP-плюшками и без привязки к Kestrel. Если ваш «HTTP API» завтра должен заодно слушать SignalR и gRPC — переписывать ничего не надо, добавляются ещё два From(...).Redb*Controller<OrdersController>().
redb.Tsak — где это хостить в проде
redb.Route описывает что делает маршрут. Когда у вас один проект — он живёт внутри dotnet run. Когда маршрутов 30 и они принадлежат разным командам — нужен runtime-контейнер.
redb.Tsak — это runtime container для модулей redb.Route. Каждый модуль (.dll или .tpkg-бандл) загружается в изолированный AssemblyLoadContext, имеет свой IRouteContext, свой DI, свои конфиги. Tsak управляет lifecycle’ом независимо: добавил модуль — он стартанул, убрал — выгрузился, обновил — hot-reload без рестарта процесса.
Что есть из коробки:
-
REST API — 32 endpoint’а: управление контекстами, маршрутами, модулями, кластером, scheduler’ом, логами, пользователями
-
CLI — 30 команд:
tsak module upload,tsak context start,tsak route stop,tsak context list— удобно для CI/CD -
Blazor Server dashboard — 10 страниц: CPU/RAM/GC, per-route latency, ring-buffer логи, статус watchdog
endpoints
monitoring -
Hot-reload — кладёшь обновлённую
.dllвLibs/, Tsak подхватывает, старая версия выгружается с graceful drain -
Кластер — leader election + автоперераспределение контекстов между нодами, через redb-базу, без Redis / ZooKeeper / Consul
-
Quartz scheduler —
RAMJobStoreдля standalone,AdoJobStoreдля кластера (схема создаётся автоматически) -
OpenTelemetry — Activities + Meters на каждый маршрут и шаг, Prometheus scrape
Деплой нового маршрута:
# Вариант 1 — голая DLL. Бросаем сборку в Libs/, Tsak увидит, поднимет# модуль с graceful drain старой версии. Подходит для дев-цикла.cp Orders.dll /tsak/worker/Libs/# Вариант 2 — .tpkg-бандл (ZIP с manifest.json + DLL + <Module>.config.json).# Собирается прямо из .csproj — у redb.Route.Demo есть Target "PackTpkg"# (AfterTargets="Build"), который кладёт .tpkg рядом и копирует в Tsak Libs.# Так едет в прод: один файл, версионируемый, с конфигом и манифестом внутри.cp Orders.tpkg /tsak/worker/Libs/# или через CLI поверх REST APItsak module upload orders --file Orders.tpkgtsak context start orders# Остановить один маршрут без рестарта процессаtsak route stop orders order-pipeline# Что сейчас работаетtsak context listtsak route list orders
Папка Libs/ — это стандартная convention-точка модулей. В исходниках она лежит ровно по таким же путям: redb.Tsak/src/redb.Tsak.Worker/Libs/ для запуска из репозитория, worker/Libs/ в готовых релизных бинарниках. В Docker-образе — /app/worker/Libs/, монтируется как volume; добавили .dll или .tpkg снаружи — внутри контейнера Tsak поднял модуль без рестарта.
Минимальный пример MSBuild-таргета для упаковки .tpkg прямо из проекта модуля — манифест + DLL + конфиг в один ZIP, и сразу копия в Libs/ Tsak’а (как сделано в redb.Route.Demo.csproj):
<PropertyGroup> <TsakModuleName>Orders</TsakModuleName> <TsakLibsDir>$(MSBuildThisFileDirectory)..\..\tsak\worker\Libs</TsakLibsDir></PropertyGroup><Target Name="PackTpkg" AfterTargets="Build"> <PropertyGroup> <_Staging>$(IntermediateOutputPath)tpkg</_Staging> <_Tpkg>$(MSBuildThisFileDirectory)output\$(TsakModuleName).tpkg</_Tpkg> </PropertyGroup> <RemoveDir Directories="$(_Staging)" /> <MakeDir Directories="$(_Staging)" /> <Copy SourceFiles="$(MSBuildThisFileDirectory)manifest.json" DestinationFolder="$(_Staging)" /> <Copy SourceFiles="$(TargetPath)" DestinationFolder="$(_Staging)" /> <Copy SourceFiles="$(MSBuildThisFileDirectory)$(TsakModuleName).config.json" DestinationFolder="$(_Staging)" Condition="Exists('$(MSBuildThisFileDirectory)$(TsakModuleName).config.json')" /> <ZipDirectory SourceDirectory="$(_Staging)" DestinationFile="$(_Tpkg)" Overwrite="true" /> <Copy SourceFiles="$(_Tpkg)" DestinationFolder="$(TsakLibsDir)" /> <Touch Files="$(TsakLibsDir)\$(TsakModuleName).tpkg" /></Target>
dotnet build → Orders.tpkg в Libs/ → Tsak его поднял. В CI/CD получается одна артефакт-сущность с проставленной версией, конфигом и манифестом — деплой сводится к публикации файла.
Маршруты с cluster=true идут через redb-координатор. В кластерном режиме маршрут можно пометить как кластерный — тогда Tsak сам распределяет его экземпляры между нодами и следит, что только нужное количество копий запущено. Состояние, partitioning, балансировка — через redb objects/values. Quartz при этом использует свои AdoJobStore-таблицы в той же БД, но это его внутренние таблицы.
Добавить ноду в кластер — запустить ещё один экземпляр Tsak с той же строкой подключения к БД. Никакого ZooKeeper / Consul / Redis как внешней зависимости.
Тот же самый RouteBuilder, который вы написали для обычного IHostedService, работает в Tsak без изменений — тот же Configure(), тот же IExchange, те же OnException и .Transacted().
351 проходящий тест, Apache 2.0.
Про Tsak — отдельная большая статья. Здесь это упомянуто крупными мазками, потому что Tsak — это самостоятельный продукт со своей архитектурой:
AssemblyLoadContext-изоляция, 5-слойный конфиг-pipeline, leader election на redb-объектах без внешнего координатора, Blazor-дашборд, REST API, CLI, hot-reload c graceful drain. В одну Route-статью это не влезает. Если зайдёт — напишу отдельно.
Где Route живёт в экосистеме redb
redb.Route — это не одиночный пакет, а слой в стопке продуктов, которые мы пишем последние годы и которые собираются друг из друга. По уровням снизу вверх:
-
redb.Core— ядро: типизированная объектная модель (RedbObject<TProps>), code-first схемы ([RedbScheme]), expression-LINQ-провайдер, кэши, сериализация, валидация, security. Всё абстрактно, без привязки к СУБД. -
redb.Core.Pro— платная надстройка над Core: change tracking, materialization, миграции схем, расширенные query/scheme-провайдеры. -
redb.Postgres/redb.MSSql— Free-провайдеры: реализацияIRedbContextи фабричных методовRedbServiceBaseповерх Npgsql / Microsoft.Data.SqlClient. -
redb.Postgres.Pro/redb.MSSql.Pro— Pro-провайдеры: подтягивают Core.Pro-возможности (materialization, миграции) на конкретную СУБД. -
redb.Identity— самостоятельный identity-сервер, построенный поверх redb-объектов: пользователи, роли, JWT, refresh, LDAP-федерация. Используется и в Route-маршрутах (тот жеHttp.Listen().UseJwt(...)), и снаружи. -
redb.Export— экспорт/бэкап redb-данных в файлы. Используется в TsUM-проде как cron-маршрут (Cron.Every(...).Process(BackupRedbBase)). -
redb.Route— то, про что эта статья. БерётIRedbServiceиз провайдера, добавляет EIP-DSL, транспорты, OpenTelemetry. Маршруты — обычные C#-классы, schema-aware:ProcessWithRedb("pg-test", ctx => ctx.SaveAsync(obj)). -
redb.Tsak— runtime-контейнер для Route-маршрутов: hot-reload, кластер, дашборд. Поверх Core + одного из провайдеров + Route + Identity (для логина в Blazor-UI). -
redb.Doc.Web— сайт документации, написан на Blazor поверх того же стека: контент-объекты лежат какRedbObject<DocPageProps>в Postgres, навигация — через Tree-провайдер Core, поиск — через query-провайдер. То есть документация про redb работает на самом redb.
Зависимости честно однонаправленные: Route не знает про Tsak, Tsak не знает про Doc.Web, Core ничего не знает про провайдеров. Можно взять только Core + Postgres и писать своё приложение без Route и Tsak; можно взять Core + Postgres + Route и хостить маршруты в обычном IHostedService; можно собрать всю стопку целиком и получить Tsak + Identity + дашборд + сайт.
Эта статья — про один слой (Route + Tsak). Про Core, Identity и Doc.Web будут отдельные.
Что redb.Route НЕ делает
Честно — потому что иначе через две недели придут с issue.
Durable saga с DB-state machine. Это к MassTransit / NServiceBus / Wolverine. У redb.Route есть Saga() с компенсирующими шагами: forward + reverse, если что-то упало посередине — откатываемся в обратном порядке. Но самой state-machine (где «заказ в состоянии PaymentPending, ждём webhook от Stripe три дня, не теряем при рестарте процесса») — нет. Можно собрать руками через IdempotentConsumer + persistent backend в redb, но это не managed решение.
Managed transactional outbox container. В MassTransit/Wolverine есть отдельный фреймворк, который создаёт таблицу outbox_messages, пишет в неё в той же транзакции, что бизнес-данные, а в фоне публикует. У Route это собирается из примитивов: Sql.Poll(...) + IdempotentConsumer + .Transacted(). То есть outbox-паттерн работает, но автомат-инсталлятор таблицы и фоновый daemon — не предоставлены, пишутся как часть маршрута.
300+ компонентов, как у Camel. 22 транспорта закрывают большую часть реальных задач (Kafka, RabbitMQ, IBM MQ, MQTT, HTTP, gRPC, SQL, SFTP, S3, LDAP, Mail, Quartz), но если вам нужен, скажем, Salesforce-bulk-API-консьюмер или ServiceNow-REST-клиент — это всё ещё «напиши свой компонент через IConsumer/IProducer», или использовать generic HTTP/gRPC поверх их API.
XML-DSL и автодеплой как у Camel K. Только C#-fluent. Никаких маршрутов из YAML. На наш взгляд — плюс (intellisense, рефакторинг, типы), но кому-то декларативный YAML удобнее.
Free vs Pro
Никакой Pro-версии redb.Route нет. Apache 2.0 целиком: все 22 транспорта, все EIP-паттерны, expression engine, Transacted(), OpenTelemetry, retry/DLC, Tsak runtime container, cluster, hot-reload, dashboard.
(Pro-версия есть только у redb.Core — там оптимизации хранилища: compiled queries, parallel materialization, change tracking. Route про это вообще не знает — он работает поверх любого redb-провайдера, любой версии.)
Что в 3.0.0 свежего
Параллельно со статьёй «Free/Pro query parity» про redb.Core 3.0.0 у Route тоже вышла мажорка. Главное:
-
Один canonical compiler. Раньше внутри жил параллельный v2-стек (
OldRouteCompiler,IRouteDefinition2,BlockStack). Удалили. Теперь AST маршрута — это деревоIProcessorDefinitionнод, каждая компилирует себя черезCreateProcessor(IRouteContext). Никаких bridge-классов, никакого «v2 DSL → bridge → legacy compiler» — та же модель, что у Camel внутри. -
Dynamic endpoints (
ToD, dynamicWireTap, dynamicEnrich). URI получателя вычисляется на каждое сообщение через string template,IExpressionилиFunc<IExchange, string>. Это Camel’овскийtoD(...)один-в-один. -
String-template DSL для
SetBody/SetHeader/SetProperty/Log.${header.x},${body.OrderId}— компилируются один раз при build, не интерпретируются. -
OnExceptionparity с Camel. ДобавленыLogStackTrace(bool),LogExhausted(bool), чтобы fluent-набор совпадал с Camel’овскимonException(...). -
Reference-тесты на Choice / TryCatch / Filter (~1200 строк), которые пинят семантику и ловят регрессии.
Полный CHANGELOG: redb.Route/CHANGELOG.md.
Сравнение в одной таблице
|
|
Apache Camel |
MassTransit |
NServiceBus |
Wolverine |
redb.Route |
|---|---|---|---|---|---|
|
Платформа |
JVM |
.NET |
.NET |
.NET |
.NET 8/9/10 |
|
Лицензия |
Apache 2.0 |
Коммерческая (v9, Massient) |
Коммерческая |
MIT |
Apache 2.0 |
|
Фокус |
EIP routing |
Message bus + Saga |
Message bus + Saga |
Mediator + Saga |
ESB / EIP routing |
|
Транспорты |
300+ |
5 |
7 |
4 |
22 + 5 встроенных |
|
EIP-паттерны |
80+ |
Saga, R/R, Outbox |
Saga, R/R |
Saga, R/R, Outbox |
30+ |
|
Expression engine |
Simple Language (интерпретируемый) |
— |
— |
— |
Compiled ( |
|
Конфиг |
XML или Java DSL |
C# fluent |
C# fluent |
C# fluent |
C# fluent only |
|
Durable saga |
Да |
Да (DB-backed) |
Да (DB-backed) |
Да (Marten / EF Core) |
Только компенсирующие шаги + persistent |
|
Managed outbox |
Plugin |
Да |
Да |
Да |
Собирается из |
|
Publisher confirms / Kafka EOS |
Да |
Да |
Да |
Да |
Да |
|
Runtime container |
Camel K / JBoss Fuse |
— |
— |
— |
redb.Tsak (hot-reload, cluster) |
Кратко: у Camel — больше всего, но он JVM. У трёх .NET-альтернатив — отличная durable saga + managed outbox, но не ESB, и при этом две из трёх уже коммерческие (MassTransit v9 переехал в Massient, Inc., NServiceBus был платным изначально). Свободным остался только Wolverine на MIT — но это mediator+saga, без 20+ транспортов и без полного EIP-каталога. У redb.Route — все 30+ ходовых EIP, 22 транспорта, compiled expressions, transactional pipelines, и Tsak как runtime-контейнер с кластером. Apache 2.0 целиком. Если вам нужно «Apache Camel в .NET, и желательно без счёта на лицензию» — это redb.Route.
Ссылки
GitHub org (все репозитории) Репозиторий redb.Route Репозиторий redb.Tsak Готовые бинарники redb.Tsak (releases) Docker-образ redb-tsak-stack (GHCR) README.md redb.Route с полной документацией redb.Route.Demo — 39 маршрутов, 18 транспортов, runnable reference CHANGELOG 3.0.0 Документация и примеры (EN) NuGet — все 43 пакета redb.* (~20 800 загрузок) — Route, Core, провайдеры, 22 транспорта, Tsak Архитектура redb.Core — на чём всё это построено Предыдущая статья — про redb.Core / Free vs Pro
Если дочитали — спасибо. Комментарии, баги, EXPLAIN-планы — всё в GitHub Discussions.
ссылка на оригинал статьи https://habr.com/ru/articles/1042392/