time-to-botec

Benchmark sampling in different programming languages
Log | Files | Refs | README

main.js (7867B)


      1 /**
      2 * @license Apache-2.0
      3 *
      4 * Copyright (c) 2018 The Stdlib Authors.
      5 *
      6 * Licensed under the Apache License, Version 2.0 (the "License");
      7 * you may not use this file except in compliance with the License.
      8 * You may obtain a copy of the License at
      9 *
     10 *    http://www.apache.org/licenses/LICENSE-2.0
     11 *
     12 * Unless required by applicable law or agreed to in writing, software
     13 * distributed under the License is distributed on an "AS IS" BASIS,
     14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15 * See the License for the specific language governing permissions and
     16 * limitations under the License.
     17 */
     18 
     19 'use strict';
     20 
     21 // MODULES //
     22 
     23 var Transform = require( 'readable-stream' ).Transform;
     24 var copy = require( '@stdlib/utils/copy' );
     25 var inherit = require( '@stdlib/utils/inherit' );
     26 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' );
     27 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' );
     28 var Buffer = require( '@stdlib/buffer/ctor' );
     29 var nextTick = require( '@stdlib/utils/next-tick' );
     30 var DEFAULTS = require( './defaults.json' );
     31 var validate = require( './validate.js' );
     32 var debug = require( './debug.js' );
     33 var decode = require( './decode.js' );
     34 
     35 
     36 // VARIABLES //
     37 
     38 var RE = /\r?\n/;
     39 
     40 
     41 // FUNCTIONS //
     42 
     43 /**
     44 * Implements the `_transform` method.
     45 *
     46 * @private
     47 * @param {(Uint8Array|Buffer|string)} chunk - streamed chunk
     48 * @param {string} encoding - Buffer encoding
     49 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk
     50 */
     51 function transform( chunk, encoding, clbk ) {
     52 	/* eslint-disable no-invalid-this */
     53 	var splits;
     54 	var split;
     55 	var len;
     56 	var i;
     57 
     58 	this._idx += 1;
     59 	debug( 'Received a new chunk. Chunk: %s. Encoding: %s. Index: %d.', chunk.toString(), encoding, this._idx );
     60 	if ( encoding === 'buffer' ) {
     61 		// Default Buffer decoding is 'utf8':
     62 		chunk = chunk.toString();
     63 	}
     64 	else if ( encoding !== 'utf8' ) {
     65 		// Decode the chunk as a 'utf8' string...
     66 		chunk = new Buffer( chunk, encoding ); // eslint-disable-line no-buffer-constructor
     67 		chunk = chunk.toString( 'utf8' );
     68 	}
     69 	// Split the chunk:
     70 	splits = chunk.split( this._sep );
     71 	debug( 'Splits: %s. Index: %d.', JSON.stringify( splits ), this._idx );
     72 
     73 	// How many splits do we have? We do not count the last "split", as it may be incomplete...
     74 	len = splits.length - 1;
     75 	debug( '%s splits. Index: %d.', len, this._idx );
     76 
     77 	// If we do not have any splits, add the chunk to the buffer and wait for more data...
     78 	if ( len === 0 ) {
     79 		debug( 'No splits. Index: %d.', this._idx );
     80 		this._buffer += splits[ len ];
     81 	}
     82 	// If we have split data, concat any previous partial split, re-decode (if need be) each split according to its original encoding, push each split to the destination, and set anything leftover as the new split buffer...
     83 	else {
     84 		debug( 'Processing splits. Index: %d.', this._index );
     85 		split = this._buffer + splits[ 0 ];
     86 		split = decode( split, encoding );
     87 
     88 		debug( 'Split %d: %s. Index: %d.', 0, split.toString(), this._idx );
     89 		this.push( split, encoding );
     90 		for ( i = 1; i < len; i++ ) {
     91 			split = decode( splits[ i ], encoding );
     92 			debug( 'Split %d: %s. Index: %d.', i, split.toString(), this._idx );
     93 			this.push( split, encoding );
     94 		}
     95 		debug( 'Remaining split: %s.', splits[len].toString() );
     96 		this._buffer = splits[ len ];
     97 	}
     98 	clbk();
     99 
    100 	/* eslint-enable no-invalid-this */
    101 }
    102 
    103 /**
    104 * Implements the `_flush` method.
    105 *
    106 * @private
    107 * @param {Callback} clbk - callback to invoke after any final processing
    108 */
    109 function flush( clbk ) {
    110 	/* eslint-disable no-invalid-this */
    111 	var split = this._buffer;
    112 	if ( split ) {
    113 		debug( 'Processing final split...' );
    114 		split = decode( split, this._encoding );
    115 		this.push( split, this._encoding );
    116 	}
    117 	debug( 'Flushing the stream...' );
    118 	clbk();
    119 
    120 	/* eslint-enable no-invalid-this */
    121 }
    122 
    123 /**
    124 * Gracefully destroys a stream, providing backward compatibility.
    125 *
    126 * @private
    127 * @param {Object} [error] - optional error message
    128 * @returns {SplitStream} Stream instance
    129 */
    130 function destroy( error ) {
    131 	/* eslint-disable no-invalid-this */
    132 	var self;
    133 	if ( this._destroyed ) {
    134 		debug( 'Attempted to destroy an already destroyed stream.' );
    135 		return this;
    136 	}
    137 	self = this;
    138 	this._destroyed = true;
    139 
    140 	nextTick( close );
    141 
    142 	return this;
    143 
    144 	/**
    145 	* Closes a stream.
    146 	*
    147 	* @private
    148 	*/
    149 	function close() {
    150 		if ( error ) {
    151 			debug( 'Stream was destroyed due to an error. Error: %s.', JSON.stringify( error ) );
    152 			self.emit( 'error', error );
    153 		}
    154 		debug( 'Closing the stream...' );
    155 		self.emit( 'close' );
    156 	}
    157 
    158 	/* eslint-enable no-invalid-this */
    159 }
    160 
    161 
    162 // MAIN //
    163 
    164 /**
    165 * Split stream constructor.
    166 *
    167 * @constructor
    168 * @param {Options} [options] - stream options
    169 * @param {(string|RegExp)} [options.sep=/\r?\n/] - separator used to split streamed data
    170 * @param {boolean} [options.objectMode=false] - specifies whether a stream should operate in object mode
    171 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to `strings`
    172 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the `Buffer` level for when `write()` starts returning `false`
    173 * @param {boolean} [options.allowHalfOpen=false] - specifies whether the stream should remain open even if one side ends
    174 * @param {boolean} [options.writableObjectMode=false] - specifies whether the writable side should be in object mode
    175 * @returns {SplitStream} split stream
    176 *
    177 * @example
    178 * var stream = new SplitStream();
    179 *
    180 * stream.write( '1\n2\n3' );
    181 * stream.end();
    182 */
    183 function SplitStream( options ) {
    184 	var opts;
    185 	var err;
    186 	if ( !( this instanceof SplitStream ) ) {
    187 		if ( arguments.length ) {
    188 			return new SplitStream( options );
    189 		}
    190 		return new SplitStream();
    191 	}
    192 	opts = copy( DEFAULTS );
    193 	if ( arguments.length ) {
    194 		err = validate( opts, options );
    195 		if ( err ) {
    196 			throw err;
    197 		}
    198 	}
    199 	// The stream's readable state should always be in object mode to prevent split data from being buffered (concatenated) and no longer being separated...
    200 	opts.readableObjectMode = true;
    201 
    202 	// The stream converts each chunk into a string so no need to encode strings written to the split stream as Buffer objects:
    203 	opts.decodeStrings = false;
    204 
    205 	// Make the stream a Transform stream:
    206 	debug( 'Creating a transform stream configured with the following options: %s.', JSON.stringify( opts ) );
    207 	Transform.call( this, opts );
    208 
    209 	// Cache the separator:
    210 	setNonEnumerableReadOnly( this, '_sep', ( opts.sep === null ) ? RE : opts.sep );
    211 
    212 	// The destruction state:
    213 	setNonEnumerable( this, '_destroyed', false );
    214 
    215 	// Cache the encoding:
    216 	setNonEnumerableReadOnly( this, '_encoding', opts.encoding );
    217 
    218 	// Buffer for storing partial splits:
    219 	setNonEnumerable( this, '_buffer', '' );
    220 
    221 	// Chunk counter:
    222 	setNonEnumerable( this, '_idx', -1 );
    223 
    224 	return this;
    225 }
    226 
    227 /*
    228 * Inherit from the `Transform` prototype.
    229 */
    230 inherit( SplitStream, Transform );
    231 
    232 /**
    233 * Implements the `_transform` method.
    234 *
    235 * @private
    236 * @name _transform
    237 * @memberof SplitStream.prototype
    238 * @type {Function}
    239 * @param {(Buffer|string)} chunk - streamed chunk
    240 * @param {string} encoding - Buffer encoding
    241 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk
    242 */
    243 setNonEnumerableReadOnly( SplitStream.prototype, '_transform', transform );
    244 
    245 /**
    246 * Implements the `_flush` method.
    247 *
    248 * @private
    249 * @name _flush
    250 * @memberof SplitStream.prototype
    251 * @type {Function}
    252 * @param {Callback} clbk - callback to invoke after any final processing
    253 */
    254 setNonEnumerableReadOnly( SplitStream.prototype, '_flush', flush );
    255 
    256 /**
    257 * Gracefully destroys a stream, providing backward compatibility.
    258 *
    259 * @name destroy
    260 * @memberof SplitStream.prototype
    261 * @type {Function}
    262 * @param {Object} [error] - optional error message
    263 * @returns {SplitStream} Stream instance
    264 */
    265 setNonEnumerableReadOnly( SplitStream.prototype, 'destroy', destroy );
    266 
    267 
    268 // EXPORTS //
    269 
    270 module.exports = SplitStream;