Примитивы синхронизации в JavaScript: cемафоры и хоккей

от автора

На дворе стоит двадцать второй год и все основные среды исполнения JavaScript уже вовсю поддерживают доступ к потокам. Причем, в отличие от языков с глобальной блокировкой интерпретатора, вроде Python и Ruby, где для параллельного выполнения задач рекомендуется запускать отдельные процессы, в JS это именно потоки с возможностью использования разделяемой памяти, а также всеми достоинствами и недостатками такой свободы. Конечно, есть и серьезные ограничения — например, не получится совместно использовать один JavaScript-объект в нескольких потоках. Но, тем не менее, задачи, связанные с трудоемкими математическими вычислениями или обработкой графики можно смело выносить в отдельные потоки уже сейчас.

Что же с поддержкой?

Посмотрим, что с поддержкой у основных инструментов, задействованных в реализации многопоточного кода: собственно возможности запуска кода в отдельном потоке, низкоуровневых методах синхронизации Atomics, а также буфера для создания области разделяемой памяти SharedArrayBuffer.

C Node.js, где модуль worker_threads стабилен начиная с версии 11.7.0, а Atomics и SharedArrayBuffer и того раньше, все просто. А вот в браузерах, и в особенности в мобильных версиях, поддержка Atomics и SharedArrayBuffer появилась в полной мере лишь в прошлом году. Однако теперь они с нами и готовы разгружать основной поток от тяжелых вычислений.

О структуре статьи

Дабы не зевать, перекладывая ничего не значащие байты из ячейки в ячейку, наблюдая за возникновением коллизий, попробуем разобраться с простейшими примитивами синхронизации на основе физического процесса. Таковым выступит хоккейный матч. На этапе раскатки перед матчем с помощью бинарного семафора разделим игроков на две равных группы, а уже в процессе игры с помощью семафора со счетчиком избавимся от штрафов за нарушение численного состава.

Примеры кода напишем для среды Node.js. Для полноценного восприятия примеров стоит познакомиться с возможностями модуля worker_threads, особенностями использования SharedArrayBuffer и атомарными операциями над ним.

Кипятков Кирилл - Хоккей (2016)
Кипятков Кирилл — Хоккей (2016)

Часть первая: разминка и бинарный семафор

Итак, матч еще впереди, игрокам нужно размяться и тренер решает следующее: первая половина игроков будет тренировать броски, вторая — катание. Очередной, вышедший из раздевалки игрок, получает следующее задание — присоединиться к группе тренирующих катание, или к группе, тренирующих броски так, чтобы количество игроков в группах было равным. Код будем писать в одном файле, воспользовавшись флагом isMainThread для разделения логики основного и вспомогательных потоков. Текущее количество игроков в каждой из групп будем хранить в двух ячейках буфера разделяемой памяти.

