Сервис очереди, или как подогнать код под все случаи

от автора

Привет! С вами снова писатель-программист из компании Simpl Group (да, без e).
Совсем недавно я выступала на нашем внутреннем Meet Up — уже 6-м, между прочим, — и рассказала своим коллегам занимательную историю, которую поведаю сегодня и вам. Не про ведьм и демонов, конечно, как в моей книге. А про цирк — цифровой цирк, в котором задачи прыгают через обручи, катаются на велосипедах и не падают.
Или, по крайней мере, мы стараемся, чтобы не падали.

(К слову, книгу тоже можете почитать: «Пороки», Ingini)

Представим ситуацию

У вас есть очередь, которая умеет выполнять только один трюк — например, отправлять задачи в расчёт. Всё, как в старом цирке: один артист, один номер.

Но однажды появляется новая задача. И за ней ещё одна. И вот уже зрители хотят видеть не только прыжки через обруч, но и медведя на велосипеде, жонглёров, акробатов и даже кибердракона с машинным обучением.

Значит, мы должны перестроить всё шоу. Так, чтобы:

  • Добавлять новых артистов минимальными усилиями;

  • Не перестраивать манеж каждый раз;

  • Всё работало: надёжно, масштабируемо и эффектно.

Кто в нашем цирке — участники шоу

Представим, что у нас есть две задачи:
Одна — это лошадки, прыгающие через обруч;
Вторая — это мишка на велосипеде, проезжающий то в одну, то в другую сторону.

Что есть общего в этих двух выступлениях?

  • Контроллеры — дрессировщики, подающиеся голосом: «Вперёд!».

  • Репозитории — реквизиторы, вытаскивающие снаряжение из склада (БД).

  • Очереди — манеж, где артисты ждут, пока их объявят.

  • Экзекьютор — тёмный коридор, ведущий от кулис к свету рампы.

  • Менеджеры — двери между закулисьем и сценой.

  • Другие сервисы — собственно сцена, где и происходит номер.

Но есть нюанс:

  • Лошадки прыгают по одной.

  • Медведи катаются по трое.

  • Кто-то выходит через Kafka, кто-то — через HTTP.

По сути мы имеем парочку небольших различий, для которых будет неверно создавать второй сервис или писать почти аналогичный код. Поэтому представим, что кулисы и дрессировщики у мишек и лошадок одни и те же, а двери, актеры и сцены разные.

Как устроен наш манеж — Архитектура

Теперь, когда мы поняли, кто у нас выступает, давайте посмотрим, как работает наш цирк изнутри. Что там в кулисах, и почему никто не спотыкается?

Идея проста:

Сделать единый цирк, куда можно легко впустить любого нового артиста: хоть медведя, хоть жонглёра, ну и да, кибердракона тоже.

В базе

У каждой задачи есть два слоя костюма:

  • Общий: ID, тип, статус, время постановки, ошибки.

  • Специфичный: параметры конкретного артиста.

Получается:

task                       -- общий склад задач task_{taskType}_parameters -- гардероб для костюмов

В коде

Самое главное наше оружие — абстрактный дженерик класс на всё, что скорее всего будет использовано не только для одного типа задач.

Покажу вам, как может выглядеть примерный код.

1. Модельки

public interface ITaskParameters { }public interface ITaskDto { }public record TaskOneParameters(int Value) : ITaskParameters;public record TaskTwoParameters(string Data) : ITaskParameters;public record TaskOneDto(int Value) : ITaskDto;public record TaskTwoDto(string Data) : ITaskDto;public class QueueTask where TParam : ITaskParameters{    public QueueTask(TParam parameters)    {        Parameters = parameters;        TaskInfo = new QueueTaskInfo();    }    public TParam Parameters { get; }    public QueueTaskInfo TaskInfo { get; }}public class QueueTaskInfo{public Guid Id { get; set; }    public DateTime QueueTime { get; set; }    public QueueTaskStatus Status { get; set; }    public QueueTaskType Type { get; set; }}public enum QueueTaskStatus{    ReadyForExecution,    InProgress,    Completed,    Failed}public enum QueueTaskType{    TaskOne,    TaskTwo}

2. Контроллеры

