Привет, Хабр!
В многопоточном программирование постоянно нужно решать задачи, связанные с доступом к данным из нескольких потоков одновременно. И тут очень кстати ConcurrentBag — коллекция, которая была добавлена в .NET Framework 4.0 специально для таких случаев. Она подходит для ситуаций, где порядок элементов не важен. Если нужно просто кидать данные в мешок, пока куча потоков их туда же добавляет.
Поэтому ConcurrentBag создана для сценариев, где один и тот же поток часто и добавляет, и извлекает данные. Поэтому эта коллекция идеальна для паттернов типа Producer-Consumer, где один поток наполняет коллекцию, а другой извлекает данные для дальнейшей обработки. Но самое классное, что работает она по принципу work-stealing.
Как работает ConcurrentaBag
Thread-local storage: локальные очереди для каждого потока
В основе ConcurrentBag лежит интересный механизм — локальные очереди для каждого потока. То есть каждый поток, который добавляет данные в коллекцию, имеет свою собственную локальную очередь. Эти локальные очереди не изолированы полностью, их можно использовать для передачи данных между потоками, но основная идея заключается в том, что каждый поток работает со своей локальной копией данных.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>(); Task.Factory.StartNew(() => { for (int i = 0; i < 5; i++) { bag.Add(i); Console.WriteLine($"Thread {Task.CurrentId} added {i}"); } }); Task.Factory.StartNew(() => { foreach (var item in bag) { Console.WriteLine($"Thread {Task.CurrentId} accessed {item}"); } }); Task.WaitAll(); // Ожидаем завершения задач
Здесь каждый поток добавляет элементы в локальную очередь и может получить доступ к данным. Однако, когда элементы добавляются одним потоком, а извлекаются другим, ConcurrentBag использует алгоритм work-stealing.
work-stealing
Алгоритм work-stealing делает эту коллекцию уникальной в плане многопоточной производительности. Суть в том, что каждый поток добавляет элементы в свою очередь, но если один поток заканчивает выполнение и его локальная очередь пустеет, он может украсть данные из очереди другого потока.
Представим, естьь пул потоков, и один поток только что освободился, а в других потоках еще есть незавершенные задачи. Вместо того чтобы простаивать, свободный поток просто крадет данные из другого потока, забирая элементы из конца его локальной очереди. Весь процесс происходит в фоновом режиме, и не нужно ничего дополнительно синхронизировать.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>(); Task.Factory.StartNew(() => { for (int i = 0; i < 10; i++) { bag.Add(i); } }); Task.Factory.StartNew(() => { int result; while (bag.TryTake(out result)) { Console.WriteLine($"Thread {Task.CurrentId} took {result}"); } }); Task.WaitAll();
Так второй поток ворует данные из коллекции, добавленные первым потоком. Здесь видим работу метода TryTake
, который забирает элементы из коллекции, давая приоритет локальной очереди текущего потока. Если в ней нет элементов, поток переходит к поиску элементов в других потоках, выполняя «воровство» данных.
Когда возникают проблемы
Но не все так радужно, как может показаться на первый взгляд. Поскольку ConcurrentBag основан на локальных очередях и stealing-алгоритме, могут возникать ситуации гонок и конфликты. Например, когда один поток пытается украсть элементы у другого, могут возникать моменты, когда оба потока пытаются получить доступ к одному и тому же элементу. Для этого ConcurrentBag использует различные механизмы синхронизации, чтобы минимизировать конфликты.
Когда поток работает с тремя или более элементами в своей локальной очереди, никакая блокировка не требуется. Однако если в очереди меньше трех элементов, включается механизм блокировок для предотвращения гонок при доступе к одним и тем же данным другими потоками.
Когда нужно выполнить глобальные операции, например, ToArray
или Count
, ConcurrentBag вводит глобальную блокировку на уровне коллекции, замораживая структуру и предотвращая её изменения во время выполнения этих операций.
Когда поток пытается украсть данные, он использует SpinWait — небольшой цикл ожидания, пока другой поток завершит свои операции над локальной очередью, если есть риск конфликта.
Операции над данными в ConcurrentBag
Операция Add
Операция Add — это основа всего.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 10, (i) => { bag.Add(i); Console.WriteLine($"Thread {Task.CurrentId} added {i}"); });
Здесь каждый поток просто добавляет элементы в свой мешок, и никакой блокировки нет. Это и есть фича ConcurrentBag — локальные очереди уменьшают количество синхронизаций.
TryTake
Операция TryTake — это попытка забрать элемент из коллекции. Если поток пытается забрать элемент, он сначала пытается сделать это из своей локальной очереди. Если там ничего нет, включается тот самый механизм work-stealing, о котором мы уже говорили. Поток начинает красть элементы из других потоков, забирая данные из их локальных очередей.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 10, (i) => bag.Add(i)); int result; if (bag.TryTake(out result)) { Console.WriteLine($"Thread {Task.CurrentId} took {result}"); }
Итак, если у текущего потока есть элементы в его локальной очереди, они будут взяты именно оттуда. Если же нет, поток начнёт проверять, можно ли украсть элементы у других потоков.
TryPeek
TryPeek
похож на TryTake
, но с отличием: он не удаляет элемент из коллекции. Этот метод хорош, когда нужно просто заглянуть в коллекцию, не вмешиваясь в её структуру.
Пример:
int peekedItem; if (bag.TryPeek(out peekedItem)) { Console.WriteLine($"Thread {Task.CurrentId} peeked and saw {peekedItem}"); }
Стоит отметить, что в порядке Peek тоже нет гарантии, что элементы будут возвращаться в том порядке, в котором они были добавлены.
Методы ToArray и Count: зачем их избегать
Методы ToArray и Count в ConcurrentBag запускают глобальные блокировки на уровне коллекции. Их в основном юзают для того, чтобы сделать некую консистентность данных, но такие операции могут значительно замедлить работу программы.
Когда вы вызываете ToArray
, коллекция должна быть заморожена, чтобы гарантировать, что данные не изменятся, пока они копируются в массив. То же самое касается Count
— чтобы точно подсчитать количество элементов, нужно заблокировать все потоки, чтобы не допустить изменения коллекции во время вычисления.
Пример:
ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 10, (i) => bag.Add(i)); int[] items = bag.ToArray(); Console.WriteLine($"Items in bag: {string.Join(", ", items)}");
Метод ToArray
может быть полезен для финального сбора данных, но его лучше избегать.
Примеры применения
Многопоточное кэширование объектов
Допустим, есть высоконагруженное веб-приложение, которое постоянно создает и уничтожает объекты — будь то соединения с базой данных, файловые дескрипторы или просто объекты в памяти. Создание объектов с нуля — это всегда дорого по производительности, поэтому создаём пул объектов, которые могут быть повторно использованы.
class ConnectionPool { private ConcurrentBag<DatabaseConnection> _connections = new ConcurrentBag<DatabaseConnection>(); public DatabaseConnection GetConnection() { if (_connections.TryTake(out var connection)) { Console.WriteLine($"Reusing connection {connection.Id}"); return connection; } // Если нет доступных соединений, создаем новое var newConnection = new DatabaseConnection(); Console.WriteLine($"Creating new connection {newConnection.Id}"); return newConnection; } public void ReturnConnection(DatabaseConnection connection) { _connections.Add(connection); Console.WriteLine($"Returned connection {connection.Id} back to the pool"); } }
В этом коде ConcurrentBag
используется как пул соединений. Когда поток запрашивает соединение, он пытается забрать его из мешка. Если соединений нет, создаётся новое. Когда работа завершена, соединение возвращается в пул для повторного использования другими потоками.
Асинхронное логирование в многопоточной среде
Можно использовать ConcurrentBag для хранения логов и асинхронного их сбора. Допустим, есть приложение, которое пишет логи с разных потоков, и потом они собираются и записываются в файл или базу данных.
Пример:
class Logger { private ConcurrentBag<string> _logMessages = new ConcurrentBag<string>(); public void LogMessage(string message) { _logMessages.Add($"{DateTime.Now}: {message}"); } public void WriteLogsToFile() { foreach (var message in _logMessages) { Console.WriteLine($"Writing log: {message}"); // Пример записи в файл или базу данных } } }
Здесь ConcurrentBag используется для хранения сообщений логов, которые добавляются асинхронно с разных потоков. Когда наступает время сбора логов (например, через каждые 10 минут), данные забираются из ConcurrentBag и записываются в файл или базу данных.
Паттерн Producer-Consumer
И вот мы подошли к, пожалуй, самому важному — Producer-Consumer. Это классический паттерн многопоточных приложений, где один или несколько потоков создают данные (Producers), а другие потоки их обрабатывают (Consumers). В этом случае ConcurrentBag — хороший кандидат для работы с такими задачами.
Пример:
class TaskProcessor { private ConcurrentBag<Action> _tasks = new ConcurrentBag<Action>(); public void ProduceTasks() { for (int i = 0; i < 10; i++) { var task = new Action(() => Console.WriteLine($"Processing task {i}")); _tasks.Add(task); Console.WriteLine($"Task {i} added"); } } public void ConsumeTasks() { while (_tasks.TryTake(out var task)) { task(); Console.WriteLine("Task executed"); } } }
Здесь два метода: ProduceTasks
, который добавляет задачи в ConcurrentBag, и ConsumeTasks
, который забирает задачи и выполняет их.
Подробнее с ConcurrentBag можно ознакомиться здесь.
Всех, кому интересна тема работы с данными в C#, рекомендую посетить открытый урок «Linq на практике», который пройдет 22 октября.
После вебинара вы сможете писать свои linq‑запросы, опираясь на синтаксис linq, и сделать свою работу эффективней благодаря применению компараторов. Записаться на урок можно на странице курса «C# Developer».
ссылка на оригинал статьи https://habr.com/ru/articles/849142/
Добавить комментарий