解读 Node.JS 中的Readable 和 Writeable

网站前端开发_前端开发者node.js

https://www.rokub.com

node.js 中的 Stream, 解读 Readable 和 Writeable在node中, 只要涉及到文件IO的场景一般都会涉及到一个类- Stream。 Stream是对IO设备的抽象表示, 其在JAVA中也有涉及, 主要体现在四个类- InputStream、 Reader、 OutputStream、 Writer, 其中InputStream和OutputStream类针对字节数据进行读写; Reader和Writer针对字符数据读写。 同时Java中有多种针对这四种类型的扩展类, 如节点流、 缓冲流和转换流等。 比较而言, node中Stream类型也和Java中的类似, 同样提供了支持字节和字符读写的Readable和Writeable类, 也存在转换流Transform类, 本文主要分析node中Readable和Writeable的实现机制, 从底层的角度更好的理解Readable和Writeable实现机制, 解读在读写过程中发生的一些重要事件。 Readable类Readable对应于Java中的InputStream和Reader两个类, 针对Readable设置encode编码可完成内部数据由Buffer到字符的转换。 Readable Stream有两种模式, 即flowing和paused模式。 这两种模式对于用户而言区别在于是否需要手动调用Readable.prototype.read(n), 读取缓冲区的数据。 查询node API文档可知触发flowing模式有三种方式: 侦听data事件readable.resume() readable.pipe() 而触发paused模式同样有几种方式: 而触发paused模式同样有几种方式: 移除data事件readable.pause() readable.unpipe() 可能这样讲解大家仍不明白Readable Stream这两种模式的区别, 那么下文从更深层次分析两种模式的机制。 深入Readable的实现Readable继承EventEmitter, 大家也都知道。 但是相信大家应该不怎么熟悉Readable的实例属性 ** _readableState ** 。该属性是一个ReadableState类型的对象, 保存了Readable实例的重要信息, 如读取模式( 是否为对象模式)、 highWaterMask( 缓冲区存放的最大字节数)、 缓冲区、 flowing模式等。 在Readable的实现中, 处处使用ReadableState对象记录当前读取状态, 并设置缓冲区保证读操作的顺利进行。

首先需要针对Readable.prototype.read方法进行特别解读:
if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
debug(‘read: emitReadable’,state.length,state.ended);
if (state.length ===0&&state.ended) endReadable(this);
elseemitReadable(this);
returnnull;
}
1 2 3 4 5 6 7 8 9 10
if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
debug(‘read: emitReadable’,state.length,state.ended);
if (state.length ===0&&state.ended) endReadable(this);
elseemitReadable(this);
returnnull;
}
当读入的数据为0时, 执行emitReadable操作。 这意味着, 针对Readable Stream执行read(0) 方法会触发readable事件, 但是不会读当前缓冲区。 因此使用read(0) 可以完成一些比较巧妙的事情, 如在readable处理函数中可以使用read(0) 触发下一次readable事件, 可选的操作读缓冲区。 继续分析代码, 如果读入的数据并不是0, 则计算读取缓冲区的具体字节数, n = howMuchToRead(n, state);
function howMuchToRead(n, state) {
if (state.length ===0&&state.ended) return0;
if (state.objectMode) returnn===0?0:1;
if (n===null||isNaN(n)) {
onlyflowonebufferatatime
if (state.flowing&&state.buffer.length) returnstate.buffer[0].length;
若是paused状态, 则读全部的缓冲区
elsereturnstate.length;
}
if (n<=0) return0;
if (n>state.highWaterMark) state.highWaterMark=computeNewHighWaterMark(n);
don’t have that much. return null, unless we’
veended.if(n>state.length) {
if (!state.ended) {
state.needReadable=true;
return0;
}else{
returnstate.length;
}
}
returnn;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 n = howMuchToRead(n, state);
function howMuchToRead(n, state) {
if (state.length ===0&&state.ended) return0;
if (state.objectMode) returnn===0?0:1;
if (n===null||isNaN(n)) {
onlyflowonebufferatatime
if (state.flowing&&state.buffer.length) returnstate.buffer[0].length;
若是paused状态, 则读全部的缓冲区
elsereturnstate.length;
}
if (n<=0) return0;
if (n>state.highWaterMark) state.highWaterMark=computeNewHighWaterMark(n);
don’t have that much. return null, unless we’
veended.if(n>state.length) {
if (!state.ended) {
state.needReadable=true;
return0;
}else{
returnstate.length;
}
}
returnn;
}
针对对象模式的读取, 每次只读一个; 对于处在flowing模式下的读取, 每次只读缓冲区中第一个buffer的长度; 在paused模式下则读取全部缓冲区的长度; 若读取的字节数大于设置的缓冲区最大值, 则适当扩大缓冲区的大小( 默认为16k, 最大为8m); 若读取的长度大于当前缓冲区的大小, 设置needReadable属性并准备数据等待下一次读取。 接下来, 判断是否需要准备数据。 在这里, 依赖于needReadable的值,
var doRead = state.needReadable;
debug(‘need readable’, doRead);
if (state.length === 0 || state.length – n < state.highWaterMark) {
doRead=true;
debug(‘length less than watermark’,doRead);
}
reading, then it ‘s unnecessary. if (state.ended || state.reading) { doRead = false; debug(‘
reading or ended ‘, doRead); } 1 2 3 4 5 6 7 8 9 10 11 12 13 var doRead = state . needReadable ; debug ( ‘
need readable ‘ , doRead ) ; if ( state . length === 0 || state . length – n < state . highWaterMark ) { doRead = true ; debug ( ‘
length less than watermark ‘ , doRead ) ; } reading, then it’
s unnecessary.if(state.ended || state.reading) {
doRead=false;
debug(‘reading or ended’,doRead);
}
如果当前缓冲区为空, 或者缓冲区并未超出我们设定的最大值, 那么就可以继续准备数据; 如果此时正在准备数据或者已经结束读取, 那么就放弃准备数据。 一旦doRead为true, 那么进入准备数据阶段,
if (doRead) {
debug(‘do read’);
state.reading=true;
state.sync=true;
ifthelengthiscurrentlyzero,thenwe*need*areadableevent.if(state.length ===0) state.needReadable=true;
callinternalreadmethod默认Readable未实现_read, 抛出Error针对自定义的Readable子类, _read可修改state.buffer的数量, 进行预处理, 然后由下面的fromList读出去缓存中的相关数据this._read(state.highWaterMark);
state.sync=false;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14
if (doRead) {
debug(‘do read’);
state.reading=true;
state.sync=true;
ifthelengthiscurrentlyzero,thenwe*need*areadableevent.if(state.length ===0) state.needReadable=true;
callinternalreadmethod默认Readable未实现_read, 抛出Error针对自定义的Readable子类, _read可修改state.buffer的数量, 进行预处理, 然后由下面的fromList读出去缓存中的相关数据this._read(state.highWaterMark);
state.sync=false;
}
接下来设置相关的标志位, 进行_read处理。 针对这个私有方法_read, 文档上有特殊说明, 自定义的Readable实现类需要实现这个方法, 在该方法中手动添加数据到Readable对象的读缓冲区, 然后进行Readable的读取。 可以理解为_read函数为读取数据前的准备工作( 准备数据), 针对的是流的实现者而言。
if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
var ret;
if (n > 0) ret = fromList(n, state);
else ret = null;
if (ret === null) {
state.needReadable=true;
n=0;
}
state.length -= n;
if (state.length === 0 && !state.ended) state.needReadable = true;
if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
flowing模式下的数据读取依赖于 read函数 data事件触发的次数, 依赖于howMuchToRead计算的次数
if (ret !== null) this.emit(‘data’, ret);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
var ret;
if (n > 0) ret = fromList(n, state);
else ret = null;
if (ret === null) {
state.needReadable=true;
n=0;
}
state.length -= n;
if (state.length === 0 && !state.ended) state.needReadable = true;
if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
flowing模式下的数据读取依赖于 read函数 data事件触发的次数, 依赖于howMuchToRead计算的次数
if (ret !== null) this.emit(‘data’, ret);
一旦在_read中更新了缓冲区, 那么我们需要重新计算( 消费者, 即可写流) 读取的字节数。 fromList方法完成了读缓冲区的slice, 如果是objectMode下的读, 则只读缓冲区的第一个对象; 针对未传参数的read方法而言, 默认读取全部缓冲区等等。 从读缓冲区读取完数据之后设置相关flag, 如needReadable, 最终, 触发data事件, 结束! 上节提到, 设置data事件的执行函数会进入flowing模式的读, 而上文看到正是read方法触发了data事件, 而默认条件下Readable处于paused状态, 因此在paused状态读取数据需要手动执行read函数, 每次read读取完毕触发一次data事件。 从这点看出, flowing和paused状态区别在于是否需要手动执行read() 来获取数据。 flowing状态下, 我们无需执行read, 仅需要设置data事件处理函数或者设定导流目标pipe; 而在paused状态下, 不仅仅是简单的执行read方法, 因为读缓冲区的内容时刻在改变, 一旦读缓冲区又有新数据, 简单执行read() 就没法满足需求(因为我们无法知道是否又有新数据到来), 因此需要侦听读缓冲区的相关事件, 即readable事件, 在该事件处理函数中进行read相关数据。 那么, 什么情况下会触发readable事件呢? 在实现_read私有方法中, 我们使用stream.push(chunk) 或stream.unshift(chunk) 方法注入数据到读缓冲区, 那么push和unshift方法都实现了下面的逻辑,
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit(‘data’,chunk);
stream.read(0);
} else {
updatethebufferinfo.state.length +=state.objectMode?1:chunk.length;
if (addToFront) state.buffer.unshift(chunk);
elsestate.buffer.push(chunk);
if (state.needReadable) emitReadable(stream);
}
function emitReadable(stream) {
varstate=stream._readableState;
state.needReadable=false;
if (!state.emittedReadable) {
debug(’emitReadable’,state.flowing);
state.emittedReadable=true;
if (state.sync) process.nextTick(emitReadable_,stream);
elseemitReadable_(stream);
}
}
function emitReadable_(stream) {
debug(’emit readable’);
stream.emit(‘readable’);
flow(stream);
}
在flowing状态下, 自动读取流( 替代paused状态下手动read)
function flow(stream) {
varstate=stream._readableState;
debug(‘flow’,state.flowing);
if (state.flowing) {
do{
varchunk=stream.read();
}while (null!==chunk&&state.flowing);
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit(‘data’,chunk);
stream.read(0);
} else {
updatethebufferinfo.state.length +=state.objectMode?1:chunk.length;
if (addToFront) state.buffer.unshift(chunk);
elsestate.buffer.push(chunk);
if (state.needReadable) emitReadable(stream);
}
function emitReadable(stream) {
varstate=stream._readableState;
state.needReadable=false;
if (!state.emittedReadable) {
debug(’emitReadable’,state.flowing);
state.emittedReadable=true;
if (state.sync) process.nextTick(emitReadable_,stream);
elseemitReadable_(stream);
}
}
function emitReadable_(stream) {
debug(’emit readable’);
stream.emit(‘readable’);
flow(stream);
}
在flowing状态下, 自动读取流( 替代paused状态下手动read)
function flow(stream) {
varstate=stream._readableState;
debug(‘flow’,state.flowing);
if (state.flowing) {
do{
varchunk=stream.read();
}while (null!==chunk&&state.flowing);
}
}
一旦处于flowing模式并且当前缓冲区没有数据, 那么就立即将预处理的push( unshift) 数据传递给data事件处理函数, 并执行stream.read(0)。 前文已经交代过, read(0) 仅仅用来触发readable事件, 并不读取缓冲区, 这就是触发readable的第一种情况。 第二种则是第一种情况之外的所有情景, 即根据操作( push、 unshift) 的不同将数据插入读缓冲区的不同位置。 最后执行emitReadable函数, 触发readable事件。 针对emitReadable函数, 它的作用就是异步触发readable事件, 并执行flow函数。 flow函数则针对flowing状态的Readable做自适应读取, 免去了手动执行read函数和何时执行read函数的苦恼。 这样, 对于Readable的实现者, 一旦在_read函数插入有效数据到读缓冲区, 都会触发readable事件, 在paused状态下, 设置readable事件处理函数并手动执行read函数, 便可完成数据的读取; 而在flowing状态下, 通过设置data事件处理函数或者定义pipe目标流同样可以实现读取。 既然pipe同样可以触发Readable进入flowing状态, 那么pipe方法具体做了什么呢? 其实pipe针对Readable和Writeable做了限流, 首先针对Readable的data事件进行侦听, 并执行Writeable的write函数, 当Writeable的写缓冲区大于一个临界值( highWaterMark), 导致write函数返回false( 此时意味着Writeable无法匹配Readable的速度, Writeable的写缓冲区已经满了), 此时, pipe修改了Readable模式, 执行pause方法, 进入paused模式, 停止读取读缓冲区。 而同时Writeable开始刷新写缓冲区, 刷新完毕后异步触发drain事件, 在该事件处理函数中, 设置Readable为flowing状态, 并继续执行flow函数不停的刷新读缓冲区, 这样就完成了pipe限流。 需要注意的是, Readable和Writeable各自维护了一个缓冲区, 在实现的上有区别: Readable的缓冲区是一个数组, 存放Buffer、 String和Object类型; 而Writeable则是一个有向链表, 依次存放需要写入的数据。 Writeable解读Writeable对应Java的OutputStream和Writer类, 实现字节和字符数据的写。 与Readable类似, Writeable的实例对象同样维护了一个状态对象- WriteableState, 记录了当前输出流的状态信息, 如写缓冲区的最大值( hightWaterMark)、 缓冲区( 有向链表) 和缓冲区长度等信息。 在本节中, 主要分析输出流的关键方法write和事件drain, 并解析输出流的实现者需要实现的方法 ** _write和write ** 的关系。
function write– — — — — — — — — — — — — —
if (state.ended) writeAfterEnd(this, cb);
else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret=writeOrBuffer(this,state,chunk,encoding,cb);
}
return ret;
1 2 3 4 5 6 7 8 9 10
function write– — — — — — — — — — — — — —
if (state.ended) writeAfterEnd(this, cb);
else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret=writeOrBuffer(this,state,chunk,encoding,cb);
}
return ret;
在write方法中, 判断写入数据的格式并执行writeOrBuffer函数, 并返回执行结果, 该返回值标示当前写缓冲区是否已满。 真正执行写入逻辑的是writeOrBuffer函数, 该函数的作用在于刷新或者更新写缓冲区, 下面看看主要做了什么,
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk=decodeChunk(state,chunk,encoding);
if (chunkinstanceofBuffer) encoding=’buffer’;
varlen=state.objectMode?1:chunk.length;
state.length +=len;
如果缓存的长度大于highWaterMark, 触发drain事件
varret=state.length <state.highWaterMark;
wemustensurethatpreviousneedDrainwillnotberesettofalse.if(!ret) state.needDrain=true;
缓存未处理的写请求, 在clearBuffer中执行缓存由此看出, Readable和Writeable都有缓存, Readable中缓存的方式是数组( 项为Buffer,字符串或对象), Writeable的缓存则是对象链表
if (state.writing||state.corked) {
varlast=state.lastBufferedRequest;
state.lastBufferedRequest=newWriteReq(chunk,encoding,cb);
if (last) {
last.next =state.lastBufferedRequest;
}else{
state.bufferedRequest=state.lastBufferedRequest;
}
state.bufferedRequestCount+=1;
}else{
doWrite(stream,state,false,len,chunk,encoding,cb);
}
returnret;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk=decodeChunk(state,chunk,encoding);
if (chunkinstanceofBuffer) encoding=’buffer’;
varlen=state.objectMode?1:chunk.length;
state.length +=len;
如果缓存的长度大于highWaterMark, 触发drain事件
varret=state.length <state.highWaterMark;
wemustensurethatpreviousneedDrainwillnotberesettofalse.if(!ret) state.needDrain=true;
缓存未处理的写请求, 在clearBuffer中执行缓存由此看出, Readable和Writeable都有缓存, Readable中缓存的方式是数组( 项为Buffer,字符串或对象), Writeable的缓存则是对象链表
if (state.writing||state.corked) {
varlast=state.lastBufferedRequest;
state.lastBufferedRequest=newWriteReq(chunk,encoding,cb);
if (last) {
last.next =state.lastBufferedRequest;
}else{
state.bufferedRequest=state.lastBufferedRequest;
}
state.bufferedRequestCount+=1;
}else{
doWrite(stream,state,false,len,chunk,encoding,cb);
}
returnret;
}
writeOrBuffer首先针对数据进行编码, 字符串转换成Buffer类型, 如果设置了Writeable的ObjectMode模式则仍为Object类型; 接下来更新写缓冲区的长度, 并判断写缓冲区长度是否超过设定的Writeable的最大值( 默认16k), 如果超过超过则ret= false并更新WriteableState的属性needDrain= true。 ret的结果其实就是write方法返回值, 因此一旦write返回值为false, 意味着当前写缓冲区已满, 需要停止继续写入数据。 在Readable的pipe方法中, 涉及到了Writeable的drain事件。 该事件的触发意味着写缓冲区已可以继续缓存数据, 可见drain事件与写缓冲区严格相关。 继续分析writeOrBuffer函数, 若当前输出流正在写数据, 那么则当前数据缓存至写缓冲区( 创建WriteReq对象); 否则执行doWrite函数, 刷新缓冲区。
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen=len;
state.writecb=cb;
state.writing=true;
state.sync=true;
if (writev) stream._writev(chunk,state.onwrite);
elsestream._write(chunk,encoding,state.onwrite);
state.sync=false;
}
1 2 3 4 5 6 7 8 9 10 11
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen=len;
state.writecb=cb;
state.writing=true;
state.sync=true;
if (writev) stream._writev(chunk,state.onwrite);
elsestream._write(chunk,encoding,state.onwrite);
state.sync=false;
}
doWrite函数设置了需要写入数据的长度、 写入状态等信息, 并执行输出流实现者需要实现的_write函数。 在_write函数中, 针对数据流向做最后的处理, 这里分析_write函数的具体实现。 _write函数有三个参数, 分别为chunk, encoding和state.onwrite回调函数, 对该回调函数稍后分析, 先着重讲解_write函数的实现。 在node的fs模块中, 可以通过fs.createWriteStream创建Writeable实例, 通过执行var writeStream = fs.createWriteStream(‘./output’, {
decodeStrings: false
});
console.log(writeStream._write.toString());
— — — — — — — — – 输出– — — — — — — — – function (data, encoding, cb) {
if (!(datainstanceofBuffer)) returnthis.emit(‘error’,newError(‘Invalid data’));
if (typeofthis.fd!==’number’) returnthis.once(‘open’,function(){
this._write(data,encoding,cb);
});
varself=this;
fs.write(this.fd,data,0,data.length,this.pos,function(er,bytes){
if (er) {
self.destroy();
returncb(er);
}
self.bytesWritten+=bytes;
cb();
});
if (this.pos!==undefined) this.pos+=data.length;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
var writeStream = fs.createWriteStream(‘./output’, {
decodeStrings: false
});
console.log(writeStream._write.toString());
— — — — — — — — – 输出– — — — — — — — – function (data, encoding, cb) {
if (!(datainstanceofBuffer)) returnthis.emit(‘error’,newError(‘Invalid data’));
if (typeofthis.fd!==’number’) returnthis.once(‘open’,function(){
this._write(data,encoding,cb);
});
varself=this;
fs.write(this.fd,data,0,data.length,this.pos,function(er,bytes){
if (er) {
self.destroy();
returncb(er);
}
self.bytesWritten+=bytes;
cb();
});
if (this.pos!==undefined) this.pos+=data.length;
}
看出, 在_write实现中, 只接受Buffer类型的数据, 接着执行fs.write操作, 写入到对应文件描述符fd对应的文件中, 写入成功或失败后执行回调函数, 即state.onwrite函数。
function onwrite(stream, er) {
varstate=stream._writableState;
varsync=state.sync;
varcb=state.writecb;
onwriteStateUpdate(state);
默认未重写_write方法, 会收到er值
if (er) onwriteError(stream,state,sync,er,cb);
else{
Check
ifwe’re actually ready to finish, but don’
temityet
varfinished=needFinish(state);
写缓存的数据
if (!finished&&!state.corked&&!state.bufferProcessing&&state.bufferedRequest) {
clearBuffer(stream,state);
}
异步触发drain事件
if (sync) {
process.nextTick(afterWrite,stream,state,finished,cb);
}else{
afterWrite(stream,state,finished,cb);
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
function onwrite(stream, er) {
varstate=stream._writableState;
varsync=state.sync;
varcb=state.writecb;
onwriteStateUpdate(state);
默认未重写_write方法, 会收到er值
if (er) onwriteError(stream,state,sync,er,cb);
else{
Check
ifwe’re actually ready to finish, but don’
temityet
varfinished=needFinish(state);
写缓存的数据
if (!finished&&!state.corked&&!state.bufferProcessing&&state.bufferedRequest) {
clearBuffer(stream,state);
}
异步触发drain事件
if (sync) {
process.nextTick(afterWrite,stream,state,finished,cb);
}else{
afterWrite(stream,state,finished,cb);
}
}
}
在state.onwrite函数中主要工作有两个: 写缓冲区的数据写完缓冲区的数据后, 异步触发drain事件第一步, 在clearBuffer函数中, 就是取出写缓冲区( 有向链表) 的第一个WriteReq对象, 执行doWrite函数, 写入缓冲区的第一个数据; 这样循环往复最终清空写缓冲区, 重置一些标志位。 第二步, 异步执行afterWrite函数, 触发drain事件, 并判断是否写操作完毕触发“ finish” 事件。 这里之所以强调异步触发drain事件, 是因为为了保证先获得write() 返回值为false, 给用户绑定drain处理函数的时隙, 然后再触发drain事件。 至此, Writeable的重要流程已全部走通。 可以看出来, 在核心的write() 中, 判断写缓冲区是否已满并返回改值, 在适当条件下缓存数据或调用_write() 写数据, 在Writeable实现者需要实现的 ** _write() 中, 主要任务是数据写入方向控制, 完成最基本的任务 。

网站前端开发_前端开发者丨Node.js

https://www.rokub.com

» 本文来自:前端开发者 » 《解读 Node.JS 中的Readable 和 Writeable》
» 本文链接地址:https://www.rokub.com/2405.html
» 您也可以订阅本站:https://www.rokub.com
赞(0)
64K

评论 抢沙发

评论前必须登录!