main.js (6858B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Readable = require( 'readable-stream' ).Readable; 24 var isError = require( '@stdlib/assert/is-error' ); 25 var isString = require( '@stdlib/assert/is-string' ).isPrimitive; 26 var isBuffer = require( '@stdlib/assert/is-buffer' ); 27 var isUint8Array = require( '@stdlib/assert/is-uint8array' ); 28 var arraybuffer2buffer = require( '@stdlib/buffer/from-arraybuffer' ); 29 var string2buffer = require( '@stdlib/buffer/from-string' ); 30 var Buffer = require( '@stdlib/buffer/ctor' ); // TODO: replace Buffer.concat usage with stdlib pkg 31 var copy = require( '@stdlib/utils/copy' ); 32 var inherit = require( '@stdlib/utils/inherit' ); 33 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 34 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 35 var nextTick = require( '@stdlib/utils/next-tick' ); 36 var DEFAULTS = require( './defaults.json' ); 37 var validate = require( './validate.js' ); 38 var debug = require( './debug.js' ); 39 40 41 // FUNCTIONS // 42 43 /** 44 * Implements the `_read` method. 45 * 46 * @private 47 * @param {number} size - number (of bytes) to read 48 * @returns {void} 49 */ 50 function read() { 51 /* eslint-disable no-invalid-this */ 52 var FLG; 53 if ( this._destroyed ) { 54 return; 55 } 56 FLG = true; 57 while ( FLG ) { 58 this._i += 1; 59 if ( this._i > this._iter ) { 60 debug( 'Finished iteration.' ); 61 return this.push( null ); 62 } 63 debug( 'Value: %s. Iter: %d.', this._value, this._i ); 64 if ( this._objectMode === false && this._i > 1 ) { 65 FLG = this.push( Buffer.concat( [ this._sep, this._value ] ) ); 66 } else { 67 FLG = this.push( this._value ); 68 } 69 } 70 71 /* eslint-enable no-invalid-this */ 72 } 73 74 /** 75 * Gracefully destroys a stream, providing backward compatibility. 76 * 77 * @private 78 * @param {(string|Object|Error)} [error] - error 79 * @returns {ConstantStream} Stream instance 80 */ 81 function destroy( error ) { 82 /* eslint-disable no-invalid-this */ 83 var self; 84 if ( this._destroyed ) { 85 debug( 'Attempted to destroy an already destroyed stream.' ); 86 return this; 87 } 88 self = this; 89 this._destroyed = true; 90 91 nextTick( close ); 92 93 return this; 94 95 /** 96 * Closes a stream. 97 * 98 * @private 99 */ 100 function close() { 101 if ( error ) { 102 debug( 'Stream was destroyed due to an error. Error: %s.', ( isError( error ) ) ? error.message : JSON.stringify( error ) ); 103 self.emit( 'error', error ); 104 } 105 debug( 'Closing the stream...' ); 106 self.emit( 'close' ); 107 } 108 109 /* eslint-enable no-invalid-this */ 110 } 111 112 113 // MAIN // 114 115 /** 116 * Stream constructor for generating a stream which always streams the same value. 117 * 118 * @constructor 119 * @param {(string|Buffer|Uint8Array)} value - value to stream 120 * @param {Options} [options] - stream options 121 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 122 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to strings 123 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the maximum number of bytes to store in an internal buffer before pausing streaming 124 * @param {string} [options.sep='\n'] - separator used to join streamed data 125 * @param {NonNegativeInteger} [options.iter] - number of iterations 126 * @throws {TypeError} in binary mode, value to stream must be a string, `Buffer`, or `Uint8Array` 127 * @throws {TypeError} options argument must be an object 128 * @throws {TypeError} must provide valid options 129 * @returns {ConstantStream} Stream instance 130 * 131 * @example 132 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 133 * 134 * function log( chunk ) { 135 * console.log( chunk.toString() ); 136 * } 137 * 138 * var opts = { 139 * 'iter': 10 140 * }; 141 * 142 * var stream = new ConstantStream( 'beep', opts ); 143 * 144 * stream.pipe( inspectStream( log ) ); 145 */ 146 function ConstantStream( value, options ) { 147 var opts; 148 var err; 149 if ( !( this instanceof ConstantStream ) ) { 150 if ( arguments.length > 1 ) { 151 return new ConstantStream( value, options ); 152 } 153 return new ConstantStream( value ); 154 } 155 opts = copy( DEFAULTS ); 156 if ( arguments.length > 1 ) { 157 err = validate( opts, options ); 158 if ( err ) { 159 throw err; 160 } 161 } 162 if ( opts.objectMode === false ) { 163 if ( isString( value ) ) { 164 value = string2buffer( value ); 165 } else if ( isBuffer( value ) ) { // NOTE: order matters here. We want the `isBuffer` check BEFORE the `isUint8Array` check!! 166 // Nothing to do, as value is already a buffer... 167 } else if ( isUint8Array( value ) ) { 168 // Convert to a `Buffer` object to provide backward compatibility with older Node.js versions... 169 value = arraybuffer2buffer( value.buffer, value.byteOffset, value.length ); // eslint-disable-line max-len 170 } else { 171 throw new TypeError( 'invalid argument. In binary mode, a provided value must be a string, Buffer, or Uint8Array. Value: `' + value + '`.' ); 172 } 173 } 174 // Make the stream a readable stream: 175 debug( 'Creating a readable stream configured with the following options: %s.', JSON.stringify( opts ) ); 176 Readable.call( this, opts ); 177 178 // Destruction state: 179 setNonEnumerable( this, '_destroyed', false ); 180 181 // Cache whether the stream is operating in object mode: 182 setNonEnumerableReadOnly( this, '_objectMode', opts.objectMode ); 183 184 // Cache the separator: 185 setNonEnumerableReadOnly( this, '_sep', string2buffer( opts.sep ) ); 186 187 // Cache the total number of iterations: 188 setNonEnumerableReadOnly( this, '_iter', opts.iter ); 189 190 // Cache the value to stream: 191 setNonEnumerableReadOnly( this, '_value', value ); 192 193 // Initialize an iteration counter: 194 setNonEnumerable( this, '_i', 0 ); 195 196 return this; 197 } 198 199 /* 200 * Inherit from the `Readable` prototype. 201 */ 202 inherit( ConstantStream, Readable ); 203 204 /** 205 * Implements the `_read` method. 206 * 207 * @private 208 * @name _read 209 * @memberof ConstantStream.prototype 210 * @type {Function} 211 * @param {number} size - number (of bytes) to read 212 * @returns {void} 213 */ 214 setNonEnumerableReadOnly( ConstantStream.prototype, '_read', read ); 215 216 /** 217 * Gracefully destroys a stream, providing backward compatibility. 218 * 219 * @name destroy 220 * @memberof ConstantStream.prototype 221 * @type {Function} 222 * @param {(string|Object|Error)} [error] - error 223 * @returns {ConstantStream} Stream instance 224 */ 225 setNonEnumerableReadOnly( ConstantStream.prototype, 'destroy', destroy ); 226 227 228 // EXPORTS // 229 230 module.exports = ConstantStream;