Многопоточность JavaScript с SharedArrayBuffer и Atomics: основы

от автора

Привет, Хабр!

JavaScript по традиции известен как однопоточный язык. Т.е код выполняется последовательно, и одновременное выполнение нескольких задач может быть проблематичным. Если код сталкивается с тяжелыми вычислительными задачами, это может привести к задержкам и замедлению интерфейса юзера. Поэтому один поток не для каких-либо интенсивных вычислений или обработки больших объемов данных.

Чтобы обойти эти ограничения, были введены Web Workers — они позволяют выполнять JS-код в фоновом потоке, параллельно с основным. Однако, все сложилось так, что простой обмен данными между основным потоком и воркерами через postMessage имеет свои ограничения и может быть недостаточно хорошим для некоторых задач.

Здесь помогают SharedArrayBuffer и Atomics.

SharedArrayBuffer

SharedArrayBuffer — это особый тип буфера, который позволяет нескольким потокам разделять один и тот же блок памяти. В отличие от того же дефолтного ArrayBuffer, который доступен только одному потоку, SharedArrayBuffer дает общую область памяти, к которой могут обращаться несколько рабочих потоков.

Внутренне SharedArrayBuffer представляет собой блок памяти фикс. размера, который можно использовать для хранения бинарных данных. Он используется в связке с типизированными массивами, такими как Uint8Array или Int32Array.

Но перед тем как работать с SharedArrayBuffer очень важно использовать определенные требования по настройке среды — настройку заголовков HTTP.

COOP заголовок позволяет защитить сайт от атак типа cross-origin. Он должен быть установлен на значение same-origin:

Cross-Origin-Opener-Policy: same-origin

COEP заголовок предотвращает встраивание сайта в другие документы. Он должен быть установлен на значение require-corp или credentialless:

Cross-Origin-Embedder-Policy: require-corp

Теперь рассмотрим как создать и передать SharedArrayBuffer между потоками.

Создание SharedArrayBuffer

Конструктор SharedArrayBuffer используется для создания буфера определенного размера в байтах. Он принимает два параметра:

  • length: размер буфера в байтах.

  • options (необязательный): объект с параметрами, например как maxByteLength, который определяет максимальный размер буфера.

const sab = new SharedArrayBuffer(1024); const growableSab = new SharedArrayBuffer(8, { maxByteLength: 16 });

Методы SharedArrayBuffer

grow позволяет увеличивать размер SharedArrayBuffer, если он был создан с параметром maxByteLength. Новый размер должен быть меньше или равен maxByteLength:

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });  if (buffer.growable) {   buffer.grow(12); }

slice возвращает новый SharedArrayBuffer, содержащий копию байтов из оригинального буфера от индекса start (включительно) до end (исключительно).

const sab = new SharedArrayBuffer(1024); const slicedSab = sab.slice(2, 100);

Так можно создавать буфеы, основанные на подмножествах данных из исходного буфера, без изменения оригинального.

Свойства SharedArrayBuffer

byteLength возвращает длину SharedArrayBuffer в байтах. Это значение устанавливается при создании буфера и не может быть изменено.

const sab = new SharedArrayBuffer(1024); console.log(sab.byteLength); // 1024

Свойство growable указывает, может ли SharedArrayBuffer изменять свой размер. Если буфер был создан с параметром maxByteLength, это свойство будет равно true.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 }); console.log(buffer.growable); // true

maxByteLength возвращает максимальный размер буфера в байтах, который был установлен при создании буфера.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 }); console.log(buffer.maxByteLength); // 16

Atomics

Atomics — это встроенный объект в JS, который предоставляет набор статических методов для выполнения атомарных операций. Атомарные операции выполняются как единое, неделимое действие, именно так atomics позволяет избегать гонок данных.

Основные методы Atomics:

Atomics.add(typedArray, index, value)
Атомарно добавляет значение к элементу массива по заданному индексу и возвращает предыдущее значение:

const sab = new SharedArrayBuffer(1024); const int32 = new Int32Array(sab); Atomics.add(int32, 0, 5); // добавляет 5 к int32[0]

Atomics.sub(typedArray, index, value)
Атомарно вычитает значение из элемента массива по заданному индексу и возвращает предыдущее значение:

Atomics.sub(int32, 0, 2); // вычитает 2 из int32[0]

Atomics.load(typedArray, index)
Атомарно считывает значение элемента массива по заданному индексу:

let value = Atomics.load(int32, 0); // считывает значение int32[0]

