Пишем свой SynchronizationContext

от автора

Началась эта история довольно давно, когда я впервые попытался работать с UI не из UI-потока. И когда я начал ловить различные “глюки”, я понял, что делать это нужно осторожно. Позднее я столкнулся с этим в дотнет мире и именно в тот момент я впервые познакомился с SynchronizationContext. Но тогда, почитав про устройство этого объекта, я посчитал, что этих знаний мне достаточно. Сделать это можно, например, здесь: SynchronizationContext — когда MSDN подводит.

Вспомнил про SynchronizationContext я только с выходом c# 5 и его async/await, т.к. этот механизм взаимодействует как раз с этим самым контекстом синхронизации. Делается это для того, чтобы после асинхронной операции, код мог выполняться в вызывающем асинхронную операцию потоке, что очень удобно при работе с UI. Но запустив этот небольшой код в UI-потоке и любом другом:

Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Run(() => Debug.WriteLine(Thread.CurrentThread.ManagedThreadId)); Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); 

Мы увидим, что код возвращается в исходный поток только при запуске в UI-потоке. Все дело в том, что контекст синхронизации задан только в UI-потоке (кроме wcf и т.д.). В голову сразу же приходит мысль, нужно просто задать контекст синхронизации нужному потоку. Но здесь нас ждет проблема, стандартная реализация SynchronizationContext не дает нам нужных возможностей. Она позволяет продолжать исполнять код в текущем потоке или в потоке из пула. После того, как я не нашел реализации, которую можно просто скопировать, запустить и увидеть желаемый результат, я решил попробовать реализовать свою и представить, как бы оно могла выглядеть на деле. Об этом и пойдет речь ниже.

Для выполнения кода в SynchronizationContext предусмотрены два виртуальных метода Send (синхронное выполнение) и Post (асинхронное). Поэтому наследуемся от SynchronizationContext и переопределяем нужные методы.

CustomSynchronizationContext

class CustomSynchronizationContext : SynchronizationContext, IDisposable {     private readonly AutoResetEvent _eventReset;     private readonly Queue<KeyValuePair<SendOrPostCallback, object>> _workItems;     private readonly Thread _thread;      public CustomSynchronizationContext()     {         _eventReset = new AutoResetEvent(false);         _workItems = new Queue<KeyValuePair<SendOrPostCallback, object>>();         _thread = new Thread(DoWork);         _thread.Start(this);     }      private void DoWork(object obj)     {         SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);          while (true)         {             while (_workItems.Count > 0)             {                 var item = _workItems.Dequeue();                 item.Key(item.Value);             }              _eventReset.Reset();             _eventReset.WaitOne();         }     }      public override void Post(SendOrPostCallback d, object state)     {         _workItems.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));         _eventReset.Set();     }      public void Dispose()     {         _eventReset.Dispose();         _thread.Abort();     } } 

Запускаем.

static void Main(string[] args) {     var syncContext = new CustomSynchronizationContext();     Console.WriteLine(Thread.CurrentThread.ManagedThreadId);     syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null); } 

И ожидаемо видим разные потоки. Что здесь происходит? Во-первых, для удобства, создаем и присваиваем поток внутри контекста, а не контекст потоку. Так мы будем уверены, что никто кроме нас не сможет влиять на этот поток. Во-вторых, заводим очередь, в которой будем хранить делегаты для выполнения в созданном потоке. В-третьих, “прикостыливаем” AutoResetEvent, чтобы поток не завершался и не зацикливался без дела. Ну и IDisposable. Обратите внимание, что при удалении контекста, здесь же будет попытка ликвидировать поток. Т.е. такой код:

static void Main(string[] args) {     using (var syncContext = new CustomSynchronizationContext())     {         Console.WriteLine(Thread.CurrentThread.ManagedThreadId);         syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null);     } } 

скорее всего выведет информацию только об изначальном потоке. Возможно это не то, что хотелось бы, но для демонстрационного примера, думаю, сойдет. К тому же это легко исправить.

Что с обработкой исключений? Проверим.

static void Main(string[] args) {     var syncContext = new CustomSynchronizationContext();     try     {         syncContext.Post(o => { throw new Exception("TestException"); }, null);     }     catch (Exception ex)     {         Console.WriteLine(ex.Message);     } } 

Ожидаемо падает в нашем “особенном” потоке. Самое время вспомнить, что у нас есть еще и метод Send, который отвечает за синхронное выполнение. Это должно позволить дождаться завершения выполнения делегата и получить исключение. Попробуем.

