Асинхронное чтение строк из большого текстового файла на Node.js — Часть 1

от автора

Асинхронность node.js часто приводят как одно из достоинств платформы. Действительно, организация программы в виде небольших задач и последовательное их выполнение в рамках одного потока дает некоторое преимущество в распределении ресурсов и большей отзывчивостью приложения. Но это также приводит к необходимости организовывать код в нетривиальной манере. Какой именно я покажу на примере библиотеки для чтения строк из файла большого объема (500Mb и более).

Поскольку статья получается довольно большой, публикую в двух частях. В первой будет рассматриваться асинхронность, во второй — файловые операции.

Часть 1: От последовательного кода к асинхронному

Задача чтения строки из текстового файла в кодировке UTF-8 не очень простая. Основная сложность — в правильной обработке очередного блока бинарных данных, граница которого может попасть в середину UTF-8 символа или разделителя строк.

Давайте пока предположим, что эта задача успешно решена. Тогда последовательный алгоритм будет простым:

function processLineSync(line) {   // ... }  var line, reader = createTextReader(file); while (line = reader.readLineSync())   processLineSync(line); reader.close(); 

Для тестирования сделаем простую реализацию createTextReader():

function createTextReader() {   var counter = 0;   return {     readLineSync : () => {       counter += 1;       return counter < 10000000 ? counter.toString() : null;     },     readLine : done => {       counter += 1;       done(null, counter < 10000000 ? counter.toString() : null);     },     close : () => { }   }; } 

Нужно заметить, что цикл, код чтения очередной строки и её обработка — последовательные. Соответственно, их выполнение блокирует процесс node.js.

Для того, что бы написать правильный асинхронный код надо понять, чем асинхронный отличается от последовательного. Для этого предположим что у метода processLineSync() есть асинхронный аналог — processLine(line, err => { }). Если мы попробуем выполнить асинхронную операцию в цикле, то получим то или иное переполнение. Давайте посмотрим на примере.

function processLine(line, done) {   setImmediate(() => { /* ... */ done(); }); }  var line, reader = createTextReader(file); while (line = reader.readLineSync())   processLine(line, () => { }); reader.close(); 

Через пару минут этот код падает с ошибкой «out of memory». При этом не выведя ни одного «ok».

А происходит вот что: while (line = reader.readLineSync())…; вычитывает данные синхронно и создает (а не вызывает) обработчики строк. Естественно, создание обработчика приводит к выделению памяти для него и через конечное число итераций память заканчивается. Обработчики создаются так как асинхронная операция в node.js является некоторым объектом, содержащим функцию, который будет вызван после завершения текущего блока. А текущий блок все никак не завершается, что и приводит к переполнению.

Если цикл не годится, что же тогда делать? Получается, что тело цикла само по себе должно быть функцией. И вызывать саму себя для продолжения итераций. Этот код уже намного сложнее простого цикла и в самом простом случае представляет собой следующую конструкцию:

function asyncWhile(criteria, iteration, done) {   if (criteria()) {     iteration(() => asyncWhile(criteria, iteration, done));   } else {     done();   } } 

Ну и пример её использования:

var line, reader = createTextReader(file); asyncWhile(   () => {     line = reader.readLineSync();     return line != null;   },   done => processLine(line, () => done()),   () => reader.close()); 

Работает уже лучше, но пока выглядит немного неуклюже, так как используются глобальные переменные. Так же отсутствует обработка ошибок и будет переполнение стека (но об этом чуть позже). Также было бы неплохо читать данные из файла асинхронно.

Что бы уменьшить зависимость от глобальной переменной в теле цикла перебросим её в виде параметра. А контекст цикла поместим в контекст функций. Получим следующее:

function asyncWhile(context, criteria, iteration, done) {   var value = criteria.call(context);   if (value) {     iteration.call(context, value, () => asyncWhile(context, criteria, iteration, done));   } else {     done.call(context);   } }  asyncWhile(createTextReader(file),   () => this.readLineSync(),   (line, done) => processLine(line, () => done()),   () => this.close()); 

Можно заметить, что параметры третьего аргумента совпадают с параметрами processLine. Аналогично для второго параметра processLine. Они оставлены что бы продемонстрировать подход, но, конечно, их можно сократить до передачи функции непосредственно:

var reader = createTextReader(file); asyncWhile(reader, reader.readLineSync, processLine, reader.close); 

Теперь надо добавить обработку ошибок. Для этого предположим, что ошибка возвращается первым параметром callback функции processLine. Такой метод возвращения ошибки является довольно стандартным для node.js.

Добавляем обработку ошибок и получаем:

function asyncWhile(context, criteria, iteration, done) {   var value = criteria.call(context);   if (value) {     iteration(value, err => {       if (err) {         done.call(context, err);       } else {         asyncWhile(context, criteria, iteration, done);       }     });   } else {     done.call(context);   } } 

Использование почти не изменилось:

asyncWhile(createTextReader(file),   () => this.readLineSync(),   (line, done) => processLine(line, (err) => {     if (err) console.log(err);     done();   }),   () => this.close()); 

Теперь давайте подсчитаем количество строк в файле:

asyncWhile({ reader : createTextReader(file), counter : 0 },   () => this.reader.readLineSync(),   (line, done) => {     this.counter += 1;     done();   },   () => {     console.log(this.counter);     this.reader.close();   }); 

И почти сразу получаем «Maximum call stack size exceeded». Анализ стека показывает многократно повторяющийся шаблон asyncWhile() -> Object. -> asyncWhile() -> … Это довольно стандартная проблема, которая обходится обновлением стека через вызов функции из основного цикла node.js в момент рекурсии:

function asyncWhile(context, criteria, iteration, done) {   var value = criteria.call(context);   if (value) {     iteration.call(context, value, err => {       if (err) {         done.call(context, err);       } else {         setImmediate(() => asyncWhile(context, criteria, iteration, done));       }     });   } else {     done.call(context, null);   } } 

setImmediate() помещает обработчик в очередь и продолжает выполнение. Этот обработчик будет вызван по завершению текущего блока.

Предпоследним штрихом заменим блокирующее чтение из файла на асинхронное (не забыв про обработку ошибок):

function asyncWhile(context, criteria, iteration, done) {   criteria.call(context, (err, value) => {     if (err) {       done.call(context, err);     } else {       if (value) {         iteration.call(context, value, err => {           if (err) {             done.call(context, err);           } else {             setImmediate(() => asyncWhile(context, criteria, iteration, done));           }         });       } else {         done.call(context, null);       }     }   }); } 

И последним сделаем createTextReader тоже асинхронным:

  function createTextReader(file, done) {     var counter = 0;     done(null, {       readLine : (done) => {         counter += 1;         done(null, counter < 10000000 ? counter.toString() : null);       },       close : () => { }     });   } 

Используем:

createTextReader(file, (err, reader) => {   asyncWhile(reader,     done => this.readLine((err, line) => done(err, line)),     (line, done) => processLine(line, (err) => {       if (err) console.log(err);       done();     }),     () => this.close()); }); 

Как видно, асинхронный код намного сложнее синхронного. И намного медленнее — примерно в 40 раз. С другой стороны он не блокирует поток, что позволяет запускать несколько таких обработчиков параллельно (в смысле node.js). Ну и чем более сложный обработчик для каждой записи, тем больше время обработки превышает расходы на поддержку асинхронности.


Об авторе: Александр Неткачев — старший разработчик на С# и F#. Поддерживает сайт alexatnet.com, проводит вебинары (Code&Coffe), помогает с кодом начинающим разработчикам (CodeReview4U).

ссылка на оригинал статьи http://habrahabr.ru/post/257365/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *