main.js (7505B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Readable = require( 'readable-stream' ).Readable; 24 var hasOwnProp = require( '@stdlib/assert/has-own-property' ); 25 var isIteratorLike = require( '@stdlib/assert/is-iterator-like' ); 26 var isError = require( '@stdlib/assert/is-error' ); 27 var isBuffer = require( '@stdlib/assert/is-buffer' ); 28 var copy = require( '@stdlib/utils/copy' ); 29 var inherit = require( '@stdlib/utils/inherit' ); 30 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 31 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 32 var string2buffer = require( '@stdlib/buffer/from-string' ); 33 var Buffer = require( '@stdlib/buffer/ctor' ); // TODO: replace Buffer.concat usage with stdlib pkg 34 var nextTick = require( '@stdlib/utils/next-tick' ); 35 var DEFAULTS = require( './defaults.json' ); 36 var validate = require( './validate.js' ); 37 var debug = require( './debug.js' ); 38 39 40 // FUNCTIONS // 41 42 /** 43 * Implements the `_read` method. 44 * 45 * @private 46 * @param {number} size - number (of bytes) to read 47 * @returns {void} 48 */ 49 function read() { 50 /* eslint-disable no-invalid-this */ 51 var FLG; 52 var err; 53 var v; 54 55 if ( this._destroyed ) { 56 return; 57 } 58 FLG = true; 59 while ( FLG ) { 60 v = this._iterator.next(); 61 this._i += 1; 62 if ( v.done ) { 63 if ( hasOwnProp( v, 'value' ) ) { 64 v = v.value; 65 debug( 'Value: %s. Iter: %d.', JSON.stringify( v ), this._i ); 66 if ( this._objectMode === false ) { 67 v = this._serialize( v ); 68 if ( typeof v === 'string' ) { 69 if ( this._i === 1 ) { // eslint-disable-line max-depth 70 v = string2buffer( v ); 71 } else { 72 v = string2buffer( this._sep+v ); 73 } 74 this.push( v ); 75 } else if ( isBuffer( v ) ) { 76 if ( this._i > 1 ) { // eslint-disable-line max-depth 77 v = Buffer.concat( [ string2buffer( this._sep ), v ] ); // eslint-disable-line max-len 78 } 79 this.push( v ); 80 } else { 81 err = new Error( 'invalid operation. Serialization function must return a string or Buffer. Value: `' + v + '`.' ); 82 this.emit( 'error', err ); 83 } 84 } 85 } 86 debug( 'Finished iteration.' ); 87 return this.push( null ); 88 } 89 v = v.value; 90 debug( 'Value: %s. Iter: %d.', JSON.stringify( v ), this._i ); 91 92 if ( this._objectMode === false ) { 93 v = this._serialize( v ); 94 if ( typeof v === 'string' ) { 95 if ( this._i === 1 ) { 96 v = string2buffer( v ); 97 } else { 98 v = string2buffer( this._sep+v ); 99 } 100 } else if ( isBuffer( v ) ) { 101 if ( this._i > 1 ) { 102 v = Buffer.concat( [ string2buffer( this._sep ), v ] ); 103 } 104 } else { 105 err = new Error( 'invalid operation. Serialization function must return a string or Buffer. Value: `' + v + '`.' ); 106 this.emit( 'error', err ); 107 continue; 108 } 109 } 110 FLG = this.push( v ); 111 } 112 113 /* eslint-enable no-invalid-this */ 114 } 115 116 /** 117 * Gracefully destroys a stream, providing backward compatibility. 118 * 119 * @private 120 * @param {(string|Object|Error)} [error] - error 121 * @returns {IteratorStream} Stream instance 122 */ 123 function destroy( error ) { 124 /* eslint-disable no-invalid-this */ 125 var self; 126 if ( this._destroyed ) { 127 debug( 'Attempted to destroy an already destroyed stream.' ); 128 return this; 129 } 130 self = this; 131 this._destroyed = true; 132 133 nextTick( close ); 134 135 return this; 136 137 /** 138 * Closes a stream. 139 * 140 * @private 141 */ 142 function close() { 143 if ( error ) { 144 debug( 'Stream was destroyed due to an error. Error: %s.', ( isError( error ) ) ? error.message : JSON.stringify( error ) ); 145 self.emit( 'error', error ); 146 } 147 debug( 'Closing the stream...' ); 148 self.emit( 'close' ); 149 } 150 151 /* eslint-enable no-invalid-this */ 152 } 153 154 155 // MAIN // 156 157 /** 158 * Stream constructor for generating a readable stream from an iterator. 159 * 160 * @constructor 161 * @param {Iterator} iterator - source iterator 162 * @param {Options} [options] - stream options 163 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 164 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to strings 165 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the maximum number of bytes to store in an internal buffer before pausing iteration 166 * @param {string} [options.sep='\n'] - separator used to join streamed data 167 * @param {Function} [options.serialize] - custom serialization function 168 * @throws {TypeError} first argument must be an iterator 169 * @throws {TypeError} options argument must be an object 170 * @throws {TypeError} must provide valid options 171 * @returns {IteratorStream} Stream instance 172 * 173 * @example 174 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 175 * var randu = require( '@stdlib/random/iter/randu' ); 176 * 177 * function log( chunk ) { 178 * console.log( chunk.toString() ); 179 * } 180 * 181 * var opts = { 182 * 'iter': 10 183 * }; 184 * 185 * var stream = new IteratorStream( randu( opts ) ); 186 * 187 * stream.pipe( inspectStream( log ) ); 188 */ 189 function IteratorStream( iterator, options ) { 190 var opts; 191 var err; 192 if ( !( this instanceof IteratorStream ) ) { 193 if ( arguments.length > 1 ) { 194 return new IteratorStream( iterator, options ); 195 } 196 return new IteratorStream( iterator ); 197 } 198 if ( !isIteratorLike( iterator ) ) { 199 throw new TypeError( 'invalid argument. First argument must be an iterator. Value: `' + iterator + '`.' ); 200 } 201 opts = copy( DEFAULTS ); 202 if ( arguments.length > 1 ) { 203 err = validate( opts, options ); 204 if ( err ) { 205 throw err; 206 } 207 } 208 // Make the stream a readable stream: 209 debug( 'Creating a readable stream configured with the following options: %s.', JSON.stringify( opts ) ); 210 Readable.call( this, opts ); 211 212 // Destruction state: 213 setNonEnumerable( this, '_destroyed', false ); 214 215 // Cache whether the stream is operating in object mode: 216 setNonEnumerableReadOnly( this, '_objectMode', opts.objectMode ); 217 218 // Cache the separator: 219 setNonEnumerableReadOnly( this, '_sep', opts.sep ); 220 221 // Define the serialization function: 222 setNonEnumerableReadOnly( this, '_serialize', opts.serialize || JSON.stringify ); 223 224 // Cache the iterator: 225 setNonEnumerableReadOnly( this, '_iterator', iterator ); 226 227 // Initialize an iteration counter: 228 setNonEnumerable( this, '_i', 0 ); 229 230 return this; 231 } 232 233 /* 234 * Inherit from the `Readable` prototype. 235 */ 236 inherit( IteratorStream, Readable ); 237 238 /** 239 * Implements the `_read` method. 240 * 241 * @private 242 * @name _read 243 * @memberof IteratorStream.prototype 244 * @type {Function} 245 * @param {number} size - number (of bytes) to read 246 * @returns {void} 247 */ 248 setNonEnumerableReadOnly( IteratorStream.prototype, '_read', read ); 249 250 /** 251 * Gracefully destroys a stream, providing backward compatibility. 252 * 253 * @name destroy 254 * @memberof IteratorStream.prototype 255 * @type {Function} 256 * @param {(string|Object|Error)} [error] - error 257 * @returns {IteratorStream} Stream instance 258 */ 259 setNonEnumerableReadOnly( IteratorStream.prototype, 'destroy', destroy ); 260 261 262 // EXPORTS // 263 264 module.exports = IteratorStream;