Atomics.store(typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу:

Atomics.store(int32, 0, 10); // Записывает 10 в int32[0]

Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
Атомарно сравнивает текущее значение элемента массива по заданному индексу с ожидаемым значением. Если они равны, то записывает новое значение и возвращает старое:

Atomics.compareExchange(int32, 0, 10, 20); // если int32[0] равно 10, то записывает 20 и возвращает старое значение

Atomics.exchange(typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу и возвращает старое значение:

Atomics.exchange(int32, 0, 15); // записывает 15 в int32[0] и возвращает старое значение

Atomics.wait(typedArray, index, value, timeout)
Проверяет значение элемента массива и ожидает его изменения до заданного значения или истечения времени тайм-аута. Возвращает «ok», «not-equal» или «timed-out»:

let result = Atomics.wait(int32, 0, 10, 1000); // ожидает изменения int32[0] до 10 или 1000 мс

Atomics.notify(typedArray, index, count)
Уведомляет потоки, ожидающие изменения элемента массива по заданному индексу. Возвращает количество уведомленных потоков:

let notified = Atomics.notify(int32, 0, 1); // уведомляет один поток, ожидающий изменения int32[0]

Atomics можно юзать для синхронных операций. Например, для синхронизации работы потоков:

const sab = new SharedArrayBuffer(1024); const int32 = new Int32Array(sab);  // в главном потоке const worker = new Worker('worker.js'); worker.postMessage(sab);  Atomics.store(int32, 0, 0); Atomics.wait(int32, 0, 0); // ожидание изменения int32[0]  // В рабочем потоке (worker.js) self.onmessage = function(event) {     const int32 = new Int32Array(event.data);     Atomics.store(int32, 0, 1);     Atomics.notify(int32, 0, 1); // уведомление главного потока };

Объединяем Atomics и SharedArrayBuffer

Счетчик запросов в реал тайме

Допустим, есть веб-сервис, который обрабатывает большое количество запросов, и нам нужно следить за кол-вом активных запросов в реальном времени:

// главный поток const buffer = new SharedArrayBuffer(4); // Создаем буфер на 4 байта const counter = new Int32Array(buffer);  const worker = new Worker('worker.js'); worker.postMessage(buffer);  // обрабатываем новый запрос function handleRequest() {     Atomics.add(counter, 0, 1); // увеличиваем счетчик     console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);          // симулируем завершение запроса через 2 секунды     setTimeout(() => {         Atomics.sub(counter, 0, 1); // уменьшаем счетчик         console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);     }, 2000); }  // рабочий поток (worker.js) self.onmessage = function(event) {     const counter = new Int32Array(event.data);     setInterval(() => {         console.log(`[Worker] Активные запросы: ${Atomics.load(counter, 0)}`);     }, 1000); };

Используем SharedArrayBuffer для хранения счетчика активных запросов, а Atomics дает безопасное увеличение и уменьшение счетчика из разных потоков.

Параллельная обработка данных

Рассмотрим задачу параллельной обработки большого массива данных, например, применение функции к каждому элементу массива:

/// главный поток const buffer = new SharedArrayBuffer(1024 * 4); // буфер для 1024 чисел const data = new Int32Array(buffer);  // инициализируем массив случайными числами for (let i = 0; i < data.length; i++) {     data[i] = Math.floor(Math.random() * 100); }  const worker = new Worker('worker.js'); worker.postMessage(buffer);  function processData() {     for (let i = 0; i < data.length; i++) {         data[i] *= 2; // применяем функцию к каждому элементу     }     console.log('Обработка данных завершена'); }  processData();  // рабочий поток (worker.js) self.onmessage = function(event) {     const data = new Int32Array(event.data);     for (let i = 0; i < data.length; i++) {         Atomics.store(data, i, Atomics.load(data, i) * 2); // атомарное умножение на 2     }     console.log('Рабочий поток: обработка данных завершена'); };

Синхронизация состояния между потоками

Предположим, есть некая игрушка, где несколько потоков должны синхронизировать свое состояние:

// главный поток const buffer = new SharedArrayBuffer(4); // буфер на 4 байта для хранения состояния const state = new Int32Array(buffer);  const worker = new Worker('worker.js'); worker.postMessage(buffer);  function updateState(newState) {     Atomics.store(state, 0, newState); // обовляем состояние     Atomics.notify(state, 0); // уведомляем рабочий поток об изменении состояния     console.log(`Состояние обновлено: ${newState}`); }  updateState(1);  // рабочий поток (worker.js) self.onmessage = function(event) {     const state = new Int32Array(event.data);      function waitForStateChange() {         Atomics.wait(state, 0, Atomics.load(state, 0)); // ожидаем изменения состояния         console.log(`Рабочий поток: состояние изменено на ${Atomics.load(state, 0)}`);         waitForStateChange(); // рекурсивно продолжаем ожидание изменений     }      waitForStateChange(); };

Про востребованные языки программирования и практические инструменты мои коллеги из OTUS рассказывают в рамках онлайн-курсов. По этой ссылке вы можете ознакомиться с полным каталогом курсов, а также записаться на открытые уроки.


ссылка на оригинал статьи https://habr.com/ru/articles/821805/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *