В этой статье:
Мы откажемся от контроллеров, MediatR‑а и MassTransit‑а, всё выше перечисленное нам заменит Wolverine. Отольём в граните модульный монолит, имплементируем регистрацию событий используя Marten. Пример всего этого безобразия находится тут.
Начнём с Wolverine
Как утверждают создатели, Wolwerine — это новое поколение реализации паттернов «Медиатор» и «Шина Сообщений», библиотека «с батарейками в комплекте».
Wolverine позволяет:
-
Обрабатывать HTTP‑запросы с помощью обработчиков Wolverine, находящихся, в том числе, в библиотеках.
-
Отправлять сообщения (команды и запросы) в обработчики и получать результат выполнения.
-
Отправлять сообщения (команды и запросы) в обработчики находящиеся в других приложениях, в этом случае нам понадобится транспорт. «Из коробки» Wolverine работает со следующими видами транспорта: RabbitMQ, Azure Service Bus, Amazon SQS, TCP, Sql Server, PostgreSQL, MQTT, Kafka. В примере я использую Kafka в качестве транспорта для некоторых сообщений, Wolverine позволяет выбирать какое сообщение каким транспортом будет отправлено.
-
Реализовывать паттерн Saga. Сага от Wolverine поддерживает такой тип сообщений, как «сообщения по таймауту», что позволяет в «автоматическом режиме» завершать «забытые» саги.
-
Отправлять сообщения в режиме «Ping — Pong», в документации это называется «cascading messages».
Wolverine имеет интеграцию с Marten «из коробки», что даёт возможность, например, отправлять все команды и запросы сразу в Кафку.
Marten
Marten ‑.Net библиотека, которая призвана устранить «boilerplate» и позволить Вам сфокусироваться на доставке бизнес ценностей, если верить её создателям. Marten является надстройкой над PostgreSQL, которая позволяет использовать эту БД в качестве аналога MongoDB или EventStoreDB. Также производитель обещает Strong Consistency (но есть нюансы), гибкие стратегии индексирования, улучшенные linq — запросы, встроенную реализацию паттерна Outbox/Inbox «из коробки», поддержку Multi‑tenancy, интеграцию с Asp.Net Core и другие не всем понятные слова на английском:
https://martendb.io/introduction.html.
В примере реализована регистрация событий на Marten, и, если сравнивать связку EventStoreDB плюс MongoDB, (хороший пример находится здесь), то, таки да, в сравнении с ними Marten потребует минимум кода, т.к. read‑часть предоставлена «из коробки».
Mодульный монолит
Модульный монолит — вид архитектуры, подразумевающий разбиение приложения на модули, каждый из которых с увеличением масштаба приложения может быть конвертирован в отдельный микро‑сервис, и, как можно догадаться, модуль является реализацией ограниченного контекста в коде. Модули «знают» лишь контракты друг друга и взаимодействуют так же, как это делали бы микро‑сервисы, плюс взаимодействие с помощью той или иной реализации паттерна Посредник.
Плюсы: мы экономим на написании инфраструктурного кода, необходимого для создания отдельного приложения под каждый модуль, что нам пришлось бы сделать в случае микро‑сервисной архитектуры, в тоже время, есть шанс избежать спагетти‑кода. Мы экономим на написании интеграционного кода, т.к. модули находятся в одном процессе. Сохранять данные из разных модулей в рамках одной транзакции тоже не проблема, т.к. все модули работают с одним и тем же подключением к БД. Создать новый модуль гораздо проще, чем новый микросервис.
Минусы: это монолит, модуль не является единицей развёртывания.
Расово верный подкаст о Modular Monolith для.Net можно найти тут и там же пример.
Ну, или в рамках импортозамещения, беседа и пример от DevBrothers, тоже изрядно.
Предметная область
Она проста и кратка, как тост Булдакова. Есть «Персона», у персоны есть «Счета», на счета приходят «Платежи». Каждое действие: создание персоны, добавление счёта, добавление платежа должно быть подтверждено либо отвергнуто. На выходе мы должны получить список персон и сальдо по каждой из них, с сохранением истории того, как было достигнуто нынешнее состояние.
Добавление новых данных в БД – только по окончанию саги:
В Swagger это выглядит так:
В решении имеется проект API, из которого уделена папка Controllers, она не нужна, так как все HTTP-вызовы должны обрабатываться модулями.
У нас три модуля: Модуль по работе с агрегатом Персона (PersonModule), модуль успешного завершения саг (SagaApprovementModule) и модуль отрицательного завершения саг (SagaRejectionModule).
HTTP EndPoint-ы Wolverine
Начнём с SagaApprovementModule: он содержит библиотеку SagaApprovement.Contracts с контрактами модуля и библиотеку SagaApprovement, в которой находятся эндпоинты Wolverine:
// <summary> /// EndPoint завершения саги добавления счёта, добавления платежа, создания персоны. /// Во всех трёх обработчиках один из параметров - впрыск ссылки на объект шины сообщений. /// </summary> public static class ApproveEndPoints { /// <summary> /// Отправляем в сагу добавления счёта сообщение разешающее добавление счёта. /// </summary> /// <param name="command"></param> /// <param name="bus"></param> /// <returns></returns> [WolverinePost("approve-add-account-saga")] public static ValueTask Handle(ApproveAccountCommand command, IMessageBus bus) => bus.PublishAsync(new AccountApproved(command.SagaId)); /// <summary> /// Отправляем в сагу добавления платежа сообщение разешающее добавление платежа. /// </summary> /// <param name="command"></param> /// <param name="bus"></param> /// <returns></returns> [WolverinePost("approve-add-payment-saga")] public static ValueTask Handle(ApprovePaymentCommand command, IMessageBus bus) => bus.PublishAsync(new PaymentApproved(command.SagaId)); /// <summary> /// Отправляем в сагу создания персоны сообщение разешающее создание персоны. /// </summary> /// <param name="command"></param> /// <param name="bus"></param> /// <returns></returns> [WolverinePost("approve-person-creation-saga")] public static ValueTask Handle(ApprovePersonCreationCommand command, IMessageBus bus) => bus.PublishAsync(new PersonApproved(command.SagaId)); }
В классе находятся три конечные точки с адресами: «approve‑add‑account‑saga», «approve‑add‑payment‑saga», «approve‑person‑creation‑saga». Чтобы всё это работало, необходим класс, имеющий в своём названии «EndPoint» или «EndPoints», и содержащий методы с атрибутами WolverinePost, WolverineGet и т. п. Больше ничего не нужно.
В руководстве Wolverine указано, что предпочтительным является впрыск зависимости в метод. В строке приведённой выше, мы впрыскиваем интерфейс шины сообщений Wolverine. Далее мы можем выбрать один из трёх вариантов:
-
bus.PublishAsync — публикует сообщение в шине. Даже если нет ни одного обработчика — метод завершится успешно, если обработчики есть — метод не будет ожидать результатов их выполнения.
-
bus.SendAsync — публикует сообщение в шине, но ожидает, что есть обработчики этого сообщения, если они есть — не дожидается завершения их выполнения.
-
bus.InvokeAsync — публикует сообщение, ожидает что есть обработчик, возвращает результат выполнения обработчика отправителю.
В примере выше я публикую в шине сообщение AccountApproved. Обработчик этого сообщения находится в саге AddAccountSagа. Чтобы сага принимала сообщение, нужно определить метод Handle, в котором один из параметров является классом сообщения. Обработчик выглядит следующим образом:
/// <summary> /// Успешное завершение саги, добавляем аккаунт. /// </summary> /// <param name="_"></param> /// <param name="addAccountService">сервис добавления счёта.</param> public async void Handle(AccountApproved _, IAddAccountService addAccountService) { // Обращаемся к сервису добавления аккаунта, // отправляя туда данные из состояния саги. await addAccountService.CreateAccount(PersonId, AccountName); // Завершаем сагу. MarkCompleted(); }
Saga на Wolverine
/// <summary> /// Сага добавления аккаунта. /// </summary> public class AddAccountSaga : Saga { /// <summary> /// Идентификатор саги. /// </summary> public string? Id { get; set; } /// <summary> /// Идентификатор персоны. /// </summary> public string PersonId { get; set; } /// <summary> /// Наименование аккаунта. /// </summary> public string AccountName { get; set; } /// <summary> /// Обработчик старта саги. /// Название Start зарезервировано Wolverine. /// Сообщение принимаемое в этом методе в качетсве первого параметра - будет считаться стартовым /// сообщением саги. /// Стартовый обработчик должен вернуть сагу. /// В нашем случае сагу и сообщение завершающее сагу по таймауту. /// </summary> /// <param name="addAccountSagaStarted">Стартовое сообщение саги</param> /// <returns></returns> public static (AddAccountSaga, AddAccountTimeoutExpired) Start(AddAccountSagaStarted addAccountSagaStarted) => (new AddAccountSaga { //заполняем состояние саги данными. Id = addAccountSagaStarted.AddAccountSagaId, PersonId = addAccountSagaStarted.PersonId, AccountName = addAccountSagaStarted.AccountName }, new AddAccountTimeoutExpired(addAccountSagaStarted.AddAccountSagaId)); /// <summary> /// Успешное завершение саги, добавляем аккаунт. /// </summary> /// <param name="_"></param> /// <param name="addAccountService">сервис добавления счёта.</param> public async void Handle(AccountApproved _, IAddAccountService addAccountService) { // Обращаемся к сервису добавления аккаунта, // отправляя туда данные из состояния саги. await addAccountService.CreateAccount(PersonId, AccountName); // Завершаем сагу. MarkCompleted(); } /// <summary> /// Хэндлер отрицательного завершения саги. /// </summary> /// <param name="_"></param> public void Handle(AccountRejected _) => MarkCompleted(); /// <summary> /// Хэндлер завершения саги по таймауту. /// MarkCompleted - закрывает сагу. /// </summary> /// <param name="_"></param> public void Handle(AddAccountTimeoutExpired _) => MarkCompleted(); }
Создавать саги с Wolverine просто:
-
Наследуем класс от класса Saga.
-
Определяем поля, являющиеся состоянием саги. Состояние будет доступно во всех обработчиках сообщений саги. (стр.6–20).
-
Определяем обработчик стартового сообщения саги. Он должен иметь название Start, а первый параметр будет считаться сообщением, с которого начинается сага. (стр.31).
-
Определяем остальные обработчики саги. Первые параметры в обработчиках — это сообщения, которые должна обрабатывать сага.
Метод Start должен вернуть экземпляр саги. В нашем случае он возвращает кортеж из экземпляра саги и сообщения, которое закроет сагу по таймауту, если про неё «забыли». Состояние саги будет сохранено в БД, а затем будет извлекаться из неё при следующих срабатываниях, описанных в саге обработчиков.
Все необходимые сервисы впрыскиваем вторым и следующими параметрами в обработчиках. Как сделано строке № 45.
В настройках, при добавлении Wolverine, можно указать, каким транспортом отправлять сообщения:
//Будем публиковать в кафке ниже приведённые события. opts.PublishMessage<PersonApproved>().ToKafkaTopic("CreatePersonUseCase.PersonApproved"); opts.PublishMessage<PersonRejected>().ToKafkaTopic("CreatePersonUseCase.PersonRejected");
И указать, откуда могут прийти водящие интеграционные сообщения:
//Будем получать из топиков кафки следующие события. opts.ListenToKafkaTopic("CreatePersonUseCase.PersonApproved"); opts.ListenToKafkaTopic("CreatePersonUseCase.PersonRejected");
Собственно, это всё, что нужно для создания саги.
Marten и Event Sourcing
Что такое Event Sourcing можно почитать тут, к статье приложен хороший пример, глядя на который, можно оценить на столько Marten облегчает имплементацию регистрации событий.
«Из коробки» в Marten реализован демон и механизм проекций. Проекции обновляются при сохранении новых событий. К проекциям можно обращаться с помощью Linq‑запросов примерно также, как если бы мы работали с реляционными БД. Да‑да, нам не нужен Mongo или другая БД, не нужно имплементировать демон и подписки, чтобы получать снимки. Всё это уже сделал для нас Marten. Нужно создать проекцию и подключить её в Marten, и это не сложно.
Перейдём к матчасти. В примере найдём файл Repository.cs — это реализация репозитория, сохраняющего события в потоки агрегатов в БД и восстанавливающего состояния агрегатов по событиям из БД.
public sealed class Repository(IDocumentStore store) : IRepository { //Marten document store private readonly IDocumentStore store = store; /// Получаем несохранённые события из агрегата и сохраняем их. public async Task StoreAsync(Aggregate aggregate, CancellationToken ct = default) { // получаем сессию для работы с событиями. await using var session = await store.LightweightSerializableSessionAsync(token: ct); // получаем список несохранённых событий из агрегата var events = aggregate.GetUncommittedEvents().ToArray(); // добавляем события в стрим с идентификатором aggregate.Id session.Events.Append(aggregate.Id, aggregate.Version, events); // сохраняем изменения. await session.SaveChangesAsync(ct); // очищаем список несохранённых событий. aggregate.ClearUncommittedEvents(); } /// Восстанавливаем состояние агрегата по событиям. public async Task<T> LoadAsync<T>( string id, int? version = null, CancellationToken ct = default ) where T : Aggregate { // получаем сессию для работы с событиями. await using var session = await store.LightweightSerializableSessionAsync(token: ct); // восстанавливаем состояние агрегата, читая из бд события стрима агрегата. // при этом Marten вызовет методы Apply для каждого из сохранённых событий. var stream = await session.Events.FetchForWriting<T>(id, ct); return stream.Aggregate; } }
Это весь код, который необходим для репозитория write-части модуля. В read-части можно обойтись вообще без репозитория.
/// <summary> /// Endpoint получения данных о персонах. /// </summary> public static class GetPersonWithSumEndPoint { /// <summary> /// Получаем персону по её идентификатору. /// В метод впрыскиваем сессию для получения read-модели. /// </summary> /// <param name="getPersonsWithSumCommand"></param> /// <param name="session"></param> /// <returns></returns> /// <exception cref="Exception"></exception> [WolverineGet("person/person")] public static async Task<string> Handle(GetPersonWithSumQuery getPersonsWithSumCommand, IQuerySession session) { var person = await session .Query<PersonWithSum>() .FirstOrDefaultAsync(c => c.Id == getPersonsWithSumCommand.PersonId) ?? throw new Exception($"Person not found."); return JsonConvert.SerializeObject(person, Formatting.Indented); } /// <summary> /// Получаем список всех персон (IRL это плохо, но для примера можно кмк.) /// Впрыскиваем в метод сессию для получения списка read-моделей. /// </summary> /// <param name="getPersonsWithSumCommand"></param> /// <param name="session"></param> /// <returns>Список персон с сальдо.</returns> /// <exception cref="Exception"></exception> [WolverineGet("person/persons")] public static async Task<string> Handle(GetPersonsWithSumQuery getPersonsWithSumCommand, IQuerySession session) { var persons = await session .Query<PersonWithSum>().ToListAsync() ?? throw new Exception($"Persons not found."); return JsonConvert.SerializeObject(persons, Formatting.Indented); } }
Класс GetPersonWithSumEndPoint, как не трудно догадаться, является HTTP-EndPoint-ом. Чтобы получить данные из проекций, нам достаточно выполнить linq-запрос.
await session.Query<PersonWithSum>().ToListAsync()
Получаем все данные из проекции, но можно добавить Where, Take, Skip и пр. для ограничения выборки. Никакой разницы с выборкой из реляционных БД.
Проекции
Проекции в Marten могут быть следующих видов:
-
Проекции агрегатов (Aggregate Projection): Live — проецируем на лету, нужна только модель, класс проекции создавать не нужно; Multi‑Stream — проекции с возможностью группировки событий, срезов событий, разбивки по tenantId и пр.; Custom — ещё более широкие возможности, чем в предыдущем Multi‑Stream.
-
Проекции событий (Event Projections): позволяют явно определять операции создания документов из отдельных событий.
-
Пользовательские (custom) проекции: проекции, наследуемые от IProjection, всё делаете сами с нуля. Остальные перечисленные в этом списке типы проекций наследуются от тех или иных классов и уже имеют какой‑то функционал.
-
Inline проекции: события проецируются одной транзакции с сохранением событий.
-
Flat Table Projection: позволяет создать ADO.NET таблицу, добавить в неё столбцы с помощью AddColumn, и проецировать данные прямо в неё.
В примере можно найти проекцию агрегатов на основе SingleStreamProjection.
Модель проекции выглядит таким образом:
/// <summary> /// Модель персоны, используется в проекции PersonWithSumProjection. В модель добавлено поле Saldo. /// </summary> public class PersonWithSum { /// <summary> /// Идентификатор персоны. /// </summary> public string Id { get; set; } /// <summary> /// ФИО /// </summary> public string Name { get; set; } /// <summary> /// ИНН /// </summary> public string Inn { get; set; } /// <summary> /// Сальдо. /// </summary> public decimal Saldo { get; set; } public long Version { get; private set; } /// <summary> /// Счета. /// </summary> public List<Account> Accounts = new List<Account>(); /// <summary> /// Методы Apply будут вызваны Marten при построении проекции. /// </summary> /// <param name="event"></param> public void Apply(PersonCreated @event) { Id = @event.Id; Name = @event.Name; Inn = @event.Inn; Version++; } public void Apply(PersonNameChanged @event) { Name = @event.NewName; Version++; } public void Apply(PersonInnChanged @event) { Inn = @event.NewInn; Version++; } public void Apply(AccountCreated @event) { var account = new Account(@event.AccountId, @event.Name, new List<Payment>()); Accounts.Add(account); Version++; } public void Apply(PaymentCreated @event) { var payment = new Payment(@event.Id, @event.Sum, @event.PaymentType); var account = Accounts.FirstOrDefault(x => x.Id == @event.AccountId) ?? throw new ArgumentNullException($"Счёт не найден с ид {@event.AccountId}"); account.Payments.Add(payment); Saldo = @event.PaymentType == (int)PaymentTypeEnum.Credit ? Saldo + @event.Sum : Saldo - @event.Sum; Version++; } }
Методы Apply будут вызваны в проекции.
Класс проекции выглядит так:
/// <summary> /// Проекция событий агрегата PersonAggreate. Проекция вычисляет сальдо по каждому из агрегатов. /// </summary> public class PersonWithSumProjection : SingleStreamProjection<PersonWithSum> { public PersonWithSumProjection() { // Вызываются методы Apply модели PersonWithSum ProjectEvent<PersonCreated>((item, @event) => item.Apply(@event)); ProjectEvent<PersonInnChanged>((item, @event) => item.Apply(@event)); ProjectEvent<PersonNameChanged>((item, @event) => item.Apply(@event)); ProjectEvent<AccountCreated>((item, @event) => item.Apply(@event)); // В этом Apply вычисляется сальдо. ProjectEvent<PaymentCreated>((item, @event) => item.Apply(@event)); } }
В проекции мы указываем, какие именно события она будет обрабатывать и какими методами модели.
Затем проекцию нужно добавить в Мартен.
options.Projections.Add<PersonWithSumProjection>(ProjectionLifecycle.Async);
LifeTime указан асинхронный, то есть проекция строится асинхронно, после сохранения событий. Можно указать Inline, тогда проекция будет строиться в рамках одной транзакции вместе с сохранением событий.
Агрегаты
Marten не предоставляет базовых классов для написания агрегатов. Поэтому пишем сами: наружу выставляем свойства для чтения. Изменение состояния — только через методы агрегата. Если возникла ошибка — записываем соответствующее событие в список подлежащих сохранению событий, вместо генерации исключения.
Marten ожидает, что в Вашем агрегате реализованы методы Apply для каждого из доменных событий, если это так, то Marten сможет восстановить состояние агрегата, даже если Apply являются приватными методами. Например, в агрегате описаны следующие методы Apply:
protected void Apply(PersonNameChanged @event) { Name = @event.NewName; Version++; } protected void Apply(PersonInnChanged @event) { Inn = @event.NewInn; Version++; } protected void Apply(AccountCreated @event) { var account = Account.Create(@event.AccountId, @event.Name); _accounts.Add(account); Version++; }
Когда мы читаем агрегат из базы с помощью метода LoadAsync репозитория,
/// Восстанавливаем состояние агрегата по событиям. public async Task<T> LoadAsync<T>( string id, int? version = null, CancellationToken ct = default ) where T : Aggregate { // получаем сессию для работы с событиями. await using var session = await store.LightweightSerializableSessionAsync(token: ct); // восстанавливаем состояние агрегата, читая из бд события стрима агрегата. // при этом Marten вызовет методы Apply для каждого из сохранённых событий. var stream = await session.Events.FetchForWriting<T>(id, ct); return stream.Aggregate; }
в строке 15 мы обращаемся к свойству stream.Aggregate, при обращении к нему Marten вызовет выше упомянутые методы Apply, и применит события.
Пример агрегата находится в файле: PersonAggregate.cs.
Подписки
Подписки тоже имеются в Marten.
/// <summary> /// Подписка на события типа PersonApproved. /// </summary> public class PersonApprovedToKafkaSubscription : SubscriptionBase { private readonly IServiceProvider _serviceProvider; public PersonApprovedToKafkaSubscription(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; SubscriptionName = nameof(PersonApprovedToKafkaSubscription); // Подписываемся только на события типа PersonApproved IncludeType<PersonApproved>(); // настраиваем сколько событий демон будет извлекать за раз // и сколько будет держать в памяти. Options.BatchSize = 1000; Options.MaximumHopperSize = 10000; // Позиция с которой читаем события (с текущего события) Options.SubscribeFromPresent(); } /// <summary> /// Обрабатываем события. /// </summary> public override async Task<IChangeListener> ProcessEventsAsync( EventRange page, ISubscriptionController controller, IDocumentOperations operations, CancellationToken cancellationToken) { // с помощью Woverine будем отправлять интеграционные события в кафку. var messageBus = _serviceProvider.GetService<IMessageBus>() ?? throw new ArgumentNullException("Шина событий не зарегистрирована в IoC"); foreach (var @event in page.Events) { await messageBus.PublishAsync( new PersonApprovedIntegrationEvent(@event.Data.GetType().Name, JsonConvert.SerializeObject(@event.Data))); } return NullChangeListener.Instance; } }
Это подписка на один тип событий. После получения доменного события PersonApproved, на его основе формируется интеграционное событие PersonApprovedIntegrationEvent, которое отправляется в Кафку, как пример.
Replay событий
Иногда нужно заново построить проекцию, в Marten тоже можно это делать.
[HttpGet] [Route("replay")] public async Task Replay( [FromServices] IDocumentStore store, CancellationToken cancellation) { using var daemon = await store.BuildProjectionDaemonAsync(); // Fire up everything! await daemon.StartAllAsync(); // or instead, rebuild a single projection //await daemon.RebuildProjectionAsync("a projection name", 5.Minutes(), cancellation); // or a single projection by its type await daemon.RebuildProjectionAsync<PersonWithSumProjection>(cancellation); // Be careful with this. Wait until the async daemon has completely // caught up with the currently known high water mark await daemon.WaitForNonStaleData(1.Minutes()); // Start a single projection shard //await daemon.StartAgentAsync("shard name", cancellation); // Or change your mind and stop the shard you just started //await daemon.StopAgentAsync("shard name"); // No, shut them all down! await daemon.StopAllAsync(); }
Итого
В целом, Wolverine+Marten способны значительно уменьшить объём шаблонного кода за счёт широкой гаммы решений «из коробки». Однако, есть нюанс — если вы стремитесь к Strong Consistency — у решений на базе Marten могут возникать проблемы с производительностью, о чём говорят предупреждения на официальном сайте.
Удачи!
ссылка на оригинал статьи https://habr.com/ru/articles/837376/
Добавить комментарий