const threads = require('worker_threads'); const { Worker, isMainThread } = threads;  if (isMainThread) {   //Главный поток } else {   //Вспомогательный поток - игрок }

Код для логики главного потока будет включать в себя три составляющих: инициализацию буфера, запуск потоков, проверка итогового распределения игроков по группам.

/* Код главного потока */  // Инициализируем буфер c двумя счетчиками  // [*делают броски*, *тренируют катание*] const buffer = new SharedArrayBuffer(2);  // Запускам/выпускаем не площадку 22 потока-игрока for (let i = 0; i < 22; i++) {   new Worker(__filename, { workerData: buffer }); }  // Проверяем итоговое распределение по группам после разминки setTimeout(() => { const array = new Int8Array(buffer, 0); console.log(`Делали броски: ${array[0]}`);   console.log(`Тренировали катание: ${array[1]}`); }, 1000);

Задача вспомогательного потока (потока-игрока) заключается в следующем: выбрать группу, где игроков не больше чем в соседней, тем самым сохранив баланс. Сделаем это через проверку на равенство для более яркой иллюстрации состояния гонки.

/* Код вспомогательного потока */  const { threadId, workerData } = threads; const array = new Int8Array(workerData, 0);  // Получаем количество игроков в каждой из групп const [doKicks, doSkating] = array;  console.log(`На лед выходит ${threadId}.`);  // Отправляем игрока тренировать катание или делать броски if (doKicks === doSkating) { array[1]++; } else { array[0]++; }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  if (isMainThread) {   const buffer = new SharedArrayBuffer(2);    for (let i = 0; i < 22; i++) {     new Worker(__filename, { workerData: buffer });   }    setTimeout(() => {     const array = new Int8Array(buffer, 0);     console.log(`Делали броски: ${array[0]}`);     console.log(`Тренировали катание: ${array[1]}`);   }, 1000); } else {   const { threadId, workerData } = threads;   const array = new Int8Array(workerData, 0);   const [doKicks, doSkating] = array;    console.log(`На лед выходит ${threadId}.`);    if (doKicks === doSkating) {     array[1]++;   } else {     array[0]++;   } }

Таким образом, выходя на поле, игрок присоединится к тренирующим катание, если количество игроков в группах равно, и выберет броски, если количество отличается. Выполняя такую логику в одном потоке, мы будем ожидать, что количество игроков в группах всего будет равным — по 11. Запустив же программу выше, мы получаем совершенно ненадежные результаты (возможно придется запустить ее несколько раз).

... Делали броски: 19 Тренировали катание: 2 ... Делали броски: 12 Тренировали катание: 10 ... Делали броски: 14 Тренировали катание: 8

Это происходит по той причине, что в критическую секцию (код проверки количества игроков в группах и выбора одной из них) одновременно могут войти сразу несколько потоков. Это похоже на то, как игроки выходя на поле и увидев, что группы наполнены одинаково, оба отправляются в одну и ту же группу. Этого можно было бы избежать, если бы один из игроков дождался решения другого.

Бинарный семафор

Бинарный семафор является инструментом для управления доступом к разделяемым данным (в нашем случае счетчикам количества игроков). Он гарантирует, что в каждый момент времени работать с ресурсом может не более одного потока. Используя бинарный семафор поток может захватить блокировку, выполнить код и освободить блокировку, не беспокоясь, что в момент выполнения другой поток зайдет в критическую секцию. Похожим примитивом синхронизации является мьютекс за тем лишь исключением, что в случае мьютекса освободить блокировку может только поток-владелец, первым захвативший мьютекс.

Чтобы вникнуть в суть идеи пройдем путь создания бинарного семафора итеративно. Для начала посмотрим, как может выглядеть вариант его использования.

/* Код вспомогательного потока */  const { threadId, workerData } = threads;  // Инициализируем семафор в каждом потоке const semaphore = new BinarySemaphore(workerData); const array = new Int8Array(workerData, 1);  // Блокируем критическую секцию semaphore.enter(); const [doKicks, doSkating] = array;  console.log(`На лед выходит ${threadId}.`);  if (doKicks === doSkating) { array[1]++; } else { array[0]++; }  // Освобождаем критическую секцию semaphore.leave();

Итак, войдя в критическую секцию, поток блокирует доступ к ней, а выполнив код — освобождает ее. Флаг для хранения состояния семафора поместим в том же буфере разделяемой памяти на нулевой позиции. Посмотрим как покажет себя следующая наивная реализация.

const LOCKED = 1; const UNLOCKED = 0;  class BinarySemaphore { constructor(shared, offset = 0) {   this.lock = new Int8Array(shared, offset, 1); }  enter() {   while (this.lock[0] !== UNLOCKED);   this.lock[0] = LOCKED; }    leave() {   this.lock[0] = UNLOCKED; } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  const LOCKED = 1; const UNLOCKED = 0;  class BinarySemaphore {   constructor(shared, offset = 0) {     this.lock = new Int8Array(shared, offset, 1);   }    enter() {     while (this.lock[0] !== UNLOCKED);     this.lock[0] = LOCKED;   }    leave() {     this.lock[0] = UNLOCKED;   } }  if (isMainThread) {   const buffer = new SharedArrayBuffer(3);    for (let i = 0; i < 22; i++) {     new Worker(__filename, { workerData: buffer });   }    setTimeout(() => {     const array = new Int8Array(buffer, 1);     console.log(`Делали броски: ${array[0]}`);     console.log(`Тренировали катание: ${array[1]}`);   }, 2000); } else {   const { threadId, workerData } = threads;   const semaphore = new BinarySemaphore(workerData);   const array = new Int8Array(workerData, 1);    semaphore.enter();   const [doKicks, doSkating] = array;    console.log(`На лед выходит ${threadId}.`);    if (doKicks === doSkating) {     array[1]++;   } else {     array[0]++;   }    semaphore.leave(); } 

Запускаем, и… видим, что стало значительно лучше — игроки почти всегда делятся на равные группы. Однако запуская программу снова и снова (или увеличив количество игроков-потоков), мы снова видим, что ошибки случаются. Почему же это происходит? А причина в том, что теперь у нас снова есть критическая секция — это код метода enter семафора. Получив значение флага мы не гарантируем, что оно будет тем же в момент, когда мы устанавливаем его в состояние блокировки. Самое время воспользоваться возможностями Atomics.

В этот момент можно задаться вопросом, зачем в принципе использовать бинарный семафор или мьютекс, если мы уже имеем Atomics с возможностью атомарно обращаться с разделяемой памятью. Причина в требуемой сложности операций. Если нам достаточно логики для проверки и изменения значения одной ячейки памяти — можно и нужно использовать Atomics, его будет достаточно. В противном случае, когда требуется атомарно считать/отредактировать две и больше ячейки памяти — потребуется более сложная логика для синхронизации.

Перепишем семафор на Atomics, воспроизведя по сути ту же логику из примера выше, используя атомарные операции.

const {   compareExchange, wait, notify } = Atomics;  class BinarySemaphore {   constructor(shared, offset = 0) {     this.lock = new Int32Array(shared, offset, 1);   }    enter() {     while (true) {       if (compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {         return;       }       wait(this.lock, 0, LOCKED);     }   }    leave() {     if (compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {       // Лучше выкинуть исключение, чтобы не прозевать такой момент       return;     }     notify(this.lock, 0, 1);   } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  const LOCKED = 1; const UNLOCKED = 0;  class BinarySemaphore {   constructor(shared, offset = 0) {     this.lock = new Int32Array(shared, offset, 1);   }    enter() {     while (true) {       if (Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED) {         return;       }       Atomics.wait(this.lock, 0, LOCKED);     }   }    leave() {     if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {       return;     }     Atomics.notify(this.lock, 0, 1);   } }  if (isMainThread) {   const buffer = new SharedArrayBuffer(6);    for (let i = 0; i < 22; i++) {     new Worker(__filename, { workerData: buffer });   }    setTimeout(() => {     const array = new Int8Array(buffer, 4);     console.log(`Делали броски: ${array[0]}`);     console.log(`Тренировали катание: ${array[1]}`);   }, 1000); } else {   const { threadId, workerData } = threads;   const semaphore = new BinarySemaphore(workerData);   const array = new Int8Array(workerData, 4);    semaphore.enter();   const [doKicks, doSkating] = array;    console.log(`На лед выходит ${threadId}.`);    if (doKicks === doSkating) {     array[1]++;   } else {     array[0]++;   }    semaphore.leave(); } 

Это уже абсолютно надежный вариант. Можно проверить, увеличивая количество потоков и выполняя программу большое число раз — игроки всегда будут делиться на две одинаковые группы. В процесс проверки и изменения флага посредством compareExchange не сможет вклиниться ни один другой поток кроме текущего, обращающегося к семафору.

Делали броски: 11 Тренировали катание: 11

Следует обратить внимание, что в качестве типизированного массива для представления буффера теперь используется экземпляр Int32Array. Основные методы Atomics работают только с экземплярами Int32Array и BigInt64Array. Попробовав применить их к неподдерживаемому типу TypedArray, мы получим ошибку вроде приведенной ниже.

Uncaught TypeError: [object Int8Array] is not an int32 or BigInt64 typed array

Осталось лишь добавить, что на практике гораздо удобнее использовать семафор в callback-формате, реализовав дополнительный метод в коде семафора.

exec(callback) {   this.enter();   try {     return callback();   } finally {     this.leave();   } }

Применить его мы сможем следующим образом.

semaphore.exec(() => {   const [doKicks, doSkating] = array;    console.log(`На лед выходит ${threadId}.`);    if (doKicks === doSkating) {     array[1]++;   } else {     array[0]++;   } });
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  const LOCKED = 1; const UNLOCKED = 0;  class BinarySemaphore {   constructor(shared, offset = 0) {     this.lock = new Int32Array(shared, offset, 1);   }    enter() {     while (true) {       if (         Atomics.compareExchange(this.lock, 0, UNLOCKED, LOCKED) === UNLOCKED       ) {         return;       }       Atomics.wait(this.lock, 0, LOCKED);     }   }    leave() {     if (Atomics.compareExchange(this.lock, 0, LOCKED, UNLOCKED) !== LOCKED) {       throw new Error("Cannot leave unlocked BinarySemaphore");     }     Atomics.notify(this.lock, 0, 1);   }    exec(callback) {     this.enter();     try {       return callback();     } finally {       this.leave();     }   } }  if (isMainThread) {   const buffer = new SharedArrayBuffer(6);    for (let i = 0; i < 22; i++) {     new Worker(__filename, { workerData: buffer });   }    setTimeout(() => {     const array = new Int8Array(buffer, 4);     console.log(`Делали броски: ${array[0]}`);     console.log(`Тренировали катание: ${array[1]}`);   }, 1000); } else {   const { threadId, workerData } = threads;   const semaphore = new BinarySemaphore(workerData);   const array = new Int8Array(workerData, 4);    semaphore.exec(() => {     const [doKicks, doSkating] = array;      console.log(`На лед выходит ${threadId}.`);      if (doKicks === doSkating) {       array[1]++;     } else {       array[0]++;     }   }); }

Естественно, существует не один вариант реализации методов enter и leave на основе Atomics, например, возможен рекурсивный вариант enter. В любом случае, итоговое решение может выбираться из требований по памяти и быстродействию. Есть также вероятность, что дополнительная логика кроме атомарных операций Atomics и не потребуется в принципе (что было бы замечательно).

Часть вторая: матч и семафор со счетчиком

Матч в разгаре, игроки меняются. Отдохнувшие постепенно заменяют уставших, следуя правилу: на площадке не должно быть больше пяти игроков (вратаря не меняем). В критическую секцию (площадку для игры) допускаем теперь до пяти потоков одновременно (но не более). Булевого флага, использовавшегося в бинарном семафоре для этого недостаточно — нужен счетчик.

Семафор со счетчиком

Пойдем проторенной тропинкой, посмотрим на код потоков, а затем доработаем семафор.

/* Код главного потока */  const buffer = new SharedArrayBuffer(4);  // Инициализируем семафор, устанавливая счетчик в 5 const semaphore = new CountingSemaphore(buffer, 0, 5);  console.log(`Счетчик семафора: ${semaphore.counter[0]}`);  // Постепенно выпускаем на площадку игроков for (let i = 0; i < 50; i++) {   new Worker(__filename, { workerData: buffer }); }

В игре одновременно задействовано много игроков (их больше 20, так как пятерки сменяются не один раз за игру), а мест на площадке всего 5.

/* Код вспомогательного потока */  const { threadId, workerData } = threads; const semaphore = new CountingSemaphore(workerData);    // Входим в критическую секцию semaphore.enter(); console.log(`На лед выходит ${threadId}.`);  // Вычисляем текущее количество игроков на площадке const players = 5 - semaphore.counter[0]; if (players > 5) { console.log('Нарушение! На поле ' + players + ' игроков'); }  // Спустя 10мс выходим из нее setTimeout(() => semaphore.leave(), 10);

Доработаем первую версию бинарного семафора, заменив флаг на счетчик и попробуем, что из этого получится.

class CountingSemaphore {   constructor(shared, offset = 0, initial) {     this.counter = new Int32Array(shared, offset, 1);     if (typeof initial === "number") {       this.counter[0] = initial;     }   }    enter() {     while (this.counter[0] === 0);     this.counter[0]--;   }    leave() {     this.counter[0]++;   } }
Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  class CountingSemaphore {   constructor(shared, offset = 0, initial) {     this.counter = new Int32Array(shared, offset, 1);     if (typeof initial === "number") {       this.counter[0] = initial;     }   }    enter() {     while (this.counter[0] === 0);     this.counter[0]--;   }    leave() {     this.counter[0]++;   } }  if (isMainThread) {   const buffer = new SharedArrayBuffer(4);   const semaphore = new CountingSemaphore(buffer, 0, 5);   console.log(`Счетчик семафора: ${semaphore.counter[0]}`);   for (let i = 0; i < 50; i++) {     new Worker(__filename, { workerData: buffer });   } } else {   const { threadId, workerData } = threads;   const semaphore = new CountingSemaphore(workerData);    semaphore.enter();   console.log(`На лед выходит ${threadId}.`);   const players = 5 - semaphore.counter[0];   if (players > 5) {     console.log(`Нарушение! На поле ${players} игроков`);   }   setTimeout(() => {     semaphore.leave();   }, 10); } 

Результат достаточно ожидаем для тех, кто прочитал первую часть статьи. Запускаем программу видим, что команда получает штраф за нарушение численного состава с завидной регулярностью.

На лед выходит 40 На лед выходит 42 Нарушение! На поле 6 игроков

Причина кроется все в том же — в метод enter семафора вхожи все потоки одновременно, что приводит к состоянию гонки. Решение на поверхности — привет, Atomics!

class CountingSemaphore {   constructor(shared, offset = 0, initial) {     this.counter = new Int32Array(shared, offset, 1);     if (typeof initial === "number") {       Atomics.store(this.counter, 0, initial);     }   }    enter() {     while (true) {       Atomics.wait(this.counter, 0, 0);       const n = Atomics.load(this.counter, 0);       if (n > 0) {         const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);         if (prev === n) return;       }     }   }    leave() {     Atomics.add(this.counter, 0, 1);     Atomics.notify(this.counter, 0, 1);   } }

Здесь пришлось попотеть над методом enter по той причине, что при установке счетчика должны быть выполнены два условия: он не должен опуститься ниже ноля, в также не должен поменяться в промежутке между считыванием и установкой нового значения. Учтя эти два условия получаем надежный семафор, не допускающий внутрь критической секции лишние потоки.

Полный код примера
const threads = require("worker_threads"); const { Worker, isMainThread } = threads;  class CountingSemaphore {   constructor(shared, offset = 0, initial) {     this.counter = new Int32Array(shared, offset, 1);     if (typeof initial === "number") {       Atomics.store(this.counter, 0, initial);     }   }    enter() {     while (true) {       Atomics.wait(this.counter, 0, 0);       const n = Atomics.load(this.counter, 0);       if (n > 0) {         const prev = Atomics.compareExchange(this.counter, 0, n, n - 1);         if (prev === n) return;       }     }   }    leave() {     Atomics.add(this.counter, 0, 1);     Atomics.notify(this.counter, 0, 1);   } }  if (isMainThread) {   const buffer = new SharedArrayBuffer(4);   const semaphore = new CountingSemaphore(buffer, 0, 5);   console.log(`Счетчик семафора: ${semaphore.counter[0]}`);   for (let i = 0; i < 50; i++) {     new Worker(__filename, { workerData: buffer });   } } else {   const { threadId, workerData } = threads;   const semaphore = new CountingSemaphore(workerData);   semaphore.enter();   console.log(`На лед выходит ${threadId}.`);   const players = 5 - semaphore.counter[0];   if (players > 5) {     console.log(`Нарушение! На поле ${players} игроков`);   }    setTimeout(() => {     semaphore.leave();   }, 10); } 

Запускаем программу и видим, что полностью избавились от штрафов. На площадке (критической секции) в любой момент времени не более пяти потоков-игроков.

Итого

Что же имеем в сухом остатке. Семафоры — простое средство для блокировки доступа к разделяемому ресурсу (в нашем случае буферу памяти). Они позволяют выполнять код в критической секции только ограниченному числу потоков (одному или нескольким), не опасаясь вмешательства со стороны других потоков. Комбинируя атомарные операции, можно создавать элементы для многопоточного программирования любой сложности. В текущий момент все необходимое для реализации многопоточных вычислений достаточно широко поддержано во всех средах JavaScript вплоть до мобильных браузеров.


ссылка на оригинал статьи https://habr.com/ru/articles/679140/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *