Велосипед: Promise в Node.js

от автора

Добрый день, Хабрахабр.

Предисловие

Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.

Как было устроена работа с Q

В первую очередь, хотелось бы рассказать, как это было реализовано изначально.

1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.

forEachSerial

function forEachSerial(array, func) {     var tickFunction = function (el, index, callback) {         if (func.length == 2)             func(el, callback);         else if (func.length == 3)             func(el, index, callback);     }     var functions = [];     array.forEach(function (el, index) {         functions.push(Q.nfcall(tickFunction, el, index));             });     return Q.all(functions); } 

2. Процедура, которая обрабатывала документ и возвращала нам promise.

documentToJSON

function documentToJSON(el, options) {     var obj = el.toObject(options);     var deferred = Q.defer();     var columns = ['...','...','...'];     forEachSerial(columns,function (el, next) {         el.somyAsyncFunction(options, function (err, result) {             if (err)                 next(err);             else result.somyAsyncFunction2(options, function (err, result) {                 if (!err)                     obj[el] = result;                 next(err);             });         });     }).then(function () {             deferred.resolve(obj);         }, function (err) {             deferred.reject(err);         });     return deferred.promise; } 

3. И головная процедура, отправляющая результат пользователю

sendResponse

exports.get = function (req, res, next) {     dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) {         if (err)             next(new errorsHandlers.internalError());         else {             var result = [];             forEachSerial(result,function (el, index, next) {                 documentToJSON(el,options).then(function (obj) {                     result[index] = obj;                     next()                 }, function (err) {                     next(new errorsHandlers.internalError())                 });             }).then(function () {                     res.send({responce: result});                 }, function (err) {                     next(new errorsHandlers.internalError())                 });         }     }); }; 

Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.

Пишем свою библиотеку promise

Как вообще «это» работает

В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:

Promise.then(step1).then(step2).then(step3).then(function () {     //All OK }, function (err) {     //Error in any step }); 

Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:

var promise = fs.stat("foo"); promise.addListener("success", function (value) {     // ok }) promise.addListener("error", function (error) {     // error }); 

Все это можно почитать здесь.
Теория закончилась, приступим к практике.

Начнем с простого — Deferred

Задачей этого объекта будет создать нужные нам события и выдать Promise

function deferred() {     this.events = new EventEmitter(); //Объект события     this.promise = new promise(this); // Возвращаемый нам Promise     this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке     var self = this;      //Вызывается в успешном случае     this.resolve = function () {         self.events.emit('completed', arguments);     }     //Вызывается в случае ошибки     this.reject = function (error) {         self.events.emit('error', error);         //Передаем ошибку дальше по цепочке         self.thenDeferred.forEach(function (el) {             el.reject(error);         });     } } 

Объект — Promise

Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.

function promise(def) {     this.def = def;     this.completed = false;     this.events = def.events;     var self = this;     var thenDeferred;     self._successListener = null;     self._errorListener = null;      //Результатом выполнения then - будет возвращаться новый promise     this.then = function (success, error) {         if (success)             self._successListener = success;         if (error)             self._errorListener = error;         thenDeferred = new deferred();         self.def.thenDeferred.push(thenDeferred);         return thenDeferred.promise;     }      //Обрабатываем успешное выполнение задачи     this.events.on('completed', function (result) {         // объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию         var args = inputOfFunctionToArray(result);         //Если вдруг задача была уже выполнена, то дальше не проходим         if (self.completed) return;         self.completed = true;          if (self._successListener) {             var result;             try {                 result = self._successListener.apply(self, args);             } catch (e) {                 self.def.reject(e);                 result;             }             //Если результатом функции Promise и есть последующие then, подключаемся к нему             var promise;             if (isPromise(result))                 promise = result;             else if (result instanceof deferred)                 promise = result.promise;             if (promise && thenDeferred) {                 promise.then(function () {                     var args = arguments;                     process.nextTick(function () {                         thenDeferred.resolve.apply(self, args);                     });                 }, function (error) {                     process.nextTick(function () {                         thenDeferred.reject(error);                     });                 });             } else if (thenDeferred)                            process.nextTick(function () {                                    //Для скалярных параметров просто запускаем следующие then                                   thenDeferred.resolve.apply(self, [result]);                           });         } else if (thenDeferred)             process.nextTick(function () {                 thenDeferred.resolve.apply(self, []);             });     });      //Обрабатываем ошибки     this.events.on('error', function (error) {         if (self.completed) return;         self.completed = true;         if (self._errorListener)             process.nextTick(function () {                 self._errorListener.apply(self, [error]);             });     }); } 

Итак, базовая модель готова. Осталось сделать обвязку для функций с callback

PromiseFn

Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска

var promisefn = function (bind, fn) {     var def = new deferred();     //bind является не обязательным параметром     if (typeof bind === 'function' && !fn) {         fn = bind;         bind = def;     }     //Назначаем наш callback для данной функции     var callback = function (err) {         if (err)             def.reject(err);         else {             var args = [];             for (var key in arguments)                 args.push(arguments[key]);             args.splice(0, 1);             def.resolve.apply(bind, args);         }     };     var result = function () {          var args = [];         for (var key in arguments)             args.push(arguments[key]);         args.push(callback);         process.nextTick(function () {             fn.apply(bind, args);         });         return def.promise;     }     return result; } 
И напоследок ALL — последовательное выполнение функций с callback

Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve

var all = function (functions) {     var def = new deferred();     process.nextTick(function () {         var index = -1;         var result = [];         var next = function (err, arguments) {             if (err) {                 def.reject(err);                 return;             }             if (arguments) result.push(inputOfFunctionToArray(arguments));             index++;             if (index >= functions.length) {                 def.resolve(result);             } else process.nextTick(function () {                 promisefn(functions[index])().then(function () {                     var args = arguments;                     process.nextTick(function () {                         next(err, args);                     });                 }, function (err) {                     process.nextTick(function () {                         next(err);                     });                 });             });         }         process.nextTick(next);      });     return def.promise; } 

В заключение

После тестирования старый подход (через библиотеку Q) был переписан заменено пару объявлений и запущен в тех же условиях. Результат положительный — 50-100 мс (вместо прежних 1300 мс).
Все исходники доступны на Githab, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!

ссылка на оригинал статьи http://habrahabr.ru/post/168929/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *