main.js (5986B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Writable = require( 'readable-stream' ).Writable; 24 var isFunction = require( '@stdlib/assert/is-function' ); 25 var copy = require( '@stdlib/utils/copy' ); 26 var inherit = require( '@stdlib/utils/inherit' ); 27 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 28 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 29 var nextTick = require( '@stdlib/utils/next-tick' ); 30 var DEFAULTS = require( './defaults.json' ); 31 var validate = require( './validate.js' ); 32 var debug = require( './debug.js' ); 33 34 35 // FUNCTIONS // 36 37 /** 38 * Implements the `_write` method. 39 * 40 * @private 41 * @param {(Uint8Array|Buffer|string)} chunk - streamed chunk 42 * @param {string} encoding - Buffer encoding 43 * @param {Callback} clbk - callback to invoke after processing the streamed chunk 44 * @returns {void} 45 */ 46 function write( chunk, encoding, clbk ) { 47 /* eslint-disable no-invalid-this */ 48 this._idx += 1; 49 debug( 'Received a new chunk. Chunk: %s. Encoding: %s. Index: %d.', chunk.toString(), encoding, this._idx ); 50 this._inspect.call( null, chunk, this._idx ); 51 52 // Once destroyed, ensure we do not block the event-loop when an upstream stream pipeline is behaving synchronously (otherwise, by continuing to process stream data synchronously, we'll continue receiving data and the stream will never be destroyed)... 53 if ( this._destroyed ) { 54 return nextTick( clbk ); 55 } 56 clbk(); 57 58 /* eslint-enable no-invalid-this */ 59 } 60 61 /** 62 * Gracefully destroys a stream, providing backward compatibility. 63 * 64 * @private 65 * @param {Object} [error] - optional error message 66 * @returns {InspectSinkStream} stream instance 67 */ 68 function destroy( error ) { 69 /* eslint-disable no-invalid-this */ 70 var self; 71 if ( this._destroyed ) { 72 debug( 'Attempted to destroy an already destroyed stream.' ); 73 return this; 74 } 75 self = this; 76 this._destroyed = true; 77 78 nextTick( close ); 79 80 return this; 81 82 /** 83 * Closes a stream. 84 * 85 * @private 86 */ 87 function close() { 88 if ( error ) { 89 debug( 'Stream was destroyed due to an error. Error: %s.', JSON.stringify( error ) ); 90 self.emit( 'error', error ); 91 } 92 debug( 'Closing the stream...' ); 93 self.emit( 'close' ); 94 } 95 96 /* eslint-enable no-invalid-this */ 97 } 98 99 100 // MAIN // 101 102 /** 103 * Inspect stream constructor. 104 * 105 * @constructor 106 * @param {Options} [options] - stream options 107 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 108 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the `Buffer` level for when `write()` starts returning `false` 109 * @param {boolean} [options.decodeStrings=true] - specifies whether to encode strings as `Buffer` objects before writing data to a returned stream 110 * @param {string} [options.defaultEncoding='utf8'] - default encoding when not explicitly specified when writing data 111 * @param {Callback} clbk - callback to invoke upon receiving data 112 * @throws {TypeError} options argument must be an object 113 * @throws {TypeError} must provide valid options 114 * @throws {TypeError} must provide a callback function 115 * @returns {InspectSinkStream} inspect stream 116 * 117 * @example 118 * function log( chunk, idx ) { 119 * console.log( 'index: %d', idx ); 120 * console.log( chunk ); 121 * } 122 * 123 * var stream = new InspectSinkStream( log ); 124 * 125 * stream.write( 'a' ); 126 * stream.write( 'b' ); 127 * stream.write( 'c' ); 128 * 129 * stream.end(); 130 * 131 * // prints: index: 0 132 * // prints: a 133 * // prints: index: 1 134 * // prints: b 135 * // prints: index: 2 136 * // prints: c 137 */ 138 function InspectSinkStream( options, clbk ) { 139 var inspect; 140 var opts; 141 var err; 142 if ( !(this instanceof InspectSinkStream) ) { 143 if ( arguments.length > 1 ) { 144 return new InspectSinkStream( options, clbk ); 145 } 146 return new InspectSinkStream( options ); 147 } 148 opts = copy( DEFAULTS ); 149 if ( arguments.length > 1 ) { 150 inspect = clbk; 151 err = validate( opts, options ); 152 if ( err ) { 153 throw err; 154 } 155 } else { 156 inspect = options; 157 } 158 if ( !isFunction( inspect ) ) { 159 throw new TypeError( 'invalid argument. Callback argument must be a function. Value: `' + inspect + '`.' ); 160 } 161 // Make the stream a Writable stream: 162 debug( 'Creating a writable stream configured with the following options: %s.', JSON.stringify( opts ) ); 163 Writable.call( this, opts ); 164 165 // The destruction state: 166 setNonEnumerable( this, '_destroyed', false ); 167 168 // Initialize a chunk counter: 169 setNonEnumerable( this, '_idx', -1 ); 170 171 // Cache a reference to the inspect callback: 172 setNonEnumerableReadOnly( this, '_inspect', inspect ); 173 174 return this; 175 } 176 177 /* 178 * Inherit from the `Writable` prototype. 179 */ 180 inherit( InspectSinkStream, Writable ); 181 182 /** 183 * Implements the `_write` method. 184 * 185 * @private 186 * @name _write 187 * @memberof InspectSinkStream.prototype 188 * @type {Function} 189 * @param {(Buffer|string)} chunk - streamed chunk 190 * @param {string} encoding - Buffer encoding 191 * @param {Callback} clbk - callback to invoke after processing the streamed chunk 192 */ 193 setNonEnumerableReadOnly( InspectSinkStream.prototype, '_write', write ); 194 195 /** 196 * Gracefully destroys a stream, providing backward compatibility. 197 * 198 * @name destroy 199 * @memberof InspectSinkStream.prototype 200 * @type {Function} 201 * @param {Object} [error] - optional error message 202 * @returns {InspectSinkStream} stream instance 203 */ 204 setNonEnumerableReadOnly( InspectSinkStream.prototype, 'destroy', destroy ); 205 206 207 // EXPORTS // 208 209 module.exports = InspectSinkStream;