Task изнутри: управление потоками в .NET и создание своих планировщиков

от автора

Привет, Хабр! Сегодня поговорим о том, как работают задачи в .NET, зачем может понадобиться собственный TaskScheduler и как его реализовать.

Зачем нужен Task?

Начнём с основы. В .NET Task — это удобная абстракция над потоками. Если потоки Thread дают полный контроль над выполнением, то Task управляет ресурсами через пул потоков ThreadPool и помогает строить асинхронные операции.

Простой пример:

using System; using System.Threading; using System.Threading.Tasks;  class Program {     static async Task Main()     {         Console.WriteLine($"Основной поток: {Thread.CurrentThread.ManagedThreadId}");          await Task.Run(async () =>         {             Console.WriteLine($"Task выполняется на потоке: {Thread.CurrentThread.ManagedThreadId}");             await Task.Delay(1000); // имитируем работу         });          Console.WriteLine("Task завершён.");     } }

Основная задача выполняется на одном потоке, а Task.Run запускает дополнительный поток из пула потоков.

Почему это удобно: не нужно вручную управлять потоками, тратить ресурсы на их создание/уничтожение или заботиться о масштабировании.

Но вот загвоздка: иногда нужно больше контроля над задачами. Например:

  • Ограничить число одновременно выполняемых задач.

  • Установить приоритет выполнения.

  • Выполнить задачи в специфическом контексте (например, в UI-потоке).

Для этого .NET позволяет создавать свои TaskScheduler.

Что делает TaskScheduler?

TaskScheduler управляет распределением задач между потоками. По умолчанию используется TaskScheduler.Default, который отправляет задачи в ThreadPool. Однако:

  • Если нужно ограничить количество одновременных задач, стандартный планировщик не подойдёт.

  • Если требуется приоритет задач, придётся создавать кастомный TaskScheduler.

Создаем свой TaskScheduler

Ограничение параллелизма

Планировщик с ограничением одновременных задач:

using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks;  public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler, IDisposable {     private readonly int _maxDegreeOfParallelism;     private readonly LinkedList<Task> _tasks = new LinkedList<Task>();     private int _runningTasks = 0;     private readonly object _lock = new object();     private bool _disposed = false;      public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)     {         if (maxDegreeOfParallelism < 1)             throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));         _maxDegreeOfParallelism = maxDegreeOfParallelism;     }      protected override IEnumerable<Task> GetScheduledTasks()     {         lock (_lock)         {             return _tasks.ToArray();         }     }      protected override void QueueTask(Task task)     {         if (_disposed)             throw new ObjectDisposedException(nameof(LimitedConcurrencyLevelTaskScheduler));          lock (_lock)         {             _tasks.AddLast(task);             TryExecuteTask();         }     }      private void TryExecuteTask()     {         if (_runningTasks >= _maxDegreeOfParallelism || _tasks.Count == 0)             return;          var task = _tasks.First.Value;         _tasks.RemoveFirst();         _runningTasks++;          ThreadPool.UnsafeQueueUserWorkItem(async _ =>         {             try             {                 await Task.Yield();                 base.TryExecuteTask(task);             }             catch (Exception ex)             {                 Console.WriteLine($"Ошибка при выполнении задачи: {ex}");             }             finally             {                 lock (_lock)                 {                     _runningTasks--;                     TryExecuteTask();                 }             }         }, null);     }      protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;      public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism;      public void Dispose()     {         if (!_disposed)         {             lock (_lock)             {                 _tasks.Clear();                 _disposed = true;             }         }     } }

В коде:

Очередь задач (_tasks): задачи добавляются в очередь, если уже выполняется максимально разрешённое количество задач.

Управление состоянием (_runningTasks): счетчик текущих задач увеличивается при начале выполнения и уменьшается при завершении.

Асинхронность: используется Task.Yield(), чтобы задачи не блокировали поток.

Приоритетизация задач

Добавим возможность управлять приоритетами. Высокоприоритетные задачи должны выполняться первыми.

using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks;  public class PriorityTaskScheduler : TaskScheduler {     private readonly int _maxDegreeOfParallelism;     private readonly LinkedList<Task> _highPriorityTasks = new LinkedList<Task>();     private readonly LinkedList<Task> _lowPriorityTasks = new LinkedList<Task>();     private int _runningTasks = 0;     private readonly object _lock = new object();      public PriorityTaskScheduler(int maxDegreeOfParallelism)     {         _maxDegreeOfParallelism = maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : throw new ArgumentOutOfRangeException();     }      public void Enqueue(Task task, bool highPriority)     {         lock (_lock)         {             if (highPriority)                 _highPriorityTasks.AddLast(task);             else                 _lowPriorityTasks.AddLast(task);              TryExecuteTask();         }     }      private void TryExecuteTask()     {         if (_runningTasks >= _maxDegreeOfParallelism)             return;          Task task = null;         lock (_lock)         {             if (_highPriorityTasks.Count > 0)             {                 task = _highPriorityTasks.First.Value;                 _highPriorityTasks.RemoveFirst();             }             else if (_lowPriorityTasks.Count > 0)             {                 task = _lowPriorityTasks.First.Value;                 _lowPriorityTasks.RemoveFirst();             }              if (task == null) return;             _runningTasks++;         }          ThreadPool.UnsafeQueueUserWorkItem(async _ =>         {             try             {                 await Task.Yield();                 base.TryExecuteTask(task);             }             finally             {                 lock (_lock)                 {                     _runningTasks--;                     TryExecuteTask();                 }             }         }, null);     }      protected override IEnumerable<Task> GetScheduledTasks() => throw new NotSupportedException();      protected override void QueueTask(Task task) => Enqueue(task, highPriority: false);      protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; }

Теперь задачи с высоким приоритетом помещаются в отдельную очередь. Если высокоприоритетных задач нет, выполняются задачи из низкоприоритетной очереди.

Использование:

var scheduler = new PriorityTaskScheduler(2);  scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("Low priority"), scheduler), highPriority: false); scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("High priority"), scheduler), highPriority: true);

