关于 stream.pipe 你需要知道更多

关于 stream 用法的一个经典例子

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

经典例子的致命问题

如果用户中断下载,文件不会关闭,导致文件句柄(fd)泄露,参见相关讨论:

express - Deleting file in node.js not working - Stack Overflow

修复如下

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res).once('close', function () {
        stream.destroy();
    });
});
server.listen(8000);

stream.destroy(同 stream.close) 是一个未文档化的 API,来自 fs.ReadStream ,如此重要的一个函数竟然未文档化(至少现在还是未文档化状态,当前 node.js 版本为 v6.2.0)。

关键 API 文档

stream.pipe 文档

readable.pipe(destination[, options])#

  • destination Writable Stream The destination for writing data
  • options Object Pipe options
    • end Boolean End the writer when the reader ends. Default = true

This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.

fs.createReadStream 文档

fs.createReadStream(path[, options])#

Returns a new ReadStream object (See Readable Stream).

Be aware that, unlike the default value set for highWaterMark on a readable stream (16 kb), the stream returned by this method has a default value of 64 kb for the same parameter.

options is an object or string with the following defaults:

{ flags: 'r', encoding: null, fd: null, mode: 0o666, autoClose: true }

options can include start and end values to read a range of bytes from the file instead of the entire file. Both start and end are inclusive and start at 0. The encoding can be 'utf8', 'ascii', or 'base64'.

If fd is specified, ReadStream will ignore the path argument and will use the specified file descriptor. This means that no open event will be emitted.

If autoClose is false, then the file descriptor won't be closed, even if there's an error. It is your responsibility to close it and make sure there's no file descriptor leak. If autoClose is set to true (default behavior), on error or end the file descriptor will be closed automatically.

mode sets the file mode (permission and sticky bits), but only if the file was created.

An example to read the last 10 bytes of a file which is 100 bytes long:

fs.createReadStream('sample.txt', {start: 90, end: 99});

If options is a string, then it specifies the encoding.

stream.pipe 的工作原理

stream.pipe 将可读流(Readable Stream)连接到可写流(Writable Stream),数据会从可读流传输到可写流,支持自动流量控制。

上面的 stream.pipe 其实是调用的 Readable.pipe,其流程简述如下:

  • 监听可读流的 data 事件:将读取到的数据写入可写流,如果可写流缓冲区满,则暂停可读流。
  • 监听可写流的 drain 事件:实现自动流量控制。
  • 监听可写流的 unpipe 事件:取消所有事件监听。
  • 监听可写流的 close、finish 事件:调用可读流的 unpipe(),触发可写流的 unpipe 事件。
  • 监听可读流的 end 事件:调用可写流的 Writable.end(),触发可写流的 finish 事件。
  • 监听可写流的 error 事件:调用可读流的 unpipe(),触发可写流的 unpipe 事件,如果 error 事件没有其它人监听,则抛出为异常。
  • 在可写流上发出 pipe 事件。

以上流程请自行阅读代码映证:node/_stream_readable.js at v4.4.4 · nodejs/node

stream.pipe 的问题

结合以上描述有以下疑问

  • 可读流出错会怎么样

    可读流发出 error、close 事件,但因为错误没有发出 end 事件。

    可读流可能被关闭,可写流不会被关闭,pipe 状态保持不变,数据流动停顿了。

    在现实情况中,或早或晚,可写流可能因为超时时间到等原因最终被关闭,从而转化为下面的情况。

  • 可写流出错会怎么样

    可写流发出 error、close 事件,没有 finish 事件。

    可读流会与可写流断开 pipe,可读流不会被关闭。

    可读流以 fs.ReadStream 为例,当它被读完时(EOF,发出 end 事件),根据 autoClose 标志(默认为 true),决定是否关闭流(释放文件句柄),没有读完就不会被关闭。

    以上逻辑是合理的,一个可读流可以与多个可写流通过 pipe 连在一起,没有理由因为一个可写流的问题影响到可读流的状态。

    做为开发人员,切莫幻想 node.js 的垃圾收集( GC)会在可读流没有被引用时自动关闭文件句柄。

    当 node.js 将文件句柄以整数(Integer)方式表示时,就不可能实现垃圾收集时自动关闭文件句柄了。

stream.pipe 没有魔法,它提供了一种传输数据的优美方式,但是它并不完美,在错误处理方面留下了很多空白,有待开发人员自行解决。


node