Параллельный цикл на worker. Многопоточность JS

от автора

Мне очень нравится Java script своей легкостью, доступностью и функциональностью. Он перекрывает 90% всех моих потребностей в программировании. Спектр решаемых с помощью него задач огромен, и в том числе, иногда возникают задачи в которых необходимо изменить каждый элемент массива независимо от остальных элементов. Одно из типовых решений этой задачи через цикл или метод map.

Пример программы с простым циклом

//Функция, изменяющая элемент массива function f(a) {   let a0 = 1;   for (let i = 0; i < 100_000; i++)     a0 = 0.5 * (a0 + a / a0)    return a0; }//f(a)           //Создание и инициализация массива  let arr = new Float64Array(10_000); for (let i in arr) {   arr[i] = i; }   let time = new Date();  //Изменение элементов массива  arr = arr.map(f);  time = new Date() - time; console.log(`Время вычисления: ${time} мс`); console.log(arr); 

Функция, изменяющая элемент массива принимает в качестве аргумента число, совершает с ним некоторые действия, и возвращает измененное значение. Зачастую эта функция может быть довольно «тяжелой» для примера использую алгоритм Герона вычисления квадратного корня. Как способ вычисления коней такой способ, да еще и со с 100000 итерациями нерационален, но как пример полезной нагрузки подойдет отлично.

Создание и инициализация массива. В JS есть довольно много вариантов создания массива. Для дальнейших действий и подобных вычислений лучше всего подходит типизированный массив. Массив заполняется порядковыми номерами, для имитации полезных данных.

Изменение элементов массива делаю методом map. В этом месте может также находится цикл for…in. Для оценки производительности замеряю время в миллисекундах.

Приведенный выше код работает в «однопоточном режиме» и не может загрузить 8-и ядерный процессор моего компьютера более чем на 25%. Запустить более одно потока в JS можно или используя методы работы с GPU, или применив Worker. Сегодня речь пойдет о Worker.

Worker. введение

В интернете можно найти очень много примеров для работы с Worker, но почти все они не будут работать в контексте локальных html файлов. В большинстве примеров код Worker-а пишется в отдельном файле и подключается к основной страницы через URL. Но при работе с локальными файлами это невозможно, да и не всегда удобно создавать дополнительный файл. Код показанный ниже можно сохранить в файл с расширением .html и открыть в браузере.

<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><script> //Функция, содержащая код Worker function worker_function() {     self.onmessage = (event) => self.postMessage("Привет из worker_function"); }//worker_function  //Создание Worker worker = new Worker(   URL.createObjectURL(     new Blob(       ["(" + worker_function.toString() + ")()"],       { type: 'text/javascript' }     )   ) );  //Передача сообщения к Worker worker.postMessage("");  //Получение сообщение от Worker worker.onmessage = (event) => {       console.log(event.data);      //Уничтожение worker     worker.terminate(); }  </script></head><body></body></html>

Я намеренно не форматирую и не акцентирую внимание на html, так как цель этого кода показать как работает worker.

Функция, содержащая код Worker. Все, что содержится в этой функции, следует воспринимать как отдельный файл JS и будет исполняться независимо от остального кода в своем лексическом окружении.

Создание Worker. Конструктор Worker принимает в качестве аргумента URL с кодом Worker. Для создания такого URL я использую Blob, содержащий текстовую строку из функции worker_function.

Передача сообщения к Worker, Получение сообщение от Worker и Уничтожение worker. Алгоритм работы с Worker описан неоднократно, но я приведу его еще раз для конкретно моего случая:

  1. В коде основной программы создается Worker с помощью конструктора (строка 8). в этот момент компьютер создает «песочницу» в которой исполняется код из указанного URL. Можно представить, что браузер открывает еще одну вкладку, но только у этой вкладки нет окна.

  2. В коде основной программы вызывается метод postMessage (строка 18) аргументом которого являются данные для Worker. Здесь данными является строка, но возможны и более сложные структуры. Этот метод генерирует в «песочнице» событие message.

  3. Данные переходят обработчику события message (строка 4). Обработчик вызывает метод postMessage, который в основном коде вызывает событие worker.onmessage. postMessage и onmessage основной страницы и Worker работают абсолютно одинокого.

  4. Запускается обработчик worker.onmessage на основной странице (строка 21) . В аргументе event содержится довольно много информации, но для примера можно получить полезные данные из свойства data. Эту информацию я вывожу в консоль.

  5. Когда Worker сделал свою работу и не планируется дальнейшие его использование он должен быть уничтожен (строка 25) . В противном случае он останется в оперативной памяти и будет занимать место и ресурсы.

На первый взгляд алгоритм может показаться сложным и запутанным, но в конечном итоге это можно представить таким образом: Я нанимаю работника и даю ему инструкции (строка 8). Звоню работнику с заданием (строка 18), а он берет трубку (строка 4) и работает. Когда работа сделана он звонит мне и сообщает результаты работы (строка 21). После получения результатов работы я увольняю работника (строка 25).

Создание второго Worker

Один работник это конечно хорошо, но для ускорения работы надо нанимать работников больше. Чуть позже сделаем код который автоматически создает столько Worker-ов, сколько нам нужно, а пока сделаем это вручную:

