main.js (8524B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Readable = require( 'readable-stream' ).Readable; 24 var isCollection = require( '@stdlib/assert/is-collection' ); 25 var isError = require( '@stdlib/assert/is-error' ); 26 var isBuffer = require( '@stdlib/assert/is-buffer' ); 27 var isInteger = require( '@stdlib/assert/is-integer' ).isPrimitive; 28 var isNonNegativeInteger = require( '@stdlib/assert/is-nonnegative-integer' ).isPrimitive; 29 var copy = require( '@stdlib/utils/copy' ); 30 var inherit = require( '@stdlib/utils/inherit' ); 31 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 32 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 33 var string2buffer = require( '@stdlib/buffer/from-string' ); 34 var Buffer = require( '@stdlib/buffer/ctor' ); // TODO: replace Buffer.concat usage with stdlib pkg 35 var nextTick = require( '@stdlib/utils/next-tick' ); 36 var DEFAULTS = require( './defaults.json' ); 37 var validate = require( './validate.js' ); 38 var debug = require( './debug.js' ); 39 40 41 // FUNCTIONS // 42 43 /** 44 * Implements the `_read` method. 45 * 46 * @private 47 * @param {number} size - number (of bytes) to read 48 * @returns {void} 49 */ 50 function read() { 51 /* eslint-disable no-invalid-this */ 52 var FLG; 53 var err; 54 var v; 55 56 if ( this._destroyed ) { 57 return; 58 } 59 FLG = true; 60 while ( FLG ) { 61 err = null; 62 63 this._i += 1; 64 if ( this._i > this._N ) { 65 debug( 'Finished iteration.' ); 66 return this.push( null ); 67 } 68 v = this._buffer[ this._idx ]; 69 debug( 'Value: %s. Idx: %d. Iter: %d.', JSON.stringify( v ), this._idx, this._i ); 70 71 this._idx += this._stride; 72 if ( this._objectMode === false ) { 73 v = this._serialize( v ); 74 if ( typeof v === 'string' ) { 75 if ( this._i === 1 ) { 76 v = string2buffer( v ); 77 } else { 78 v = string2buffer( this._sep+v ); 79 } 80 } else if ( isBuffer( v ) ) { 81 if ( this._i > 1 ) { 82 v = Buffer.concat( [ string2buffer( this._sep ), v ] ); 83 } 84 } else { 85 err = new Error( 'invalid operation. Serialization function must return a string or Buffer. Value: `' + v + '`.' ); 86 } 87 } 88 if ( err ) { 89 this.emit( 'error', err ); 90 } else { 91 FLG = this.push( v ); 92 } 93 } 94 95 /* eslint-enable no-invalid-this */ 96 } 97 98 /** 99 * Gracefully destroys a stream, providing backward compatibility. 100 * 101 * @private 102 * @param {(string|Object|Error)} [error] - error 103 * @returns {StridedArrayStream} Stream instance 104 */ 105 function destroy( error ) { 106 /* eslint-disable no-invalid-this */ 107 var self; 108 if ( this._destroyed ) { 109 debug( 'Attempted to destroy an already destroyed stream.' ); 110 return this; 111 } 112 self = this; 113 this._destroyed = true; 114 115 nextTick( close ); 116 117 return this; 118 119 /** 120 * Closes a stream. 121 * 122 * @private 123 */ 124 function close() { 125 if ( error ) { 126 debug( 'Stream was destroyed due to an error. Error: %s.', ( isError( error ) ) ? error.message : JSON.stringify( error ) ); 127 self.emit( 'error', error ); 128 } 129 debug( 'Closing the stream...' ); 130 self.emit( 'close' ); 131 } 132 133 /* eslint-enable no-invalid-this */ 134 } 135 136 137 // MAIN // 138 139 /** 140 * Stream constructor for generating a readable stream from an array-like object. 141 * 142 * @constructor 143 * @param {NonNegativeInteger} N - number of values to stream 144 * @param {Collection} buffer - source value 145 * @param {integer} stride - stride length 146 * @param {NonNegativeInteger} offset - starting index 147 * @param {Options} [options] - stream options 148 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 149 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to strings 150 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the maximum number of bytes to store in an internal buffer before pausing streaming 151 * @param {string} [options.sep='\n'] - separator used to join streamed data 152 * @param {Function} [options.serialize] - custom serialization function 153 * @throws {TypeError} first argument must be a nonnegative integer 154 * @throws {TypeError} second argument must be an array-like object 155 * @throws {TypeError} third argument must be an integer 156 * @throws {TypeError} fourth argument must be a nonnegative integer 157 * @throws {RangeError} linear index cannot exceed array bounds 158 * @throws {TypeError} options argument must be an object 159 * @throws {TypeError} must provide valid options 160 * @returns {StridedArrayStream} Stream instance 161 * 162 * @example 163 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 164 * var Float64Array = require( '@stdlib/array/float64' ); 165 * var randu = require( '@stdlib/random/base/randu' ); 166 * 167 * function log( chunk ) { 168 * console.log( chunk.toString() ); 169 * } 170 * 171 * var arr = new Float64Array( 10 ); 172 * var i; 173 * for ( i = 0; i < arr.length; i++ ) { 174 * arr[ i ] = randu(); 175 * } 176 * 177 * var stream = new StridedArrayStream( arr.length, arr, 1, 0 ); 178 * 179 * stream.pipe( inspectStream( log ) ); 180 */ 181 function StridedArrayStream( N, buffer, stride, offset, options ) { 182 var opts; 183 var err; 184 var i; 185 if ( !( this instanceof StridedArrayStream ) ) { 186 if ( arguments.length > 4 ) { 187 return new StridedArrayStream( N, buffer, stride, offset, options ); 188 } 189 return new StridedArrayStream( N, buffer, stride, offset ); 190 } 191 if ( !isNonNegativeInteger( N ) ) { 192 throw new TypeError( 'invalid argument. First argument must be a nonnegative integer. Value: `' + N + '`.' ); 193 } 194 if ( !isCollection( buffer ) ) { 195 throw new TypeError( 'invalid argument. Second argument must be an array-like object. Value: `' + buffer + '`.' ); 196 } 197 if ( !isInteger( stride ) ) { 198 throw new TypeError( 'invalid argument. Third argument must be an integer. Value: `' + stride + '`.' ); 199 } 200 if ( !isNonNegativeInteger( offset ) ) { 201 throw new TypeError( 'invalid argument. Fourth argument must be a nonnegative integer. Value: `' + offset + '`.' ); 202 } 203 if ( N > 0 ) { 204 i = offset + ((N-1)*stride); 205 if ( offset >= buffer.length || i < 0 || i >= buffer.length ) { 206 throw new RangeError( 'invalid arguments. Strided array parameters are incompatible with the provided array-like object. Linear index exceeds array bounds.' ); 207 } 208 } 209 opts = copy( DEFAULTS ); 210 if ( arguments.length > 4 ) { 211 err = validate( opts, options ); 212 if ( err ) { 213 throw err; 214 } 215 } 216 // Make the stream a readable stream: 217 debug( 'Creating a readable stream configured with the following options: %s.', JSON.stringify( opts ) ); 218 Readable.call( this, opts ); 219 220 // Destruction state: 221 setNonEnumerable( this, '_destroyed', false ); 222 223 // Cache whether the stream is operating in object mode: 224 setNonEnumerableReadOnly( this, '_objectMode', opts.objectMode ); 225 226 // Cache the separator: 227 setNonEnumerableReadOnly( this, '_sep', opts.sep ); 228 229 // Define the serialization function: 230 setNonEnumerableReadOnly( this, '_serialize', opts.serialize || JSON.stringify ); 231 232 // Cache the data source: 233 setNonEnumerableReadOnly( this, '_buffer', buffer ); 234 235 // Cache the strided array parameters: 236 setNonEnumerableReadOnly( this, '_N', N ); 237 setNonEnumerableReadOnly( this, '_stride', stride ); 238 setNonEnumerableReadOnly( this, '_offset', offset ); 239 setNonEnumerable( this, '_idx', offset ); 240 241 // Initialize an iteration counter: 242 setNonEnumerable( this, '_i', 0 ); 243 244 return this; 245 } 246 247 /* 248 * Inherit from the `Readable` prototype. 249 */ 250 inherit( StridedArrayStream, Readable ); 251 252 /** 253 * Implements the `_read` method. 254 * 255 * @private 256 * @name _read 257 * @memberof StridedArrayStream.prototype 258 * @type {Function} 259 * @param {number} size - number (of bytes) to read 260 * @returns {void} 261 */ 262 setNonEnumerableReadOnly( StridedArrayStream.prototype, '_read', read ); 263 264 /** 265 * Gracefully destroys a stream, providing backward compatibility. 266 * 267 * @name destroy 268 * @memberof StridedArrayStream.prototype 269 * @type {Function} 270 * @param {(string|Object|Error)} [error] - error 271 * @returns {StridedArrayStream} Stream instance 272 */ 273 setNonEnumerableReadOnly( StridedArrayStream.prototype, 'destroy', destroy ); 274 275 276 // EXPORTS // 277 278 module.exports = StridedArrayStream;