main.js (7053B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Readable = require( 'readable-stream' ).Readable; 24 var isCollection = require( '@stdlib/assert/is-collection' ); 25 var isError = require( '@stdlib/assert/is-error' ); 26 var isBuffer = require( '@stdlib/assert/is-buffer' ); 27 var copy = require( '@stdlib/utils/copy' ); 28 var inherit = require( '@stdlib/utils/inherit' ); 29 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 30 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 31 var string2buffer = require( '@stdlib/buffer/from-string' ); 32 var Buffer = require( '@stdlib/buffer/ctor' ); // TODO: replace Buffer.concat usage with stdlib pkg 33 var nextTick = require( '@stdlib/utils/next-tick' ); 34 var DEFAULTS = require( './defaults.json' ); 35 var validate = require( './validate.js' ); 36 var debug = require( './debug.js' ); 37 38 39 // FUNCTIONS // 40 41 /** 42 * Implements the `_read` method. 43 * 44 * @private 45 * @param {number} size - number (of bytes) to read 46 * @returns {void} 47 */ 48 function read() { 49 /* eslint-disable no-invalid-this */ 50 var FLG; 51 var err; 52 var v; 53 54 if ( this._destroyed ) { 55 return; 56 } 57 FLG = true; 58 while ( FLG ) { 59 err = null; 60 61 this._i += 1; 62 if ( this._i > this._src.length ) { 63 debug( 'Finished iteration.' ); 64 return this.push( null ); 65 } 66 v = this._src[ this._idx ]; 67 debug( 'Value: %s. Idx: %d. Iter: %d.', JSON.stringify( v ), this._idx, this._i ); 68 69 this._idx += this._stride; 70 if ( this._objectMode === false ) { 71 v = this._serialize( v ); 72 if ( typeof v === 'string' ) { 73 if ( this._i === 1 ) { 74 v = string2buffer( v ); 75 } else { 76 v = string2buffer( this._sep+v ); 77 } 78 } else if ( isBuffer( v ) ) { 79 if ( this._i > 1 ) { 80 v = Buffer.concat( [ string2buffer( this._sep ), v ] ); 81 } 82 } else { 83 err = new Error( 'invalid operation. Serialization function must return a string or Buffer. Value: `' + v + '`.' ); 84 } 85 } 86 if ( err ) { 87 this.emit( 'error', err ); 88 } else { 89 FLG = this.push( v ); 90 } 91 } 92 93 /* eslint-enable no-invalid-this */ 94 } 95 96 /** 97 * Gracefully destroys a stream, providing backward compatibility. 98 * 99 * @private 100 * @param {(string|Object|Error)} [error] - error 101 * @returns {ArrayStream} Stream instance 102 */ 103 function destroy( error ) { 104 /* eslint-disable no-invalid-this */ 105 var self; 106 if ( this._destroyed ) { 107 debug( 'Attempted to destroy an already destroyed stream.' ); 108 return this; 109 } 110 self = this; 111 this._destroyed = true; 112 113 nextTick( close ); 114 115 return this; 116 117 /** 118 * Closes a stream. 119 * 120 * @private 121 */ 122 function close() { 123 if ( error ) { 124 debug( 'Stream was destroyed due to an error. Error: %s.', ( isError( error ) ) ? error.message : JSON.stringify( error ) ); 125 self.emit( 'error', error ); 126 } 127 debug( 'Closing the stream...' ); 128 self.emit( 'close' ); 129 } 130 131 /* eslint-enable no-invalid-this */ 132 } 133 134 135 // MAIN // 136 137 /** 138 * Stream constructor for generating a readable stream from an array-like object. 139 * 140 * @constructor 141 * @param {Collection} src - source array-like object 142 * @param {Options} [options] - stream options 143 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 144 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to strings 145 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the maximum number of bytes to store in an internal buffer before pausing streaming 146 * @param {string} [options.sep='\n'] - separator used to join streamed data 147 * @param {Function} [options.serialize] - custom serialization function 148 * @param {integer} [options.dir=1] - iteration direction 149 * @throws {TypeError} first argument must be an array-like object 150 * @throws {TypeError} options argument must be an object 151 * @throws {TypeError} must provide valid options 152 * @returns {ArrayStream} Stream instance 153 * 154 * @example 155 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 156 * var Float64Array = require( '@stdlib/array/float64' ); 157 * var randu = require( '@stdlib/random/base/randu' ); 158 * 159 * function log( chunk ) { 160 * console.log( chunk.toString() ); 161 * } 162 * 163 * var arr = new Float64Array( 10 ); 164 * var i; 165 * for ( i = 0; i < arr.length; i++ ) { 166 * arr[ i ] = randu(); 167 * } 168 * 169 * var stream = new ArrayStream( arr ); 170 * 171 * stream.pipe( inspectStream( log ) ); 172 */ 173 function ArrayStream( src, options ) { 174 var opts; 175 var err; 176 if ( !( this instanceof ArrayStream ) ) { 177 if ( arguments.length > 1 ) { 178 return new ArrayStream( src, options ); 179 } 180 return new ArrayStream( src ); 181 } 182 if ( !isCollection( src ) ) { 183 throw new TypeError( 'invalid argument. First argument must be an array-like object. Value: `' + src + '`.' ); 184 } 185 opts = copy( DEFAULTS ); 186 if ( arguments.length > 1 ) { 187 err = validate( opts, options ); 188 if ( err ) { 189 throw err; 190 } 191 } 192 // Make the stream a readable stream: 193 debug( 'Creating a readable stream configured with the following options: %s.', JSON.stringify( opts ) ); 194 Readable.call( this, opts ); 195 196 // Destruction state: 197 setNonEnumerable( this, '_destroyed', false ); 198 199 // Cache whether the stream is operating in object mode: 200 setNonEnumerableReadOnly( this, '_objectMode', opts.objectMode ); 201 202 // Cache the separator: 203 setNonEnumerableReadOnly( this, '_sep', opts.sep ); 204 205 // Define the serialization function: 206 setNonEnumerableReadOnly( this, '_serialize', opts.serialize || JSON.stringify ); 207 208 // Cache the data source: 209 setNonEnumerableReadOnly( this, '_src', src ); 210 211 // Cache the array "stride": 212 setNonEnumerableReadOnly( this, '_stride', opts.dir ); 213 214 // Initialize an iteration counter: 215 setNonEnumerable( this, '_i', 0 ); 216 217 // Initialize the source index (pointer): 218 setNonEnumerable( this, '_idx', ( opts.dir === 1 ) ? 0 : src.length-1 ); 219 220 return this; 221 } 222 223 /* 224 * Inherit from the `Readable` prototype. 225 */ 226 inherit( ArrayStream, Readable ); 227 228 /** 229 * Implements the `_read` method. 230 * 231 * @private 232 * @name _read 233 * @memberof ArrayStream.prototype 234 * @type {Function} 235 * @param {number} size - number (of bytes) to read 236 * @returns {void} 237 */ 238 setNonEnumerableReadOnly( ArrayStream.prototype, '_read', read ); 239 240 /** 241 * Gracefully destroys a stream, providing backward compatibility. 242 * 243 * @name destroy 244 * @memberof ArrayStream.prototype 245 * @type {Function} 246 * @param {(string|Object|Error)} [error] - error 247 * @returns {ArrayStream} Stream instance 248 */ 249 setNonEnumerableReadOnly( ArrayStream.prototype, 'destroy', destroy ); 250 251 252 // EXPORTS // 253 254 module.exports = ArrayStream;