_stream_transform.js (7742B)
1 // Copyright Joyent, Inc. and other Node contributors. 2 // 3 // Permission is hereby granted, free of charge, to any person obtaining a 4 // copy of this software and associated documentation files (the 5 // "Software"), to deal in the Software without restriction, including 6 // without limitation the rights to use, copy, modify, merge, publish, 7 // distribute, sublicense, and/or sell copies of the Software, and to permit 8 // persons to whom the Software is furnished to do so, subject to the 9 // following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included 12 // in all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 // USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22 // a transform stream is a readable/writable stream where you do 23 // something with the data. Sometimes it's called a "filter", 24 // but that's not a great name for it, since that implies a thing where 25 // some bits pass through, and others are simply ignored. (That would 26 // be a valid example of a transform, of course.) 27 // 28 // While the output is causally related to the input, it's not a 29 // necessarily symmetric or synchronous transformation. For example, 30 // a zlib stream might take multiple plain-text writes(), and then 31 // emit a single compressed chunk some time in the future. 32 // 33 // Here's how this works: 34 // 35 // The Transform stream has all the aspects of the readable and writable 36 // stream classes. When you write(chunk), that calls _write(chunk,cb) 37 // internally, and returns false if there's a lot of pending writes 38 // buffered up. When you call read(), that calls _read(n) until 39 // there's enough pending readable data buffered up. 40 // 41 // In a transform stream, the written data is placed in a buffer. When 42 // _read(n) is called, it transforms the queued up data, calling the 43 // buffered _write cb's as it consumes chunks. If consuming a single 44 // written chunk would result in multiple output chunks, then the first 45 // outputted bit calls the readcb, and subsequent chunks just go into 46 // the read buffer, and will cause it to emit 'readable' if necessary. 47 // 48 // This way, back-pressure is actually determined by the reading side, 49 // since _read has to be called to start processing a new chunk. However, 50 // a pathological inflate type of transform can cause excessive buffering 51 // here. For example, imagine a stream where every byte of input is 52 // interpreted as an integer from 0-255, and then results in that many 53 // bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 54 // 1kb of data being output. In this case, you could write a very small 55 // amount of input, and end up with a very large amount of output. In 56 // such a pathological inflating mechanism, there'd be no way to tell 57 // the system to stop doing the transform. A single 4MB write could 58 // cause the system to run out of memory. 59 // 60 // However, even in such a pathological case, only a single written chunk 61 // would be consumed, and then the rest would wait (un-transformed) until 62 // the results of the previous transformed chunk were consumed. 63 64 'use strict'; 65 66 module.exports = Transform; 67 68 var Duplex = require('./_stream_duplex'); 69 70 /*<replacement>*/ 71 var util = Object.create(require('core-util-is')); 72 util.inherits = require('inherits'); 73 /*</replacement>*/ 74 75 util.inherits(Transform, Duplex); 76 77 function afterTransform(er, data) { 78 var ts = this._transformState; 79 ts.transforming = false; 80 81 var cb = ts.writecb; 82 83 if (!cb) { 84 return this.emit('error', new Error('write callback called multiple times')); 85 } 86 87 ts.writechunk = null; 88 ts.writecb = null; 89 90 if (data != null) // single equals check for both `null` and `undefined` 91 this.push(data); 92 93 cb(er); 94 95 var rs = this._readableState; 96 rs.reading = false; 97 if (rs.needReadable || rs.length < rs.highWaterMark) { 98 this._read(rs.highWaterMark); 99 } 100 } 101 102 function Transform(options) { 103 if (!(this instanceof Transform)) return new Transform(options); 104 105 Duplex.call(this, options); 106 107 this._transformState = { 108 afterTransform: afterTransform.bind(this), 109 needTransform: false, 110 transforming: false, 111 writecb: null, 112 writechunk: null, 113 writeencoding: null 114 }; 115 116 // start out asking for a readable event once data is transformed. 117 this._readableState.needReadable = true; 118 119 // we have implemented the _read method, and done the other things 120 // that Readable wants before the first _read call, so unset the 121 // sync guard flag. 122 this._readableState.sync = false; 123 124 if (options) { 125 if (typeof options.transform === 'function') this._transform = options.transform; 126 127 if (typeof options.flush === 'function') this._flush = options.flush; 128 } 129 130 // When the writable side finishes, then flush out anything remaining. 131 this.on('prefinish', prefinish); 132 } 133 134 function prefinish() { 135 var _this = this; 136 137 if (typeof this._flush === 'function') { 138 this._flush(function (er, data) { 139 done(_this, er, data); 140 }); 141 } else { 142 done(this, null, null); 143 } 144 } 145 146 Transform.prototype.push = function (chunk, encoding) { 147 this._transformState.needTransform = false; 148 return Duplex.prototype.push.call(this, chunk, encoding); 149 }; 150 151 // This is the part where you do stuff! 152 // override this function in implementation classes. 153 // 'chunk' is an input chunk. 154 // 155 // Call `push(newChunk)` to pass along transformed output 156 // to the readable side. You may call 'push' zero or more times. 157 // 158 // Call `cb(err)` when you are done with this chunk. If you pass 159 // an error, then that'll put the hurt on the whole operation. If you 160 // never call cb(), then you'll never get another chunk. 161 Transform.prototype._transform = function (chunk, encoding, cb) { 162 throw new Error('_transform() is not implemented'); 163 }; 164 165 Transform.prototype._write = function (chunk, encoding, cb) { 166 var ts = this._transformState; 167 ts.writecb = cb; 168 ts.writechunk = chunk; 169 ts.writeencoding = encoding; 170 if (!ts.transforming) { 171 var rs = this._readableState; 172 if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark); 173 } 174 }; 175 176 // Doesn't matter what the args are here. 177 // _transform does all the work. 178 // That we got here means that the readable side wants more data. 179 Transform.prototype._read = function (n) { 180 var ts = this._transformState; 181 182 if (ts.writechunk !== null && ts.writecb && !ts.transforming) { 183 ts.transforming = true; 184 this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); 185 } else { 186 // mark that we need a transform, so that any data that comes in 187 // will get processed, now that we've asked for it. 188 ts.needTransform = true; 189 } 190 }; 191 192 Transform.prototype._destroy = function (err, cb) { 193 var _this2 = this; 194 195 Duplex.prototype._destroy.call(this, err, function (err2) { 196 cb(err2); 197 _this2.emit('close'); 198 }); 199 }; 200 201 function done(stream, er, data) { 202 if (er) return stream.emit('error', er); 203 204 if (data != null) // single equals check for both `null` and `undefined` 205 stream.push(data); 206 207 // if there's nothing in the write buffer, then that means 208 // that nothing more will ever be provided 209 if (stream._writableState.length) throw new Error('Calling transform done when ws.length != 0'); 210 211 if (stream._transformState.transforming) throw new Error('Calling transform done when still transforming'); 212 213 return stream.push(null); 214 }