/// <summary>/// Базовый контроллер для постановки задач в очередь/// </summary>[Route("api/[controller]")][ApiController]public abstract class AbstractQueueTasksController : ControllerBase    where TParam : ITaskParameters{    protected AbstractQueueTasksController(IMediator mediator)    {        _mediator = mediator;    }    protected IMediator _mediator { get; }        /// <summary>/// Общий метод для всех типов задач/// </summary>    [HttpGet("GetTasks")]public Task<...> GetAsync(CancellationToken cancellationToken = default){    return _mediator.Send(new AbstractGetQueueTasksRequest(), cancellationToken);}}/// <summary>/// Контроллер для задач типа "TaskOne"/// </summary>public class TaskOneController : AbstractQueueTasksController{    public TaskOneController(IMediator mediator) : base(mediator) { }/// <summary>/// Постановка задачи, которая пришла из другого сервиса, а значит дажнные уже обработаны/// </summary>    [HttpPost("Enqueue")]    public Task EnqueueAsync(TaskOneDto dto, CancellationToken cancellationToken = default)    {        return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);    }}/// <summary>/// Контроллер для задач типа "TaskTwo"/// </summary>public class TaskTwoController : AbstractQueueTasksController{    public TaskTwoController(IMediator mediator) : base(mediator) { }/// <summary>/// Постановка задачи, которая пришла с фронта/// </summary>    [HttpPost("Enqueue")]    public Task EnqueueAsync(TaskTwoDto dto, CancellationToken cancellationToken = default)    {    ... // тут какая-то обратка и валидация данных        return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);    }}

3. Команда и обработчик

/// <summary>/// Команда постановки задачи в очередь/// </summary>public class EnqueueTaskCommand : IRequest    where TDto : ITaskDto{    public EnqueueTaskCommand(TDto dto) => TaskDto = dto;    public TDto TaskDto { get; }}/// <summary>/// Базовый обработчик постановки задач/// </summary>public abstract class EnqueueTaskCommandHandler : IRequestHandler&gt;    where TParam : ITaskParameters    where TDto : ITaskDto{    private readonly AbstractDataflowQueue _queue;    protected EnqueueTaskCommandHandler(AbstractDataflowQueue queue)    {        _queue = queue;    }    public async Task Handle(EnqueueTaskCommand request, CancellationToken cancellationToken)    {        if (request is null)            throw new ArgumentNullException(nameof(request));        var param = Map(request.TaskDto);        await _queue.EnqueueAsync(param, cancellationToken);    }/// <summary>/// Просто какая-то работа с данными /// </summary>    protected abstract TParam Map(TDto dto);}

4. Очередь

