Началась эта история довольно давно, когда я впервые попытался работать с UI не из UI-потока. И когда я начал ловить различные “глюки”, я понял, что делать это нужно осторожно. Позднее я столкнулся с этим в дотнет мире и именно в тот момент я впервые познакомился с SynchronizationContext. Но тогда, почитав про устройство этого объекта, я посчитал, что этих знаний мне достаточно. Сделать это можно, например, здесь: SynchronizationContext — когда MSDN подводит.
Вспомнил про SynchronizationContext я только с выходом c# 5 и его async/await, т.к. этот механизм взаимодействует как раз с этим самым контекстом синхронизации. Делается это для того, чтобы после асинхронной операции, код мог выполняться в вызывающем асинхронную операцию потоке, что очень удобно при работе с UI. Но запустив этот небольшой код в UI-потоке и любом другом:
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Run(() => Debug.WriteLine(Thread.CurrentThread.ManagedThreadId)); Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
Мы увидим, что код возвращается в исходный поток только при запуске в UI-потоке. Все дело в том, что контекст синхронизации задан только в UI-потоке (кроме wcf и т.д.). В голову сразу же приходит мысль, нужно просто задать контекст синхронизации нужному потоку. Но здесь нас ждет проблема, стандартная реализация SynchronizationContext не дает нам нужных возможностей. Она позволяет продолжать исполнять код в текущем потоке или в потоке из пула. После того, как я не нашел реализации, которую можно просто скопировать, запустить и увидеть желаемый результат, я решил попробовать реализовать свою и представить, как бы оно могла выглядеть на деле. Об этом и пойдет речь ниже.
Для выполнения кода в SynchronizationContext предусмотрены два виртуальных метода Send (синхронное выполнение) и Post (асинхронное). Поэтому наследуемся от SynchronizationContext и переопределяем нужные методы.
class CustomSynchronizationContext : SynchronizationContext, IDisposable { private readonly AutoResetEvent _eventReset; private readonly Queue<KeyValuePair<SendOrPostCallback, object>> _workItems; private readonly Thread _thread; public CustomSynchronizationContext() { _eventReset = new AutoResetEvent(false); _workItems = new Queue<KeyValuePair<SendOrPostCallback, object>>(); _thread = new Thread(DoWork); _thread.Start(this); } private void DoWork(object obj) { SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext); while (true) { while (_workItems.Count > 0) { var item = _workItems.Dequeue(); item.Key(item.Value); } _eventReset.Reset(); _eventReset.WaitOne(); } } public override void Post(SendOrPostCallback d, object state) { _workItems.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state)); _eventReset.Set(); } public void Dispose() { _eventReset.Dispose(); _thread.Abort(); } }
Запускаем.
static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); Console.WriteLine(Thread.CurrentThread.ManagedThreadId); syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null); }
И ожидаемо видим разные потоки. Что здесь происходит? Во-первых, для удобства, создаем и присваиваем поток внутри контекста, а не контекст потоку. Так мы будем уверены, что никто кроме нас не сможет влиять на этот поток. Во-вторых, заводим очередь, в которой будем хранить делегаты для выполнения в созданном потоке. В-третьих, “прикостыливаем” AutoResetEvent, чтобы поток не завершался и не зацикливался без дела. Ну и IDisposable. Обратите внимание, что при удалении контекста, здесь же будет попытка ликвидировать поток. Т.е. такой код:
static void Main(string[] args) { using (var syncContext = new CustomSynchronizationContext()) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null); } }
скорее всего выведет информацию только об изначальном потоке. Возможно это не то, что хотелось бы, но для демонстрационного примера, думаю, сойдет. К тому же это легко исправить.
Что с обработкой исключений? Проверим.
static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); try { syncContext.Post(o => { throw new Exception("TestException"); }, null); } catch (Exception ex) { Console.WriteLine(ex.Message); } }
Ожидаемо падает в нашем “особенном” потоке. Самое время вспомнить, что у нас есть еще и метод Send, который отвечает за синхронное выполнение. Это должно позволить дождаться завершения выполнения делегата и получить исключение. Попробуем.
class CustomSynchronizationContext : SynchronizationContext, IDisposable { private readonly AutoResetEvent _workerResetEvent; private readonly ConcurrentQueue<WorkItem> _workItems; private readonly Thread _thread; public CustomSynchronizationContext() { _workerResetEvent = new AutoResetEvent(false); _workItems = new ConcurrentQueue<WorkItem>(); _thread = new Thread(DoWork); _thread.Start(this); } private void DoWork(object obj) { SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext); while (true) { WorkItem workItem; while (_workItems.TryDequeue(out workItem)) workItem.Execute(); _workerResetEvent.Reset(); _workerResetEvent.WaitOne(); } } public override void Send(SendOrPostCallback d, object state) { if (Thread.CurrentThread == _thread) d(state); else { using (var resetEvent = new AutoResetEvent(false)) { var wiExecutionInfo = new WorkItemExecutionInfo(); _workItems.Enqueue(new SynchronousWorkItem(d, state, resetEvent, ref wiExecutionInfo)); _workerResetEvent.Set(); resetEvent.WaitOne(); if (wiExecutionInfo.HasException) throw wiExecutionInfo.Exception; } } } public override void Post(SendOrPostCallback d, object state) { _workItems.Enqueue(new AsynchronousWorkItem(d, state)); _workerResetEvent.Set(); } public void Dispose() { _workerResetEvent.Dispose(); _thread.Abort(); } private class WorkItemExecutionInfo { public bool HasException => Exception != null; public Exception Exception { get; set; } } private abstract class WorkItem { protected readonly SendOrPostCallback SendOrPostCallback; protected readonly object State; protected WorkItem(SendOrPostCallback sendOrPostCallback, object state) { SendOrPostCallback = sendOrPostCallback; State = state; } public abstract void Execute(); } private class SynchronousWorkItem : WorkItem { private readonly AutoResetEvent _syncObject; private readonly WorkItemExecutionInfo _workItemExecutionInfo; public SynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state, AutoResetEvent resetEvent, ref WorkItemExecutionInfo workItemExecutionInfo) : base(sendOrPostCallback, state) { if (workItemExecutionInfo == null) throw new NullReferenceException(nameof(workItemExecutionInfo)); _syncObject = resetEvent; _workItemExecutionInfo = workItemExecutionInfo; } public override void Execute() { try { SendOrPostCallback(State); } catch (Exception ex) { _workItemExecutionInfo.Exception = ex; } _syncObject.Set(); } } private class AsynchronousWorkItem : WorkItem { public AsynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state) : base(sendOrPostCallback, state) { } public override void Execute() { SendOrPostCallback(State); } } }
Здесь для удобства вводим класс WorkItem, который будет выполнять код (делегат) нужным нам способом. От него наследуем еще два SynchronousWorkItem и AsynchronousWorkItem, по названию понятно в чем их различие. В реализациях отличие только в том, что в синхронной версии реализовано ожидание (AutoResetEvent) и поглощение исключения, которое далее будет брошено в изначальном потоке. Теперь KeyValuePair<SendOrPostCallback, object> можно поменять на WorkItem, ну и поменяем простую очередь на конкурентную. Также в методе Send добавляем проверку текущего потока и если он вдруг окажется “нашим”, то просто запустим делегат здесь же.
Снова проверяем.
static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); try { syncContext.Send(o => { throw new Exception("TestException"); }, null); } catch (Exception ex) { Console.WriteLine(ex.Message); } }
Теперь исключение успешно обработано. Ну и пришло время запустить самый первый пример кода, который упоминался в статье, на потоке с только что созданным контекстом синхронизации.
static void Main(string[] args) { var syncContext = new CustomSynchronizationContext(); syncContext.Post(TestAsyncMethod, null); } async static void TestAsyncMethod(object obj) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId)); Console.WriteLine(Thread.CurrentThread.ManagedThreadId); }
Мой вывод:
9
10
9
ссылка на оригинал статьи http://habrahabr.ru/post/269985/
Добавить комментарий