stream.pipeline(streams, callback)
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 返回: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 返回: <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 返回: <AsyncIterable> | <Promise>
callback
<Function> 当管道完全完成时调用。err
<Error>val
destination
返回的Promise
的解析值。
- 返回: <Stream>
模块方法,用于在流和生成器之间进行管道转发错误并正确清理并在管道完成时提供回调。
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// 使用管道 API 可以轻松地将一系列流传输到一起,
// 并在管道完全完成时收到通知。
// 有效地 gzip 潜在巨大的 tar 文件的管道:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
);
pipeline
API 提供了 promise 版本,它也可以接收一选项参数作为带有 signal
<AbortSignal> 属性的最后一个参数。
当信号中止时,将在底层管道上调用 destroy
,并带有 中止错误
。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
要使用 AbortSignal
,则将其作为最后一个参数传到选项对象中:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
pipeline
API 还支持异步生成器:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // 使用字符串而不是 `Buffer`。
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
记得处理传入异步生成器的 signal
参数。
特别是在异步生成器是管道的来源(即第一个参数)或管道永远不会完成的情况下。
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
stream.pipeline()
将在所有流上调用 stream.destroy(err)
,除了:
- 已触发
'end'
或'close'
的Readable
流。 - 已触发
'finish'
或'close'
的Writable
流。
在调用 callback
后,stream.pipeline()
在流上留下悬空事件监听器。
在失败后重用流的情况下,这可能会导致事件监听器泄漏和吞下错误。
如果最后一个流是可读的,则悬空的事件监听器将被移除,以便稍后可以使用最后一个流。
stream.pipeline()
在出现错误时关闭所有流。
IncomingRequest
与 pipeline
一起使用可能会导致意外行为,一旦发生它会销毁套接字且不发送预期的响应。
参见以下示例:
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // 没有这样的文件
// 一旦 `pipeline` 已经销毁了套接字,则无法发送此消息
return res.end('error!!!');
}
});
});
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Returns: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Returns: <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Returns: <AsyncIterable> | <Promise>
callback
<Function> Called when the pipeline is fully done.err
<Error>val
Resolved value ofPromise
returned bydestination
.
- Returns: <Stream>
A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
);
The pipeline
API provides a promise version, which can also
receive an options argument as the last parameter with a
signal
<AbortSignal> property. When the signal is aborted,
destroy
will be called on the underlying pipeline, with an
AbortError
.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
To use an AbortSignal
, pass it inside an options object,
as the last argument:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
The pipeline
API also supports async generators:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
Remember to handle the signal
argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
stream.pipeline()
will call stream.destroy(err)
on all streams except:
Readable
streams which have emitted'end'
or'close'
.Writable
streams which have emitted'finish'
or'close'
.
stream.pipeline()
leaves dangling event listeners on the streams
after the callback
has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.
stream.pipeline()
closes all the streams when an error is raised.
The IncomingRequest
usage with pipeline
could lead to an unexpected behavior
once it would destroy the socket without sending the expected response.
See the example below:
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});