main.js (7651B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Readable = require( 'readable-stream' ).Readable; 24 var isCollection = require( '@stdlib/assert/is-collection' ); 25 var isError = require( '@stdlib/assert/is-error' ); 26 var isBuffer = require( '@stdlib/assert/is-buffer' ); 27 var copy = require( '@stdlib/utils/copy' ); 28 var inherit = require( '@stdlib/utils/inherit' ); 29 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 30 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 31 var string2buffer = require( '@stdlib/buffer/from-string' ); 32 var Buffer = require( '@stdlib/buffer/ctor' ); // TODO: replace Buffer.concat usage with stdlib pkg 33 var nextTick = require( '@stdlib/utils/next-tick' ); 34 var DEFAULTS = require( './defaults.json' ); 35 var validate = require( './validate.js' ); 36 var debug = require( './debug.js' ); 37 38 39 // FUNCTIONS // 40 41 /** 42 * Implements the `_read` method. 43 * 44 * @private 45 * @param {number} size - number (of bytes) to read 46 * @returns {void} 47 */ 48 function read() { 49 /* eslint-disable no-invalid-this */ 50 var FLG; 51 var err; 52 var v; 53 54 if ( this._destroyed ) { 55 return; 56 } 57 FLG = true; 58 while ( FLG ) { 59 err = null; 60 61 this._i += 1; 62 if ( this._i > this._iter || this._src.length === 0 ) { 63 debug( 'Finished iteration.' ); 64 return this.push( null ); 65 } 66 this._idx += this._stride; 67 if ( this._stride > 0 && this._idx >= this._src.length ) { 68 this._idx %= this._src.length; 69 } else if ( this._stride < 0 && this._idx < 0 ) { 70 this._idx += this._src.length; 71 } 72 v = this._src[ this._idx ]; 73 debug( 'Value: %s. Idx: %d. Iter: %d.', JSON.stringify( v ), this._idx, this._i ); 74 75 if ( this._objectMode === false ) { 76 v = this._serialize( v ); 77 if ( typeof v === 'string' ) { 78 if ( this._i === 1 ) { 79 v = string2buffer( v ); 80 } else { 81 v = string2buffer( this._sep+v ); 82 } 83 } else if ( isBuffer( v ) ) { 84 if ( this._i > 1 ) { 85 v = Buffer.concat( [ string2buffer( this._sep ), v ] ); 86 } 87 } else { 88 err = new Error( 'invalid operation. Serialization function must return a string or Buffer. Value: `' + v + '`.' ); 89 } 90 } 91 if ( err ) { 92 this.emit( 'error', err ); 93 } else { 94 FLG = this.push( v ); 95 } 96 } 97 98 /* eslint-enable no-invalid-this */ 99 } 100 101 /** 102 * Gracefully destroys a stream, providing backward compatibility. 103 * 104 * @private 105 * @param {(string|Object|Error)} [error] - error 106 * @returns {CircularArrayStream} Stream instance 107 */ 108 function destroy( error ) { 109 /* eslint-disable no-invalid-this */ 110 var self; 111 if ( this._destroyed ) { 112 debug( 'Attempted to destroy an already destroyed stream.' ); 113 return this; 114 } 115 self = this; 116 this._destroyed = true; 117 118 nextTick( close ); 119 120 return this; 121 122 /** 123 * Closes a stream. 124 * 125 * @private 126 */ 127 function close() { 128 if ( error ) { 129 debug( 'Stream was destroyed due to an error. Error: %s.', ( isError( error ) ) ? error.message : JSON.stringify( error ) ); 130 self.emit( 'error', error ); 131 } 132 debug( 'Closing the stream...' ); 133 self.emit( 'close' ); 134 } 135 136 /* eslint-enable no-invalid-this */ 137 } 138 139 140 // MAIN // 141 142 /** 143 * Stream constructor for generating a readable stream from an array-like object which repeatedly iterates over a provided value's elements. 144 * 145 * @constructor 146 * @param {Collection} src - source array-like object 147 * @param {Options} [options] - stream options 148 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 149 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to strings 150 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the maximum number of bytes to store in an internal buffer before pausing streaming 151 * @param {string} [options.sep='\n'] - separator used to join streamed data 152 * @param {Function} [options.serialize] - custom serialization function 153 * @param {NonNegativeInteger} [options.iter=1e308] - number of iterations 154 * @param {integer} [options.dir=1] - iteration direction 155 * @throws {TypeError} first argument must be an array-like object 156 * @throws {TypeError} options argument must be an object 157 * @throws {TypeError} must provide valid options 158 * @returns {CircularArrayStream} Stream instance 159 * 160 * @example 161 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 162 * var Float64Array = require( '@stdlib/array/float64' ); 163 * var randu = require( '@stdlib/random/base/randu' ); 164 * 165 * function log( chunk ) { 166 * console.log( chunk.toString() ); 167 * } 168 * 169 * var arr = new Float64Array( 10 ); 170 * var i; 171 * for ( i = 0; i < arr.length; i++ ) { 172 * arr[ i ] = randu(); 173 * } 174 * 175 * var opts = { 176 * 'iter': arr.length * 2 177 * }; 178 * 179 * var stream = new CircularArrayStream( arr, opts ); 180 * 181 * stream.pipe( inspectStream( log ) ); 182 */ 183 function CircularArrayStream( src, options ) { 184 var opts; 185 var err; 186 if ( !( this instanceof CircularArrayStream ) ) { 187 if ( arguments.length > 1 ) { 188 return new CircularArrayStream( src, options ); 189 } 190 return new CircularArrayStream( src ); 191 } 192 if ( !isCollection( src ) ) { 193 throw new TypeError( 'invalid argument. First argument must be an array-like object. Value: `' + src + '`.' ); 194 } 195 opts = copy( DEFAULTS ); 196 if ( arguments.length > 1 ) { 197 err = validate( opts, options ); 198 if ( err ) { 199 throw err; 200 } 201 } 202 // Make the stream a readable stream: 203 debug( 'Creating a readable stream configured with the following options: %s.', JSON.stringify( opts ) ); 204 Readable.call( this, opts ); 205 206 // Destruction state: 207 setNonEnumerable( this, '_destroyed', false ); 208 209 // Cache whether the stream is operating in object mode: 210 setNonEnumerableReadOnly( this, '_objectMode', opts.objectMode ); 211 212 // Cache the separator: 213 setNonEnumerableReadOnly( this, '_sep', opts.sep ); 214 215 // Define the serialization function: 216 setNonEnumerableReadOnly( this, '_serialize', opts.serialize || JSON.stringify ); 217 218 // Cache the data source: 219 setNonEnumerableReadOnly( this, '_src', src ); 220 221 // Cache the array "stride": 222 setNonEnumerableReadOnly( this, '_stride', opts.dir ); 223 224 // Cache the number of iterations: 225 setNonEnumerableReadOnly( this, '_iter', opts.iter ); 226 227 // Initialize an iteration counter: 228 setNonEnumerable( this, '_i', 0 ); 229 230 // Initialize the source index (pointer): 231 setNonEnumerable( this, '_idx', ( opts.dir === 1 ) ? -1 : src.length ); 232 233 return this; 234 } 235 236 /* 237 * Inherit from the `Readable` prototype. 238 */ 239 inherit( CircularArrayStream, Readable ); 240 241 /** 242 * Implements the `_read` method. 243 * 244 * @private 245 * @name _read 246 * @memberof CircularArrayStream.prototype 247 * @type {Function} 248 * @param {number} size - number (of bytes) to read 249 * @returns {void} 250 */ 251 setNonEnumerableReadOnly( CircularArrayStream.prototype, '_read', read ); 252 253 /** 254 * Gracefully destroys a stream, providing backward compatibility. 255 * 256 * @name destroy 257 * @memberof CircularArrayStream.prototype 258 * @type {Function} 259 * @param {(string|Object|Error)} [error] - error 260 * @returns {CircularArrayStream} Stream instance 261 */ 262 setNonEnumerableReadOnly( CircularArrayStream.prototype, 'destroy', destroy ); 263 264 265 // EXPORTS // 266 267 module.exports = CircularArrayStream;