CustomSynchronizationContext (финальная демонстрационная версия)

class CustomSynchronizationContext : SynchronizationContext, IDisposable {     private readonly AutoResetEvent _workerResetEvent;     private readonly ConcurrentQueue<WorkItem> _workItems;     private readonly Thread _thread;      public CustomSynchronizationContext()     {         _workerResetEvent = new AutoResetEvent(false);         _workItems = new ConcurrentQueue<WorkItem>();         _thread = new Thread(DoWork);         _thread.Start(this);     }      private void DoWork(object obj)     {         SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);          while (true)         {             WorkItem workItem;             while (_workItems.TryDequeue(out workItem))                 workItem.Execute();              _workerResetEvent.Reset();             _workerResetEvent.WaitOne();         }     }      public override void Send(SendOrPostCallback d, object state)     {         if (Thread.CurrentThread == _thread)             d(state);         else         {             using (var resetEvent = new AutoResetEvent(false))             {                 var wiExecutionInfo = new WorkItemExecutionInfo();                 _workItems.Enqueue(new SynchronousWorkItem(d, state, resetEvent, ref wiExecutionInfo));                 _workerResetEvent.Set();                  resetEvent.WaitOne();                 if (wiExecutionInfo.HasException)                     throw wiExecutionInfo.Exception;             }         }     }      public override void Post(SendOrPostCallback d, object state)     {         _workItems.Enqueue(new AsynchronousWorkItem(d, state));         _workerResetEvent.Set();     }      public void Dispose()     {         _workerResetEvent.Dispose();         _thread.Abort();     }      private class WorkItemExecutionInfo     {         public bool HasException => Exception != null;         public Exception Exception { get; set; }     }      private abstract class WorkItem     {         protected readonly SendOrPostCallback SendOrPostCallback;         protected readonly object State;          protected WorkItem(SendOrPostCallback sendOrPostCallback, object state)         {             SendOrPostCallback = sendOrPostCallback;             State = state;         }          public abstract void Execute();     }      private class SynchronousWorkItem : WorkItem     {         private readonly AutoResetEvent _syncObject;         private readonly WorkItemExecutionInfo _workItemExecutionInfo;          public SynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state, AutoResetEvent resetEvent,             ref WorkItemExecutionInfo workItemExecutionInfo) : base(sendOrPostCallback, state)         {             if (workItemExecutionInfo == null)                 throw new NullReferenceException(nameof(workItemExecutionInfo));              _syncObject = resetEvent;             _workItemExecutionInfo = workItemExecutionInfo;         }          public override void Execute()         {             try             {                 SendOrPostCallback(State);             }             catch (Exception ex)             {                 _workItemExecutionInfo.Exception = ex;             }             _syncObject.Set();         }     }      private class AsynchronousWorkItem : WorkItem     {         public AsynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state)             : base(sendOrPostCallback, state)         {         }          public override void Execute()         {             SendOrPostCallback(State);         }     } } 

Здесь для удобства вводим класс WorkItem, который будет выполнять код (делегат) нужным нам способом. От него наследуем еще два SynchronousWorkItem и AsynchronousWorkItem, по названию понятно в чем их различие. В реализациях отличие только в том, что в синхронной версии реализовано ожидание (AutoResetEvent) и поглощение исключения, которое далее будет брошено в изначальном потоке. Теперь KeyValuePair<SendOrPostCallback, object> можно поменять на WorkItem, ну и поменяем простую очередь на конкурентную. Также в методе Send добавляем проверку текущего потока и если он вдруг окажется “нашим”, то просто запустим делегат здесь же.

Снова проверяем.

static void Main(string[] args) {     var syncContext = new CustomSynchronizationContext();     try     {         syncContext.Send(o => { throw new Exception("TestException"); }, null);     }     catch (Exception ex)     {         Console.WriteLine(ex.Message);     } } 

Теперь исключение успешно обработано. Ну и пришло время запустить самый первый пример кода, который упоминался в статье, на потоке с только что созданным контекстом синхронизации.

static void Main(string[] args) {     var syncContext = new CustomSynchronizationContext();     syncContext.Post(TestAsyncMethod, null); }  async static void TestAsyncMethod(object obj) {     Console.WriteLine(Thread.CurrentThread.ManagedThreadId);     await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));     Console.WriteLine(Thread.CurrentThread.ManagedThreadId); } 

Мой вывод:

9
10
9

ссылка на оригинал статьи http://habrahabr.ru/post/269985/