В этой статье мы попробуем решить реальную проблему при помощи Node.js Stream и чуточку Reactive Programming. В последнем не уверен – RP, в какой-то мере, "жупел"(как перевести buzzword?) о котором все говорят, но никто не "делает".
Статья рассматривает практический пример и ориентирована на знакомого с платформой читателя, по-этому намеренно не объясняет базовые понятия – если что-то непонятно по Stream API, то стоит обратится в документацию платформы или в какой-нибудь ее пересказ(например, этот).
Начнем с описания проблемы: нам нужно построить “паука” который заберет все данные с “чужого” REST API, как-то их обработает и запишет в “нашу” базу данных. Для удобства воспроизведения и моделирования мы опустим некоторые детали о конкретном API и базе данных(в реальности это было API одного известного стартапа связанного с гостиницами и Postgres база данных).
Представим что у нас есть две функции(код функций как и весь код из статьи можно найти тут):
getAPI(n, count) // функция псевдо-чтения из API. Возвращает нам promise который разрешится списком длинной count элементов начиная с n-го insertDB(entries) // Функция псевдо-записи в базу данных. Возвращает promise который будет разрешен когда запись в базу выполнена //Рассмотрим, пару примеров вызова этих функций: getAPI(0, 2).then(console.log) // [{ id: 0}, {id: 1}] getAPI(LAST_ITEM_ID, 1000).then(console.log) // [{id: LAST_ITEM_ID}] – отсюда вытекает важная особенность мы не можем просто узнать сколько сущностей содержит API. // Максимальное значения для count равно 1000: если мы запросим 1001, то нам все равно вернется максимум 1000 сущностей insertDB([{id: 0}]).then(console.log) // { count: 1 }
Мы намеренно проигнорируем обработку ошибок возможных при работе с API и базой, для простоты. Если возникнет интерес, то рассмотрим их в отдельной статье.
Ну и для того чтобы было не скучно скажем что наш заказчик извращенец и он поставил следующую задачу: мы не хотим видеть у себя в база все сущности id которых содержит число 3. А сущности id которых содержат число 9 хотим дополнить текущим значением timestamp: {id: 9} -> {id: 9, timestamp: 1490571732068}
. Чуть притянуто за уши, но похоже на задачи обработки и фильтрации, которые приходится решать в подобных “пауках”.
Ну что же – начнем. Давайте попробуем решить данную задачу “в лоб”. Скорее всего мы закончим с кодом чем-то похожим на этот:
function grab(offset = 0, rows = 1000) { offset = offset return getAPI(offset, 1000).then((items) => { if(_.isEmpty(items)) { return } else { return insertDB(items).then(() => grab(offset + rows)) } }) } console.time('transition') grab().then(() => { console.timeEnd('transition') })
Что не так с данным кодом?
- Бегло прочитав код сложно понять что он делает. Это можно поправить добавив комменариев, но все же хотелось бы на уровне кода дать "читателю" понять что мы откуда-то читаем и куда-то пишем.
- Он слишком специфичный – представьте что мы добавим код обработи значений. Куда мы его добавим?
- Он рекурсивный – а значит при достаточно большом количестве сущностей в API мы получимм ошибку. Лечится переписыванием на do…while, но вряд-ли это сделает код более читаемым
- Он непроизводительный. Представим что наш источник данных данных работает намного быстрее чем потребитель – в этой ситуации нам бы хотелось аггрегировать данные в некий буфер и, по возможности, записывать их за один раз
Как вы уже догадались, данную задачу легко решить при помощи Streams. Для начала разобъем эту задачу на две подзадачи: чтение и запись.
Начнем с чтения, давайте попробуем выполнить наш ReadableStream:
const {Writable, Readable} = require('stream') const {getAPI, insertDB} = require('./io-simulators') const ROWS = 1000 class APIReadable extends Readable { constructor(options) { super({objectMode: true}) this.offset = 0 } _read(size) { getAPI(this.offset, ROWS).then(data => { if(_.isEmpty(data)) { this.push(null) } else { this.push(data) } }) this.offset = this.offset + ROWS } }
Выглядит чуть более громоздким. Стоит обратить внимание на objectMode: true
– мы хотим оперировать объектами, а значит стоит передать этот флаг конструктору.
Окей, теперь перейдем к записи. Имплементируем наш Writable stream. Что-то вроде этого:
class DBWritable extends Writable { constructor(options) { super({highWaterMark: 5, objectMode: true}); } _write(chunk, encoding, callback) { insertDB(chunk).asCallback(callback) } _writev(chunks, callback) { const entries = _.map(chunks, 'chunk') insertDB(_.flatten(entries)).asCallback(callback) // я использую Bluebird-promises, и вам рекомендую } }
На что стоит обратить внимание:
- objectMode — как и с Readable stream мы хотим оперировать объектами, а не бинарными данными
- highWaterMark – размер нашего буфера. Тут стоит быть аккуратным, мы задаем размер буфера в объектах и это никак не связано с реальной размерностью(битами–байтами). Например: в нашем случае мы оперируем списками.
- _writev – опиcываейт как обрабовать несколько "кусков" данных из буфера за раз
Ну и теперь используем наш код вот так:
const dbWritable = new DBWritable() const apiReadable= new APIReadable() apiReadable.pipe(dbWritable)
Как мне кажется – это очень круто, теперь из кода предельно ясно что мы читаем из одного места и пишем в другое. Кроме того читатель может проверить что наш код работает очень эффективно и использует буфер. Ну и всякие мелкие плюшки вроде того что он не блокирует event-loop.
Хм –, спросит внимательный читатель, – а что же с обрабоктой данных? Для этого мы можем написать еще один Transform stream, но это как-то "плоско и скучно", по-этому мы используем библиотеку Highland.js которая позволит нам применить всеми любимые filter и map над эллементами нашего "потока" сущностей. Вообще, Highland это что-то больше чем этот простой usecase, но это тема отдельной и не маленькой статьи. Как-то так:
H(apiReadable) .flatten() .reject(x => _.includes(String(x.id), 3)) .map(function(x) { if(_.includes(String(x.id), 9)) { return _.extend(x, {timestamp: Date.now()}) } else { return x } }) .batchWithTimeOrCount(100, 1000) .pipe(dbWritable)
Как по мне, очень похоже на операции со списками и читаемо. А .flatten()
и .batchWithTimeOrCount(100, 1000)
нужны нам только потому что наши Streamы оперирует массивами вместо отдельных объектов.
Вот сообственно и все. Надеюсь я достиг своей цели и заинтересовал читателя в изучении Stream и Highland.js
NB: Если вам понравилась статья перейдите, пожалуйста, по ссылке и прогосолуйте за мой доклад на Polyconf. Доклад называется Asynchronous programming 101: Promises and Streams
ссылка на оригинал статьи https://habrahabr.ru/post/325320/
Добавить комментарий