public abstract class AbstractDataflowQueue     where TParam : ITaskParameters{    private readonly SemaphoreSlim _locker = new(1, 1); // Нужен для защиты от одновременной постановки нескольких задач    protected AbstractQueueTaskRepository _repository { get; }    protected AbstractBackgroundExecutingTask _executor { get; }    protected AbstractDataflowQueue(        AbstractBackgroundExecutingTask executor,        AbstractQueueTaskRepository repository)    {        _executor = executor;        _repository = repository;    }    public async Task EnqueueAsync(QueueTask item, CancellationToken cancellationToken = default)    {        if (item is null)         throw new ArgumentNullException(nameof(item));        await _locker.WaitAsync(cancellationToken);        try        {            item.TaskInfo.QueueTime = DateTime.Now;            item.TaskInfo.Status = QueueTaskStatus.ReadyForExecution;            await _repository.SaveAsync(item, cancellationToken);            await _executor.TrySendQueueTask(item.Id);        }        finally        {            _locker.Release();        }    }}

5. Экзекьютер

/// <summary>/// Базовый экзекьютер: достаёт задачу из очереди и отправляет её в менеджер/// </summary>public abstract class AbstractBackgroundExecutingTask    where TParam : ITaskParameters{    protected AbstractBackgroundExecutingTask(        IManager manager,        AbstractQueueTaskRepository repository,        int defaultMaxParallelism = 1)    {        _manager = manager;        _repository = repository;        var options = new ExecutionDataflowBlockOptions        {            MaxDegreeOfParallelism = defaultMaxParallelism,            BoundedCapacity = DataflowBlockOptions.Unbounded        };        _block = new ActionBlock(HandleAsync, options);    }    protected IManager _manager { get; }    protected AbstractQueueTaskRepository _repository { get; }    protected ActionBlock _block { get; }    public bool TrySendQueueTask(Guid taskId)    {        return _block.Post(taskId);    }    private async Task HandleAsync(Guid taskId)    {        var task = await _repository.GetTask(taskId);        if (task == null) return;        task.TaskInfo.Status = QueueTaskStatus.InProgress;        await _manager.TransferTask(task);                task.TaskInfo.Status = QueueTaskStatus.Completed;        await _repository.UpdateAsync(task);    }}/// <summary>/// Лошадки прыгают по одной/// </summary>public class TaskOneExecutor : AbstractBackgroundExecutingTask{    public TaskOneExecutor(        IManager manager,        AbstractQueueTaskRepository repository)        : base(manager, repository, defaultMaxParallelism: 1) { }}/// <summary>/// Медведи катаются втроём/// </summary>public class TaskTwoExecutor : AbstractBackgroundExecutingTask{    public TaskTwoExecutor(        IManager manager,        AbstractQueueTaskRepository repository)        : base(manager, repository, defaultMaxParallelism: 3) { }}

6. Репозиторий

public abstract class AbstractQueueTaskRepository    where TParam : ITaskParameters{    // Простое хранилище в памяти    protected readonly Dictionary<Guid, QueueTask<TParam>> _storage = new();    public virtual Task SaveAsync(QueueTask task, CancellationToken cancellationToken = default)    {        _storage[task.TaskInfo.Id] = task;        return Task.CompletedTask;    }    public virtual Task UpdateAsync(QueueTask task, CancellationToken cancellationToken = default)    {        if (_storage.ContainsKey(task.TaskInfo.Id))        {            _storage[task.TaskInfo.Id] = task;        }        return Task.CompletedTask;    }    public virtual QueueTask? GetTask(Guid taskId)    {        _storage.TryGetValue(taskId, out var task);        return task;    }        ...}

+реализации, сохранение в бд и другая логика

7. Менеджеры

public interface IManager    where TParam : ITaskParameters{    Task TransferTask(QueueTask task);}/// <summary>/// Тут у нас кафка/// </summary>public class TaskOneManager : IManager{    private readonly ITaskOneProducer _producer;    private readonly ITaskOneConsumer _consumer;    public TaskOneManager(        ITaskOneProducer producer,        ITaskOneConsumer consumer)    {        _producer = producer;        _consumer = consumer;    }    public async Task TransferTask(QueueTask queueTask)    {// Отправка задачи через продюсераawait _producer.PublishAsync(queueTask);// Ожидаем результат через консюмераawait _consumer.GetResult(queueTask.TaskInfo.Id);    }}/// <summary>/// Тут у нас Refit клиент/// </summary>public class TaskTwoManager : IManager{    private readonly ITaskTwoClient _client;    public TaskTwoManager(ITaskTwoClient client)    {        _client = client;    }    public async Task TransferTask(QueueTask task)    {        await _client.SendTaskTwoAsync(task);    }}

Разумеется, код самый примитивный, который просто показывает, как можно сделать.

И не забудьте зарегистрировать реализации как синглтон объекты (иначе вся ваша очередь потеряется). Только Менеджеры можно сделать Transient.

Итоговая архитектура:

Как бы мы приручили разношёрстных артистов — расширяемость

Теперь представим, что завтра к нам заходят:

  • Слоны, которые будут делать запросы по SOAP.

  • Пингвины, которые будут танцевать параллельно в 10 потоков. Наша архитектура говорит: «Да не вопрос». Вот как мы добавляем нового зверя в наш цирк:

В базу:

  • Новая таблица параметров.

В код:

  • Реализация абстрактного контроллера, необходимых команд и запросов, модельки

  • Реализация репозитория, очереди, экзекьютора и менеджера.

  • DI-регистрация в Program.cs. (то есть подписываем, что наши животные могут пользоваться любыми нашими рельсами)

И всё. Весь путь — по накатанной. Никто не мешает мишкам, лошадям и слонам выступать одновременно.

Слова автора

Спасибо большое, что прочитали статью мини-мидла. Надеюсь, что вам понравились метафоры) Моим коллегам на Meet Up очень понравились! А там, между прочим, не только разработчики были, но и аналитики, тестеры и даже медийщики!

Если вам интересны такие мероприятия, то заглядывайте к нам. Возможно мы даже скоро выйдем на более глобальный уровень с нашим митапом)

Ну и, конечно же, если вам есть что сказать, то милости прошу в комментарии. Я всегда рада конструктивной критике!

ссылка на оригинал статьи https://habr.com/ru/articles/947556/