index.js (1489B)
1 'use strict'; 2 const {constants: BufferConstants} = require('buffer'); 3 const stream = require('stream'); 4 const {promisify} = require('util'); 5 const bufferStream = require('./buffer-stream'); 6 7 const streamPipelinePromisified = promisify(stream.pipeline); 8 9 class MaxBufferError extends Error { 10 constructor() { 11 super('maxBuffer exceeded'); 12 this.name = 'MaxBufferError'; 13 } 14 } 15 16 async function getStream(inputStream, options) { 17 if (!inputStream) { 18 throw new Error('Expected a stream'); 19 } 20 21 options = { 22 maxBuffer: Infinity, 23 ...options 24 }; 25 26 const {maxBuffer} = options; 27 const stream = bufferStream(options); 28 29 await new Promise((resolve, reject) => { 30 const rejectPromise = error => { 31 // Don't retrieve an oversized buffer. 32 if (error && stream.getBufferedLength() <= BufferConstants.MAX_LENGTH) { 33 error.bufferedData = stream.getBufferedValue(); 34 } 35 36 reject(error); 37 }; 38 39 (async () => { 40 try { 41 await streamPipelinePromisified(inputStream, stream); 42 resolve(); 43 } catch (error) { 44 rejectPromise(error); 45 } 46 })(); 47 48 stream.on('data', () => { 49 if (stream.getBufferedLength() > maxBuffer) { 50 rejectPromise(new MaxBufferError()); 51 } 52 }); 53 }); 54 55 return stream.getBufferedValue(); 56 } 57 58 module.exports = getStream; 59 module.exports.buffer = (stream, options) => getStream(stream, {...options, encoding: 'buffer'}); 60 module.exports.array = (stream, options) => getStream(stream, {...options, array: true}); 61 module.exports.MaxBufferError = MaxBufferError;