main.js (7023B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var Transform = require( 'readable-stream' ).Transform; 24 var string2buffer = require( '@stdlib/buffer/from-string' ); 25 var copy = require( '@stdlib/utils/copy' ); 26 var inherit = require( '@stdlib/utils/inherit' ); 27 var Buffer = require( '@stdlib/buffer/ctor' ); 28 var setNonEnumerable = require( '@stdlib/utils/define-nonenumerable-property' ); 29 var setNonEnumerableReadOnly = require( '@stdlib/utils/define-nonenumerable-read-only-property' ); 30 var nextTick = require( '@stdlib/utils/next-tick' ); 31 var DEFAULTS = require( './defaults.json' ); 32 var validate = require( './validate.js' ); 33 var debug = require( './debug.js' ); 34 35 36 // FUNCTIONS // 37 38 /** 39 * Implements the `_transform` method. 40 * 41 * @private 42 * @param {(Uint8Array|Buffer|string)} chunk - streamed chunk 43 * @param {string} encoding - Buffer encoding 44 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk 45 */ 46 function transform( chunk, encoding, clbk ) { 47 /* eslint-disable no-invalid-this */ 48 var len; 49 50 this._idx += 1; 51 debug( 'Received a new chunk. Chunk: %s. Encoding: %s. Index: %d.', chunk.toString(), encoding, this._idx ); 52 if ( this._encoding === 'utf8' ) { 53 if ( this._init ) { 54 chunk = this._sep + chunk; 55 } else { 56 this._init = true; 57 } 58 } else if ( this._init ) { 59 chunk = new Buffer( chunk, encoding ); // eslint-disable-line no-buffer-constructor 60 len = this._sep.length + chunk.length; 61 chunk = Buffer.concat( [ this._sep, chunk ], len ); // TODO: replace with stdlib pkg 62 chunk = chunk.toString( this._encoding ); 63 } else { 64 this._init = true; 65 } 66 this.push( chunk, this._encoding ); 67 clbk(); 68 69 /* eslint-enable no-invalid-this */ 70 } 71 72 /** 73 * Implements the `_flush` method. 74 * 75 * @private 76 * @param {Callback} clbk - callback to invoke after performing flush tasks 77 */ 78 function flush( clbk ) { 79 debug( 'Flushing the stream...' ); 80 clbk(); // TODO: consider supporting an option to append a final separator 81 } 82 83 /** 84 * Gracefully destroys a stream, providing backward compatibility. 85 * 86 * @private 87 * @param {Object} [error] - optional error message 88 * @returns {JoinStream} Stream instance 89 */ 90 function destroy( error ) { 91 /* eslint-disable no-invalid-this */ 92 var self; 93 if ( this._destroyed ) { 94 debug( 'Attempted to destroy an already destroyed stream.' ); 95 return this; 96 } 97 self = this; 98 this._destroyed = true; 99 100 nextTick( close ); 101 102 return this; 103 104 /** 105 * Closes a stream. 106 * 107 * @private 108 */ 109 function close() { 110 if ( error ) { 111 debug( 'Stream was destroyed due to an error. Error: %s.', JSON.stringify( error ) ); 112 self.emit( 'error', error ); 113 } 114 debug( 'Closing the stream...' ); 115 self.emit( 'close' ); 116 } 117 118 /* eslint-enable no-invalid-this */ 119 } 120 121 122 // MAIN // 123 124 /** 125 * Join stream constructor. 126 * 127 * @constructor 128 * @param {Options} [options] - stream options 129 * @param {string} [options.sep='\n'] - separator used to join streamed data 130 * @param {boolean} [options.objectMode=false] - specifies whether the stream should operate in object mode 131 * @param {(string|null)} [options.encoding=null] - specifies how `Buffer` objects should be decoded to `strings` 132 * @param {NonNegativeNumber} [options.highWaterMark] - specifies the `Buffer` level for when `write()` starts returning `false` 133 * @param {boolean} [options.allowHalfOpen=false] - specifies whether the stream should remain open even if one side ends 134 * @param {boolean} [options.readableObjectMode=false] - specifies whether the readable side should be in object mode 135 * @throws {TypeError} options argument must be an object 136 * @throws {TypeError} must provide valid options 137 * @returns {JoinStream} join stream 138 * 139 * @example 140 * var inspectStream = require( '@stdlib/streams/node/inspect-sink' ); 141 * 142 * function log( chunk ) { 143 * console.log( chunk ); 144 * } 145 * 146 * var stream = new JoinStream(); 147 * 148 * stream.pipe( inspectStream( log ) ); 149 * 150 * stream.write( '1' ); 151 * stream.write( '2' ); 152 * stream.write( '3' ); 153 * 154 * stream.end(); 155 * 156 * // prints: '1\n2\n3' 157 */ 158 function JoinStream( options ) { 159 var opts; 160 var err; 161 if ( !( this instanceof JoinStream ) ) { 162 if ( arguments.length ) { 163 return new JoinStream( options ); 164 } 165 return new JoinStream(); 166 } 167 opts = copy( DEFAULTS ); 168 if ( arguments.length ) { 169 err = validate( opts, options ); 170 if ( err ) { 171 throw err; 172 } 173 } 174 // The stream's writable state should always be in object mode to prevent incoming data from being buffered (concatenated) and thus lose separation... 175 opts.writableObjectMode = true; 176 177 // The stream converts each chunk into a string so no need to encode strings written to the join stream as Buffer objects: 178 opts.decodeStrings = false; 179 180 // Make the stream a Transform stream: 181 debug( 'Creating a transform stream configured with the following options: %s.', JSON.stringify( opts ) ); 182 Transform.call( this, opts ); 183 184 // Destruction state: 185 setNonEnumerable( this, '_destroyed', false ); 186 187 // Cache the encoding: 188 setNonEnumerableReadOnly( this, '_encoding', ( opts.encoding === null ) ? 'utf8' : opts.encoding ); 189 190 // Cache the separator: 191 if ( this._encoding !== 'utf8' ) { 192 opts.sep = string2buffer( opts.sep, this._encoding ); 193 } 194 setNonEnumerableReadOnly( this, '_sep', opts.sep ); 195 196 // Flag indicating if the stream has received streamed data: 197 setNonEnumerable( this, '_init', false ); 198 199 // Chunk counter: 200 setNonEnumerable( this, '_idx', -1 ); 201 202 return this; 203 } 204 205 /* 206 * Inherit from the `Transform` prototype. 207 */ 208 inherit( JoinStream, Transform ); 209 210 /** 211 * Implements the `_transform` method. 212 * 213 * @private 214 * @name _transform 215 * @memberof JoinStream.prototype 216 * @type {Function} 217 * @param {(Buffer|string)} chunk - streamed chunk 218 * @param {string} encoding - Buffer encoding 219 * @param {Callback} clbk - callback to invoke after transforming the streamed chunk 220 */ 221 setNonEnumerableReadOnly( JoinStream.prototype, '_transform', transform ); 222 223 /** 224 * Implements the `_flush` method. 225 * 226 * @private 227 * @name _flush 228 * @memberof JoinStream.prototype 229 * @type {Function} 230 * @param {Callback} clbk - callback to invoke after performing flush tasks 231 */ 232 setNonEnumerableReadOnly( JoinStream.prototype, '_flush', flush ); 233 234 /** 235 * Gracefully destroys a stream, providing backward compatibility. 236 * 237 * @name destroy 238 * @memberof JoinStream.prototype 239 * @type {Function} 240 * @param {Object} [error] - optional error message 241 * @returns {JoinStream} Stream instance 242 */ 243 setNonEnumerableReadOnly( JoinStream.prototype, 'destroy', destroy ); 244 245 246 // EXPORTS // 247 248 module.exports = JoinStream;