Привет, Хабр! Сегодня поговорим о том, как работают задачи в .NET, зачем может понадобиться собственный TaskScheduler
и как его реализовать.
Зачем нужен Task?
Начнём с основы. В .NET Task
— это удобная абстракция над потоками. Если потоки Thread
дают полный контроль над выполнением, то Task
управляет ресурсами через пул потоков ThreadPool
и помогает строить асинхронные операции.
Простой пример:
using System; using System.Threading; using System.Threading.Tasks; class Program { static async Task Main() { Console.WriteLine($"Основной поток: {Thread.CurrentThread.ManagedThreadId}"); await Task.Run(async () => { Console.WriteLine($"Task выполняется на потоке: {Thread.CurrentThread.ManagedThreadId}"); await Task.Delay(1000); // имитируем работу }); Console.WriteLine("Task завершён."); } }
Основная задача выполняется на одном потоке, а Task.Run
запускает дополнительный поток из пула потоков.
Почему это удобно: не нужно вручную управлять потоками, тратить ресурсы на их создание/уничтожение или заботиться о масштабировании.
Но вот загвоздка: иногда нужно больше контроля над задачами. Например:
-
Ограничить число одновременно выполняемых задач.
-
Установить приоритет выполнения.
-
Выполнить задачи в специфическом контексте (например, в UI-потоке).
Для этого .NET позволяет создавать свои TaskScheduler
.
Что делает TaskScheduler?
TaskScheduler
управляет распределением задач между потоками. По умолчанию используется TaskScheduler.Default
, который отправляет задачи в ThreadPool
. Однако:
-
Если нужно ограничить количество одновременных задач, стандартный планировщик не подойдёт.
-
Если требуется приоритет задач, придётся создавать кастомный
TaskScheduler
.
Создаем свой TaskScheduler
Ограничение параллелизма
Планировщик с ограничением одновременных задач:
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler, IDisposable { private readonly int _maxDegreeOfParallelism; private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); private int _runningTasks = 0; private readonly object _lock = new object(); private bool _disposed = false; public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); _maxDegreeOfParallelism = maxDegreeOfParallelism; } protected override IEnumerable<Task> GetScheduledTasks() { lock (_lock) { return _tasks.ToArray(); } } protected override void QueueTask(Task task) { if (_disposed) throw new ObjectDisposedException(nameof(LimitedConcurrencyLevelTaskScheduler)); lock (_lock) { _tasks.AddLast(task); TryExecuteTask(); } } private void TryExecuteTask() { if (_runningTasks >= _maxDegreeOfParallelism || _tasks.Count == 0) return; var task = _tasks.First.Value; _tasks.RemoveFirst(); _runningTasks++; ThreadPool.UnsafeQueueUserWorkItem(async _ => { try { await Task.Yield(); base.TryExecuteTask(task); } catch (Exception ex) { Console.WriteLine($"Ошибка при выполнении задачи: {ex}"); } finally { lock (_lock) { _runningTasks--; TryExecuteTask(); } } }, null); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; public override int MaximumConcurrencyLevel => _maxDegreeOfParallelism; public void Dispose() { if (!_disposed) { lock (_lock) { _tasks.Clear(); _disposed = true; } } } }
В коде:
Очередь задач (_tasks
): задачи добавляются в очередь, если уже выполняется максимально разрешённое количество задач.
Управление состоянием (_runningTasks
): счетчик текущих задач увеличивается при начале выполнения и уменьшается при завершении.
Асинхронность: используется Task.Yield()
, чтобы задачи не блокировали поток.
Приоритетизация задач
Добавим возможность управлять приоритетами. Высокоприоритетные задачи должны выполняться первыми.
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class PriorityTaskScheduler : TaskScheduler { private readonly int _maxDegreeOfParallelism; private readonly LinkedList<Task> _highPriorityTasks = new LinkedList<Task>(); private readonly LinkedList<Task> _lowPriorityTasks = new LinkedList<Task>(); private int _runningTasks = 0; private readonly object _lock = new object(); public PriorityTaskScheduler(int maxDegreeOfParallelism) { _maxDegreeOfParallelism = maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : throw new ArgumentOutOfRangeException(); } public void Enqueue(Task task, bool highPriority) { lock (_lock) { if (highPriority) _highPriorityTasks.AddLast(task); else _lowPriorityTasks.AddLast(task); TryExecuteTask(); } } private void TryExecuteTask() { if (_runningTasks >= _maxDegreeOfParallelism) return; Task task = null; lock (_lock) { if (_highPriorityTasks.Count > 0) { task = _highPriorityTasks.First.Value; _highPriorityTasks.RemoveFirst(); } else if (_lowPriorityTasks.Count > 0) { task = _lowPriorityTasks.First.Value; _lowPriorityTasks.RemoveFirst(); } if (task == null) return; _runningTasks++; } ThreadPool.UnsafeQueueUserWorkItem(async _ => { try { await Task.Yield(); base.TryExecuteTask(task); } finally { lock (_lock) { _runningTasks--; TryExecuteTask(); } } }, null); } protected override IEnumerable<Task> GetScheduledTasks() => throw new NotSupportedException(); protected override void QueueTask(Task task) => Enqueue(task, highPriority: false); protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; }
Теперь задачи с высоким приоритетом помещаются в отдельную очередь. Если высокоприоритетных задач нет, выполняются задачи из низкоприоритетной очереди.
Использование:
var scheduler = new PriorityTaskScheduler(2); scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("Low priority"), scheduler), highPriority: false); scheduler.Enqueue(Task.Factory.StartNew(() => Console.WriteLine("High priority"), scheduler), highPriority: true);
Тестирование TaskScheduler
Тестирование кастомного TaskScheduler
— это этап, который поможет убедиться в корректности работы. Для этого может быть достаточно простых юнит-тестов. Например, протестируем, что LimitedConcurrencyLevelTaskScheduler
правильно ограничивает число одновременно выполняемых задач.
[Fact] public async Task SchedulerLimitsConcurrency() { // Создаём планировщик, ограничивающий выполнение до 2 задач одновременно var scheduler = new LimitedConcurrencyLevelTaskScheduler(2); var taskFactory = new TaskFactory(scheduler); int runningTasks = 0; var tasks = Enumerable.Range(0, 10).Select(async _ => { Interlocked.Increment(ref runningTasks); Assert.True(runningTasks <= 2, "Превышен лимит параллелизма"); await Task.Delay(100); // Симуляция работы Interlocked.Decrement(ref runningTasks); }); await Task.WhenAll(tasks); }
Счетчик runningTasks
увеличивается при запуске задачи и уменьшается при завершении. Через Assert
проверяем, что одновременно выполняется не более 2 задач.
Итоги
Создание собственного TaskScheduler
необходимо в случаях:
-
Если стандартный
TaskScheduler
не справляется с ограничением задач. -
Требуется управление приоритетами.
-
Нужен специфический контекст выполнения (UI, ограниченный пул потоков).
В завершение скажу пару слов об открытых уроках по ASP.NET, которые в ближайшее время пройдут в OTUS:
ссылка на оригинал статьи https://habr.com/ru/articles/861074/
Добавить комментарий