Архитектура MassTransit: как устроена библиотека под капотом

от автора

Привет! В этой статье разберём архитектуру MassTransit — одной из самых зрелых .NET-библиотек для построения распределённых систем, работы с брокерами сообщений и реализации паттернов вроде Saga и Consumer Pipeline.

Прежде чем вы начнёте читать статью, хочу сделать небольшую ремарку: это моя первая публикация, поэтому прошу отнестись с пониманием. Материал получился достаточно сложным, а его разбор потребовал большой работы.

Статья, вероятно, будет постепенно дорабатываться и уточняться по мере появления новых правок. В процессе такого глубокого исследования хочется как можно скорее поделиться полученными выводами, но одновременно бывает непросто сразу изложить всё максимально структурировано. Поэтому в отдельных местах возможна некоторая сумбурность, которую я постараюсь со временем исправить.

Быстрые переходы

Если хочется более глубокого разбора с кодом и навигацией по исходникам, у меня есть и видеоразбор на YouTube. Здесь же — краткая, структурированная версия для тех, кто хочет понять ключевые архитектурные идеи без погружения в 50+ минут видео.

Почему MassTransit бывает сложно понять

MassTransit существует уже более 15 лет. Это означает, что библиотека формировалась ещё во времена классического .NET Framework — задолго до появления CoreCLR и современного .NET.

Пример

Пример

Автор библиотеки — Chris Patterson, Microsoft MVP и разработчик, который проектировал MassTransit как универсальный abstraction layer над транспортами (RabbitMQ, Azure Service Bus, Amazon SQS и др.).

Пример

Пример

При первом знакомстве кодовая база может показаться перегруженной. Но причина не в возрасте проекта, а в архитектурной насыщенности.

Основные причины сложности :

  1. Большое количество паттернов MassTransit активно использует: Pipeline, Observer, Visitor, State, Finite State Machine(FSM) и тд… То есть библиотека — это не просто “обёртка над RabbitMQ”, а полноценный framework с богатой внутренней моделью.

  2. Глубокое использование generics Практически вся архитектура строится вокруг T:

ConsumeContext<T>ConsumerConsumeContext<TConsumer, TMessage>
  1. Транспортно-специфичная логика MassTransit старается быть универсальным, но особенности конкретного транспорта всё равно проникают внутрь архитектуры.

Например, в RabbitMQ:

  • одно TCP-соединение может содержать множество channels;

  • channels нельзя безопасно использовать полностью параллельно без контроля;

  • клиент требует аккуратного управления concurrency, иначе можно столкнуться с нарушением AMQP framing.

Именно поэтому в RabbitMQ Transport есть такие компоненты, как ChannelExecutor, которые не выглядят случайными: они помогают переиспользовать channel и сериализовать операции там, где это необходимо

  1. SagaStateMachine и визуальные графы Когда разработчик впервые открывает SagaStateMachine, графы состояний и переходов могут выглядеть как часть runtime-магии.

На практике это скорее декларативный инструмент:

  • состояния

  • события

  • переходы

Графы нужны в основном для визуализации и анализа, а не как основной runtime-механизм исполнения.

[HttpGet("graph/edges")]    public IActionResult GetGraphEdges()    {        var graph = _machine.GetGraph();        var sb = new StringBuilder();        sb.AppendLine("stateDiagram-v2");        foreach (var edge in graph.Edges)        {            var from = edge.From.Title;            var to = edge.To.Title;            sb.AppendLine($"{from} --> {to}");        }        return Content(sb.ToString(), "text/plain");    }
[HttpGet("graph/vertices")]    public IActionResult GetGraphVertices()    {        var graph = _machine.GetGraph();        var sb = new StringBuilder();        sb.AppendLine("stateDiagram-v2");        foreach (var vertex in graph.Vertices)        {            sb.AppendLine($"state {vertex.Title}");        }        return Content(sb.ToString(), "text/plain");    }

Главная архитектурная идея: Pipeline

MassTransit изначально строился поверх библиотеки GreenPipes, которая реализовывала внутреннюю систему Pipeline. В новых версиях MassTransit GreenPipes больше не используется как отдельная зависимость — её код был напрямую интегрирован в сам MassTransit. Это произошло потому, что GreenPipes практически не получила распространения за пределами экосистемы MassTransit, а существующие стандартные решения не соответствовали требованиям проекта.

На первый взгляд может показаться, что в основе MassTransit лежит классический архитектурный паттерн Pipes & Filters, но это не совсем так.

Пример

Пример

Классический Pipes & Filters хорошо знаком, например, по Bash-конвейерам, где результат работы одной программы передаётся следующей через символ |:

cat file.txt | grep "error" | sort | uniq

Здесь каждая программа выступает как Filter, а Pipe служит лишь каналом передачи данных между этапами обработки.

В MassTransit модель сложнее. Хотя терминология Pipes и Filters используется, архитектура ближе к Middleware Pipeline, чем к классическому Pipes & Filters. Цепочка обработки выглядит как последовательность Pipe -> Filter -> Pipe -> Filter, однако каждый Filter не просто обрабатывает сообщение и передаёт его дальше, а может:

  1. управлять дальнейшим выполнением цепочки

  2. изменять маршрут обработки

  3. добавлять ветвления

  4. завершать выполнение

  5. вызывать дополнительные Pipes

То есть Filter здесь выступает не только обработчиком, но и элементом управления потоком выполнения (Control Flow), что делает архитектуру MassTransit ближе к middleware-подходу, где каждый компонент контролирует, как именно будет выполняться следующий шаг. Именно поэтому Pipeline в MassTransit — это не просто линейная цепочка фильтров, а более гибкая и управляемая система обработки сообщений.

Отсюда это можно резюмировать, как Pipes & Filter с Middleware Control Flow

GreenPipes строится на следующих компонентах

  • Agents — управляют жизненным циклом инфраструктуры В часть их обязанностей входит: старт, завершение, определение готовности, сигнализация об ошибках В пример можно привести RabbitMqBasicConsumer, что является Agent’ом, внутри которого крутится постоянный loop получения сообщений. Здесь переопределен метод HandleBasicDeliver из rabbitmq.client

public class RabbitMqBasicConsumer :        Agent,        IAsyncBasicConsumer,        IBasicConsumer,        RabbitMqDeliveryMetrics    {        readonly RabbitMqReceiveEndpointContext _context;        readonly TaskCompletionSource<bool> _deliveryComplete;        readonly IReceivePipeDispatcher _dispatcher;        readonly SemaphoreSlim _limit;        readonly ModelContext _model;        readonly ConcurrentDictionary<ulong, RabbitMqReceiveContext> _pending;        readonly ReceiveSettings _receiveSettings;        string _consumerTag;        EventHandler<ConsumerEventArgs> _onConsumerCancelled;        /// <summary>        /// The basic consumer receives messages pushed from the broker.        /// </summary>        /// <param name="model">The model context for the consumer</param>        /// <param name="context">The topology</param>        public RabbitMqBasicConsumer(ModelContext model, RabbitMqReceiveEndpointContext context)        {            _model = model;            _context = context;            _receiveSettings = model.GetPayload<ReceiveSettings>();            _pending = new ConcurrentDictionary<ulong, RabbitMqReceiveContext>();            _dispatcher = context.CreateReceivePipeDispatcher();            _dispatcher.ZeroActivity += HandleDeliveryComplete;            _deliveryComplete = TaskUtil.GetTask<bool>();            if (context.ConcurrentMessageLimit.HasValue)                _limit = new SemaphoreSlim(context.ConcurrentMessageLimit.Value);            ConsumerCancelled += OnConsumerCancelled;        }...        Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,            IBasicProperties properties, ReadOnlyMemory<byte> body)        {            HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);            return Task.CompletedTask;        }public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,            IBasicProperties properties, ReadOnlyMemory<byte> body)        {            var bodyBytes = body.ToArray();            Task.Run(async () =>            {                LogContext.Current = _context.LogContext;                var context = new RabbitMqReceiveContext(exchange, routingKey, _consumerTag, deliveryTag, bodyBytes, redelivered, properties,                    _context, _receiveSettings, _model, _model.ConnectionContext);                var added = _pending.TryAdd(deliveryTag, context);                if (!added && deliveryTag != 1) // DIRECT REPLY-TO fixed value                    LogContext.Warning?.Log("Duplicate BasicDeliver: {DeliveryTag}", deliveryTag);                var receiveLock = _receiveSettings.NoAck ? default : new RabbitMqReceiveLockContext(_model, deliveryTag);                if (_limit != null)                    await _limit.WaitAsync(context.CancellationToken).ConfigureAwait(false);                try                {                    await _dispatcher.Dispatch(context, receiveLock).ConfigureAwait(false);                }                catch (Exception exception)                {                    context.LogTransportFaulted(exception);                }                finally                {                    _limit?.Release();                    if (added)                        _pending.TryRemove(deliveryTag, out _);                    context.Dispose();                }            });        }
  • Supervisors — специальный тип Agent’а, который управляет другими Agent’ами. По сути, это “agent of agents” или оркестратор агентов, но это не все, ведь он может управлять другими supervisors. Supervisor хранит список управляемых Agent’ов, стартует их при инициализации и останавливает при завершении.

