На дворе стоит двадцать второй год и все основные среды исполнения JavaScript уже вовсю поддерживают доступ к потокам. Причем, в отличие от языков с глобальной блокировкой интерпретатора, вроде Python и Ruby, где для параллельного выполнения задач рекомендуется запускать отдельные процессы, в JS это именно потоки с возможностью использования разделяемой памяти, а также всеми достоинствами и недостатками такой свободы. Конечно, есть и серьезные ограничения — например, не получится совместно использовать один JavaScript-объект в нескольких потоках. Но, тем не менее, задачи, связанные с трудоемкими математическими вычислениями или обработкой графики можно смело выносить в отдельные потоки уже сейчас.
Что же с поддержкой?
Посмотрим, что с поддержкой у основных инструментов, задействованных в реализации многопоточного кода: собственно возможности запуска кода в отдельном потоке, низкоуровневых методах синхронизации Atomics, а также буфера для создания области разделяемой памяти SharedArrayBuffer.
C Node.js, где модуль worker_threads стабилен начиная с версии 11.7.0, а Atomics и SharedArrayBuffer и того раньше, все просто. А вот в браузерах, и в особенности в мобильных версиях, поддержка Atomics и SharedArrayBuffer появилась в полной мере лишь в прошлом году. Однако теперь они с нами и готовы разгружать основной поток от тяжелых вычислений.
О структуре статьи
Дабы не зевать, перекладывая ничего не значащие байты из ячейки в ячейку, наблюдая за возникновением коллизий, попробуем разобраться с простейшими примитивами синхронизации на основе физического процесса. Таковым выступит хоккейный матч. На этапе раскатки перед матчем с помощью бинарного семафора разделим игроков на две равных группы, а уже в процессе игры с помощью семафора со счетчиком избавимся от штрафов за нарушение численного состава.
Примеры кода напишем для среды Node.js. Для полноценного восприятия примеров стоит познакомиться с возможностями модуля worker_threads, особенностями использования SharedArrayBuffer и атомарными операциями над ним.
![Кипятков Кирилл - Хоккей (2016) Кипятков Кирилл - Хоккей (2016)](https://habrastorage.org/getpro/habr/upload_files/ca1/422/df7/ca1422df756d4344700cf85832328eae.jpg)
Часть первая: разминка и бинарный семафор
Итак, матч еще впереди, игрокам нужно размяться и тренер решает следующее: первая половина игроков будет тренировать броски, вторая — катание. Очередной, вышедший из раздевалки игрок, получает следующее задание — присоединиться к группе тренирующих катание, или к группе, тренирующих броски так, чтобы количество игроков в группах было равным. Код будем писать в одном файле, воспользовавшись флагом isMainThread для разделения логики основного и вспомогательных потоков. Текущее количество игроков в каждой из групп будем хранить в двух ячейках буфера разделяемой памяти.
const threads = require('worker_threads'); const { Worker, isMainThread } = threads; if (isMainThread) { //Главный поток } else { //Вспомогательный поток - игрок }
Код для логики главного потока будет включать в себя три составляющих: инициализацию буфера, запуск потоков, проверка итогового распределения игроков по группам.
/* Код главного потока */ // Инициализируем буфер c двумя счетчиками // [*делают броски*, *тренируют катание*] const buffer = new SharedArrayBuffer(2); // Запускам/выпускаем не площадку 22 потока-игрока for (let i = 0; i < 22; i++) { new Worker(__filename, { workerData: buffer }); } // Проверяем итоговое распределение по группам после разминки setTimeout(() => { const array = new Int8Array(buffer, 0); console.log(`Делали броски: ${array[0]}`); console.log(`Тренировали катание: ${array[1]}`); }, 1000);
Задача вспомогательного потока (потока-игрока) заключается в следующем: выбрать группу, где игроков не больше чем в соседней, тем самым сохранив баланс. Сделаем это через проверку на равенство для более яркой иллюстрации состояния гонки.
/* Код вспомогательного потока */ const { threadId, workerData } = threads; const array = new Int8Array(workerData, 0); // Получаем количество игроков в каждой из групп const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); // Отправляем игрока тренировать катание или делать броски if (doKicks === doSkating) { array[1]++; } else { array[0]++; }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; if (isMainThread) { const buffer = new SharedArrayBuffer(2); for (let i = 0; i < 22; i++) { new Worker(__filename, { workerData: buffer }); } setTimeout(() => { const array = new Int8Array(buffer, 0); console.log(`Делали броски: ${array[0]}`); console.log(`Тренировали катание: ${array[1]}`); }, 1000); } else { const { threadId, workerData } = threads; const array = new Int8Array(workerData, 0); const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } }
Таким образом, выходя на поле, игрок присоединится к тренирующим катание, если количество игроков в группах равно, и выберет броски, если количество отличается. Выполняя такую логику в одном потоке, мы будем ожидать, что количество игроков в группах всего будет равным — по 11. Запустив же программу выше, мы получаем совершенно ненадежные результаты (возможно придется запустить ее несколько раз).
... Делали броски: 19 Тренировали катание: 2 ... Делали броски: 12 Тренировали катание: 10 ... Делали броски: 14 Тренировали катание: 8
Это происходит по той причине, что в критическую секцию (код проверки количества игроков в группах и выбора одной из них) одновременно могут войти сразу несколько потоков. Это похоже на то, как игроки выходя на поле и увидев, что группы наполнены одинаково, оба отправляются в одну и ту же группу. Этого можно было бы избежать, если бы один из игроков дождался решения другого.
Бинарный семафор
Бинарный семафор является инструментом для управления доступом к разделяемым данным (в нашем случае счетчикам количества игроков). Он гарантирует, что в каждый момент времени работать с ресурсом может не более одного потока. Используя бинарный семафор поток может захватить блокировку, выполнить код и освободить блокировку, не беспокоясь, что в момент выполнения другой поток зайдет в критическую секцию. Похожим примитивом синхронизации является мьютекс за тем лишь исключением, что в случае мьютекса освободить блокировку может только поток-владелец, первым захвативший мьютекс.
Чтобы вникнуть в суть идеи пройдем путь создания бинарного семафора итеративно. Для начала посмотрим, как может выглядеть вариант его использования.
/* Код вспомогательного потока */ const { threadId, workerData } = threads; // Инициализируем семафор в каждом потоке const semaphore = new BinarySemaphore(workerData); const array = new Int8Array(workerData, 1); // Блокируем критическую секцию semaphore.enter(); const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } // Освобождаем критическую секцию semaphore.leave();
Итак, войдя в критическую секцию, поток блокирует доступ к ней, а выполнив код — освобождает ее. Флаг для хранения состояния семафора поместим в том же буфере разделяемой памяти на нулевой позиции. Посмотрим как покажет себя следующая наивная реализация.
const LOCKED = 1; const UNLOCKED = 0; class BinarySemaphore { constructor(shared, offset = 0) { this.lock = new Int8Array(shared, offset, 1); } enter() { while (this.lock[0] !== UNLOCKED); this.lock[0] = LOCKED; } leave() { this.lock[0] = UNLOCKED; } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; const LOCKED = 1; const UNLOCKED = 0; class BinarySemaphore { constructor(shared, offset = 0) { this.lock = new Int8Array(shared, offset, 1); } enter() { while (this.lock[0] !== UNLOCKED); this.lock[0] = LOCKED; } leave() { this.lock[0] = UNLOCKED; } } if (isMainThread) { const buffer = new SharedArrayBuffer(3); for (let i = 0; i < 22; i++) { new Worker(__filename, { workerData: buffer }); } setTimeout(() => { const array = new Int8Array(buffer, 1); console.log(`Делали броски: ${array[0]}`); console.log(`Тренировали катание: ${array[1]}`); }, 2000); } else { const { threadId, workerData } = threads; const semaphore = new BinarySemaphore(workerData); const array = new Int8Array(workerData, 1); semaphore.enter(); const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } semaphore.leave(); }
Запускаем, и… видим, что стало значительно лучше — игроки почти всегда делятся на равные группы. Однако запуская программу снова и снова (или увеличив количество игроков-потоков), мы снова видим, что ошибки случаются. Почему же это происходит? А причина в том, что теперь у нас снова есть критическая секция — это код метода enter семафора. Получив значение флага мы не гарантируем, что оно будет тем же в момент, когда мы устанавливаем его в состояние блокировки. Самое время воспользоваться возможностями Atomics.
В этот момент можно задаться вопросом, зачем в принципе использовать бинарный семафор или мьютекс, если мы уже имеем Atomics с возможностью атомарно обращаться с разделяемой памятью. Причина в требуемой сложности операций. Если нам достаточно логики для проверки и изменения значения одной ячейки памяти — можно и нужно использовать Atomics, его будет достаточно. В противном случае, когда требуется атомарно считать/отредактировать две и больше ячейки памяти — потребуется более сложная логика для синхронизации.
Перепишем семафор на Atomics, воспроизведя по сути ту же логику из примера выше, используя атомарные операции.
const { compareExchange, wait, notify } = Atomics; class BinarySemaphore { constructor(shared, offset = 0) { this.lock = new Int32Array(shared, offset, 1); } enter() { while (true) { if (compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) { return; } wait(this.lock, 0, LOCKED); } } leave() { if (compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) { // Лучше выкинуть исключение, чтобы не прозевать такой момент return; } notify(this.lock, 0, 1); } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; const LOCKED = 1; const UNLOCKED = 0; class BinarySemaphore { constructor(shared, offset = 0) { this.lock = new Int32Array(shared, offset, 1); } enter() { while (true) { if (Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) { return; } Atomics.wait(this.lock, 0, LOCKED); } } leave() { if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) { return; } Atomics.notify(this.lock, 0, 1); } } if (isMainThread) { const buffer = new SharedArrayBuffer(6); for (let i = 0; i < 22; i++) { new Worker(__filename, { workerData: buffer }); } setTimeout(() => { const array = new Int8Array(buffer, 4); console.log(`Делали броски: ${array[0]}`); console.log(`Тренировали катание: ${array[1]}`); }, 1000); } else { const { threadId, workerData } = threads; const semaphore = new BinarySemaphore(workerData); const array = new Int8Array(workerData, 4); semaphore.enter(); const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } semaphore.leave(); }
Это уже абсолютно надежный вариант. Можно проверить, увеличивая количество потоков и выполняя программу большое число раз — игроки всегда будут делиться на две одинаковые группы. В процесс проверки и изменения флага посредством compareExchange не сможет вклиниться ни один другой поток кроме текущего, обращающегося к семафору.
Делали броски: 11 Тренировали катание: 11
Следует обратить внимание, что в качестве типизированного массива для представления буффера теперь используется экземпляр Int32Array. Основные методы Atomics работают только с экземплярами Int32Array и BigInt64Array. Попробовав применить их к неподдерживаемому типу TypedArray, мы получим ошибку вроде приведенной ниже.
Uncaught TypeError: [object Int8Array] is not an int32 or BigInt64 typed array
Осталось лишь добавить, что на практике гораздо удобнее использовать семафор в callback-формате, реализовав дополнительный метод в коде семафора.
exec(callback) { this.enter(); try { return callback(); } finally { this.leave(); } }
Применить его мы сможем следующим образом.
semaphore.exec(() => { const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } });
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; const LOCKED = 1; const UNLOCKED = 0; class BinarySemaphore { constructor(shared, offset = 0) { this.lock = new Int32Array(shared, offset, 1); } enter() { while (true) { if ( Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED ) { return; } Atomics.wait(this.lock, 0, LOCKED); } } leave() { if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) { throw new Error("Cannot leave unlocked BinarySemaphore"); } Atomics.notify(this.lock, 0, 1); } exec(callback) { this.enter(); try { return callback(); } finally { this.leave(); } } } if (isMainThread) { const buffer = new SharedArrayBuffer(6); for (let i = 0; i < 22; i++) { new Worker(__filename, { workerData: buffer }); } setTimeout(() => { const array = new Int8Array(buffer, 4); console.log(`Делали броски: ${array[0]}`); console.log(`Тренировали катание: ${array[1]}`); }, 1000); } else { const { threadId, workerData } = threads; const semaphore = new BinarySemaphore(workerData); const array = new Int8Array(workerData, 4); semaphore.exec(() => { const [doKicks, doSkating] = array; console.log(`На лед выходит ${threadId}.`); if (doKicks === doSkating) { array[1]++; } else { array[0]++; } }); }
Естественно, существует не один вариант реализации методов enter и leave на основе Atomics, например, возможен рекурсивный вариант enter. В любом случае, итоговое решение может выбираться из требований по памяти и быстродействию. Есть также вероятность, что дополнительная логика кроме атомарных операций Atomics и не потребуется в принципе (что было бы замечательно).
Часть вторая: матч и семафор со счетчиком
Матч в разгаре, игроки меняются. Отдохнувшие постепенно заменяют уставших, следуя правилу: на площадке не должно быть больше пяти игроков (вратаря не меняем). В критическую секцию (площадку для игры) допускаем теперь до пяти потоков одновременно (но не более). Булевого флага, использовавшегося в бинарном семафоре для этого недостаточно — нужен счетчик.
Семафор со счетчиком
Пойдем проторенной тропинкой, посмотрим на код потоков, а затем доработаем семафор.
/* Код главного потока */ const buffer = new SharedArrayBuffer(4); // Инициализируем семафор, устанавливая счетчик в 5 const semaphore = new CountingSemaphore(buffer, 0, 5); console.log(`Счетчик семафора: ${semaphore.counter[0]}`); // Постепенно выпускаем на площадку игроков for (let i = 0; i < 50; i++) { new Worker(__filename, { workerData: buffer }); }
В игре одновременно задействовано много игроков (их больше 20, так как пятерки сменяются не один раз за игру), а мест на площадке всего 5.
/* Код вспомогательного потока */ const { threadId, workerData } = threads; const semaphore = new CountingSemaphore(workerData); // Входим в критическую секцию semaphore.enter(); console.log(`На лед выходит ${threadId}.`); // Вычисляем текущее количество игроков на площадке const players = 5 - semaphore.counter[0]; if (players > 5) { console.log('Нарушение! На поле ' + players + ' игроков'); } // Спустя 10мс выходим из нее setTimeout(() => semaphore.leave(), 10);
Доработаем первую версию бинарного семафора, заменив флаг на счетчик и попробуем, что из этого получится.
class CountingSemaphore { constructor(shared, offset = 0, initial) { this.counter = new Int32Array(shared, offset, 1); if (typeof initial === "number") { this.counter[0] = initial; } } enter() { while (this.counter[0] === 0); this.counter[0]--; } leave() { this.counter[0]++; } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; class CountingSemaphore { constructor(shared, offset = 0, initial) { this.counter = new Int32Array(shared, offset, 1); if (typeof initial === "number") { this.counter[0] = initial; } } enter() { while (this.counter[0] === 0); this.counter[0]--; } leave() { this.counter[0]++; } } if (isMainThread) { const buffer = new SharedArrayBuffer(4); const semaphore = new CountingSemaphore(buffer, 0, 5); console.log(`Счетчик семафора: ${semaphore.counter[0]}`); for (let i = 0; i < 50; i++) { new Worker(__filename, { workerData: buffer }); } } else { const { threadId, workerData } = threads; const semaphore = new CountingSemaphore(workerData); semaphore.enter(); console.log(`На лед выходит ${threadId}.`); const players = 5 - semaphore.counter[0]; if (players > 5) { console.log(`Нарушение! На поле ${players} игроков`); } setTimeout(() => { semaphore.leave(); }, 10); }
Результат достаточно ожидаем для тех, кто прочитал первую часть статьи. Запускаем программу видим, что команда получает штраф за нарушение численного состава с завидной регулярностью.
На лед выходит 40 На лед выходит 42 Нарушение! На поле 6 игроков
Причина кроется все в том же — в метод enter семафора вхожи все потоки одновременно, что приводит к состоянию гонки. Решение на поверхности — привет, Atomics!
class CountingSemaphore { constructor(shared, offset = 0, initial) { this.counter = new Int32Array(shared, offset, 1); if (typeof initial === "number") { Atomics.store(this.counter, 0, initial); } } enter() { while (true) { Atomics.wait(this.counter, 0, 0); const n = Atomics.load(this.counter, 0); if (n > 0) { const prev = Atomics.compareExchange(this.counter, 0, n, n - 1); if (prev === n) return; } } } leave() { Atomics.add(this.counter, 0, 1); Atomics.notify(this.counter, 0, 1); } }
Здесь пришлось попотеть над методом enter по той причине, что при установке счетчика должны быть выполнены два условия: он не должен опуститься ниже ноля, в также не должен поменяться в промежутке между считыванием и установкой нового значения. Учтя эти два условия получаем надежный семафор, не допускающий внутрь критической секции лишние потоки.
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads; class CountingSemaphore { constructor(shared, offset = 0, initial) { this.counter = new Int32Array(shared, offset, 1); if (typeof initial === "number") { Atomics.store(this.counter, 0, initial); } } enter() { while (true) { Atomics.wait(this.counter, 0, 0); const n = Atomics.load(this.counter, 0); if (n > 0) { const prev = Atomics.compareExchange(this.counter, 0, n, n - 1); if (prev === n) return; } } } leave() { Atomics.add(this.counter, 0, 1); Atomics.notify(this.counter, 0, 1); } } if (isMainThread) { const buffer = new SharedArrayBuffer(4); const semaphore = new CountingSemaphore(buffer, 0, 5); console.log(`Счетчик семафора: ${semaphore.counter[0]}`); for (let i = 0; i < 50; i++) { new Worker(__filename, { workerData: buffer }); } } else { const { threadId, workerData } = threads; const semaphore = new CountingSemaphore(workerData); semaphore.enter(); console.log(`На лед выходит ${threadId}.`); const players = 5 - semaphore.counter[0]; if (players > 5) { console.log(`Нарушение! На поле ${players} игроков`); } setTimeout(() => { semaphore.leave(); }, 10); }
Запускаем программу и видим, что полностью избавились от штрафов. На площадке (критической секции) в любой момент времени не более пяти потоков-игроков.
Итого
Что же имеем в сухом остатке. Семафоры — простое средство для блокировки доступа к разделяемому ресурсу (в нашем случае буферу памяти). Они позволяют выполнять код в критической секции только ограниченному числу потоков (одному или нескольким), не опасаясь вмешательства со стороны других потоков. Комбинируя атомарные операции, можно создавать элементы для многопоточного программирования любой сложности. В текущий момент все необходимое для реализации многопоточных вычислений достаточно широко поддержано во всех средах JavaScript вплоть до мобильных браузеров.
ссылка на оригинал статьи https://habr.com/ru/articles/679140/
Добавить комментарий