Опыт использования MassTransit 3.0

MassTransit это open source библиотека, разработанная на языке C# для .NET платформы, упрощающая работу с шиной данных, которая используется при построении распределенных приложений и реализации SOA (service oriented architecture).

В качестве message broker могут выступать RabbitMq, Azure Service Bus или In-Memory менеджер (в случае с In-Memory область видимости ограничивается процессом, в котором проинициализирован экземпляр).

Содержание:

Команды и события

В библиотеке заложено 2 основных типа сообщений: команды и события.

Команды

Сигнализируют о необходимости выполнить некое действие. Для наиболее содержательного наименования команды желательно использовать структуру глагол + существительное: EstimateConnection, SendSms, NotifyCustomerOrderProcessed.

Работа с командами осуществляется с помощью метода Send (интерфейса ISendEndpoint) и указания получателя endpoint (очереди):

Отправка команды

private static async Task SendSmsCommand(IBus busControl) {    var command = new SendSmsCommand    {        CommandId = Guid.NewGuid(),        PhoneNumber = "89031112233",        Message = "Thank you for your reply"    };     var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint);    await endpoint.Send(command); } 

События

Сигнализируют о случившемся событии, которое может быть интересно некоему набору подписчиков (паттерн Observer), которые на эти события реагируют, например: ConnectionEstimated, CallTerminated, SmsSent, CustomerNotified.

Работа с событиями осуществляется с помощью метода Publish (интерфейса IPublishEndpoint).

В терминологии заложено и основное различие этих типов сообщений — команда доставляется единственному исполнителю (дабы избежать дублирования выполнения):


Изображение из статьи MassTransit Send vs. Publish

В то время как событие ориентировано на оповещение n-подписчиков, каждый из которых реагирует на случившееся событие по-своему.


Изображение из статьи MassTransit Send vs. Publish

Иными словами, при запущенных n-консьюмеров (от англ. consumer — потребитель, обработчик), обрабатывающих команду, после её публикации только один из них получит сообщение о ней, в то время как сообщение о событии получит каждый.

Контракты сообщений

Согласно документации MassTransit, при объявлении контрактов сообщений рекомендуется прибегать к интерфейсам:

Контракт : команда на отправку СМС сообщения

public interface ISendSms { 	Guid CommandId { get; } 	string PhoneNumber { get; } 	string Message { get; } } 

Как упоминалось ранее, отправка команд должна осуществляться исключительно с помощью метода Send (интерфейса IBus) и указания адресата (endpoint).

Контракт: событие об успешной отправке СМС сообщения

public interface ISmsSent { 	Guid EventId { get; } 	DateTime SentAtUtc { get; }	 } 

События отправляются с помощью метода Publish.

Роутинг

Как распределение сообщений по exchange, так и выбор консьюмеров (о них в этой статье будет рассказано чуть позже) для обработки базируются на runtime типах этих сообщений,- в наименовании используются namespace и имя типа, в случае с generic имя родительского типа и перечень аргументов.

Exchange

При конфигурации receive endpoint‘a (подключении ранее зарегистрированных консьюмеров) в случае использования в качестве канала доставки сообщений RabbitMq на основании указанных к обработке консьюмерами типов сообщений формируются наименования требуемых exchange, в которые затем эти самые сообщения и будут помещаться.

Аналогичные действия на этапе конфигурации send endpoint‘a выполняются и для команд, для отправки которых также требуются собственные exchange.

На изображении можно увидеть созданные в рамках моего сценария exchange:

В том случае, если конфигурируя receive endpoint мы указываем наименование очереди:

cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container)); 

то в привязках exchange сообщений можно будет увидеть следующую картину:

Итоговый путь сообщения, тип которого имплементирует ISmsEvent, будет иметь следующий вид:

Если же конфигурация осуществляется без указания очереди:

cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container)); 

То имена для последнего exchange и очереди формируются автоматически, а по завершению работы они будут удалены:

Формат сообщения

Говоря о формате сообщения, хотелось бы подробнее остановиться на наименовании (или messageType). За его формирование (заголовков urn:message:) ответственна функция GetUrnForType(Type type). Для примера я добавил для команды ISendSms наследование от ICommand и generic тип:

Контракт : команда на отправку СМС сообщения ICommand<string>