    public class TransportPipeContextSupervisor<T> :        PipeContextSupervisor<T>,        ITransportSupervisor<T>        where T : class, PipeContext    {        readonly ISupervisor _consumeSupervisor;        readonly ISupervisor _sendSupervisor;        protected TransportPipeContextSupervisor(IPipeContextFactory<T> factory)            : base(factory)        {            _consumeSupervisor = new Supervisor();            _sendSupervisor = new Supervisor();        }        public void Probe(ProbeContext context)        {            if (HasContext)                context.Add("connected", true);        }        public void AddSendAgent<TAgent>(TAgent agent)            where TAgent : IAgent        {            _sendSupervisor.Add(agent);        }        public void AddConsumeAgent<TAgent>(TAgent agent)            where TAgent : IAgent        {            _consumeSupervisor.Add(agent);        }        protected override async Task StopSupervisor(StopSupervisorContext context)        {            await _consumeSupervisor.Stop(context).ConfigureAwait(false);            await _sendSupervisor.Stop(context).ConfigureAwait(false);            await base.StopSupervisor(context).ConfigureAwait(false);        }    }}
  • Probe представляет собой механизм диагностики и самоописания pipeline. Где используется:

  1. Диагностика Позволяет увидеть структуру pipeline и его состав.

  2. Monitoring / Observability Даёт информацию о состоянии компонентов (например, circuit breaker, попытки, ошибки).

  3. Отладка Помогает понять, почему фильтр не срабатывает или работает не так, как ожидается.

  4. Интеграции Может использоваться для экспорта данных наружу — например, в метрики или диагностические endpoints.

  5. Тестирование Позволяет писать более точные тесты: [1] проверять, что нужные фильтры подключены [2] валидировать порядок pipeline [3] убеждаться, что конфигурация применена корректно [4] тестировать поведение без запуска полного окружения

public class PipeContextSupervisor<TContext> :        Supervisor,        ISupervisor<TContext>        where TContext : class, PipeContext    {        readonly ISupervisor _activeSupervisor;        readonly IPipeContextFactory<TContext> _contextFactory;        readonly object _contextLock = new object();        PipeContextHandle<TContext> _context;        /// <summary>        /// Create the cache        /// </summary>        /// <param name="contextFactory">Factory used to create the underlying and active contexts</param>        public PipeContextSupervisor(IPipeContextFactory<TContext> contextFactory)        {            _contextFactory = contextFactory;            _activeSupervisor = new Supervisor();        }        protected bool HasContext        {            get            {                lock (_contextLock)                {                    return _context is { IsDisposed: false };                }            }        }        void IProbeSite.Probe(ProbeContext context)        {            var scope = context.CreateScope("source");            scope.Set(new            {                Type = TypeCache<PipeContextSupervisor<TContext>>.ShortName,                HasContext,            });        }...}
 [Test]        public void Should_override_bus_setting_if_specified()        {            var busControl = MassTransit.Bus.Factory.CreateUsingActiveMq(cfg =>            {                cfg.PrefetchCount = 427;                cfg.ReceiveEndpoint("input-queue", e =>                {                    e.PrefetchCount = 351;                });            });            var jsonString = busControl.GetProbeResult().ToJsonString();            var probe = JObject.Parse(jsonString);            Assert.That(GetPrefetchCount(probe, 0), Is.EqualTo(351));            Assert.That(GetPrefetchCount(probe, 1), Is.EqualTo(427));        }
  • Pipes — готовые, полностью собранные и исполняемые цепочки обработки. Это execution layer pipeline, который отвечает за реальное выполнение логики во время runtime’а. Он позволяет собрать цепочку обработки в единую рабочую систему. FilterPipe, EmtyPipe это одни из множества by default pipes

public class FilterPipe<TContext> :        IPipe<TContext>        where TContext : class, PipeContext    {        readonly IFilter<TContext> _filter;        readonly IPipe<TContext> _next;        public FilterPipe(IFilter<TContext> filter, IPipe<TContext> next)        {            _filter = filter;            _next = next;        }        public void Probe(ProbeContext context)        {            _filter.Probe(context);            _next.Probe(context);        }        [DebuggerStepThrough]        public Task Send(TContext context)        {            return _filter.Send(context, _next);        }    }
public class EmptyPipe<TContext> :        IPipe<TContext>        where TContext : class, PipeContext    {        [DebuggerNonUserCode]        Task IPipe<TContext>.Send(TContext context)        {            return Task.CompletedTask;        }        void IProbeSite.Probe(ProbeContext context)        {        }    }
  • Filters является реальным обработчиком pipeline (middleware), который выполняется при вызове Send. Фильтры представляют собой конкретные этапы обработки в цепочке (Pipe), где каждый Filter может модифицировать контекст, добавлять логику или выполнять побочные операции. Структура Filter’а следует паттерну middleware и состоит из трех основных фаз:

  1. Фаза 1: До обработки (DoSomething)

  2. Фаза 2: Передача управления (next.Send)

  3. Фаза 3: После обработки (DoSomethingAfter)

public class TransportReadyFilter<T> :        IFilter<T>        where T : class, PipeContext    {        readonly ReceiveEndpointContext _context;        public TransportReadyFilter(ReceiveEndpointContext context)        {            _context = context;        }        public async Task Send(T context, IPipe<T> next)        {            // Первая фаза            await _context.TransportObservers.NotifyReady(_context.InputAddress).ConfigureAwait(false);            var agent = new Agent();            agent.SetReady();            _context.AddConsumeAgent(agent);            // Вторая фаза            await next.Send(context).ConfigureAwait(false);            // Третья Фаза            await agent.Completed.ConfigureAwait(false);        }
  • Context управляет данными и жизненным циклом выполнения операции внутри Pipeline. Pipeline — это конвейер, по которому движется обработка. Context — это “посылка”, которая едет по конвейеру от одной станции к другой. Filters — это станции обработки. Каждый Filter берет Context, выполняет свою работу, может модифицировать Context и передает его следующему Filter’у.

 public interface ConsumeContext :        PipeContext,        MessageContext,        IPublishEndpoint,        ISendEndpointProvider    {        /// <summary>        /// The received message context        /// </summary>        ReceiveContext ReceiveContext { get; }        /// <summary>        /// The serializer context from message deserialization        /// </summary>        SerializerContext SerializerContext { get; }        /// <summary>        /// An awaitable task that is completed once the consume context is completed        /// </summary>        Task ConsumeCompleted { get; }        /// <summary>        /// Returns the supported message types from the message        /// </summary>        IEnumerable<string> SupportedMessageTypes { get; }        /// <summary>        /// Returns true if the specified message type is contained in the serialized message        /// </summary>        /// <param name="messageType"></param>        /// <returns></returns>        bool HasMessageType(Type messageType);....}
  • Payloads являются способ прикреплять к Context’у дополнительные данные или объекты (то есть “полезную нагрузку”), которые будут путешествовать вместе с ним по всему Pipeline и будут доступны для всех Filter’ов.

public class TransactionFilter<T> :        IFilter<T>        where T : class, PipeContext    {        readonly TransactionOptions _options;        public TransactionFilter(IsolationLevel isolationLevel = IsolationLevel.ReadCommitted, TimeSpan timeout = default)        {            if (timeout == default)                timeout = TimeSpan.FromSeconds(30);            _options = new TransactionOptions            {                IsolationLevel = isolationLevel,                Timeout = timeout            };        }        void IProbeSite.Probe(ProbeContext context)        {            var step = context.CreateFilterScope("transaction");            step.Add("isolationLevel", _options.IsolationLevel.ToString());            step.Add("timeout", _options.Timeout);        }        [DebuggerNonUserCode]        public async Task Send(T context, IPipe<T> next)        {            SystemTransactionContext systemTransactionContext = null;            context.GetOrAddPayload<TransactionContext>(() =>            {                systemTransactionContext = new SystemTransactionContext(_options);                return systemTransactionContext;            });            try            {                await next.Send(context).ConfigureAwait(false);                if (systemTransactionContext != null)                    await systemTransactionContext.Commit().ConfigureAwait(false);            }            catch (Exception ex)            {                systemTransactionContext?.Rollback(ex);                throw;            }            finally            {                systemTransactionContext?.Dispose();            }        }    }
  • Configurators представляет собой язык конфигурации для описания поведения pipeline. Это удобный API, через который пользователь описывает, как должна работать система обработки сообщений. Когда конфигурация завершена и система стартует, Configurators исчезают со сцены. Их место занимают Agent’ы, Supervisor’ы, Pipe’ы и Filter’ы, которые выполняют реальную работу.

public class RabbitMqBusFactoryConfigurator :        BusFactoryConfigurator,        IRabbitMqBusFactoryConfigurator,        IBusFactory    {        readonly IRabbitMqBusConfiguration _busConfiguration;        readonly IRabbitMqHostConfiguration _hostConfiguration;        readonly RabbitMqReceiveSettings _settings;        public RabbitMqBusFactoryConfigurator(IRabbitMqBusConfiguration busConfiguration)            : base(busConfiguration)        {            _busConfiguration = busConfiguration;            _hostConfiguration = busConfiguration.HostConfiguration;            var queueName = busConfiguration.Topology.Consume.CreateTemporaryQueueName("bus");            var exchangeType = busConfiguration.BusEndpointConfiguration.Topology.Consume.ExchangeTypeSelector.DefaultExchangeType;            _settings = new RabbitMqReceiveSettings(busConfiguration.BusEndpointConfiguration, queueName, exchangeType, false, true);            _settings.AutoDeleteAfter(TimeSpan.FromMinutes(1));        }        public IReceiveEndpointConfiguration CreateBusEndpointConfiguration(Action<IReceiveEndpointConfigurator> configure)        {            return _busConfiguration.HostConfiguration.CreateReceiveEndpointConfiguration(_settings, _busConfiguration.BusEndpointConfiguration, configure);        }        public override IEnumerable<ValidationResult> Validate()        {            foreach (var result in base.Validate())                yield return result;            if (string.IsNullOrWhiteSpace(_settings.QueueName))                yield return this.Failure("Bus", "The bus queue name must not be null or empty");        }... }
  • Specification служит мостом между этапом конфигурации и runtime’ом системы, по сути являясь “единицой поведения pipeline”, которая описывает и реализует конкретное поведение системы. Каждая Specification отвечает за одну задачу: добавление фильтра в pipeline или конфигурацию инфраструктуры брокера

class ConsumerMessageConfigurator :            IConsumerMessageConfigurator<Batch<TMessage>>        {            readonly IBuildPipeConfigurator<ConsumeContext<Batch<TMessage>>> _batchConfigurator;            public ConsumerMessageConfigurator(IBuildPipeConfigurator<ConsumeContext<Batch<TMessage>>> batchConfigurator)            {                _batchConfigurator = batchConfigurator;            }            public void AddPipeSpecification(IPipeSpecification<ConsumeContext<Batch<TMessage>>> specification)            {                _batchConfigurator.AddPipeSpecification(specification);            }        }
public class PipeConfigurator<TContext> :        IBuildPipeConfigurator<TContext>        where TContext : class, PipeContext    {        readonly List<IPipeSpecification<TContext>> _specifications;        public PipeConfigurator()        {            _specifications = new List<IPipeSpecification<TContext>>(4);        }        public IEnumerable<ValidationResult> Validate()        {            return _specifications.Count == 0                ? Array.Empty<ValidationResult>()                : _specifications.SelectMany(x => x.Validate());        }        void IPipeConfigurator<TContext>.AddPipeSpecification(IPipeSpecification<TContext> specification)        {            if (specification == null)                throw new ArgumentNullException(nameof(specification));            _specifications.Add(specification);        }        public IPipe<TContext> Build()        {            if (_specifications.Count == 0)                return Pipe.Empty<TContext>();            var builder = new PipeBuilder<TContext>(_specifications.Count);            var count = _specifications.Count;            for (var index = 0; index < count; index++)                _specifications[index].Apply(builder);            return builder.Build();        }    }
public class ConsumerConsumeContextRescuePipeSpecification<T> :        ExceptionSpecification,        IPipeSpecification<ConsumerConsumeContext<T>>        where T : class    {        readonly IPipe<ExceptionConsumerConsumeContext<T>> _rescuePipe;        public ConsumerConsumeContextRescuePipeSpecification(IPipe<ExceptionConsumerConsumeContext<T>> rescuePipe)        {            _rescuePipe = rescuePipe;        }        public void Apply(IPipeBuilder<ConsumerConsumeContext<T>> builder)        {            builder.AddFilter(new RescueFilter<ConsumerConsumeContext<T>, ExceptionConsumerConsumeContext<T>>(_rescuePipe, Filter,                (context, ex) => new RescueExceptionConsumerConsumeContext<T>(context, ex)));        }        public IEnumerable<ValidationResult> Validate()        {            if (_rescuePipe == null)                yield return this.Failure("RescuePipe", "must not be null");        }    }
  • Validators представляет слой проверки конфигурации pipeline до его сборки в GreenPipes и MassTransit. Проверяет не runtime, а то, можно ли вообще корректно собрать pipeline.

 public class ConsumerFilterSpecification<TConsumer, TMessage> :        IPipeSpecification<ConsumerConsumeContext<TConsumer, TMessage>>        where TConsumer : class        where TMessage : class    {        readonly IFilter<ConsumerConsumeContext<TConsumer, TMessage>> _filter;        public ConsumerFilterSpecification(IFilter<ConsumerConsumeContext<TConsumer>> filter)        {            _filter = new ConsumerSplitFilter<TConsumer, TMessage>(filter);        }        public void Apply(IPipeBuilder<ConsumerConsumeContext<TConsumer, TMessage>> builder)        {            builder.AddFilter(_filter);        }        public IEnumerable<ValidationResult> Validate()        {            if (_filter == null)                yield return this.Failure("Filter", "must not be null");        }    }
  • Observers — система, которая позволяет отслеживать и реагировать на события, происходящие внутри pipeline во время выполнения. Observer “слушает” события и реагирует на них, не влияя на основной поток выполнения Pipeline не только выполняется, но и генерирует события:

  1. фильтр начал работу

  2. фильтр завершился

  3. произошла ошибка