Тестирование TaskScheduler

Тестирование кастомного TaskScheduler — это этап, который поможет убедиться в корректности работы. Для этого может быть достаточно простых юнит-тестов. Например, протестируем, что LimitedConcurrencyLevelTaskScheduler правильно ограничивает число одновременно выполняемых задач.

[Fact] public async Task SchedulerLimitsConcurrency() {     // Создаём планировщик, ограничивающий выполнение до 2 задач одновременно     var scheduler = new LimitedConcurrencyLevelTaskScheduler(2);     var taskFactory = new TaskFactory(scheduler);     int runningTasks = 0;      var tasks = Enumerable.Range(0, 10).Select(async _ =>     {         Interlocked.Increment(ref runningTasks);         Assert.True(runningTasks <= 2, "Превышен лимит параллелизма");         await Task.Delay(100); // Симуляция работы         Interlocked.Decrement(ref runningTasks);     });      await Task.WhenAll(tasks); }

Счетчик runningTasks увеличивается при запуске задачи и уменьшается при завершении. Через Assert проверяем, что одновременно выполняется не более 2 задач.

Итоги

Создание собственного TaskScheduler необходимо в случаях:

  • Если стандартный TaskScheduler не справляется с ограничением задач.

  • Требуется управление приоритетами.

  • Нужен специфический контекст выполнения (UI, ограниченный пул потоков).


В завершение скажу пару слов об открытых уроках по ASP.NET, которые в ближайшее время пройдут в OTUS:

  • 28 ноября: «SignalR в ASP.NET Core приложениях». Подробнее

  • 11 декабря: «Мониторинг работоспособности ASP.NET Core приложений». Подробнее


ссылка на оригинал статьи https://habr.com/ru/articles/861074/