main.js (7867B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Transform = require( 'readable-stream' ).Transform; 24 var copy = require( '@stdlib/utils/copy' ); 25 var inherit = require( '@stdlib/utils/inherit' ); 26 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 27 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 28 var Buffer = require( '@stdlib/buffer/ctor' ); 29 var nextTick = require( '@stdlib/utils/next-tick' ); 30 var DEFAULTS = require( './defaults.json' ); 31 var validate = require( './validate.js' ); 32 var debug = require( './debug.js' ); 33 var decode = require( './decode.js' ); 34 35 36 // VARIABLES // 37 38 var RE = /\r?\n/; 39 40 41 // FUNCTIONS // 42 43 /** 44 * Implements the `_transform` method. 45 * 46 * @private 47 * @param {(Uint8Array|Buffer|string)} chunk - streamed chunk 48 * @param {string} encoding - Buffer encoding 49 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk 50 */ 51 function transform( chunk, encoding, clbk ) { 52 /* eslint-disable no-invalid-this */ 53 var splits; 54 var split; 55 var len; 56 var i; 57 58 this._idx += 1; 59 debug( 'Received a new chunk. Chunk: %s. Encoding: %s. Index: %d.', chunk.toString(), encoding, this._idx ); 60 if ( encoding === 'buffer' ) { 61 // Default Buffer decoding is 'utf8': 62 chunk = chunk.toString(); 63 } 64 else if ( encoding !== 'utf8' ) { 65 // Decode the chunk as a 'utf8' string... 66 chunk = new Buffer( chunk, encoding ); // eslint-disable-line no-buffer-constructor 67 chunk = chunk.toString( 'utf8' ); 68 } 69 // Split the chunk: 70 splits = chunk.split( this._sep ); 71 debug( 'Splits: %s. Index: %d.', JSON.stringify( splits ), this._idx ); 72 73 // How many splits do we have? We do not count the last "split", as it may be incomplete... 74 len = splits.length - 1; 75 debug( '%s splits. Index: %d.', len, this._idx ); 76 77 // If we do not have any splits, add the chunk to the buffer and wait for more data... 78 if ( len === 0 ) { 79 debug( 'No splits. Index: %d.', this._idx ); 80 this._buffer += splits[ len ]; 81 } 82 // If we have split data, concat any previous partial split, re-decode (if need be) each split according to its original encoding, push each split to the destination, and set anything leftover as the new split buffer... 83 else { 84 debug( 'Processing splits. Index: %d.', this._index ); 85 split = this._buffer + splits[ 0 ]; 86 split = decode( split, encoding ); 87 88 debug( 'Split %d: %s. Index: %d.', 0, split.toString(), this._idx ); 89 this.push( split, encoding ); 90 for ( i = 1; i < len; i++ ) { 91 split = decode( splits[ i ], encoding ); 92 debug( 'Split %d: %s. Index: %d.', i, split.toString(), this._idx ); 93 this.push( split, encoding ); 94 } 95 debug( 'Remaining split: %s.', splits[len].toString() ); 96 this._buffer = splits[ len ]; 97 } 98 clbk(); 99 100 /* eslint-enable no-invalid-this */ 101 } 102 103 /** 104 * Implements the `_flush` method. 105 * 106 * @private 107 * @param {Callback} clbk - callback to invoke after any final processing 108 */ 109 function flush( clbk ) { 110 /* eslint-disable no-invalid-this */ 111 var split = this._buffer; 112 if ( split ) { 113 debug( 'Processing final split...' ); 114 split = decode( split, this._encoding ); 115 this.push( split, this._encoding ); 116 } 117 debug( 'Flushing the stream...' ); 118 clbk(); 119 120 /* eslint-enable no-invalid-this */ 121 } 122 123 /** 124 * Gracefully destroys a stream, providing backward compatibility. 125 * 126 * @private 127 * @param {Object} [error] - optional error message 128 * @returns {SplitStream} Stream instance 129 */ 130 function destroy( error ) { 131 /* eslint-disable no-invalid-this */ 132 var self; 133 if ( this._destroyed ) { 134 debug( 'Attempted to destroy an already destroyed stream.' ); 135 return this; 136 } 137 self = this; 138 this._destroyed = true; 139 140 nextTick( close ); 141 142 return this; 143 144 /** 145 * Closes a stream. 146 * 147 * @private 148 */ 149 function close() { 150 if ( error ) { 151 debug( 'Stream was destroyed due to an error. Error: %s.', JSON.stringify( error ) ); 152 self.emit( 'error', error ); 153 } 154 debug( 'Closing the stream...' ); 155 self.emit( 'close' ); 156 } 157 158 /* eslint-enable no-invalid-this */ 159 } 160 161 162 // MAIN // 163 164 /** 165 * Split stream constructor. 166 * 167 * @constructor 168 * @param {Options} [options] - stream options 169 * @param {(string|RegExp)} [options.sep=/\r?\n/] - separator used to split streamed data 170 * @param {boolean} [options.objectMode=false] - specifies whether a stream should operate in object mode 171 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to `strings` 172 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the `Buffer` level for when `write()` starts returning `false` 173 * @param {boolean} [options.allowHalfOpen=false] - specifies whether the stream should remain open even if one side ends 174 * @param {boolean} [options.writableObjectMode=false] - specifies whether the writable side should be in object mode 175 * @returns {SplitStream} split stream 176 * 177 * @example 178 * var stream = new SplitStream(); 179 * 180 * stream.write( '1\n2\n3' ); 181 * stream.end(); 182 */ 183 function SplitStream( options ) { 184 var opts; 185 var err; 186 if ( !( this instanceof SplitStream ) ) { 187 if ( arguments.length ) { 188 return new SplitStream( options ); 189 } 190 return new SplitStream(); 191 } 192 opts = copy( DEFAULTS ); 193 if ( arguments.length ) { 194 err = validate( opts, options ); 195 if ( err ) { 196 throw err; 197 } 198 } 199 // The stream's readable state should always be in object mode to prevent split data from being buffered (concatenated) and no longer being separated... 200 opts.readableObjectMode = true; 201 202 // The stream converts each chunk into a string so no need to encode strings written to the split stream as Buffer objects: 203 opts.decodeStrings = false; 204 205 // Make the stream a Transform stream: 206 debug( 'Creating a transform stream configured with the following options: %s.', JSON.stringify( opts ) ); 207 Transform.call( this, opts ); 208 209 // Cache the separator: 210 setNonEnumerableReadOnly( this, '_sep', ( opts.sep === null ) ? RE : opts.sep ); 211 212 // The destruction state: 213 setNonEnumerable( this, '_destroyed', false ); 214 215 // Cache the encoding: 216 setNonEnumerableReadOnly( this, '_encoding', opts.encoding ); 217 218 // Buffer for storing partial splits: 219 setNonEnumerable( this, '_buffer', '' ); 220 221 // Chunk counter: 222 setNonEnumerable( this, '_idx', -1 ); 223 224 return this; 225 } 226 227 /* 228 * Inherit from the `Transform` prototype. 229 */ 230 inherit( SplitStream, Transform ); 231 232 /** 233 * Implements the `_transform` method. 234 * 235 * @private 236 * @name _transform 237 * @memberof SplitStream.prototype 238 * @type {Function} 239 * @param {(Buffer|string)} chunk - streamed chunk 240 * @param {string} encoding - Buffer encoding 241 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk 242 */ 243 setNonEnumerableReadOnly( SplitStream.prototype, '_transform', transform ); 244 245 /** 246 * Implements the `_flush` method. 247 * 248 * @private 249 * @name _flush 250 * @memberof SplitStream.prototype 251 * @type {Function} 252 * @param {Callback} clbk - callback to invoke after any final processing 253 */ 254 setNonEnumerableReadOnly( SplitStream.prototype, '_flush', flush ); 255 256 /** 257 * Gracefully destroys a stream, providing backward compatibility. 258 * 259 * @name destroy 260 * @memberof SplitStream.prototype 261 * @type {Function} 262 * @param {Object} [error] - optional error message 263 * @returns {SplitStream} Stream instance 264 */ 265 setNonEnumerableReadOnly( SplitStream.prototype, 'destroy', destroy ); 266 267 268 // EXPORTS // 269 270 module.exports = SplitStream;