  4. retry начался / завершился

Connectable — это интерфейс, который обеспечивает подписку Observer’ов на события компонентов. Компонент, реализующий IConnectable, позволяет регистрировать Observer’ов и уведомлять их о событиях.

По сути, Connectable — это механизм, который связывает Observer’ов с компонентами pipeline’а, обеспечивая отправку событий всем заинтересованным наблюдателям.

public class Connectable<T>        where T : class    {        readonly Dictionary<long, T> _connections;        T[] _connected;        long _nextId;        public Connectable()        {            _connections = new Dictionary<long, T>();            _connected = Array.Empty<T>();        }        /// <summary>        /// The number of connections        /// </summary>        public int Count => _connected.Length;        /// <summary>        /// Connect a connectable type        /// </summary>        /// <param name="connection">The connection to add</param>        /// <returns>The connection handle</returns>        public ConnectHandle Connect(T connection)        {            if (connection == null)                throw new ArgumentNullException(nameof(connection));            var id = Interlocked.Increment(ref _nextId);            lock (_connections)            {                _connections.Add(id, connection);                _connected = _connections.Values.ToArray();            }            return new Handle(id, this);        }        /// <summary>        /// Enumerate the connections invoking the callback for each connection        /// </summary>        /// <param name="callback">The callback</param>        /// <returns>An awaitable Task for the operation</returns>        public Task ForEachAsync(Func<T, Task> callback)        {            if (callback == null)                throw new ArgumentNullException(nameof(callback));            T[] connected;            lock (_connections)                connected = _connected;            if (connected.Length == 0)                return Task.CompletedTask;            if (connected.Length == 1)                return callback(connected[0]);            var outputTasks = new Task[connected.Length];            int i;            for (i = 0; i < connected.Length; i++)                outputTasks[i] = callback(connected[i]);            for (i = 0; i < outputTasks.Length; i++)            {                if (outputTasks[i].Status != TaskStatus.RanToCompletion)                    break;            }            if (i == outputTasks.Length)                return Task.CompletedTask;            return Task.WhenAll(outputTasks);        }        public void ForEach(Action<T> callback)        {            T[] connected;            lock (_connections)                connected = _connected;            switch (connected.Length)            {                case 0:                    break;                case 1:                    callback(connected[0]);                    break;                default:                    {                        for (var i = 0; i < connected.Length; i++)                            callback(connected[i]);                        break;                    }            }        }        public bool All(Func<T, bool> callback)        {            T[] connected;            lock (_connections)                connected = _connected;            if (connected.Length == 0)                return true;            if (connected.Length == 1)                return callback(connected[0]);            for (var i = 0; i < connected.Length; i++)            {                if (callback(connected[i]) == false)                    return false;            }            return true;        }        void Disconnect(long id)        {            lock (_connections)            {                _connections.Remove(id);                _connected = _connections.Values.ToArray();            }        }        class Handle :            ConnectHandle        {            readonly Connectable<T> _connectable;            readonly long _id;            public Handle(long id, Connectable<T> connectable)            {                _id = id;                _connectable = connectable;            }            public void Disconnect()            {                _connectable.Disconnect(_id);            }            public void Dispose()            {                Disconnect();            }        }    }
public class ReceiveObservable :        Connectable<IReceiveObserver>,        IReceiveObserver    {        public Task PreReceive(ReceiveContext context)        {            return ForEachAsync(x => x.PreReceive(context));        }        public Task PostReceive(ReceiveContext context)        {            return ForEachAsync(x => x.PostReceive(context));        }        public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)            where T : class        {            return ForEachAsync(x => x.PostConsume(context, duration, consumerType));        }        public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)            where T : class        {            return ForEachAsync(x => x.ConsumeFault(context, duration, consumerType, exception));        }        public Task ReceiveFault(ReceiveContext context, Exception exception)        {            return ForEachAsync(x => x.ReceiveFault(context, exception));        }    }
ublic class PerformanceCounterReceiveObserver :        IReceiveObserver    {        readonly ICounterFactory _factory;        public PerformanceCounterReceiveObserver(ICounterFactory factory)        {            _factory = factory;        }        Task IReceiveObserver.PreReceive(ReceiveContext context)        {            return Task.CompletedTask;        }        Task IReceiveObserver.PostReceive(ReceiveContext context)        {            return Task.CompletedTask;        }        Task IReceiveObserver.PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)        {            ConsumerPerformanceCounterCache.GetCounter(_factory, consumerType).Consumed(duration);            MessagePerformanceCounterCache<T>.Counter(_factory).Consumed(duration);            return Task.CompletedTask;        }        Task IReceiveObserver.ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)        {            ConsumerPerformanceCounterCache.GetCounter(_factory, consumerType).Faulted();            MessagePerformanceCounterCache<T>.Counter(_factory).ConsumeFaulted(duration);            return Task.CompletedTask;        }        Task IReceiveObserver.ReceiveFault(ReceiveContext context, Exception exception)        {            return Task.CompletedTask;        }    }

Переход к внутренней архитектуре MassTransit

Мы рассмотрели базовые принципы, на которых строится pipeline на примере GreenPipes. Теперь разберём, как эти механизмы используются внутри MassTransit и формируют его поведение на уровне обработки сообщений.

Разбор будет начинаться с момента регистрации компонентов и будет доходить до их фактического выполнения. В качестве основы используется версия библиотеки 8.0.0

Архитектура DI-регистрации в MassTransit

Ключевую роль в регистрации зависимостей играет ServiceCollectionBusConfigurator. От него строится вся дальнейшая конфигурация, включая RegistrationConfigurator и DependencyInjectionContainerRegistrar. Дополнительно используются extension-методы, которые расширяют и структурируют процесс регистрации.

Эти компоненты формируют основу DI-интеграции в MassTransit, и именно на них стоит сосредоточить внимание.

  • DependencyInjectionContainerRegistrar Не управляет DI по контексту — он лишь регистрирует фабрики, которые при создании RequestClient проверяют наличие ConsumeContext и при необходимости привязывают клиента к нему. RequestClient автоматически использует текущий consume context, что обеспечивает передачу correlationId, заголовков и tracing между сообщениями

public class DependencyInjectionContainerRegistrar : IContainerRegistrar    {         protected readonly IServiceCollection Collection;        public DependencyInjectionContainerRegistrar(IServiceCollection collection)        {            Collection = collection;        }          public void RegisterRequestClient<T>(RequestTimeout timeout)            where T : class        {            Collection.AddScoped(provider =>            {                var clientFactory = GetClientFactory(provider);                var consumeContext = provider.GetRequiredService<ScopedConsumeContextProvider>().GetContext();                if (consumeContext != null)                    return clientFactory.CreateRequestClient<T>(consumeContext, timeout);                return new ClientFactory(                        new ScopedClientFactoryContext<IServiceProvider>(clientFactory, provider))                    .CreateRequestClient<T>(timeout);            });        }        public void RegisterRequestClient<T>(Uri destinationAddress, RequestTimeout timeout)            where T : class        {            Collection.AddScoped(provider =>            {                var clientFactory = GetClientFactory(provider);                var consumeContext = provider.GetRequiredService<ScopedConsumeContextProvider>().GetContext();                if (consumeContext != null)                    return clientFactory.CreateRequestClient<T>(consumeContext, destinationAddress, timeout);                return new ClientFactory(                        new ScopedClientFactoryContext<IServiceProvider>(clientFactory, provider))                    .CreateRequestClient<T>(destinationAddress, timeout);            });        }...}
  • RegistrationConfigurator как фасад DI-конфигурации

RegistrationConfigurator — это основной класс для регистрации компонентов в MassTransit. Через него добавляются consumers, sagas, saga state machines, activities, endpoints и другие элементы системы, формируя единый декларативный слой конфигурации поверх Microsoft.Extensions.DependencyInjection.

public class RegistrationConfigurator :        IRegistrationConfigurator    {        readonly IServiceCollection _collection;        bool _configured;        ISagaRepositoryRegistrationProvider _sagaRepositoryRegistrationProvider;        protected RegistrationConfigurator(IServiceCollection collection, IContainerRegistrar registrar)        {            _collection = collection ?? throw new ArgumentNullException(nameof(collection));            Registrar = registrar ?? new DependencyInjectionContainerRegistrar(collection);            _sagaRepositoryRegistrationProvider = new SagaRepositoryRegistrationProvider();        }        public IContainerRegistrar Registrar { get; }        protected Func<IServiceProvider, IBus, IClientFactory> ClientFactoryProvider { get; } = BusClientFactoryProvider;           public IConsumerRegistrationConfigurator<T> AddConsumer<T>(Action<IConsumerConfigurator<T>> configure)            where T : class, IConsumer        {            return AddConsumer(null, configure);        }        public IConsumerRegistrationConfigurator<T> AddConsumer<T>(Type consumerDefinitionType, Action<IConsumerConfigurator<T>> configure = null)            where T : class, IConsumer        {            var registration = _collection.RegisterConsumer<T>(Registrar, consumerDefinitionType);            registration.AddConfigureAction(configure);            return new ConsumerRegistrationConfigurator<T>(this);        }        public ISagaRegistrationConfigurator<T> AddSaga<T>(Action<ISagaConfigurator<T>> configure)            where T : class, ISaga        {            return AddSaga(null, configure);        }...}
  • ServiceCollectionBusConfigurator — Composition Root Class MassTransit (Bus + DI интеграция) Является центральным классом, отвечающим за настройку и регистрацию Bus в MassTransit. Он строит весь runtime-слой.

    public class ServiceCollectionBusConfigurator :        RegistrationConfigurator,        IBusRegistrationConfigurator    {        public ServiceCollectionBusConfigurator(IServiceCollection collection)            : this(collection, new DependencyInjectionContainerRegistrar(collection))        {            IBusRegistrationContext CreateRegistrationContext(IServiceProvider provider)            {                return new BusRegistrationContext(provider, Registrar);            }            collection.AddSingleton(provider => ClientFactoryProvider(provider, provider.GetRequiredService<IBus>()));            collection.AddSingleton(provider => Bind<IBus>.Create(CreateRegistrationContext(provider)));            collection.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusRegistrationContext>>().Value);            collection.TryAdd(ServiceDescriptor.Singleton(typeof(IReceiveEndpointDispatcher<>), typeof(ReceiveEndpointDispatcher<>)));            collection.AddSingleton<IReceiveEndpointDispatcherFactory>(provider =>            {                var registrationContext = provider.GetRequiredService<Bind<IBus, IBusRegistrationContext>>().Value;                var busInstance = provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value;                return new ReceiveEndpointDispatcherFactory(registrationContext, busInstance);            });        }         protected ServiceCollectionBusConfigurator(IServiceCollection collection, IContainerRegistrar registrar)            : base(collection, registrar)        {            AddMassTransitComponents(collection);        }        public virtual void AddBus(Func<IBusRegistrationContext, IBusControl> busFactory)        {            SetBusFactory(new RegistrationBusFactory(busFactory));        }        public virtual void SetBusFactory<T>(T busFactory)            where T : IRegistrationBusFactory        {            if (busFactory == null)                throw new ArgumentNullException(nameof(busFactory));            ThrowIfAlreadyConfigured(nameof(SetBusFactory));            this.AddSingleton(provider => Bind<IBus>.Create(CreateBus(busFactory, provider)));            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value);            this.AddSingleton<IReceiveEndpointConnector>(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value);            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value.BusControl);            this.AddSingleton(provider => provider.GetRequiredService<Bind<IBus, IBusInstance>>().Value.Bus);            Registrar.RegisterScopedClientFactory();        }

Регистрация MassTransit: автоматическая vs ручная конфигурация

Разбирать всю систему регистрации в MassTransit глубоко не имеет большого смысла в рамках этого разбора, так как она включает множество уровней абстракции и внутренних механизмов, а детальное погружение существенно бы затянуло материал.

Например, BusRegistrationContext используется в основном в сценарии ConfigureEndpoints, где MassTransit автоматически генерирует топологию: создаёт очереди, группирует consumers и настраивает их подключение к endpoint’ам.

В то же время при использовании ReceiveEndpoint конфигурация становится полностью ручной — разработчик сам определяет очередь и явно подключает consumers. В этом случае автоматическая логика ConfigureEndpoints не применяется, так как endpoint уже управляется напрямую.

Из-за наличия двух параллельных моделей конфигурации (автоматической и ручной), а также большого количества внутренних правил и исключений, углубляться во все детали в рамках текущего материала нецелесообразно, чтобы не перегружать и не затягивать разбор.

Configuration и Configurators

В MassTransit архитектура разделяет процесс настройки системы на два уровня: Configurator и Configuration.

Configurator относится к уровню построения pipeline обработки сообщений и основан на модели GreenPipes. Он отвечает за формирование цепочки middleware, добавление фильтров и определение порядка выполнения логики обработки. На этом уровне создаётся исполнительная модель в виде IPipe, которая определяет поведение обработки сообщений.

Configuration относится к инфраструктурному уровню и отвечает за создание и настройку транспортной части системы. Он определяет receive endpoint, конфигурацию очередей, exchange, topology и связывает транспорт с обработкой сообщений. На этом уровне формируется сама точка входа для сообщений и её интеграция с брокером.

Таким образом, Configuration отвечает за то, что создаётся в системе (endpoint и транспорт), а Configurator — за то, как выполняется обработка сообщений внутри этого endpoint.

Configurators и configuration-объекты — это build-time сущности, которые используются для построения execution pipeline и state machine. Они, как правило, не участвуют в runtime execution, но могут оставаться в памяти, если удерживаются через GC roots (DI container, замыкания или lifecycle host’a).

Пример

Пример
 public class RabbitMqHostConfiguration :        BaseHostConfiguration<IRabbitMqReceiveEndpointConfiguration, IRabbitMqReceiveEndpointConfigurator>,        IRabbitMqHostConfiguration    {        readonly IRabbitMqBusConfiguration _busConfiguration;        readonly Recycle<IConnectionContextSupervisor> _connectionContext;        readonly IRabbitMqBusTopology _topology;        RabbitMqHostSettings _hostSettings;        public RabbitMqHostConfiguration(IRabbitMqBusConfiguration busConfiguration, IRabbitMqTopologyConfiguration topologyConfiguration)            : base(busConfiguration)        {            _busConfiguration = busConfiguration;            _hostSettings = new ConfigurationHostSettings            {                Host = "localhost",                VirtualHost = "/",                Port = 5672,                Username = "guest",                Password = "guest"            };            var messageNameFormatter = new RabbitMqMessageNameFormatter();            _topology = new RabbitMqBusTopology(this, messageNameFormatter, _hostSettings.HostAddress, topologyConfiguration);            ReceiveTransportRetryPolicy = Retry.CreatePolicy(x =>            {                x.Handle<ConnectionException>();                x.Handle<MessageNotConfirmedException>(exception =>                    exception.Message.Contains("CONNECTION_FORCED")                    || exception.Message.Contains("End of stream")                    || exception.Message.Contains("Unexpected Exception"));                x.Ignore<AuthenticationFailureException>();                x.Exponential(1000, TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(3));            });            _connectionContext = new Recycle<IConnectionContextSupervisor>(() => new ConnectionContextSupervisor(this, topologyConfiguration));        }        public IConnectionContextSupervisor ConnectionContextSupervisor => _connectionContext.Supervisor;        public override Uri HostAddress => _hostSettings.HostAddress;        public bool PublisherConfirmation => _hostSettings.PublisherConfirmation;        public BatchSettings BatchSettings => _hostSettings.BatchSettings;        IRabbitMqBusTopology IRabbitMqHostConfiguration.Topology => _topology;...}

Runtime-инфраструктура MassTransit

При запуске приложения MassTransit создает долгоживущую инфраструктуру, которая работает весь lifetime сервиса. Устанавливаются и поддерживаются соединение с RabbitMQ (IConnection) и канал (IModel) для коммуникации с брокером.

Если в приложении зарегистрированы consumer’ы, MassTransit создаёт один или несколько receive endpoints:

  1. Системный для задач шины (например, “DESKTOPDTE2RH7_TestingOnWebAPIWithDI_bus_ib3oyydsm7ymbtaibdxj7iiqyb”) — управляет инфраструктурой MassTransit

  2. Пользовательский (например, “something”) — обрабатывает бизнес-сообщения через привязанный Consumer

Пример

Пример

В рамках transport pipeline каждого endpoint выполняется RabbitMqConsumerFilter, который является частью транспортного уровня. Он создаёт RabbitMqBasicConsumer и регистрирует его через BasicConsume в RabbitMQ.Client, подписываясь на очередь. Таким образом формируется связь между RabbitMQ и MassTransit.

public class RabbitMqConsumerFilter :        IFilter<ModelContext>    {        readonly RabbitMqReceiveEndpointContext _context;        string _consumerTag;        public RabbitMqConsumerFilter(RabbitMqReceiveEndpointContext context)        {            _context = context;            _consumerTag = "";        }        void IProbeSite.Probe(ProbeContext context)        {        }        async Task IFilter<ModelContext>.Send(ModelContext context, IPipe<ModelContext> next)        {            var receiveSettings = context.GetPayload<ReceiveSettings>();           ** var consumer = new RabbitMqBasicConsumer(context, _context);**            _consumerTag = await context.BasicConsume(                receiveSettings.QueueName,                receiveSettings.NoAck,                _context.ExclusiveConsumer,                receiveSettings.ConsumeArguments,                consumer,                _consumerTag            ).ConfigureAwait(false);            await consumer.Ready.ConfigureAwait(false);            _context.AddConsumeAgent(consumer);            await _context.TransportObservers.NotifyReady(_context.InputAddress).ConfigureAwait(false);            try            {                await consumer.Completed.ConfigureAwait(false);            }            finally            {                RabbitMqDeliveryMetrics metrics = consumer;                await _context.TransportObservers.NotifyCompleted(_context.InputAddress, metrics).ConfigureAwait(false);                LogContext.Debug?.Log(                    "Consumer completed {ConsumerTag}: {DeliveryCount} received, {ConcurrentDeliveryCount} concurrent",                    metrics.ConsumerTag,                    metrics.DeliveryCount,                    metrics.ConcurrentDeliveryCount                );            }            await next.Send(context).ConfigureAwait(false);        }    }
 public class RabbitMqModelContext :        ScopePipeContext,        ModelContext,        IAsyncDisposable    {        readonly CancellationToken _cancellationToken;        readonly PendingConfirmationCollection _confirmations;        readonly ConnectionContext _connectionContext;        readonly ChannelExecutor _executor;        readonly IModel _model;        readonly IPublisher _publisher;...Task<string> ModelContext.BasicConsume(string queue, bool noAck, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer,            string consumerTag)        {            return _executor.Run(() => _model.BasicConsume(consumer, queue, noAck, consumerTag, false, exclusive, arguments), CancellationToken);        }...}