Скрытый текст
//Функция, содержащая код Worker function worker_function() {   self.onmessage = (event) => {     setTimeout(       () => self.postMessage(event.data + "\nПривет из worker_function"),       Math.random() * 1000     )//setTimeout   }//self.onmessage }//worker_function  //Создание Worker worker1 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' }))); worker2 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' })));  //Передача сообщения к Worker worker1.postMessage("Сообщение для worker1"); worker2.postMessage("Сообщение для worker2");  //Получение сообщения от Worker async function getdata(worker) {   return new Promise((resolve, reject) => {     worker.onmessage = (event) => { console.log(event.data); resolve() };   }   )//Promise }//getdata   (async () => {   await Promise.all([getdata(worker1), getdata(worker2)])   worker1.terminate();   worker2.terminate(); })()//async 

В коде произошли некоторые изменения и давайте разберем их по порядку.

Worker теперь не мгновенно выдает ответ, а через случайное время (строки 2-8). Так как каждый worker работает независимо друг от друга невозможно предугадать кто раньше закончит работу, для усиления этого эффекта я добавил функцию setTimeout.

Теперь я создаю два worker и обоим даю разные данные. Здесь нет каких либо отличий от предыдущего примера.

Получение сообщения от Worker теперь немного сложнее. Обрабатывать данные полученные от Worker в функции-обработчике это хорошо, но еще нужно остановить выполнение основного кода до завершения работы всех Worker-ов. С этим мне помогает Promise и его метод all. Мне эта часть кода не нравиться, но пока не придумал ничего лучше будет написано так. Основой код программы теперь работает асинхронно и его пришлось обернуть в анонимную асинхронную функцию.

Подведем промежуточный итог. Нам удалось одновременно запустить два Worker, а после того как оба из них завершили работу проложить выполнение программы. Это большая победа и прямой путь к написанию собственной функции, работающий как метод map, но использующий многопоточность.

Нужно больше Worker

Теперь мы обладаем почти достаточными навыками чтобы сделать циклы многопоточными. Код решения представлен ниже:

Скрытый текст
async function map_worker(     arr,//Массив данных     f,//Функция обработки данных     thread = 8,//Кол-во потоков ) {     //Функция содержащая код Worker     function worker_function() {         self.onmessage = function (event) {             let buf = new Float64Array(event.data.buf);              for (let i in buf) {                 buf[i] = f(buf[i]);             }             buf = buf.buffer;             self.postMessage({ buf, start: event.data.start }, [buf]);         };      }//worker_function      //Получение сообщения от Worker     async function getdata(worker) {         return new Promise(             function (resolve, reject) {                 worker.onmessage = function (event) {                     arr.set(new Float64Array(event.data.buf), event.data.start);                     resolve();                 }             }         )     }//getdata      let worker = [];     let arr_resolve = [];     let buf_len = Math.floor(arr.length / thread);      for (let i = 0; i < thread; i++) {         //Создание Worker         worker[i] = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")(); let f = " + f.toString()], { type: 'text/javascript' })));          //Разбиение arr         let start = i * buf_len;         let buf = arr.slice(start, i == thread - 1 ? arr.length : start + buf_len).buffer;          //Передача сообщения к Worker         worker[i].postMessage({ start, buf }, [buf],);          //Создание функции получения сообщения от worker         arr_resolve[i] = getdata(worker[i]);     }      //Ожидание завершения работы всех worker     await Promise.all(arr_resolve);      //Удаление worker     for (let i = 0; i < thread; i++)         worker[i].terminate();      return arr; }//map_worker   (async () => {      //Функция, изменяющая элемент массива     function f(a) {         let a0 = 1;         for (let i = 0; i < 100_000; i++)             a0 = 0.5 * (a0 + a / a0)         return a0;     }//f(a)       //Создание и инициализация массива      let arr = new Float64Array(10_000);     for (let i in arr) {         arr[i] = i;     }      let time = new Date();      //Изменение элементов массива      arr = await map_worker(arr, f);      time = new Date() - time;     console.log(`Время вычисления: ${time} мс`);     console.log(arr);  })()//asynca

Функция map_worker работает подобно встроенному методу map, но использует при этом ресурсы компьютера полностью (загрузка процессора 100%). Большая часть кода уже разобрана, однако и здесь есть небольшие хитрости.

  • При создании worker в строку с URL передается функция для обработки данных.

  • В методе postMessage появилось два аргумента. Первый это объект, содержащий фрагмент массива и его положение в исходном массиве. А второй — массив элементом которого является фрагмент массива. Выглядит странно, но при таком синтаксисе передача данных осуществляется мгновенно. И именно поэтому я использую типизированные массивы. Подробнее об этом можно почитать в документации.

  • Предыдущий пункт обязывает передавать в worker ArrayBuffer, поэтому в коде добавляется преобразование типизированного массива в голый поток байтов.

Заключение

В заключении протестируем насколько эффективен данный метод. В таблице каждая ячейка содержит время выполнения кода в мс с использованием того или иного метода. Во второй строке указано во сколько раз быстрее происходит вычисление по сравнению с Array.map.

Размер массива

Array.map

for…in

worker_1

worker_4

worker_8

worker_10

1000

573
1

564
1.02

588
0.97

176
3.26

167
3.43

137
4.18

10000

5729
1

5113
1.12

5169
1.11

1562
3.67

905
6.33

935
6.13

100000

57942
1

50268
1.15

52564
1.1

14186
4.1

7556
7.68

7562
7.66

1000000

572235
1

561455
1.02

507012
1.2

160719
3.5

75393
7.59

85513
6.69

Из таблицы видно, максимальное ускорение получается при использовании 8-ми worker, что соответствует количеству ядер моего процессора, при этом ускорение составляет порядка 7.6 раз. Использование другого числа потоков дает более плохие результаты. Также цикл for…in работает чуть быстрее Array.map.


ссылка на оригинал статьи https://habr.com/ru/articles/900488/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *