Предисловие
Была довольно простая задача: получить набор документов из базы, каждый документ преобразовать и отправить пользователю все преобразованные документы, порядок их менять нельзя, для обработки документа используется асинхронная функция. Если на каком-то документе вылезла ошибка — документы мы не отправляем, только ошибку и заканчиваем обработку документов.
Для решения задачи была выбрана библиотека Q, так как сам поход Promise мне симпатичен. Но возникла одна загвоздка, вроде бы элементарная задача, а выполняется больше секунды, а точнее 1300 мс, вместо ожидаемых 50-80 мс. Дабы разобраться, как все устроено и проникнуться асинхронностью было решено написать специализированный «велосипед» под данную задачу.
Как было устроена работа с Q
В первую очередь, хотелось бы рассказать, как это было реализовано изначально.
1. Процедура, которая проходила последовательно по массиву и возвращала нам promise.
function forEachSerial(array, func) { var tickFunction = function (el, index, callback) { if (func.length == 2) func(el, callback); else if (func.length == 3) func(el, index, callback); } var functions = []; array.forEach(function (el, index) { functions.push(Q.nfcall(tickFunction, el, index)); }); return Q.all(functions); }
2. Процедура, которая обрабатывала документ и возвращала нам promise.
function documentToJSON(el, options) { var obj = el.toObject(options); var deferred = Q.defer(); var columns = ['...','...','...']; forEachSerial(columns,function (el, next) { el.somyAsyncFunction(options, function (err, result) { if (err) next(err); else result.somyAsyncFunction2(options, function (err, result) { if (!err) obj[el] = result; next(err); }); }); }).then(function () { deferred.resolve(obj); }, function (err) { deferred.reject(err); }); return deferred.promise; }
3. И головная процедура, отправляющая результат пользователю
exports.get = function (req, res, next) { dbModel.find(search, {}, { skip: from, limit: limit }).sort({column1: -1}).exec(function (err, result) { if (err) next(new errorsHandlers.internalError()); else { var result = []; forEachSerial(result,function (el, index, next) { documentToJSON(el,options).then(function (obj) { result[index] = obj; next() }, function (err) { next(new errorsHandlers.internalError()) }); }).then(function () { res.send({responce: result}); }, function (err) { next(new errorsHandlers.internalError()) }); } }); };
Кто-то может заметить, что всевозможных «обещаний» очень много даже там, где они не сильно нужны, и что такое большое зацикливание привело к такой скорости, но процедура отдает всего 20 простых документов, преобразования примитивны и выполнять их такое количество времени уже точно никуда не годиться.
Пишем свою библиотеку promise
Как вообще «это» работает
В сети полно описаний что и как. Опишу вкратце. Promise — это своего рода обещание. Какая-то функция нам обещает результат, получить его мы можем с помощью then(success, error), в свою очередь, при успешной обработке мы может назначить новый promise и так же обработать его. В частном случае это выглядит так:
Promise.then(step1).then(step2).then(step3).then(function () { //All OK }, function (err) { //Error in any step });
Результат каждого этапа передается как параметр в следующий и так последовательно. В итоге мы обрабатываем все ошибки в одном блоке и избавляемся от «лапши».
Внутри выглядит примерно так: создаются события, которые вызываются при успешном завершении или при ошибке:
var promise = fs.stat("foo"); promise.addListener("success", function (value) { // ok }) promise.addListener("error", function (error) { // error });
Все это можно почитать здесь.
Теория закончилась, приступим к практике.
Начнем с простого — Deferred
Задачей этого объекта будет создать нужные нам события и выдать Promise
function deferred() { this.events = new EventEmitter(); //Объект события this.promise = new promise(this); // Возвращаемый нам Promise this.thenDeferred = []; //Последующие обработчики, нужны для того что бы передать ошибку дальше по цепочке var self = this; //Вызывается в успешном случае this.resolve = function () { self.events.emit('completed', arguments); } //Вызывается в случае ошибки this.reject = function (error) { self.events.emit('error', error); //Передаем ошибку дальше по цепочке self.thenDeferred.forEach(function (el) { el.reject(error); }); } }
Объект — Promise
Задача его будет отслеживать события "completed" и "error" вызывать нужные функции, которые назначены через "then" и отслеживать что там вернула эта функция: если возвратила нам еще один promise то подключаться к нему для того что бы срабатывали следующие then, если просто данные, то выполнять последующие then, таким образом мы сможем строить цепочки из then.
function promise(def) { this.def = def; this.completed = false; this.events = def.events; var self = this; var thenDeferred; self._successListener = null; self._errorListener = null; //Результатом выполнения then - будет возвращаться новый promise this.then = function (success, error) { if (success) self._successListener = success; if (error) self._errorListener = error; thenDeferred = new deferred(); self.def.thenDeferred.push(thenDeferred); return thenDeferred.promise; } //Обрабатываем успешное выполнение задачи this.events.on('completed', function (result) { // объекты, аргументы, массивы приводим к виду массива для передачи их в дальнейшем как атрибуты в функцию var args = inputOfFunctionToArray(result); //Если вдруг задача была уже выполнена, то дальше не проходим if (self.completed) return; self.completed = true; if (self._successListener) { var result; try { result = self._successListener.apply(self, args); } catch (e) { self.def.reject(e); result; } //Если результатом функции Promise и есть последующие then, подключаемся к нему var promise; if (isPromise(result)) promise = result; else if (result instanceof deferred) promise = result.promise; if (promise && thenDeferred) { promise.then(function () { var args = arguments; process.nextTick(function () { thenDeferred.resolve.apply(self, args); }); }, function (error) { process.nextTick(function () { thenDeferred.reject(error); }); }); } else if (thenDeferred) process.nextTick(function () { //Для скалярных параметров просто запускаем следующие then thenDeferred.resolve.apply(self, [result]); }); } else if (thenDeferred) process.nextTick(function () { thenDeferred.resolve.apply(self, []); }); }); //Обрабатываем ошибки this.events.on('error', function (error) { if (self.completed) return; self.completed = true; if (self._errorListener) process.nextTick(function () { self._errorListener.apply(self, [error]); }); }); }
Итак, базовая модель готова. Осталось сделать обвязку для функций с callback
PromiseFn
Его задача — сделать обертку для функции с callback с возможностью указания this и аргументов запуска
var promisefn = function (bind, fn) { var def = new deferred(); //bind является не обязательным параметром if (typeof bind === 'function' && !fn) { fn = bind; bind = def; } //Назначаем наш callback для данной функции var callback = function (err) { if (err) def.reject(err); else { var args = []; for (var key in arguments) args.push(arguments[key]); args.splice(0, 1); def.resolve.apply(bind, args); } }; var result = function () { var args = []; for (var key in arguments) args.push(arguments[key]); args.push(callback); process.nextTick(function () { fn.apply(bind, args); }); return def.promise; } return result; }
И напоследок ALL — последовательное выполнение функций с callback
Тут все просто: нам передают массив функций, мы их обвязываем через promisefn и, когда они все выполнятся — вызываем resolve
var all = function (functions) { var def = new deferred(); process.nextTick(function () { var index = -1; var result = []; var next = function (err, arguments) { if (err) { def.reject(err); return; } if (arguments) result.push(inputOfFunctionToArray(arguments)); index++; if (index >= functions.length) { def.resolve(result); } else process.nextTick(function () { promisefn(functions[index])().then(function () { var args = arguments; process.nextTick(function () { next(err, args); }); }, function (err) { process.nextTick(function () { next(err); }); }); }); } process.nextTick(next); }); return def.promise; }
В заключение
После тестирования старый подход (через библиотеку Q) был переписан заменено пару объявлений и запущен в тех же условиях. Результат положительный — 50-100 мс (вместо прежних 1300 мс).
Все исходники доступны на Githab, там же можно найти и примеры. Изобретение «велосипедов» полезно хотя бы тем, что это улучшает понимание.
Спасибо за внимание!
ссылка на оригинал статьи http://habrahabr.ru/post/168929/
Добавить комментарий