Каждый endpoint связывает очередь RabbitMQ, транспортный слой и pipeline обработки.

Распределение и обработка сообщений по pipeline’ам происходит уже внутри MassTransit transport layer через ReceivePipeDispatcher и ReceivePipe, которые запускают middleware pipeline и consumer pipeline соответствующего receive endpoint’а.

Ключевые концепции потребления сообщения

Стоит помнить, что обработка сообщения в MassTransit на первый взгляд выглядит единообразной, но фактически делится на два уровня: Transport Layer и Consumer Layer

Transport Layer работает с ReceiveContext, который содержит сырое сообщение и транспортные заголовки (например, routing key). На этом этапе бизнес-модель ещё не сформирована.

Далее сообщение проходит через ReceivePipeLine. В DeserializerFilter происходит ключевой момент: из ReceiveContext формируется ConsumeContext, и обработка переходит в Consumer Layer.

public class DeserializeFilter :        IFilter<ReceiveContext>    {        readonly IPipe<ConsumeContext> _output;        readonly ISerialization _serializers;        public DeserializeFilter(ISerialization serializers, IPipe<ConsumeContext> output)        {            _serializers = serializers;            _output = output;        }        public void Probe(ProbeContext context)        {            var scope = context.CreateFilterScope("deserialize");            _serializers.Probe(scope);            _output.Probe(scope);        }        [DebuggerNonUserCode]        public async Task Send(ReceiveContext context, IPipe<ReceiveContext> next)        {            if (!context.TryGetPayload(out ConsumeContext consumeContext))                consumeContext = _serializers.GetMessageDeserializer(context.ContentType).Deserialize(context);            Activity.Current?.AddConsumeContextTags(consumeContext);            await _output.Send(consumeContext).ConfigureAwait(false);            await next.Send(context).ConfigureAwait(false);            await consumeContext.ConsumeCompleted.ConfigureAwait(false);        }    }

С этого момента уже работает бизнес-логика — consumers, sagas и ConsumePipeLine.

Pipeline обработки сообщения в MassTransit

Сообщение поступает из RabbitMQ через RabbitMqBasicConsumer в ReceivePipeDispatcher, где создаётся ReceiveContext.

Далее оно проходит transport pipeline (ReceivePipe) с базовыми проверками и обработкой ошибок: Rescue и DeadLetter активируются только при сбоях или невозможности обработки.

Затем DeserializeFilter преобразует сообщение в типизированный ConsumeContext.

Пример

Пример

ReceivePipeDispatcher — оркестратор входящих сообщений

Сообщение пришло из RabbitMQ. Но просто так его обработать нельзя — нужна строгая последовательность действий. Вот здесь и появляется ReceivePipeDispatcher.

Сначала он получает ReceiveContext — это сообщение в его “сыром” виде.

Затем он уведомляет наблюдателей о событии PreReceive — это точка, где можно подцепить любую глобальную логику мониторинга или аудита, которую предусмотрел Крисс Паттерсон.

Сообщение отправляется в ReceivePipe — дальше в конвейер.

После того как ReceivePipe завершит работу, диспетчер ждёт события ReceiveCompleted, чтобы убедиться, что вся асинхронная обработка закончилась.

Если всё прошло успешно, диспетчер явно говорит транспорту: “Сообщение обработано, можно удалить из очереди”.

И наконец, уведомляет наблюдателей о событии PostReceive — обработка завершена успешно.

public class ReceivePipeDispatcher :        IReceivePipeDispatcher    {        readonly string _activityName;        readonly IHostConfiguration _hostConfiguration;        readonly ReceiveObservable _observers;        readonly IReceivePipe _receivePipe;        int _activeDispatchCount;        long _dispatchCount;        int _maxConcurrentDispatchCount;...public async Task Dispatch(ReceiveContext context, ReceiveLockContext receiveLock = default)        {            LogContext.SetCurrentIfNull(_hostConfiguration.ReceiveLogContext);            var active = StartDispatch();            StartedActivity? activity = LogContext.IfEnabled(_activityName)?.StartReceiveActivity(context);            try            {                if (_observers.Count > 0)                    await _observers.PreReceive(context).ConfigureAwait(false);                if (receiveLock != null)                    await receiveLock.ValidateLockStatus().ConfigureAwait(false);                await _receivePipe.Send(context).ConfigureAwait(false);                await context.ReceiveCompleted.ConfigureAwait(false);                if (receiveLock != null)                    await receiveLock.Complete().ConfigureAwait(false);                if (_observers.Count > 0)                    await _observers.PostReceive(context).ConfigureAwait(false);            }            catch (Exception ex)            {                if (_observers.Count > 0)                    await _observers.ReceiveFault(context, ex).ConfigureAwait(false);                if (receiveLock != null)                {                    try                    {                        await receiveLock.Faulted(ex).ConfigureAwait(false);                    }                    catch (Exception releaseLockException)                    {                        throw new AggregateException("ReceiveLock.Faulted threw an exception", releaseLockException, ex);                    }                }                throw;            }            finally            {                activity?.Stop();                await active.Complete().ConfigureAwait(false);            }        }...}

ChannelExecutor

При работе с транспортным уровнем MassTransit и RabbitMQ становится видно, что ChannelExecutor добавлен не случайно.

В основе модели лежит понятие shared protocol state. В случае RabbitMQ.Client таким состоянием является TCP-соединение, внутри которого создаются каналы (IModel / IChannel).

Все каналы используют один общий поток записи (frame writer), поэтому IChannel нельзя безопасно использовать из нескольких потоков одновременно — параллельная запись приводит к нарушению AMQP framing и неконсистентности протокольного состояния. Отсюда соединение может быть закрыто, потеряны незавершенные операции.

Пример

Пример

https://www.rabbitmq.com/client-libraries/dotnet-api-guide#concurrency

Pipeline отправки сообщения в MassTransit

Процесс публикации начинается с вызова метода Publish — точки входа в pipeline отправки сообщений. Он делегирует выполнение в PublishInternal, где формируется PublishContext и подключаются middleware.

На уровне логики маршрутизации сообщение проходит через SendEndpointProxy и CachedSendEndpoint, которые кэшируют и переиспользуют endpoint’ы для оптимизации производительности. Затем управление попадает в SendEndpoint, где запускается основной send pipeline.

Транспортный уровень представлен RabbitMqSendTransport — он интегрирует MassTransit с RabbitMQ и подготавливает сообщение к отправке. На этом уровне используется ConnectionContextSupervisor для управления жизненным циклом соединений и оборачивания отправки в механизм retry (HostConfigurationRetryExtensions), обеспечивая устойчивость к временным сбоям.

Для отправки сообщения используется в MassTransit тот же ChannelExecutor чтоб не сломать AMQP Framing.

Через PipeContextSupervisor создаётся и передаётся ModelContext, представляющий RabbitMQ канал (IModel). На этапе SendPipe сообщение окончательно преобразуется: сериализуется в байты, формируются IBasicProperties, заголовки и routing key.

Вызов проходит через несколько обёрток ModelContext (SharedModelContext, ScopeModelContext, RabbitMqModelContext), которые управляют областью жизни канала и ресурсами.

Финальный шаг — ImmediatePublisher.Publish, где вызывается BasicPublish через ChannelExecutor, гарантирующий потокобезопасную работу с каналом. Здесь сообщение физически отправляется в RabbitMQ exchange. При необходимости используется механизм publisher confirms для подтверждения доставки от брокера.

