index.js (885B)
1 'use strict'; 2 3 const { PassThrough } = require('stream'); 4 5 module.exports = function (/*streams...*/) { 6 var sources = [] 7 var output = new PassThrough({objectMode: true}) 8 9 output.setMaxListeners(0) 10 11 output.add = add 12 output.isEmpty = isEmpty 13 14 output.on('unpipe', remove) 15 16 Array.prototype.slice.call(arguments).forEach(add) 17 18 return output 19 20 function add (source) { 21 if (Array.isArray(source)) { 22 source.forEach(add) 23 return this 24 } 25 26 sources.push(source); 27 source.once('end', remove.bind(null, source)) 28 source.once('error', output.emit.bind(output, 'error')) 29 source.pipe(output, {end: false}) 30 return this 31 } 32 33 function isEmpty () { 34 return sources.length == 0; 35 } 36 37 function remove (source) { 38 sources = sources.filter(function (it) { return it !== source }) 39 if (!sources.length && output.readable) { output.end() } 40 } 41 }