- PVSM.RU - https://www.pvsm.ru -
Поток — это концепция, которая была сначала реализована в UNIX системах для передачи данных из одной программы в другую в операциях ввода/вывода. Это позволяет каждой программе быть очень специализированной в том, что она делает — быть независимым модулем. Сочетание таких простых программ помогает в создании более сложных систем путем «объединения» их в цепочку вызовов.
Потоки позволяют обмениваться данными небольшими частями, что в свою очередь дает возможность в своей работе не расходовать много памяти. Конечно, это зависит от того, как вы реализуется внутренний функционал потока.
Распространенная задача — парсинг файла большого объема. Например, в текстовом файле с данными логов нужно найти строку, содержащую определенный текст. Вместо того, чтобы файл полностью загрузить в память, и потом начать разбирать в нем строки в поисках нужной, мы можем его считывать небольшими порциями. Тем самым не занимаем память сверх необходимого, а лишь столько памяти, сколько нужно для буферизации считанных данных. Как только найдем требуемую запись, сразу прекратим дальнейшую работу. Или можем передать найденную запись в другой поток по цепочке, например, для преобразование в другой формат, или сохранения в другой файл.
Модуль stream предоставляет базовый API по работе с потоками в Node.JS. Документации Node.JS вполне достаточно, чтобы разобраться в данном вопросе, но мы попытаемся составить что-то вроде шпаргалки с пояснениями некоторых моментов.
Есть четыре вида потоков:
Все потоки являются экземплярами EventEmitter, то есть можно генерировать события StreamClass.emit('eventName', data), и обрабатывать их StreamClass.on('eventName', (data)=>{});
Чтобы передать данные из одного потока в другой, самый простой способ вызвать над потоками метод pipe:
Readable.pipe(Writable);//например, по "схеме" DataBase -> File
Readable.pipe(Transform).pipe(Writable);//DataBase -> преобразовать в JSON формат -> сохранить JSON в File
Duplex.pipe(Transform).pipe(Duplex);//прочитать из DataBase -> обработать -> записать обратно в DataBase результат
Последняя цепочка вызовов показывает, что реализовывать свои классы потоков лучше таким образом, чтобы каждый их них решал свою задачу.
Как видно — метод pipe возвращает экземпляр потока, который был передан в него, что и позволяет потоки объединять между собой.
Метод pipe, реализован таким образом, что он решает задачу контроля «скорости» передачи данных из одного потока в другой (превышение объема внутреннего буфера потока). Например, Writable поток работает на запись медленнее, чем их передает источник данных Readable. В этом случае передача данных «приостанавливается» до тех пор, пока Writable «не сообщит» (внутренний буфер очистится), что он готов принимать следующую порцию данных.
Потоки хранят данные в своем внутреннем буфере. Размер буфера можно указать через параметр highWaterMark, который можно задать в конструкторе класса.
Физический смысл значение highWaterMark зависит от другой опции — objectMode.
new StreamObject({objectMode: false, highWaterMark: кол_во_байт}); //по умолчанию 16384 (16kb)
new StreamObject({objectMode: true, highWaterMark: кол_во_объектов});//по умолчанию 16
В Readable потоке данные буферизируются, когда над ним вызвается метод push(data), и остаются в буфере до тех пор, пока их не прочитают, вызвав метод read(). Как только общий размер внутреннего буфера Readable потока достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.
Для Writable буферизация происходит во время вызова над ним метода write(data). Метод вернет true, пока размер буфера не достиг значения highWaterMark, и false, когда буфер переполнен.
При использовании метода pipe(), как раз в этот момент он «останавливает» чтение данных, ожидает событие «drain», после чего передача данных возобновляется.
По умолчанию, потоки работают с данными в виде буфера, но так же могут работать как со строками, так и с другими объектами JavaScript (например, {«user»:{«name»:«Ivan», «last_name»:«Petrov»}}), за исключением null-объекта, который играет отдельную роль при передаче данных (если поток получает null, это является сигналом, что данных для обработки больше нет, и чтение или запись данных завершена). Как установить тот или иной режим потока при его инициализации покажем в примерах ниже.
Состояние flowing === true — автоматически если:
Из состояния flowing в paused можно переключиться (flowing === false):
На момент инициализации класса Readable flowing === null, то есть еще не реализован механизм чтения данных, и данные не генерируются.
Readable потоки работают в одном из двух состояний: flowing и paused. В состоянии paused для считывания данных необходимо явно вызывать метод read(). Когда вы передаете данные из одного потока в другой (R.pipe(W)), метод read() вызывается автоматически.
Весь текущий буфер данных можно получить с помощью свойства Readable._readableState.buffer.
'use strict';
const { Readable } = require('stream');
/**
* чтобы реализовать свой класс Readable потока, необходимо имплементировать метод _read().
* именно с нижним подчеркиванием перед именем
* сравните состояние потока (_readableState) во время инициализации, и по окончании чтения данных (on('end', ()=>{}))
*/
class Source extends Readable
{
constructor(array_of_data = [], opt = {})
{
super(opt);
this._array_of_data = array_of_data;
console.log('objectMode ', this._readableState.objectMode);//false по умолчанию, если не задано явно другое
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('length ', this._readableState.length);//0 - кол-во буфер объектов
console.log('flowing ', this._readableState.flowing);//null
//для краткости примеров, добавим обработчики событий в конструкторе
this.on('data', (chunk)=>
{
//при обработке события 'data' - данные считываются из буфера и удаляются из него
console.log('n---');
console.log('Readable on data ');
//здесь chunk данные в виде буфера
console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`);
//кол-во данных в текущем буфере (кол-во буфер объектов)
console.log('buffer.length ', this._readableState.buffer.length);
console.log('данные: ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk как строка ', this._readableState.buffer.toString());
})
.on('error',(err)=>
{
console.log('Readable on error ', err);
})
.on('end',()=>
{
console.log('Readable on end ');
console.log('objectMode ', this._readableState.objectMode);//false
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('buffer.length ', this._readableState.buffer.length);//0
console.log('flowing ', this._readableState.flowing);//true !!!так как у нас есть обработчик события 'data'
})
.on('close',()=>
{
console.log('Readable on close не все реализации генерируют это событие');
});
}
_read()
{
let data = this._array_of_data.shift()
if (!data) {
//сообщаем, что данные закончились
this.push(null);
} else {
this.push(data);
}
}
}
/*значение именно как строки, т.к. по умолчанию потоки работают либо со строками, либо с буфером. иначе будет выброшено сообщение об ошибке по время this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */
let array_of_data = ['1', '2', '3', '4', '5'];
let opts = {/* значения свойств по умолчанию */};
const R = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false,
highWaterMark: 1//1 байт лимит для буферизации данных _readableState.buffer.length будет === 1
};
const R2 = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false
, encoding: 'utf8'//если задать кодировку (поддерживаемую NodeJS), то поток будет работать с данными как со строками, а не как с буфером
};
const R3 = new Source(array_of_data, opts);//кодировку так же можно задать с помощью метода .setEncoding('utf8')
array_of_data = [1, 2, 3, 4, 5];
/*при таких "настройках" потока будет ошибка. если objectMode: true то не надо указывать кодировку - ни в параметрах, ни через метод Readable.setEncoding('utf8')*/
opts = {
objectMode: true
, encoding: 'utf8'
};
const R4 = new Source(array_of_data, opts);
//при objectMode: true можно передать как строки, или как числа (Number)
array_of_data = [1, 2, 3, 4, 5];
opts = {
objectMode: true
};
const R5 = new Source(array_of_data, opts); //highWaterMark 16 - значение по умолчанию для объектов
/*имитируем задержку при чтении данных (подобное может происходить при Writable.write(someData) === false). пример ниже взят из документации Node.JS.
выполните код, и увидите как данные прекращаются считываться, они накапливаются в буфере, а потом продолжают считываться*/
array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
opts = {
objectMode: true
};
const R6 = new Source(array_of_data, opts);
R6.on('data', (chunk) => {
//приостанавливаем передачу данных на 1 секунду
R6.pause();
setTimeout(() => {
R6.resume();//возобновим работу потока
}, 1000);
});
Весь текущий буфер данных можно получить с помощью метода writable._writableState.getBuffer().
'use strict';
const Source = require('./readable.js');
const { Writable } = require('stream');
class Writer extends Writable
{
constructor(opt = {})
{
super(opt);
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('drain', ()=>
{
console.log('n------ writable on drain');
})
.on('error', (err)=>
{
console.log('n------ writable on error', err);
})
.on('finish', ()=>
{
console.log('n------ writable on finish');
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
});
}
/**
* @param chunk - строка|буфер|объект
* @param encoding - кодировка поступающих данных. если objectMode === true, значение encoding будет игнорироваться
* @param done - callback ф-ция. ее удобнее именовать именно так, потому что вы ее вызываете, когда по логике
* вашего метода _write, нужно сообщить, что завершили запись текущей части данных chunk, и готовы принять на запись
* следующую часть: done(err) - можно передать объект ошибки new Error(...)
* @private
*/
_write(chunk, encoding, done)
{
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
console.log(typeof chunk );
//для пример с потоком Transform см ниже
if (typeof chunk === 'object') {
console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get()));
} else {
console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`);
}
/* Пример с ошибкой оставим закомментированным.
Добавим, что:
1) всегда добавляйте обработчик ошибок on('error', (err)=>{...})
2) если выбрасывается ошибка, то поток данных Readable не прекращает свою работу.
в этом слувае вам надо обрабатывать эту ситуацию - например, вызывать Readable.emit('error', err);
и прекращать читать данные Readable.puse(), после обработки ошибки продолжить работу Readable.remuse().
Это в общем случае, и все зависит от ваших задач при работе с потоками
//if (chunk > 3) return done(new Error('chunk > 3'));*/
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {/* значения по умолчанию */};
const R = new Source(array_of_data, r_opts);
let w_opts = {/* значения по умолчанию */};
const W = new Writer(w_opts);
R.pipe(W);
array_of_data = ['1', '2', '3', '4', '5'];
r_opts = {encoding: 'utf8'};
const R1 = new Source(array_of_data, r_opts);
w_opts = {
decodeStrings: false//данные в _write будут строками в кодировке 'utf8', так как данные из источника - строки ( см r_opts),
};
const W1 = new Writer(w_opts);
R1.pipe(W1);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R2 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
};
const W2 = new Writer(w_opts);
R2.pipe(W2);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R3 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
, highWaterMark: 1 //ограничем буфер; при таком маленьком значении каждый раз будет вызываться событие 'drain'
};
const W3 = new Writer(w_opts);
R3.pipe(W3);
//Вариант без pipe()
const R3_1 = new Source(array_of_data, r_opts);
const W3_1 = new Writer(w_opts);
R3_1.on('data', (chunk)=> {
//R3_1._readableState.flowing === true
console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer);
toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain);
});
function onDrain() {
//R3_1._readableState.flowing === false, так как был вызван метод R3_1.pause() см toWriteOrNotToWriteThatIsTheQuestion
console.log('R3_1 in flowing mode', R3_1._readableState.flowing);
R3_1.resume();
}
/**
* если на данный момент не можем больше писать в поток Writable, нужно оставноить и получение данных из Readable (R3_1.pause())
* как только буфер очистится (событие 'drain'), мы продолжаем читать данные из источника Readable (см cb R3_1.resume(); ), и записывать в Writable
* @param data
* @param cb
*/
function toWriteOrNotToWriteThatIsTheQuestion(data, cb)
{
//во "внешнем коде" записывать данные через метод write(...), а не через _write(...)
if (!W3_1.write(data)) {
R3_1.pause();
W3_1.once('drain', cb);
} else {
process.nextTick(cb);
}
}
Transform — разновидность Duplex потоков. Решили сначала показать пример с ним.
'use strict';
const Readable = require('./readable.js');
const Writable = require('./writable.js');
const {Transform} = require('stream');
/*для примера того, что можем передавать не только строки, буфер, простые JS объекты,
но и экземпляры классов*/
class Chunk
{
constructor(chunk)
{
this.set(chunk);
}
set(chunk)
{
this._chunk = chunk;
}
get()
{
return this._chunk;
}
inPow(pow = 2)
{
return Math.pow(this.get(), pow);
}
}
class Transformer extends Transform
{
constructor(opt = {})
{
super(opt);
console.log('n -------- Transform in constructor');
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('close', ()=>
{
console.log('n------ Transform on close');
})
.on('drain', ()=>
{
console.log('n------ Transform on drain');
})
.on('error', (err)=>
{
console.log('n------ Transform on error', err);
})
.on('finish', ()=>
{
console.log('n------ Transform on finish');
})
.on('end', ()=>
{
console.log('n------ Transform on end');
})
.on('pipe', ()=>
{
console.log('n------ Transform on pipe');
});
}
/**
* метод, реализующий в себе запись данных (chunk поступают в поток Transform),
* и чтение данных - когда другой поток читает из Transform
* @param chunk
* @param encoding
* @param done - в общем случае done(err, chunk)
* @private
*/
_transform(chunk, encoding, done)
{
/*завершить обработку текущих данных chunk, и передать дальше на чтение можно двумя вариантами
done(null, chunk);
done(err, chunk); - в этом случае будет вызвано событие error
или так, что то же самое:
this.push(chunk);
done();
this.push(chunk);
done(err);*/
//преобразовали выходные данные в экземпляр класса Chunk (см. пример writable.js)
this.push(new Chunk(chunk));
done();
}
/**
* Кастомные transform потоки могут реализовать метод _flush.
Он будет вызван, когда нет больше данных на запись, но перед событием 'end' потока Readable (имеется ввиду Transform, так как это поток и на запись, и на чтение данных).
* @param done - done(err) можно передать объект ошибки Error
* @private
*/
_flush(done)
{
//TODO ... что-нибудь сделали дополнительно перед завершением работы потока
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {
encoding: 'utf8'
};
const R = new Readable(array_of_data, r_opts);
let t_opts = {
readableObjectMode: true //читать из потока Transform будут объекты
, writableObjectMode: false//записывать в поток Transform можно либо строки или буфер
, decodeStrings: false
};
const T = new Transformer(t_opts);
let w_opts = {
objectMode: true//если false, будет выброшена ошибка
};
const W = new Writable(w_opts);
R.pipe(T).pipe(W);
Duplex реализуют в себе как Readable, таки Writable потоки. При этом их «работа» происходит независимо друг от друга.
Если вы заинтересовались темой потоков, предлагаем поэкспериментировать над реализацией своих Duplex потоков самостоятельно.
Когда в каком-то из звеньев было вызвано событие 'error', и если нужно об этом уведомить «предыдущие» потоки в цепочке, для них так же нужно вызвать событие 'error': StreamClass.emit(‘error’, err), и обработать ситуацию. Или воспользоваться модулем pump (https://github.com/mafintosh/pump), с помощью которого можно решить данный вопрос.
С помощью потоков можно решать практически любую задачу:
Как говорится – на любой вкус.
Автор: RA_ZeroTech
Источник [1]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/javascript/265505
Ссылки в тексте:
[1] Источник: https://habrahabr.ru/post/339900/
Нажмите здесь для печати.