Асинхронность node.js часто приводят как одно из достоинств платформы. Действительно, организация программы в виде небольших задач и последовательное их выполнение в рамках одного потока дает некоторое преимущество в распределении ресурсов и большей отзывчивостью приложения. Но это также приводит к необходимости организовывать код в нетривиальной манере. Какой именно я покажу на примере библиотеки для чтения строк из файла большого объема (500Mb и более).
Поскольку статья получается довольно большой, публикую в двух частях. В первой будет рассматриваться асинхронность, во второй — файловые операции.
Часть 1: От последовательного кода к асинхронному
Задача чтения строки из текстового файла в кодировке UTF-8 не очень простая. Основная сложность — в правильной обработке очередного блока бинарных данных, граница которого может попасть в середину UTF-8 символа или разделителя строк.
Давайте пока предположим, что эта задача успешно решена. Тогда последовательный алгоритм будет простым:
function processLineSync(line) { // ... } var line, reader = createTextReader(file); while (line = reader.readLineSync()) processLineSync(line); reader.close();
Для тестирования сделаем простую реализацию createTextReader():
function createTextReader() { var counter = 0; return { readLineSync : () => { counter += 1; return counter < 10000000 ? counter.toString() : null; }, readLine : done => { counter += 1; done(null, counter < 10000000 ? counter.toString() : null); }, close : () => { } }; }
Нужно заметить, что цикл, код чтения очередной строки и её обработка — последовательные. Соответственно, их выполнение блокирует процесс node.js.
Для того, что бы написать правильный асинхронный код надо понять, чем асинхронный отличается от последовательного. Для этого предположим что у метода processLineSync() есть асинхронный аналог — processLine(line, err => { }). Если мы попробуем выполнить асинхронную операцию в цикле, то получим то или иное переполнение. Давайте посмотрим на примере.
function processLine(line, done) { setImmediate(() => { /* ... */ done(); }); } var line, reader = createTextReader(file); while (line = reader.readLineSync()) processLine(line, () => { }); reader.close();
Через пару минут этот код падает с ошибкой «out of memory». При этом не выведя ни одного «ok».
А происходит вот что: while (line = reader.readLineSync())…; вычитывает данные синхронно и создает (а не вызывает) обработчики строк. Естественно, создание обработчика приводит к выделению памяти для него и через конечное число итераций память заканчивается. Обработчики создаются так как асинхронная операция в node.js является некоторым объектом, содержащим функцию, который будет вызван после завершения текущего блока. А текущий блок все никак не завершается, что и приводит к переполнению.
Если цикл не годится, что же тогда делать? Получается, что тело цикла само по себе должно быть функцией. И вызывать саму себя для продолжения итераций. Этот код уже намного сложнее простого цикла и в самом простом случае представляет собой следующую конструкцию:
function asyncWhile(criteria, iteration, done) { if (criteria()) { iteration(() => asyncWhile(criteria, iteration, done)); } else { done(); } }
Ну и пример её использования:
var line, reader = createTextReader(file); asyncWhile( () => { line = reader.readLineSync(); return line != null; }, done => processLine(line, () => done()), () => reader.close());
Работает уже лучше, но пока выглядит немного неуклюже, так как используются глобальные переменные. Так же отсутствует обработка ошибок и будет переполнение стека (но об этом чуть позже). Также было бы неплохо читать данные из файла асинхронно.
Что бы уменьшить зависимость от глобальной переменной в теле цикла перебросим её в виде параметра. А контекст цикла поместим в контекст функций. Получим следующее:
function asyncWhile(context, criteria, iteration, done) { var value = criteria.call(context); if (value) { iteration.call(context, value, () => asyncWhile(context, criteria, iteration, done)); } else { done.call(context); } } asyncWhile(createTextReader(file), () => this.readLineSync(), (line, done) => processLine(line, () => done()), () => this.close());
Можно заметить, что параметры третьего аргумента совпадают с параметрами processLine. Аналогично для второго параметра processLine. Они оставлены что бы продемонстрировать подход, но, конечно, их можно сократить до передачи функции непосредственно:
var reader = createTextReader(file); asyncWhile(reader, reader.readLineSync, processLine, reader.close);
Теперь надо добавить обработку ошибок. Для этого предположим, что ошибка возвращается первым параметром callback функции processLine. Такой метод возвращения ошибки является довольно стандартным для node.js.
Добавляем обработку ошибок и получаем:
function asyncWhile(context, criteria, iteration, done) { var value = criteria.call(context); if (value) { iteration(value, err => { if (err) { done.call(context, err); } else { asyncWhile(context, criteria, iteration, done); } }); } else { done.call(context); } }
Использование почти не изменилось:
asyncWhile(createTextReader(file), () => this.readLineSync(), (line, done) => processLine(line, (err) => { if (err) console.log(err); done(); }), () => this.close());
Теперь давайте подсчитаем количество строк в файле:
asyncWhile({ reader : createTextReader(file), counter : 0 }, () => this.reader.readLineSync(), (line, done) => { this.counter += 1; done(); }, () => { console.log(this.counter); this.reader.close(); });
И почти сразу получаем «Maximum call stack size exceeded». Анализ стека показывает многократно повторяющийся шаблон asyncWhile() -> Object. -> asyncWhile() -> … Это довольно стандартная проблема, которая обходится обновлением стека через вызов функции из основного цикла node.js в момент рекурсии:
function asyncWhile(context, criteria, iteration, done) { var value = criteria.call(context); if (value) { iteration.call(context, value, err => { if (err) { done.call(context, err); } else { setImmediate(() => asyncWhile(context, criteria, iteration, done)); } }); } else { done.call(context, null); } }
setImmediate() помещает обработчик в очередь и продолжает выполнение. Этот обработчик будет вызван по завершению текущего блока.
Предпоследним штрихом заменим блокирующее чтение из файла на асинхронное (не забыв про обработку ошибок):
function asyncWhile(context, criteria, iteration, done) { criteria.call(context, (err, value) => { if (err) { done.call(context, err); } else { if (value) { iteration.call(context, value, err => { if (err) { done.call(context, err); } else { setImmediate(() => asyncWhile(context, criteria, iteration, done)); } }); } else { done.call(context, null); } } }); }
И последним сделаем createTextReader тоже асинхронным:
function createTextReader(file, done) { var counter = 0; done(null, { readLine : (done) => { counter += 1; done(null, counter < 10000000 ? counter.toString() : null); }, close : () => { } }); }
Используем:
createTextReader(file, (err, reader) => { asyncWhile(reader, done => this.readLine((err, line) => done(err, line)), (line, done) => processLine(line, (err) => { if (err) console.log(err); done(); }), () => this.close()); });
Как видно, асинхронный код намного сложнее синхронного. И намного медленнее — примерно в 40 раз. С другой стороны он не блокирует поток, что позволяет запускать несколько таких обработчиков параллельно (в смысле node.js). Ну и чем более сложный обработчик для каждой записи, тем больше время обработки превышает расходы на поддержку асинхронности.
Об авторе: Александр Неткачев — старший разработчик на С# и F#. Поддерживает сайт alexatnet.com, проводит вебинары (Code&Coffe), помогает с кодом начинающим разработчикам (CodeReview4U).
ссылка на оригинал статьи http://habrahabr.ru/post/257365/
Добавить комментарий