Пример

Пример

Sagas

Главный паттерн StateMachine, по сути говоря любая “машина состояний” — это правила и они держатся на 4ех основных компонентах, которые работают как единое целое:

  1. States — Объекты State

  2. Events — Типизированные сообщения

  3. Transitions — явные правила(During/When)

  4. Activities — явные действия

Яркое сравнение можно получить с AsyncStateMachine, где математическая модель одна и та же — конечный автомат.

Пример

Пример

На месте States у нас числа состояний т.е 0,1,2… На месте Events там завершение через await На месте Transitions у нас порядок кода где будет меняться состояние На месте Activities тело метода

SagaStateMachine и FSM

SagaStateMachine и любая StateMachine представляет собой FSM т.е Finite State Machine.

У любой state machine всегда есть и будут основные понятия, как State, Event/Input, Transition, Activity/Action

Пример

Пример

https://en.wikipedia.org/wiki/Finite-state_machine

И это по сути математическая модель, описывающая систему, которая находится ровно в одном состоянии из конечного множества в любой момент времени. Система меняет состояние (переход) под воздействием событий или входных сигналов. FSM используется для управления логикой программ, ИИ в играх и проектирования схем

Представлять её можно ввиде Графа состояний т.е

  1. Вершины — состояния(Pending/Completed)

  2. Ребра — переходы(OrderPaid)

Допустим Pending – (OrderPaid) – Completed

Ключевая идея: Текущее состояние + Событие → Переход → Новое состояние + Действие

Когда система получает событие или входной сигнал, она может:

  • остаться в текущем состоянии

  • выполнить действие

  • перейти в другое состояние

Так же AsyncStateMachine являются по сути DFA т.е Deterministic Finite Automaton. В каждой ситуации путь только один.

Пример

Пример

https://en.wikipedia.org/wiki/Finite-state_machine

Sagas внутри MassTransit

Касательно паттерна Сага в экосистеме MassTransit важно понимать архитектурное разделение на два уровня. Если мы посмотрим на исходный код или проект, то увидим папки Sagas и SagaStateMachine.

Смысл этого разделения фундаментален: Первый слой: Sagas — это «обвязка». Он отвечает за подключение паттерна к основному движку MassTransit (GreenPipes), работу с репозиториями и получение сообщений из очереди. Второй слой: SagaStateMachine — это уже сама логика оркестрации, которая реализует собственный внутренний Pipeline внутри оболочки саги.

Пример

Пример

SagaStateMachine

Если углубиться в устройство StateMachine, то мы увидим знакомые элементы архитектуры GreenPipes, но в специфических обёртках:

  • Accessors отвечают за доступ к данным конкретного экземпляра саги(может получить состояние саги, изменить).

  • Activities наши бизнес-шаги, аналогичные фильтрам (Filters) в стандартном в стандартном конвейере.

  • Behaviours управляют порядком выполнения Activities, работая как контейнеры (Pipes). Вместо привычных нам PipeBuilders, сборку всего процесса здесь выполняет компонент Binders.

  • Binders — это механизм, который превращает декларативный DSL (When/Then/TransitionTo) в исполняемый pipeline (Behavior).

  • Correlation правило сопоставления сообщения с экземпляром саги, которое определяет, как по данным сообщения найти или создать соответствующую сагу в хранилище.

Внутри StateMachine в MassTransit лежит граф состояний — декларативная модель, описывающая жизненный цикл процесса.

Пример

Пример

По сути говоря наша машина состояний здесь и основная фишка MassTransit при работе с Sagas вот в этом замечательном и простом DSL

Пример

Пример

States в SagaStateMachine

Если заглянуть внутрь State, то можно заметить следующие Property, а это Enter/Leave/BeforeEnter/AfterLeave

Принцип работы следующий: BeforeEnter → SetState → AfterLeave → Enter

По сути говоря Крисс Паттерсон хотел выполнять логику не только на событии, но и на самом переходе состояния и для этого он сделал lifecycle хуки. И они используются для Transitions т.е переходов состояния.

В распределенных системах критично понимать, когда состояние ещё старое, когда оно уже изменено и в какой момент безопасно сделать side-effects(например, отправку сообщения)

Чуть позже мы увидим переходы этих состояний в TransitionActivity.cs файле

public interface State :        IVisitable,        IComparable<State>    {        string Name { get; }               Event Enter { get; }                Event Leave { get; }               Event<State> BeforeEnter { get; }                Event<State> AfterLeave { get; }    }

SagaStateMachine и Graphs

Если вы начнёте изучать исходники MassTransit, почти неизбежно вы придёте к теме саг. И довольно неожиданно можете обнаружить там графы.

На первый взгляд это выглядит странно: зачем машине состояний вообще нужен граф? Но дело в том, что он там не случайно.

Автор проекта, Крис Патерсон, добавил это осознанно. Хотя сама SagaStateMachine отлично работает без какой-либо визуальной модели, в реальных сценариях она может становиться достаточно сложной: количество состояний, событий и переходов растёт, и удерживать всю логику в голове становится затруднительно.

В таких случаях граф выступает как инструмент “второго взгляда” на систему. Он не участвует в выполнении логики, но позволяет визуализировать её — будь то текстовое представление или диаграмма в Mermaid.

По сути, это вспомогательный слой, который делает сложную конечную автоматную логику более прозрачной и удобной для понимания, отладки и коммуникации внутри команды.

Пример

Пример

Обработка Sagas

Как мы уже знаем, что вначале у нас идет ReceivePipe т.е обработка через транспорт и в данном случае нам он мало интересен.

Ключевой момент тут в том, что у Sagas имеется CorrelationIdMessageFilter внутри ConsumePipe, который создает ProxyContext и прокидывает его дальше по пайплайну, который приводит к CorrelatedSagaFilter.

Пример

Пример
Пример

Пример

Здесь можно сказать начало обработки нашей Saga. И начинается обработка с SagaRepository, где если провалится чуть глубже т.е через итерацию — мы попадаем EntityFrameworkSagaRepositoryContextFactory, где у нас стартует транзакция с isolationLevel = ReadCommited.

Пример

Пример
Пример

Пример

Поскольку это все PipeLine, то мы вконце-концов сюда после обработки вернемся и будет произведен Commit транзакции либо Rollback

После созданной транзакции, у нас начинается обработка отдельного pipeline с DbContextSagaRepositoryContext с SendSagaPipe

В SendSagaPipe мы сталкиваемся с такой вещью как policy. Если вкратце то это сценарий обработки сообщения в контексте Sagas. В первую очередь проверяется имеется ли запись с данной SagaInstance в БД Saga через DbContextSagaRepository, поскольку включен OptimisticLocking(в нашем примере), то делается запрос с SingleOrDefaultAsync от Ef Core, но ничего не находит т.к мы только инициализируем нашу Sagas…

Пример

Пример

Поскольку мы регистрируем новое сообщение, которое ранее не фигурировала в хранилище у БД, то применяется MissingPolicy т.е MissingSagaPipe.

Пример

Пример

Мы попадаем в фильтр машины состояний, где у нас меняются названия компонентов(вместо Pipes теперь Behaviours), но это все тот же привычный нам Pipeline в другой обертке. Вызываем ивент с behaviourContext и попадаем в ядро машины состояний Saga т.е MassTransitStateMachine.

Пример

Пример

Вконце концов нам надо получить Accessor т.е это тот слой, который говорит где хранится state и как его читать/менять. Мы попадаем InitialIfNullStateAccessor(могут быть и другие, но не о них).

Наше состояние равно null, отсюда надо инициализировать state machine instance до нормального состояния runtime

Здесь мы уже встречаемся с концепции State Machine, а это Activity(в данном случае TransitionActivity). Один из самых важных слоев state machine т.к именно здесь живет исполняемая логика. Activities отвечают за шаги выполнения внутри state machine (behaviour pipeline).

Пример

Пример

Каждая Activity делает Execute вызывая next, либо Faulted обрабатывая ошибки (или прокидывая).

Метод Transition работает с состоянием(ранее обсуждали что у State есть переходы внутренние)

  1. проверка текущего состояния

  2. Leave т.е выход из состояния

  3. BeforeEnter т.е логика перед переходом в новое состояние

  4. Установка нового состояния

  5. After Leave логика после перехода из предыдущего состояния.

  6. Enter — для всей иерархии нового состояния