public interface ICommand<T> { }  public interface ISendSms<T> : ICommand<T> {    Guid CommandId { get; }    string PhoneNumber { get; }    string Message { get; } }  class SendSmsCommand : ISendSms<string> {    public Guid CommandId { get; set; }    public string PhoneNumber { get; set; }    public string Message { get; set; } } 

Сформированное сообщение в таком случае будет содержать следующее значение в поле messageType (на основании которого после получения сообщения затем и выбирается ответственный за обработку консьюмер):

"messageType": [     "urn:message:PlayWithMassTransit30.Extension:SendSmsCommand",     "urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]",     "urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]" ] 

Кроме messageType сообщение содержит информацию о host, которым оно было отправлено:

"host": {     "machineName": "DESKTOP-SI9OHUR",     "processName": "PlayWithMassTransit30.vshost",     "processId": 1092,     "assembly": "PlayWithMassTransit30",     "assemblyVersion": "1.0.0.0",     "frameworkVersion": "4.0.30319.42000",     "massTransitVersion": "3.4.1.808",     "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0" } 

Значимую часть payload:

"message": {     "commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e",     "phoneNumber": "89031112233",     "message": "Thank you for your reply" } 

и иные служебные поля и заголовки.

Консьюмеры (Consumer)

Консьюмер — это класс, который обрабатывает один или несколько типов сообщений, указываемых при объявлении в наследовании интерфейса IConsumer, где T это тип обрабатываемого данным консьюмером сообщения.

Пример консьюмера, обрабатывающего команду ISendSms и публикующего событие ISmsSent:

SendSmsConsumer: обработчик команды на отправку сообщения

public class SendSmsConsumer : IConsumer<ISendSms<string>> {    public SendSmsConsumer(IBusControl busControl)    {        _busControl = busControl;    }     public async Task Consume(ConsumeContext<ISendSms<string>> context)    {        var message = context.Message;         Console.WriteLine($"[IConsumer<ISendSms>] Send sms command consumed");        Console.WriteLine($"[IConsumer<ISendSms>] CommandId: {message.CommandId}");        Console.WriteLine($"[IConsumer<ISendSms>] Phone number: {message.PhoneNumber}");        Console.WriteLine($"[IConsumer<ISendSms>] Message: {message.Message}");         Console.Write(Environment.NewLine);        Console.WriteLine("Публикация события: Смс сообщение отправлено");        await _busControl.SmsSent(DateTime.UtcNow);    }     private readonly IBus _busControl; } 

После того, как мы получили команду на отправку смс сообщения и выполнили требуемые действия, мы формируем и отправляем событие о том, что смс доставлено.

Код отправки сообщений я вынес в отдельный Extension класс над IBusControl, там же находится и имплементация самих сообщений:

Методы расширения над IBus для инкапсуляции логики межсистемного взаимодействия

public static class BusExtensions {    /// <summary>    /// Отправка смс сообщения    /// </summary>    /// <param name="bus"></param>    /// <param name="host"></param>    /// <param name="phoneNumber"></param>    /// <param name="message"></param>    /// <returns></returns>    public static async Task SendSms(        this IBus bus, Uri host, string phoneNumber, string message    )    {        var command = new SendSmsCommand        {            CommandId = Guid.NewGuid(),            PhoneNumber = phoneNumber,            Message = message        };         await bus.SendCommand(host, command);    }     /// <summary>    /// Публикация события об отправке смс сообщения    /// </summary>    /// <param name="bus"></param>    /// <param name="sentAtUtc"></param>    /// <returns></returns>    public static async Task SmsSent(        this IBus bus, DateTime sentAtUtc    )    {        var @event = new SmsSentEvent        {            EventId = Guid.NewGuid(),            SentAtUtc = sentAtUtc        };         await bus.PublishEvent(@event);    }     /// <summary>    /// Отправка команды    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="bus"></param>    /// <param name="address"></param>    /// <param name="command"></param>    /// <returns></returns>    private static async Task SendCommand<T>(this IBus bus, Uri address, T command) where T : class    {        var endpoint = await bus.GetSendEndpoint(address);        await endpoint.Send(command);    }     /// <summary>    /// Публикация события    /// </summary>    /// <typeparam name="T"></typeparam>    /// <param name="bus"></param>    /// <param name="message"></param>    /// <returns></returns>    private static async Task PublishEvent<T>(this IBus bus, T message) where T : class    {        await bus.Publish(message);    } }  class SendSmsCommand : ISendSms<string> {    public Guid CommandId { get; set; }    public string PhoneNumber { get; set; }    public string Message { get; set; } }  class SmsSentEvent : ISmsSent {    public Guid EventId { get; set; }    public DateTime SentAtUtc { get; set; } } 

На мой взгляд, данное решение вполне удачно позволяет отделить код бизнес-логики от деталей реализации межсистемного (компонентного) взаимодействия и инкапсулировать их в одном месте.

Конфигурация контейнера DI

На данный момент MassTransit предоставляет возможность использовать следующие популярные контейнеры:

  1. Autofac;
  2. Ninject;
  3. StructureMap;
  4. Unity;
  5. Castle Windsor.

В случае с UnityContainer потребуется установить nuget package MassTransit.Unity, после чего станет доступен метод расширения LoadFrom:

public static class UnityExtensions {     public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container); } 

Пример использования выглядит следующим образом:

Конфигурация IBusControl с помощью UnityContainer

public static IBusControl GetConfiguredFactory(IUnityContainer container) {    if (container == null)    {        throw new ArgumentNullException(nameof(container));    }     var control = Bus.Factory.CreateUsingRabbitMq(cfg => {        var host = cfg.Host(AppSettings.Host, h => { });          // cfg.ReceiveEndpoint(host, e => e.LoadFrom(container));        cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));    });     control.ConnectConsumeObserver(new ConsumeObserver());    control.ConnectReceiveObserver(new ReceiveObserver());    control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand());    control.ConnectSendObserver(new SendObserver());    control.ConnectPublishObserver(new PublishObserver());     return control; } 

В качестве срока жизни консьюмеров в контейнере документация предлагает использовать ContainerControlledLifetimeManager().

Наблюдатели (Observer)

Для мониторинга процесса обработки сообщений доступно подключение наблюдателей (Observer). Для этого MassTransit предоставляет следующий набор интерфейсов для обработчиков:

  1. IReceiveObserver- срабатывает сразу же после получения сообщения и создания RecieveContext;
  2. IConsumeObserver — срабатывает после создания ConsumeContext;
  3. IConsumeMessageObserver<T> — для наблюдения за сообщениями типа T, в методах которого будет доступно строго-типизированное содержимое сообщения;
  4. ISendObserver — для наблюдения за отправляемыми командами;
  5. IPublishObserver — для наблюдения за отправляемыми событиями.

Для каждого из них интерфейс IBusControl предоставляет собственный метод подключения, выполнение которого должно быть осуществлено непосредственно перед IBusControl.Start().

В качестве примера далее представлена реализация ConsumeObserver:

Реализация IConsumeObsever

public class ConsumeObserver : IConsumeObserver {    public Task PreConsume<T>(ConsumeContext<T> context) where T : class    {        Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}");        return Task.CompletedTask;    }     public Task PostConsume<T>(ConsumeContext<T> context) where T : class    {        Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}");        return Task.CompletedTask;    }     public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class    {        Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}");        return Task.CompletedTask;    } } 

Я не буду приводить код каждого из консьюмеров, т.к. по принципу работы и структуре они схожи. Имплементацию каждого из них можно посмотреть в документации или в исходниках на Github.

Итоговый pipeline получения команды на отправку смс сообщения, её обработки и публикации события о её успешном выполнении выглядит следующим образом:

Новое в MassTransit 3.0

С изменениями, которые коснулись новой версии библиотеки, вы можете ознакомиться в 2-х обзорных статьях автора библиотеки Chris Patterson’а на страницах его блога: MassTransit 3 API Changes и MassTransit v3 Update.

Заключение

Здесь должно было быть сравнение наиболее популярных библиотек для работы с очередями сообщений, однако, я решил оставить это для отдельной статьи.

Надеюсь, мне удалось провести для вас поверхностное знакомство с библиотекой MassTransit, за гранью которого ещё остаются такие интересные вещи, как транзакционность, персистентность (интеграция с NHibernate, MondoDb, EntityFramework), планировщик отправки сообщений (интеграция с Quartz), state machine (Automatonymous и Saga), логирование (Log4Net, NLog), многопоточность и многое другое.

Исходные коды примеров доступны на Github.

Используемые материалы:

  1. Документация MassTransit.


ссылка на оригинал статьи https://habrahabr.ru/post/314080/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *