_stream_readable.js (31324B)
1 // Copyright Joyent, Inc. and other Node contributors. 2 // 3 // Permission is hereby granted, free of charge, to any person obtaining a 4 // copy of this software and associated documentation files (the 5 // "Software"), to deal in the Software without restriction, including 6 // without limitation the rights to use, copy, modify, merge, publish, 7 // distribute, sublicense, and/or sell copies of the Software, and to permit 8 // persons to whom the Software is furnished to do so, subject to the 9 // following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included 12 // in all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 // USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22 'use strict'; 23 24 /*<replacement>*/ 25 26 var pna = require('process-nextick-args'); 27 /*</replacement>*/ 28 29 module.exports = Readable; 30 31 /*<replacement>*/ 32 var isArray = require('isarray'); 33 /*</replacement>*/ 34 35 /*<replacement>*/ 36 var Duplex; 37 /*</replacement>*/ 38 39 Readable.ReadableState = ReadableState; 40 41 /*<replacement>*/ 42 var EE = require('events').EventEmitter; 43 44 var EElistenerCount = function (emitter, type) { 45 return emitter.listeners(type).length; 46 }; 47 /*</replacement>*/ 48 49 /*<replacement>*/ 50 var Stream = require('./internal/streams/stream'); 51 /*</replacement>*/ 52 53 /*<replacement>*/ 54 55 var Buffer = require('safe-buffer').Buffer; 56 var OurUint8Array = global.Uint8Array || function () {}; 57 function _uint8ArrayToBuffer(chunk) { 58 return Buffer.from(chunk); 59 } 60 function _isUint8Array(obj) { 61 return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; 62 } 63 64 /*</replacement>*/ 65 66 /*<replacement>*/ 67 var util = Object.create(require('core-util-is')); 68 util.inherits = require('inherits'); 69 /*</replacement>*/ 70 71 /*<replacement>*/ 72 var debugUtil = require('util'); 73 var debug = void 0; 74 if (debugUtil && debugUtil.debuglog) { 75 debug = debugUtil.debuglog('stream'); 76 } else { 77 debug = function () {}; 78 } 79 /*</replacement>*/ 80 81 var BufferList = require('./internal/streams/BufferList'); 82 var destroyImpl = require('./internal/streams/destroy'); 83 var StringDecoder; 84 85 util.inherits(Readable, Stream); 86 87 var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; 88 89 function prependListener(emitter, event, fn) { 90 // Sadly this is not cacheable as some libraries bundle their own 91 // event emitter implementation with them. 92 if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); 93 94 // This is a hack to make sure that our error handler is attached before any 95 // userland ones. NEVER DO THIS. This is here only because this code needs 96 // to continue to work with older versions of Node.js that do not include 97 // the prependListener() method. The goal is to eventually remove this hack. 98 if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; 99 } 100 101 function ReadableState(options, stream) { 102 Duplex = Duplex || require('./_stream_duplex'); 103 104 options = options || {}; 105 106 // Duplex streams are both readable and writable, but share 107 // the same options object. 108 // However, some cases require setting options to different 109 // values for the readable and the writable sides of the duplex stream. 110 // These options can be provided separately as readableXXX and writableXXX. 111 var isDuplex = stream instanceof Duplex; 112 113 // object stream flag. Used to make read(n) ignore n and to 114 // make all the buffer merging and length checks go away 115 this.objectMode = !!options.objectMode; 116 117 if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; 118 119 // the point at which it stops calling _read() to fill the buffer 120 // Note: 0 is a valid value, means "don't call _read preemptively ever" 121 var hwm = options.highWaterMark; 122 var readableHwm = options.readableHighWaterMark; 123 var defaultHwm = this.objectMode ? 16 : 16 * 1024; 124 125 if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;else this.highWaterMark = defaultHwm; 126 127 // cast to ints. 128 this.highWaterMark = Math.floor(this.highWaterMark); 129 130 // A linked list is used to store data chunks instead of an array because the 131 // linked list can remove elements from the beginning faster than 132 // array.shift() 133 this.buffer = new BufferList(); 134 this.length = 0; 135 this.pipes = null; 136 this.pipesCount = 0; 137 this.flowing = null; 138 this.ended = false; 139 this.endEmitted = false; 140 this.reading = false; 141 142 // a flag to be able to tell if the event 'readable'/'data' is emitted 143 // immediately, or on a later tick. We set this to true at first, because 144 // any actions that shouldn't happen until "later" should generally also 145 // not happen before the first read call. 146 this.sync = true; 147 148 // whenever we return null, then we set a flag to say 149 // that we're awaiting a 'readable' event emission. 150 this.needReadable = false; 151 this.emittedReadable = false; 152 this.readableListening = false; 153 this.resumeScheduled = false; 154 155 // has it been destroyed 156 this.destroyed = false; 157 158 // Crypto is kind of old and crusty. Historically, its default string 159 // encoding is 'binary' so we have to make this configurable. 160 // Everything else in the universe uses 'utf8', though. 161 this.defaultEncoding = options.defaultEncoding || 'utf8'; 162 163 // the number of writers that are awaiting a drain event in .pipe()s 164 this.awaitDrain = 0; 165 166 // if true, a maybeReadMore has been scheduled 167 this.readingMore = false; 168 169 this.decoder = null; 170 this.encoding = null; 171 if (options.encoding) { 172 if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; 173 this.decoder = new StringDecoder(options.encoding); 174 this.encoding = options.encoding; 175 } 176 } 177 178 function Readable(options) { 179 Duplex = Duplex || require('./_stream_duplex'); 180 181 if (!(this instanceof Readable)) return new Readable(options); 182 183 this._readableState = new ReadableState(options, this); 184 185 // legacy 186 this.readable = true; 187 188 if (options) { 189 if (typeof options.read === 'function') this._read = options.read; 190 191 if (typeof options.destroy === 'function') this._destroy = options.destroy; 192 } 193 194 Stream.call(this); 195 } 196 197 Object.defineProperty(Readable.prototype, 'destroyed', { 198 get: function () { 199 if (this._readableState === undefined) { 200 return false; 201 } 202 return this._readableState.destroyed; 203 }, 204 set: function (value) { 205 // we ignore the value if the stream 206 // has not been initialized yet 207 if (!this._readableState) { 208 return; 209 } 210 211 // backward compatibility, the user is explicitly 212 // managing destroyed 213 this._readableState.destroyed = value; 214 } 215 }); 216 217 Readable.prototype.destroy = destroyImpl.destroy; 218 Readable.prototype._undestroy = destroyImpl.undestroy; 219 Readable.prototype._destroy = function (err, cb) { 220 this.push(null); 221 cb(err); 222 }; 223 224 // Manually shove something into the read() buffer. 225 // This returns true if the highWaterMark has not been hit yet, 226 // similar to how Writable.write() returns true if you should 227 // write() some more. 228 Readable.prototype.push = function (chunk, encoding) { 229 var state = this._readableState; 230 var skipChunkCheck; 231 232 if (!state.objectMode) { 233 if (typeof chunk === 'string') { 234 encoding = encoding || state.defaultEncoding; 235 if (encoding !== state.encoding) { 236 chunk = Buffer.from(chunk, encoding); 237 encoding = ''; 238 } 239 skipChunkCheck = true; 240 } 241 } else { 242 skipChunkCheck = true; 243 } 244 245 return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); 246 }; 247 248 // Unshift should *always* be something directly out of read() 249 Readable.prototype.unshift = function (chunk) { 250 return readableAddChunk(this, chunk, null, true, false); 251 }; 252 253 function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { 254 var state = stream._readableState; 255 if (chunk === null) { 256 state.reading = false; 257 onEofChunk(stream, state); 258 } else { 259 var er; 260 if (!skipChunkCheck) er = chunkInvalid(state, chunk); 261 if (er) { 262 stream.emit('error', er); 263 } else if (state.objectMode || chunk && chunk.length > 0) { 264 if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) { 265 chunk = _uint8ArrayToBuffer(chunk); 266 } 267 268 if (addToFront) { 269 if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event'));else addChunk(stream, state, chunk, true); 270 } else if (state.ended) { 271 stream.emit('error', new Error('stream.push() after EOF')); 272 } else { 273 state.reading = false; 274 if (state.decoder && !encoding) { 275 chunk = state.decoder.write(chunk); 276 if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false);else maybeReadMore(stream, state); 277 } else { 278 addChunk(stream, state, chunk, false); 279 } 280 } 281 } else if (!addToFront) { 282 state.reading = false; 283 } 284 } 285 286 return needMoreData(state); 287 } 288 289 function addChunk(stream, state, chunk, addToFront) { 290 if (state.flowing && state.length === 0 && !state.sync) { 291 stream.emit('data', chunk); 292 stream.read(0); 293 } else { 294 // update the buffer info. 295 state.length += state.objectMode ? 1 : chunk.length; 296 if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk); 297 298 if (state.needReadable) emitReadable(stream); 299 } 300 maybeReadMore(stream, state); 301 } 302 303 function chunkInvalid(state, chunk) { 304 var er; 305 if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { 306 er = new TypeError('Invalid non-string/buffer chunk'); 307 } 308 return er; 309 } 310 311 // if it's past the high water mark, we can push in some more. 312 // Also, if we have no data yet, we can stand some 313 // more bytes. This is to work around cases where hwm=0, 314 // such as the repl. Also, if the push() triggered a 315 // readable event, and the user called read(largeNumber) such that 316 // needReadable was set, then we ought to push more, so that another 317 // 'readable' event will be triggered. 318 function needMoreData(state) { 319 return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0); 320 } 321 322 Readable.prototype.isPaused = function () { 323 return this._readableState.flowing === false; 324 }; 325 326 // backwards compatibility. 327 Readable.prototype.setEncoding = function (enc) { 328 if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; 329 this._readableState.decoder = new StringDecoder(enc); 330 this._readableState.encoding = enc; 331 return this; 332 }; 333 334 // Don't raise the hwm > 8MB 335 var MAX_HWM = 0x800000; 336 function computeNewHighWaterMark(n) { 337 if (n >= MAX_HWM) { 338 n = MAX_HWM; 339 } else { 340 // Get the next highest power of 2 to prevent increasing hwm excessively in 341 // tiny amounts 342 n--; 343 n |= n >>> 1; 344 n |= n >>> 2; 345 n |= n >>> 4; 346 n |= n >>> 8; 347 n |= n >>> 16; 348 n++; 349 } 350 return n; 351 } 352 353 // This function is designed to be inlinable, so please take care when making 354 // changes to the function body. 355 function howMuchToRead(n, state) { 356 if (n <= 0 || state.length === 0 && state.ended) return 0; 357 if (state.objectMode) return 1; 358 if (n !== n) { 359 // Only flow one buffer at a time 360 if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length; 361 } 362 // If we're asking for more than the current hwm, then raise the hwm. 363 if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); 364 if (n <= state.length) return n; 365 // Don't have enough 366 if (!state.ended) { 367 state.needReadable = true; 368 return 0; 369 } 370 return state.length; 371 } 372 373 // you can override either this method, or the async _read(n) below. 374 Readable.prototype.read = function (n) { 375 debug('read', n); 376 n = parseInt(n, 10); 377 var state = this._readableState; 378 var nOrig = n; 379 380 if (n !== 0) state.emittedReadable = false; 381 382 // if we're doing read(0) to trigger a readable event, but we 383 // already have a bunch of data in the buffer, then just trigger 384 // the 'readable' event and move on. 385 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { 386 debug('read: emitReadable', state.length, state.ended); 387 if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this); 388 return null; 389 } 390 391 n = howMuchToRead(n, state); 392 393 // if we've ended, and we're now clear, then finish it up. 394 if (n === 0 && state.ended) { 395 if (state.length === 0) endReadable(this); 396 return null; 397 } 398 399 // All the actual chunk generation logic needs to be 400 // *below* the call to _read. The reason is that in certain 401 // synthetic stream cases, such as passthrough streams, _read 402 // may be a completely synchronous operation which may change 403 // the state of the read buffer, providing enough data when 404 // before there was *not* enough. 405 // 406 // So, the steps are: 407 // 1. Figure out what the state of things will be after we do 408 // a read from the buffer. 409 // 410 // 2. If that resulting state will trigger a _read, then call _read. 411 // Note that this may be asynchronous, or synchronous. Yes, it is 412 // deeply ugly to write APIs this way, but that still doesn't mean 413 // that the Readable class should behave improperly, as streams are 414 // designed to be sync/async agnostic. 415 // Take note if the _read call is sync or async (ie, if the read call 416 // has returned yet), so that we know whether or not it's safe to emit 417 // 'readable' etc. 418 // 419 // 3. Actually pull the requested chunks out of the buffer and return. 420 421 // if we need a readable event, then we need to do some reading. 422 var doRead = state.needReadable; 423 debug('need readable', doRead); 424 425 // if we currently have less than the highWaterMark, then also read some 426 if (state.length === 0 || state.length - n < state.highWaterMark) { 427 doRead = true; 428 debug('length less than watermark', doRead); 429 } 430 431 // however, if we've ended, then there's no point, and if we're already 432 // reading, then it's unnecessary. 433 if (state.ended || state.reading) { 434 doRead = false; 435 debug('reading or ended', doRead); 436 } else if (doRead) { 437 debug('do read'); 438 state.reading = true; 439 state.sync = true; 440 // if the length is currently zero, then we *need* a readable event. 441 if (state.length === 0) state.needReadable = true; 442 // call internal read method 443 this._read(state.highWaterMark); 444 state.sync = false; 445 // If _read pushed data synchronously, then `reading` will be false, 446 // and we need to re-evaluate how much data we can return to the user. 447 if (!state.reading) n = howMuchToRead(nOrig, state); 448 } 449 450 var ret; 451 if (n > 0) ret = fromList(n, state);else ret = null; 452 453 if (ret === null) { 454 state.needReadable = true; 455 n = 0; 456 } else { 457 state.length -= n; 458 } 459 460 if (state.length === 0) { 461 // If we have nothing in the buffer, then we want to know 462 // as soon as we *do* get something into the buffer. 463 if (!state.ended) state.needReadable = true; 464 465 // If we tried to read() past the EOF, then emit end on the next tick. 466 if (nOrig !== n && state.ended) endReadable(this); 467 } 468 469 if (ret !== null) this.emit('data', ret); 470 471 return ret; 472 }; 473 474 function onEofChunk(stream, state) { 475 if (state.ended) return; 476 if (state.decoder) { 477 var chunk = state.decoder.end(); 478 if (chunk && chunk.length) { 479 state.buffer.push(chunk); 480 state.length += state.objectMode ? 1 : chunk.length; 481 } 482 } 483 state.ended = true; 484 485 // emit 'readable' now to make sure it gets picked up. 486 emitReadable(stream); 487 } 488 489 // Don't emit readable right away in sync mode, because this can trigger 490 // another read() call => stack overflow. This way, it might trigger 491 // a nextTick recursion warning, but that's not so bad. 492 function emitReadable(stream) { 493 var state = stream._readableState; 494 state.needReadable = false; 495 if (!state.emittedReadable) { 496 debug('emitReadable', state.flowing); 497 state.emittedReadable = true; 498 if (state.sync) pna.nextTick(emitReadable_, stream);else emitReadable_(stream); 499 } 500 } 501 502 function emitReadable_(stream) { 503 debug('emit readable'); 504 stream.emit('readable'); 505 flow(stream); 506 } 507 508 // at this point, the user has presumably seen the 'readable' event, 509 // and called read() to consume some data. that may have triggered 510 // in turn another _read(n) call, in which case reading = true if 511 // it's in progress. 512 // However, if we're not ended, or reading, and the length < hwm, 513 // then go ahead and try to read some more preemptively. 514 function maybeReadMore(stream, state) { 515 if (!state.readingMore) { 516 state.readingMore = true; 517 pna.nextTick(maybeReadMore_, stream, state); 518 } 519 } 520 521 function maybeReadMore_(stream, state) { 522 var len = state.length; 523 while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) { 524 debug('maybeReadMore read 0'); 525 stream.read(0); 526 if (len === state.length) 527 // didn't get any data, stop spinning. 528 break;else len = state.length; 529 } 530 state.readingMore = false; 531 } 532 533 // abstract method. to be overridden in specific implementation classes. 534 // call cb(er, data) where data is <= n in length. 535 // for virtual (non-string, non-buffer) streams, "length" is somewhat 536 // arbitrary, and perhaps not very meaningful. 537 Readable.prototype._read = function (n) { 538 this.emit('error', new Error('_read() is not implemented')); 539 }; 540 541 Readable.prototype.pipe = function (dest, pipeOpts) { 542 var src = this; 543 var state = this._readableState; 544 545 switch (state.pipesCount) { 546 case 0: 547 state.pipes = dest; 548 break; 549 case 1: 550 state.pipes = [state.pipes, dest]; 551 break; 552 default: 553 state.pipes.push(dest); 554 break; 555 } 556 state.pipesCount += 1; 557 debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); 558 559 var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; 560 561 var endFn = doEnd ? onend : unpipe; 562 if (state.endEmitted) pna.nextTick(endFn);else src.once('end', endFn); 563 564 dest.on('unpipe', onunpipe); 565 function onunpipe(readable, unpipeInfo) { 566 debug('onunpipe'); 567 if (readable === src) { 568 if (unpipeInfo && unpipeInfo.hasUnpiped === false) { 569 unpipeInfo.hasUnpiped = true; 570 cleanup(); 571 } 572 } 573 } 574 575 function onend() { 576 debug('onend'); 577 dest.end(); 578 } 579 580 // when the dest drains, it reduces the awaitDrain counter 581 // on the source. This would be more elegant with a .once() 582 // handler in flow(), but adding and removing repeatedly is 583 // too slow. 584 var ondrain = pipeOnDrain(src); 585 dest.on('drain', ondrain); 586 587 var cleanedUp = false; 588 function cleanup() { 589 debug('cleanup'); 590 // cleanup event handlers once the pipe is broken 591 dest.removeListener('close', onclose); 592 dest.removeListener('finish', onfinish); 593 dest.removeListener('drain', ondrain); 594 dest.removeListener('error', onerror); 595 dest.removeListener('unpipe', onunpipe); 596 src.removeListener('end', onend); 597 src.removeListener('end', unpipe); 598 src.removeListener('data', ondata); 599 600 cleanedUp = true; 601 602 // if the reader is waiting for a drain event from this 603 // specific writer, then it would cause it to never start 604 // flowing again. 605 // So, if this is awaiting a drain, then we just call it now. 606 // If we don't know, then assume that we are waiting for one. 607 if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); 608 } 609 610 // If the user pushes more data while we're writing to dest then we'll end up 611 // in ondata again. However, we only want to increase awaitDrain once because 612 // dest will only emit one 'drain' event for the multiple writes. 613 // => Introduce a guard on increasing awaitDrain. 614 var increasedAwaitDrain = false; 615 src.on('data', ondata); 616 function ondata(chunk) { 617 debug('ondata'); 618 increasedAwaitDrain = false; 619 var ret = dest.write(chunk); 620 if (false === ret && !increasedAwaitDrain) { 621 // If the user unpiped during `dest.write()`, it is possible 622 // to get stuck in a permanently paused state if that write 623 // also returned false. 624 // => Check whether `dest` is still a piping destination. 625 if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) { 626 debug('false write response, pause', src._readableState.awaitDrain); 627 src._readableState.awaitDrain++; 628 increasedAwaitDrain = true; 629 } 630 src.pause(); 631 } 632 } 633 634 // if the dest has an error, then stop piping into it. 635 // however, don't suppress the throwing behavior for this. 636 function onerror(er) { 637 debug('onerror', er); 638 unpipe(); 639 dest.removeListener('error', onerror); 640 if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er); 641 } 642 643 // Make sure our error handler is attached before userland ones. 644 prependListener(dest, 'error', onerror); 645 646 // Both close and finish should trigger unpipe, but only once. 647 function onclose() { 648 dest.removeListener('finish', onfinish); 649 unpipe(); 650 } 651 dest.once('close', onclose); 652 function onfinish() { 653 debug('onfinish'); 654 dest.removeListener('close', onclose); 655 unpipe(); 656 } 657 dest.once('finish', onfinish); 658 659 function unpipe() { 660 debug('unpipe'); 661 src.unpipe(dest); 662 } 663 664 // tell the dest that it's being piped to 665 dest.emit('pipe', src); 666 667 // start the flow if it hasn't been started already. 668 if (!state.flowing) { 669 debug('pipe resume'); 670 src.resume(); 671 } 672 673 return dest; 674 }; 675 676 function pipeOnDrain(src) { 677 return function () { 678 var state = src._readableState; 679 debug('pipeOnDrain', state.awaitDrain); 680 if (state.awaitDrain) state.awaitDrain--; 681 if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) { 682 state.flowing = true; 683 flow(src); 684 } 685 }; 686 } 687 688 Readable.prototype.unpipe = function (dest) { 689 var state = this._readableState; 690 var unpipeInfo = { hasUnpiped: false }; 691 692 // if we're not piping anywhere, then do nothing. 693 if (state.pipesCount === 0) return this; 694 695 // just one destination. most common case. 696 if (state.pipesCount === 1) { 697 // passed in one, but it's not the right one. 698 if (dest && dest !== state.pipes) return this; 699 700 if (!dest) dest = state.pipes; 701 702 // got a match. 703 state.pipes = null; 704 state.pipesCount = 0; 705 state.flowing = false; 706 if (dest) dest.emit('unpipe', this, unpipeInfo); 707 return this; 708 } 709 710 // slow case. multiple pipe destinations. 711 712 if (!dest) { 713 // remove all. 714 var dests = state.pipes; 715 var len = state.pipesCount; 716 state.pipes = null; 717 state.pipesCount = 0; 718 state.flowing = false; 719 720 for (var i = 0; i < len; i++) { 721 dests[i].emit('unpipe', this, unpipeInfo); 722 }return this; 723 } 724 725 // try to find the right one. 726 var index = indexOf(state.pipes, dest); 727 if (index === -1) return this; 728 729 state.pipes.splice(index, 1); 730 state.pipesCount -= 1; 731 if (state.pipesCount === 1) state.pipes = state.pipes[0]; 732 733 dest.emit('unpipe', this, unpipeInfo); 734 735 return this; 736 }; 737 738 // set up data events if they are asked for 739 // Ensure readable listeners eventually get something 740 Readable.prototype.on = function (ev, fn) { 741 var res = Stream.prototype.on.call(this, ev, fn); 742 743 if (ev === 'data') { 744 // Start flowing on next tick if stream isn't explicitly paused 745 if (this._readableState.flowing !== false) this.resume(); 746 } else if (ev === 'readable') { 747 var state = this._readableState; 748 if (!state.endEmitted && !state.readableListening) { 749 state.readableListening = state.needReadable = true; 750 state.emittedReadable = false; 751 if (!state.reading) { 752 pna.nextTick(nReadingNextTick, this); 753 } else if (state.length) { 754 emitReadable(this); 755 } 756 } 757 } 758 759 return res; 760 }; 761 Readable.prototype.addListener = Readable.prototype.on; 762 763 function nReadingNextTick(self) { 764 debug('readable nexttick read 0'); 765 self.read(0); 766 } 767 768 // pause() and resume() are remnants of the legacy readable stream API 769 // If the user uses them, then switch into old mode. 770 Readable.prototype.resume = function () { 771 var state = this._readableState; 772 if (!state.flowing) { 773 debug('resume'); 774 state.flowing = true; 775 resume(this, state); 776 } 777 return this; 778 }; 779 780 function resume(stream, state) { 781 if (!state.resumeScheduled) { 782 state.resumeScheduled = true; 783 pna.nextTick(resume_, stream, state); 784 } 785 } 786 787 function resume_(stream, state) { 788 if (!state.reading) { 789 debug('resume read 0'); 790 stream.read(0); 791 } 792 793 state.resumeScheduled = false; 794 state.awaitDrain = 0; 795 stream.emit('resume'); 796 flow(stream); 797 if (state.flowing && !state.reading) stream.read(0); 798 } 799 800 Readable.prototype.pause = function () { 801 debug('call pause flowing=%j', this._readableState.flowing); 802 if (false !== this._readableState.flowing) { 803 debug('pause'); 804 this._readableState.flowing = false; 805 this.emit('pause'); 806 } 807 return this; 808 }; 809 810 function flow(stream) { 811 var state = stream._readableState; 812 debug('flow', state.flowing); 813 while (state.flowing && stream.read() !== null) {} 814 } 815 816 // wrap an old-style stream as the async data source. 817 // This is *not* part of the readable stream interface. 818 // It is an ugly unfortunate mess of history. 819 Readable.prototype.wrap = function (stream) { 820 var _this = this; 821 822 var state = this._readableState; 823 var paused = false; 824 825 stream.on('end', function () { 826 debug('wrapped end'); 827 if (state.decoder && !state.ended) { 828 var chunk = state.decoder.end(); 829 if (chunk && chunk.length) _this.push(chunk); 830 } 831 832 _this.push(null); 833 }); 834 835 stream.on('data', function (chunk) { 836 debug('wrapped data'); 837 if (state.decoder) chunk = state.decoder.write(chunk); 838 839 // don't skip over falsy values in objectMode 840 if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return; 841 842 var ret = _this.push(chunk); 843 if (!ret) { 844 paused = true; 845 stream.pause(); 846 } 847 }); 848 849 // proxy all the other methods. 850 // important when wrapping filters and duplexes. 851 for (var i in stream) { 852 if (this[i] === undefined && typeof stream[i] === 'function') { 853 this[i] = function (method) { 854 return function () { 855 return stream[method].apply(stream, arguments); 856 }; 857 }(i); 858 } 859 } 860 861 // proxy certain important events. 862 for (var n = 0; n < kProxyEvents.length; n++) { 863 stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n])); 864 } 865 866 // when we try to consume some more bytes, simply unpause the 867 // underlying stream. 868 this._read = function (n) { 869 debug('wrapped _read', n); 870 if (paused) { 871 paused = false; 872 stream.resume(); 873 } 874 }; 875 876 return this; 877 }; 878 879 Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { 880 // making it explicit this property is not enumerable 881 // because otherwise some prototype manipulation in 882 // userland will fail 883 enumerable: false, 884 get: function () { 885 return this._readableState.highWaterMark; 886 } 887 }); 888 889 // exposed for testing purposes only. 890 Readable._fromList = fromList; 891 892 // Pluck off n bytes from an array of buffers. 893 // Length is the combined lengths of all the buffers in the list. 894 // This function is designed to be inlinable, so please take care when making 895 // changes to the function body. 896 function fromList(n, state) { 897 // nothing buffered 898 if (state.length === 0) return null; 899 900 var ret; 901 if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) { 902 // read it all, truncate the list 903 if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length); 904 state.buffer.clear(); 905 } else { 906 // read part of list 907 ret = fromListPartial(n, state.buffer, state.decoder); 908 } 909 910 return ret; 911 } 912 913 // Extracts only enough buffered data to satisfy the amount requested. 914 // This function is designed to be inlinable, so please take care when making 915 // changes to the function body. 916 function fromListPartial(n, list, hasStrings) { 917 var ret; 918 if (n < list.head.data.length) { 919 // slice is the same for buffers and strings 920 ret = list.head.data.slice(0, n); 921 list.head.data = list.head.data.slice(n); 922 } else if (n === list.head.data.length) { 923 // first chunk is a perfect match 924 ret = list.shift(); 925 } else { 926 // result spans more than one buffer 927 ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list); 928 } 929 return ret; 930 } 931 932 // Copies a specified amount of characters from the list of buffered data 933 // chunks. 934 // This function is designed to be inlinable, so please take care when making 935 // changes to the function body. 936 function copyFromBufferString(n, list) { 937 var p = list.head; 938 var c = 1; 939 var ret = p.data; 940 n -= ret.length; 941 while (p = p.next) { 942 var str = p.data; 943 var nb = n > str.length ? str.length : n; 944 if (nb === str.length) ret += str;else ret += str.slice(0, n); 945 n -= nb; 946 if (n === 0) { 947 if (nb === str.length) { 948 ++c; 949 if (p.next) list.head = p.next;else list.head = list.tail = null; 950 } else { 951 list.head = p; 952 p.data = str.slice(nb); 953 } 954 break; 955 } 956 ++c; 957 } 958 list.length -= c; 959 return ret; 960 } 961 962 // Copies a specified amount of bytes from the list of buffered data chunks. 963 // This function is designed to be inlinable, so please take care when making 964 // changes to the function body. 965 function copyFromBuffer(n, list) { 966 var ret = Buffer.allocUnsafe(n); 967 var p = list.head; 968 var c = 1; 969 p.data.copy(ret); 970 n -= p.data.length; 971 while (p = p.next) { 972 var buf = p.data; 973 var nb = n > buf.length ? buf.length : n; 974 buf.copy(ret, ret.length - n, 0, nb); 975 n -= nb; 976 if (n === 0) { 977 if (nb === buf.length) { 978 ++c; 979 if (p.next) list.head = p.next;else list.head = list.tail = null; 980 } else { 981 list.head = p; 982 p.data = buf.slice(nb); 983 } 984 break; 985 } 986 ++c; 987 } 988 list.length -= c; 989 return ret; 990 } 991 992 function endReadable(stream) { 993 var state = stream._readableState; 994 995 // If we get here before consuming all the bytes, then that is a 996 // bug in node. Should never happen. 997 if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream'); 998 999 if (!state.endEmitted) { 1000 state.ended = true; 1001 pna.nextTick(endReadableNT, state, stream); 1002 } 1003 } 1004 1005 function endReadableNT(state, stream) { 1006 // Check that we didn't get one last unshift. 1007 if (!state.endEmitted && state.length === 0) { 1008 state.endEmitted = true; 1009 stream.readable = false; 1010 stream.emit('end'); 1011 } 1012 } 1013 1014 function indexOf(xs, x) { 1015 for (var i = 0, l = xs.length; i < l; i++) { 1016 if (xs[i] === x) return i; 1017 } 1018 return -1; 1019 }