Каждый TransitionTo не выполняется сразу, а превращается в TransitionActivity.

Пример

Пример

Он не просто меняет состояние, а делает целый Lifesycle переход т.е

  1. Создание Saga (Initial transition) null → Initially Когда приходит первое сообщение, то создается Saga, устанавливается начальное состояние Initially(создалась Saga), затем переход в Started

  2. Первый переход состояния Initially → Started Происходит выполнение Then(…), запуск TransitionActivity,смена состояния на Started

  3. Дальнейшие переходы Started → Updated → Processing → Processed Каждый переход выполняется через TransitionActivity проходит полный lifecycle (leave — beforeEnter — enter — afterleave) и вконце концов фиксация состояния в Saga Storage

Все крутится в цикле до тех пор пока не будет завершена логика переходов предусмотренная настройкой MassTransitStateMachine.

И закомичена транзакция.

Основные методы обработки

Когда ты пишешь During(Started, When(SomethingReceived)…, ты фактически добавляешь запись в словарь внутри состояния: для состояния Started и события SomethingReceived создаётся behavior (цепочка из Then, TransitionTo и т.д.). Именно этот behavior потом находится через behaviors.TryGetValue(…) и выполняется через Execute

Initially — это то же самое, только для начального состояния (Initial).

DuringAny — это глобальные обработчики, они не лежат внутри конкретного состояния.

BeforeEnter, Enter, AfterLeave — это обычные события, которые MassTransit сам генерирует при переходах между состояниями.

Когда происходит TransitionTo(Started), внутри вызываются события вроде Started.BeforeEnter, и если ты где-то описал When(Started.BeforeEnter), для него тоже будет создан behavior.

В итоге Execute вызывается только если для текущего события найден behavior. Он может быть либо прямо в текущем состоянии (через During/Initially), либо найден через fallback (например, из DuringAny).

Пример

Пример
Пример

Пример

Разница тут в вызовах заключается в наличии payload

Пример

Пример
Пример

Пример

Особенность Saga как паттерна

Поскольку состояние саги хранится в базе данных, а сообщения поступают через брокер, при обработке каждого сообщения MassTransit загружает сагу по CorrelationId и выполняет соответствующий behavior для текущего состояния.

Если сервис падает до подтверждения обработки (ack), сообщение возвращается в очередь и будет доставлено повторно после перезапуска. При этом MassTransit не восстанавливает промежуточное выполнение внутри behavior, а всегда начинает обработку заново, исходя из последнего зафиксированного состояния саги в базе данных.

При последующих сообщениях с тем же CorrelationId обработка всегда начинается с текущего сохранённого состояния. Если дальнейших переходов не определено (например, достигнут финальный state), сообщение будет проигнорировано или обработано как не имеющее действия.

Таким образом, поведение саги полностью определяется значением CurrentState в базе данных, а не частично выполненными шагами внутри обработки сообщения.

Пример

Пример

Лучший простой пример Sagas Orchestration

Мы увидели подробно реализацию Sagas в MassTransit, она ни капли не простая в реализации, а сама идея простая, отсюда для иллюстрации будет уместным взглянуть как это делается в других фреймворках.

В более легкий показательный пример хочется привести пример Sagas от команды devmentors. Проще говоря это небольшая IT-компания и одновременно образовательный проект для разработчиков из Польши, города Краков.

Пример

Пример

Их библиотека Sagas фигурировала в одном из роликов на ютубе в проекте DNC-DShop, который всем доступен в интернете. Внутри, для демонстрации работы Sagas была написана отдельная библиотека под названием “Chronicle”.

Пример

Пример
Пример

Пример

И чтоб лучше понять как работают саги они сделали библиотеку именно по способу manual control flow engine т.е ты сам контролируешь:

  1. Когда создается Saga

  2. Как ищется Saga

  3. как вызываются handlers

  4. как делаются compensations

Пусть вас не сбивает столку что Chronicle принадлежит некоему snatch.dev github аккаунту, ведь это по сути своей дополнительный аккаунт для либ DevMentors, а так же у них одноименный Github Account, где присутствует та же либа Trill.Saga, где и внутри неё Chronicle так же.

Как работать с их библиотекой я не вижу смысла показывать. Все довольно просто — сообщения в брокер, их десериализация, вызов нужного Event’а Saga. Это та же знакомая оркестрация, только выглядит чутка иначе. Она выполняет функцию Sagas, но меньше обвязок и возможностей из коробки. Трудностей возникнуть не должно.

Почему так же их команда заслуживает внимания? Среди них есть ex-Microsoft MVP — Piotor Gankiewicz. Талантливый разработчик и архитектор.

Пример

Пример

Пример на NServiceBus

Ранее мы рассмотрели простой пример реализации Sagas на библиотеке DevMentors с использованием Chronicle, где отсутствуют дополнительные инфраструктурные обвязки, и основной акцент сделан на демонстрации принципов работы. Теперь перейдём к решению enterprise-уровня — NServiceBus, который можно сравнить по значимости и зрелости с MassTransit. При рассмотрении этой библиотеки становится заметно, что она менее “гибкая” и декларативная по сравнению с MassTransit и его SagaStateMachine. В NServiceBus модель Sagas более приземлённая и строгая: разработчик работает ближе к уровню сообщений и явного управления состоянием, без высокоуровневой абстракции state machine, характерной для MassTransit.

Пример

Пример
Пример

Пример
Пример

Пример
Пример

Пример
Пример

Пример

Какой вывод касаемо Sagas?

Saga Orchestration из практики = Message broker + state + correlation + workflow orchestration + compensation + eventual consistency.

  1. Saga работает поверх message broker’a, который является основой распределённого взаимодействия между сервисами.

  2. Используется event-driven или command-driven модель, где events — это события, фиксирующие факт того, что что-то произошло, а commands — это команды, инициирующие выполнение действия т.е CQRS и Sagas связаны

  3. Saga является stateful workflow и хранит состояние бизнес-процесса между шагами выполнения.

  4. Корреляция обеспечивает связь каждого сообщения с конкретной сагой, без чего невозможно определить, какой процесс необходимо продолжить.

  5. Оркестрация процесса заключается в том, что Saga выступает координатором и управляет последовательностью выполнения шагов.

  6. В системе отсутствуют распределённые транзакции, и вместо этого каждый шаг выполняется как отдельная локальная транзакция с моделью eventual consistency.

  7. Компенсация применяется в случае ошибки шага, при этом запускаются заранее определённые компенсирующие действия, реализуемые вручную.

  8. Idempotency означает, что обработчики должны корректно обрабатывать повторные сообщения, для чего используются механизмы optimistic concurrency control, pessimistic locking или concurrency tokens.

  9. Retry и fault handling обеспечивают повторные попытки выполнения при временных ошибках, отправку сообщений в DLQ и переход в compensation при фатальных ошибках.

  10. Persisted workflow state означает, что состояние Saga хранится во внешнем хранилище, обычно в базе данных, для возможности восстановления и продолжения процесса.

    Пример

    Пример

Подведем итоги

Как мы увидели, MassTransit представляет собой полноценный фреймворк для построения распределённых систем. Он гибкий, расширяемый и предоставляет все необходимые инструменты для enterprise-разработки. Отдельного внимания заслуживает реализация Sagas. Она выполнена на высоком уровне и выгодно отличается от многих альтернатив благодаря встроенному DSL, который значительно упрощает описание бизнес-процессов. Также в MassTransit присутствует поддержка различных механизмов инфраструктурного уровня: pipeline-фильтров, различных брокеров сообщений и разнообразных persistence-хранилищ. Фактически фреймворк берёт на себя большую часть технической сложности, позволяя разработчику сосредоточиться на бизнес-логике и минимальной конфигурации. Несмотря на то, что начиная с версии 8.0.0 часть функциональности стала платной, MassTransit по-прежнему остаётся одним из наиболее сильных решений для построения распределённых и сложных систем. При этом важно понимать, что ключевые концепции, лежащие в его основе, включая Sagas, опираются на достаточно простые принципы, однако доведены до уровня зрелого и инженерно выверенного решения благодаря работе Криса Патерсона и сообщества.

На этом все, спасибо за внимание 🙂

Пример

Пример

ссылка на оригинал статьи https://habr.com/ru/articles/1033712/