stream.js (2958B)
1 import {createReadStream, readFileSync} from 'node:fs'; 2 import {isStream} from 'is-stream'; 3 import getStream from 'get-stream'; 4 import mergeStream from 'merge-stream'; 5 6 const validateInputOptions = input => { 7 if (input !== undefined) { 8 throw new TypeError('The `input` and `inputFile` options cannot be both set.'); 9 } 10 }; 11 12 const getInputSync = ({input, inputFile}) => { 13 if (typeof inputFile !== 'string') { 14 return input; 15 } 16 17 validateInputOptions(input); 18 return readFileSync(inputFile); 19 }; 20 21 // `input` and `inputFile` option in sync mode 22 export const handleInputSync = options => { 23 const input = getInputSync(options); 24 25 if (isStream(input)) { 26 throw new TypeError('The `input` option cannot be a stream in sync mode'); 27 } 28 29 return input; 30 }; 31 32 const getInput = ({input, inputFile}) => { 33 if (typeof inputFile !== 'string') { 34 return input; 35 } 36 37 validateInputOptions(input); 38 return createReadStream(inputFile); 39 }; 40 41 // `input` and `inputFile` option in async mode 42 export const handleInput = (spawned, options) => { 43 const input = getInput(options); 44 45 if (input === undefined) { 46 return; 47 } 48 49 if (isStream(input)) { 50 input.pipe(spawned.stdin); 51 } else { 52 spawned.stdin.end(input); 53 } 54 }; 55 56 // `all` interleaves `stdout` and `stderr` 57 export const makeAllStream = (spawned, {all}) => { 58 if (!all || (!spawned.stdout && !spawned.stderr)) { 59 return; 60 } 61 62 const mixed = mergeStream(); 63 64 if (spawned.stdout) { 65 mixed.add(spawned.stdout); 66 } 67 68 if (spawned.stderr) { 69 mixed.add(spawned.stderr); 70 } 71 72 return mixed; 73 }; 74 75 // On failure, `result.stdout|stderr|all` should contain the currently buffered stream 76 const getBufferedData = async (stream, streamPromise) => { 77 // When `buffer` is `false`, `streamPromise` is `undefined` and there is no buffered data to retrieve 78 if (!stream || streamPromise === undefined) { 79 return; 80 } 81 82 stream.destroy(); 83 84 try { 85 return await streamPromise; 86 } catch (error) { 87 return error.bufferedData; 88 } 89 }; 90 91 const getStreamPromise = (stream, {encoding, buffer, maxBuffer}) => { 92 if (!stream || !buffer) { 93 return; 94 } 95 96 if (encoding) { 97 return getStream(stream, {encoding, maxBuffer}); 98 } 99 100 return getStream.buffer(stream, {maxBuffer}); 101 }; 102 103 // Retrieve result of child process: exit code, signal, error, streams (stdout/stderr/all) 104 export const getSpawnedResult = async ({stdout, stderr, all}, {encoding, buffer, maxBuffer}, processDone) => { 105 const stdoutPromise = getStreamPromise(stdout, {encoding, buffer, maxBuffer}); 106 const stderrPromise = getStreamPromise(stderr, {encoding, buffer, maxBuffer}); 107 const allPromise = getStreamPromise(all, {encoding, buffer, maxBuffer: maxBuffer * 2}); 108 109 try { 110 return await Promise.all([processDone, stdoutPromise, stderrPromise, allPromise]); 111 } catch (error) { 112 return Promise.all([ 113 {error, signal: error.signal, timedOut: error.timedOut}, 114 getBufferedData(stdout, stdoutPromise), 115 getBufferedData(stderr, stderrPromise), 116 getBufferedData(all, allPromise), 117 ]); 118 } 119 };