time-to-botec

Benchmark sampling in different programming languages
Log | Files | Refs | README

_stream_readable.js (31324B)


      1 // Copyright Joyent, Inc. and other Node contributors.
      2 //
      3 // Permission is hereby granted, free of charge, to any person obtaining a
      4 // copy of this software and associated documentation files (the
      5 // "Software"), to deal in the Software without restriction, including
      6 // without limitation the rights to use, copy, modify, merge, publish,
      7 // distribute, sublicense, and/or sell copies of the Software, and to permit
      8 // persons to whom the Software is furnished to do so, subject to the
      9 // following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included
     12 // in all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
     17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
     18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
     19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
     20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
     21 
     22 'use strict';
     23 
     24 /*<replacement>*/
     25 
     26 var pna = require('process-nextick-args');
     27 /*</replacement>*/
     28 
     29 module.exports = Readable;
     30 
     31 /*<replacement>*/
     32 var isArray = require('isarray');
     33 /*</replacement>*/
     34 
     35 /*<replacement>*/
     36 var Duplex;
     37 /*</replacement>*/
     38 
     39 Readable.ReadableState = ReadableState;
     40 
     41 /*<replacement>*/
     42 var EE = require('events').EventEmitter;
     43 
     44 var EElistenerCount = function (emitter, type) {
     45   return emitter.listeners(type).length;
     46 };
     47 /*</replacement>*/
     48 
     49 /*<replacement>*/
     50 var Stream = require('./internal/streams/stream');
     51 /*</replacement>*/
     52 
     53 /*<replacement>*/
     54 
     55 var Buffer = require('safe-buffer').Buffer;
     56 var OurUint8Array = global.Uint8Array || function () {};
     57 function _uint8ArrayToBuffer(chunk) {
     58   return Buffer.from(chunk);
     59 }
     60 function _isUint8Array(obj) {
     61   return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
     62 }
     63 
     64 /*</replacement>*/
     65 
     66 /*<replacement>*/
     67 var util = Object.create(require('core-util-is'));
     68 util.inherits = require('inherits');
     69 /*</replacement>*/
     70 
     71 /*<replacement>*/
     72 var debugUtil = require('util');
     73 var debug = void 0;
     74 if (debugUtil && debugUtil.debuglog) {
     75   debug = debugUtil.debuglog('stream');
     76 } else {
     77   debug = function () {};
     78 }
     79 /*</replacement>*/
     80 
     81 var BufferList = require('./internal/streams/BufferList');
     82 var destroyImpl = require('./internal/streams/destroy');
     83 var StringDecoder;
     84 
     85 util.inherits(Readable, Stream);
     86 
     87 var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
     88 
     89 function prependListener(emitter, event, fn) {
     90   // Sadly this is not cacheable as some libraries bundle their own
     91   // event emitter implementation with them.
     92   if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn);
     93 
     94   // This is a hack to make sure that our error handler is attached before any
     95   // userland ones.  NEVER DO THIS. This is here only because this code needs
     96   // to continue to work with older versions of Node.js that do not include
     97   // the prependListener() method. The goal is to eventually remove this hack.
     98   if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
     99 }
    100 
    101 function ReadableState(options, stream) {
    102   Duplex = Duplex || require('./_stream_duplex');
    103 
    104   options = options || {};
    105 
    106   // Duplex streams are both readable and writable, but share
    107   // the same options object.
    108   // However, some cases require setting options to different
    109   // values for the readable and the writable sides of the duplex stream.
    110   // These options can be provided separately as readableXXX and writableXXX.
    111   var isDuplex = stream instanceof Duplex;
    112 
    113   // object stream flag. Used to make read(n) ignore n and to
    114   // make all the buffer merging and length checks go away
    115   this.objectMode = !!options.objectMode;
    116 
    117   if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
    118 
    119   // the point at which it stops calling _read() to fill the buffer
    120   // Note: 0 is a valid value, means "don't call _read preemptively ever"
    121   var hwm = options.highWaterMark;
    122   var readableHwm = options.readableHighWaterMark;
    123   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
    124 
    125   if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;else this.highWaterMark = defaultHwm;
    126 
    127   // cast to ints.
    128   this.highWaterMark = Math.floor(this.highWaterMark);
    129 
    130   // A linked list is used to store data chunks instead of an array because the
    131   // linked list can remove elements from the beginning faster than
    132   // array.shift()
    133   this.buffer = new BufferList();
    134   this.length = 0;
    135   this.pipes = null;
    136   this.pipesCount = 0;
    137   this.flowing = null;
    138   this.ended = false;
    139   this.endEmitted = false;
    140   this.reading = false;
    141 
    142   // a flag to be able to tell if the event 'readable'/'data' is emitted
    143   // immediately, or on a later tick.  We set this to true at first, because
    144   // any actions that shouldn't happen until "later" should generally also
    145   // not happen before the first read call.
    146   this.sync = true;
    147 
    148   // whenever we return null, then we set a flag to say
    149   // that we're awaiting a 'readable' event emission.
    150   this.needReadable = false;
    151   this.emittedReadable = false;
    152   this.readableListening = false;
    153   this.resumeScheduled = false;
    154 
    155   // has it been destroyed
    156   this.destroyed = false;
    157 
    158   // Crypto is kind of old and crusty.  Historically, its default string
    159   // encoding is 'binary' so we have to make this configurable.
    160   // Everything else in the universe uses 'utf8', though.
    161   this.defaultEncoding = options.defaultEncoding || 'utf8';
    162 
    163   // the number of writers that are awaiting a drain event in .pipe()s
    164   this.awaitDrain = 0;
    165 
    166   // if true, a maybeReadMore has been scheduled
    167   this.readingMore = false;
    168 
    169   this.decoder = null;
    170   this.encoding = null;
    171   if (options.encoding) {
    172     if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
    173     this.decoder = new StringDecoder(options.encoding);
    174     this.encoding = options.encoding;
    175   }
    176 }
    177 
    178 function Readable(options) {
    179   Duplex = Duplex || require('./_stream_duplex');
    180 
    181   if (!(this instanceof Readable)) return new Readable(options);
    182 
    183   this._readableState = new ReadableState(options, this);
    184 
    185   // legacy
    186   this.readable = true;
    187 
    188   if (options) {
    189     if (typeof options.read === 'function') this._read = options.read;
    190 
    191     if (typeof options.destroy === 'function') this._destroy = options.destroy;
    192   }
    193 
    194   Stream.call(this);
    195 }
    196 
    197 Object.defineProperty(Readable.prototype, 'destroyed', {
    198   get: function () {
    199     if (this._readableState === undefined) {
    200       return false;
    201     }
    202     return this._readableState.destroyed;
    203   },
    204   set: function (value) {
    205     // we ignore the value if the stream
    206     // has not been initialized yet
    207     if (!this._readableState) {
    208       return;
    209     }
    210 
    211     // backward compatibility, the user is explicitly
    212     // managing destroyed
    213     this._readableState.destroyed = value;
    214   }
    215 });
    216 
    217 Readable.prototype.destroy = destroyImpl.destroy;
    218 Readable.prototype._undestroy = destroyImpl.undestroy;
    219 Readable.prototype._destroy = function (err, cb) {
    220   this.push(null);
    221   cb(err);
    222 };
    223 
    224 // Manually shove something into the read() buffer.
    225 // This returns true if the highWaterMark has not been hit yet,
    226 // similar to how Writable.write() returns true if you should
    227 // write() some more.
    228 Readable.prototype.push = function (chunk, encoding) {
    229   var state = this._readableState;
    230   var skipChunkCheck;
    231 
    232   if (!state.objectMode) {
    233     if (typeof chunk === 'string') {
    234       encoding = encoding || state.defaultEncoding;
    235       if (encoding !== state.encoding) {
    236         chunk = Buffer.from(chunk, encoding);
    237         encoding = '';
    238       }
    239       skipChunkCheck = true;
    240     }
    241   } else {
    242     skipChunkCheck = true;
    243   }
    244 
    245   return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
    246 };
    247 
    248 // Unshift should *always* be something directly out of read()
    249 Readable.prototype.unshift = function (chunk) {
    250   return readableAddChunk(this, chunk, null, true, false);
    251 };
    252 
    253 function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
    254   var state = stream._readableState;
    255   if (chunk === null) {
    256     state.reading = false;
    257     onEofChunk(stream, state);
    258   } else {
    259     var er;
    260     if (!skipChunkCheck) er = chunkInvalid(state, chunk);
    261     if (er) {
    262       stream.emit('error', er);
    263     } else if (state.objectMode || chunk && chunk.length > 0) {
    264       if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
    265         chunk = _uint8ArrayToBuffer(chunk);
    266       }
    267 
    268       if (addToFront) {
    269         if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event'));else addChunk(stream, state, chunk, true);
    270       } else if (state.ended) {
    271         stream.emit('error', new Error('stream.push() after EOF'));
    272       } else {
    273         state.reading = false;
    274         if (state.decoder && !encoding) {
    275           chunk = state.decoder.write(chunk);
    276           if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false);else maybeReadMore(stream, state);
    277         } else {
    278           addChunk(stream, state, chunk, false);
    279         }
    280       }
    281     } else if (!addToFront) {
    282       state.reading = false;
    283     }
    284   }
    285 
    286   return needMoreData(state);
    287 }
    288 
    289 function addChunk(stream, state, chunk, addToFront) {
    290   if (state.flowing && state.length === 0 && !state.sync) {
    291     stream.emit('data', chunk);
    292     stream.read(0);
    293   } else {
    294     // update the buffer info.
    295     state.length += state.objectMode ? 1 : chunk.length;
    296     if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
    297 
    298     if (state.needReadable) emitReadable(stream);
    299   }
    300   maybeReadMore(stream, state);
    301 }
    302 
    303 function chunkInvalid(state, chunk) {
    304   var er;
    305   if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
    306     er = new TypeError('Invalid non-string/buffer chunk');
    307   }
    308   return er;
    309 }
    310 
    311 // if it's past the high water mark, we can push in some more.
    312 // Also, if we have no data yet, we can stand some
    313 // more bytes.  This is to work around cases where hwm=0,
    314 // such as the repl.  Also, if the push() triggered a
    315 // readable event, and the user called read(largeNumber) such that
    316 // needReadable was set, then we ought to push more, so that another
    317 // 'readable' event will be triggered.
    318 function needMoreData(state) {
    319   return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
    320 }
    321 
    322 Readable.prototype.isPaused = function () {
    323   return this._readableState.flowing === false;
    324 };
    325 
    326 // backwards compatibility.
    327 Readable.prototype.setEncoding = function (enc) {
    328   if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
    329   this._readableState.decoder = new StringDecoder(enc);
    330   this._readableState.encoding = enc;
    331   return this;
    332 };
    333 
    334 // Don't raise the hwm > 8MB
    335 var MAX_HWM = 0x800000;
    336 function computeNewHighWaterMark(n) {
    337   if (n >= MAX_HWM) {
    338     n = MAX_HWM;
    339   } else {
    340     // Get the next highest power of 2 to prevent increasing hwm excessively in
    341     // tiny amounts
    342     n--;
    343     n |= n >>> 1;
    344     n |= n >>> 2;
    345     n |= n >>> 4;
    346     n |= n >>> 8;
    347     n |= n >>> 16;
    348     n++;
    349   }
    350   return n;
    351 }
    352 
    353 // This function is designed to be inlinable, so please take care when making
    354 // changes to the function body.
    355 function howMuchToRead(n, state) {
    356   if (n <= 0 || state.length === 0 && state.ended) return 0;
    357   if (state.objectMode) return 1;
    358   if (n !== n) {
    359     // Only flow one buffer at a time
    360     if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
    361   }
    362   // If we're asking for more than the current hwm, then raise the hwm.
    363   if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
    364   if (n <= state.length) return n;
    365   // Don't have enough
    366   if (!state.ended) {
    367     state.needReadable = true;
    368     return 0;
    369   }
    370   return state.length;
    371 }
    372 
    373 // you can override either this method, or the async _read(n) below.
    374 Readable.prototype.read = function (n) {
    375   debug('read', n);
    376   n = parseInt(n, 10);
    377   var state = this._readableState;
    378   var nOrig = n;
    379 
    380   if (n !== 0) state.emittedReadable = false;
    381 
    382   // if we're doing read(0) to trigger a readable event, but we
    383   // already have a bunch of data in the buffer, then just trigger
    384   // the 'readable' event and move on.
    385   if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
    386     debug('read: emitReadable', state.length, state.ended);
    387     if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
    388     return null;
    389   }
    390 
    391   n = howMuchToRead(n, state);
    392 
    393   // if we've ended, and we're now clear, then finish it up.
    394   if (n === 0 && state.ended) {
    395     if (state.length === 0) endReadable(this);
    396     return null;
    397   }
    398 
    399   // All the actual chunk generation logic needs to be
    400   // *below* the call to _read.  The reason is that in certain
    401   // synthetic stream cases, such as passthrough streams, _read
    402   // may be a completely synchronous operation which may change
    403   // the state of the read buffer, providing enough data when
    404   // before there was *not* enough.
    405   //
    406   // So, the steps are:
    407   // 1. Figure out what the state of things will be after we do
    408   // a read from the buffer.
    409   //
    410   // 2. If that resulting state will trigger a _read, then call _read.
    411   // Note that this may be asynchronous, or synchronous.  Yes, it is
    412   // deeply ugly to write APIs this way, but that still doesn't mean
    413   // that the Readable class should behave improperly, as streams are
    414   // designed to be sync/async agnostic.
    415   // Take note if the _read call is sync or async (ie, if the read call
    416   // has returned yet), so that we know whether or not it's safe to emit
    417   // 'readable' etc.
    418   //
    419   // 3. Actually pull the requested chunks out of the buffer and return.
    420 
    421   // if we need a readable event, then we need to do some reading.
    422   var doRead = state.needReadable;
    423   debug('need readable', doRead);
    424 
    425   // if we currently have less than the highWaterMark, then also read some
    426   if (state.length === 0 || state.length - n < state.highWaterMark) {
    427     doRead = true;
    428     debug('length less than watermark', doRead);
    429   }
    430 
    431   // however, if we've ended, then there's no point, and if we're already
    432   // reading, then it's unnecessary.
    433   if (state.ended || state.reading) {
    434     doRead = false;
    435     debug('reading or ended', doRead);
    436   } else if (doRead) {
    437     debug('do read');
    438     state.reading = true;
    439     state.sync = true;
    440     // if the length is currently zero, then we *need* a readable event.
    441     if (state.length === 0) state.needReadable = true;
    442     // call internal read method
    443     this._read(state.highWaterMark);
    444     state.sync = false;
    445     // If _read pushed data synchronously, then `reading` will be false,
    446     // and we need to re-evaluate how much data we can return to the user.
    447     if (!state.reading) n = howMuchToRead(nOrig, state);
    448   }
    449 
    450   var ret;
    451   if (n > 0) ret = fromList(n, state);else ret = null;
    452 
    453   if (ret === null) {
    454     state.needReadable = true;
    455     n = 0;
    456   } else {
    457     state.length -= n;
    458   }
    459 
    460   if (state.length === 0) {
    461     // If we have nothing in the buffer, then we want to know
    462     // as soon as we *do* get something into the buffer.
    463     if (!state.ended) state.needReadable = true;
    464 
    465     // If we tried to read() past the EOF, then emit end on the next tick.
    466     if (nOrig !== n && state.ended) endReadable(this);
    467   }
    468 
    469   if (ret !== null) this.emit('data', ret);
    470 
    471   return ret;
    472 };
    473 
    474 function onEofChunk(stream, state) {
    475   if (state.ended) return;
    476   if (state.decoder) {
    477     var chunk = state.decoder.end();
    478     if (chunk && chunk.length) {
    479       state.buffer.push(chunk);
    480       state.length += state.objectMode ? 1 : chunk.length;
    481     }
    482   }
    483   state.ended = true;
    484 
    485   // emit 'readable' now to make sure it gets picked up.
    486   emitReadable(stream);
    487 }
    488 
    489 // Don't emit readable right away in sync mode, because this can trigger
    490 // another read() call => stack overflow.  This way, it might trigger
    491 // a nextTick recursion warning, but that's not so bad.
    492 function emitReadable(stream) {
    493   var state = stream._readableState;
    494   state.needReadable = false;
    495   if (!state.emittedReadable) {
    496     debug('emitReadable', state.flowing);
    497     state.emittedReadable = true;
    498     if (state.sync) pna.nextTick(emitReadable_, stream);else emitReadable_(stream);
    499   }
    500 }
    501 
    502 function emitReadable_(stream) {
    503   debug('emit readable');
    504   stream.emit('readable');
    505   flow(stream);
    506 }
    507 
    508 // at this point, the user has presumably seen the 'readable' event,
    509 // and called read() to consume some data.  that may have triggered
    510 // in turn another _read(n) call, in which case reading = true if
    511 // it's in progress.
    512 // However, if we're not ended, or reading, and the length < hwm,
    513 // then go ahead and try to read some more preemptively.
    514 function maybeReadMore(stream, state) {
    515   if (!state.readingMore) {
    516     state.readingMore = true;
    517     pna.nextTick(maybeReadMore_, stream, state);
    518   }
    519 }
    520 
    521 function maybeReadMore_(stream, state) {
    522   var len = state.length;
    523   while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
    524     debug('maybeReadMore read 0');
    525     stream.read(0);
    526     if (len === state.length)
    527       // didn't get any data, stop spinning.
    528       break;else len = state.length;
    529   }
    530   state.readingMore = false;
    531 }
    532 
    533 // abstract method.  to be overridden in specific implementation classes.
    534 // call cb(er, data) where data is <= n in length.
    535 // for virtual (non-string, non-buffer) streams, "length" is somewhat
    536 // arbitrary, and perhaps not very meaningful.
    537 Readable.prototype._read = function (n) {
    538   this.emit('error', new Error('_read() is not implemented'));
    539 };
    540 
    541 Readable.prototype.pipe = function (dest, pipeOpts) {
    542   var src = this;
    543   var state = this._readableState;
    544 
    545   switch (state.pipesCount) {
    546     case 0:
    547       state.pipes = dest;
    548       break;
    549     case 1:
    550       state.pipes = [state.pipes, dest];
    551       break;
    552     default:
    553       state.pipes.push(dest);
    554       break;
    555   }
    556   state.pipesCount += 1;
    557   debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
    558 
    559   var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
    560 
    561   var endFn = doEnd ? onend : unpipe;
    562   if (state.endEmitted) pna.nextTick(endFn);else src.once('end', endFn);
    563 
    564   dest.on('unpipe', onunpipe);
    565   function onunpipe(readable, unpipeInfo) {
    566     debug('onunpipe');
    567     if (readable === src) {
    568       if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    569         unpipeInfo.hasUnpiped = true;
    570         cleanup();
    571       }
    572     }
    573   }
    574 
    575   function onend() {
    576     debug('onend');
    577     dest.end();
    578   }
    579 
    580   // when the dest drains, it reduces the awaitDrain counter
    581   // on the source.  This would be more elegant with a .once()
    582   // handler in flow(), but adding and removing repeatedly is
    583   // too slow.
    584   var ondrain = pipeOnDrain(src);
    585   dest.on('drain', ondrain);
    586 
    587   var cleanedUp = false;
    588   function cleanup() {
    589     debug('cleanup');
    590     // cleanup event handlers once the pipe is broken
    591     dest.removeListener('close', onclose);
    592     dest.removeListener('finish', onfinish);
    593     dest.removeListener('drain', ondrain);
    594     dest.removeListener('error', onerror);
    595     dest.removeListener('unpipe', onunpipe);
    596     src.removeListener('end', onend);
    597     src.removeListener('end', unpipe);
    598     src.removeListener('data', ondata);
    599 
    600     cleanedUp = true;
    601 
    602     // if the reader is waiting for a drain event from this
    603     // specific writer, then it would cause it to never start
    604     // flowing again.
    605     // So, if this is awaiting a drain, then we just call it now.
    606     // If we don't know, then assume that we are waiting for one.
    607     if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
    608   }
    609 
    610   // If the user pushes more data while we're writing to dest then we'll end up
    611   // in ondata again. However, we only want to increase awaitDrain once because
    612   // dest will only emit one 'drain' event for the multiple writes.
    613   // => Introduce a guard on increasing awaitDrain.
    614   var increasedAwaitDrain = false;
    615   src.on('data', ondata);
    616   function ondata(chunk) {
    617     debug('ondata');
    618     increasedAwaitDrain = false;
    619     var ret = dest.write(chunk);
    620     if (false === ret && !increasedAwaitDrain) {
    621       // If the user unpiped during `dest.write()`, it is possible
    622       // to get stuck in a permanently paused state if that write
    623       // also returned false.
    624       // => Check whether `dest` is still a piping destination.
    625       if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
    626         debug('false write response, pause', src._readableState.awaitDrain);
    627         src._readableState.awaitDrain++;
    628         increasedAwaitDrain = true;
    629       }
    630       src.pause();
    631     }
    632   }
    633 
    634   // if the dest has an error, then stop piping into it.
    635   // however, don't suppress the throwing behavior for this.
    636   function onerror(er) {
    637     debug('onerror', er);
    638     unpipe();
    639     dest.removeListener('error', onerror);
    640     if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
    641   }
    642 
    643   // Make sure our error handler is attached before userland ones.
    644   prependListener(dest, 'error', onerror);
    645 
    646   // Both close and finish should trigger unpipe, but only once.
    647   function onclose() {
    648     dest.removeListener('finish', onfinish);
    649     unpipe();
    650   }
    651   dest.once('close', onclose);
    652   function onfinish() {
    653     debug('onfinish');
    654     dest.removeListener('close', onclose);
    655     unpipe();
    656   }
    657   dest.once('finish', onfinish);
    658 
    659   function unpipe() {
    660     debug('unpipe');
    661     src.unpipe(dest);
    662   }
    663 
    664   // tell the dest that it's being piped to
    665   dest.emit('pipe', src);
    666 
    667   // start the flow if it hasn't been started already.
    668   if (!state.flowing) {
    669     debug('pipe resume');
    670     src.resume();
    671   }
    672 
    673   return dest;
    674 };
    675 
    676 function pipeOnDrain(src) {
    677   return function () {
    678     var state = src._readableState;
    679     debug('pipeOnDrain', state.awaitDrain);
    680     if (state.awaitDrain) state.awaitDrain--;
    681     if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
    682       state.flowing = true;
    683       flow(src);
    684     }
    685   };
    686 }
    687 
    688 Readable.prototype.unpipe = function (dest) {
    689   var state = this._readableState;
    690   var unpipeInfo = { hasUnpiped: false };
    691 
    692   // if we're not piping anywhere, then do nothing.
    693   if (state.pipesCount === 0) return this;
    694 
    695   // just one destination.  most common case.
    696   if (state.pipesCount === 1) {
    697     // passed in one, but it's not the right one.
    698     if (dest && dest !== state.pipes) return this;
    699 
    700     if (!dest) dest = state.pipes;
    701 
    702     // got a match.
    703     state.pipes = null;
    704     state.pipesCount = 0;
    705     state.flowing = false;
    706     if (dest) dest.emit('unpipe', this, unpipeInfo);
    707     return this;
    708   }
    709 
    710   // slow case. multiple pipe destinations.
    711 
    712   if (!dest) {
    713     // remove all.
    714     var dests = state.pipes;
    715     var len = state.pipesCount;
    716     state.pipes = null;
    717     state.pipesCount = 0;
    718     state.flowing = false;
    719 
    720     for (var i = 0; i < len; i++) {
    721       dests[i].emit('unpipe', this, unpipeInfo);
    722     }return this;
    723   }
    724 
    725   // try to find the right one.
    726   var index = indexOf(state.pipes, dest);
    727   if (index === -1) return this;
    728 
    729   state.pipes.splice(index, 1);
    730   state.pipesCount -= 1;
    731   if (state.pipesCount === 1) state.pipes = state.pipes[0];
    732 
    733   dest.emit('unpipe', this, unpipeInfo);
    734 
    735   return this;
    736 };
    737 
    738 // set up data events if they are asked for
    739 // Ensure readable listeners eventually get something
    740 Readable.prototype.on = function (ev, fn) {
    741   var res = Stream.prototype.on.call(this, ev, fn);
    742 
    743   if (ev === 'data') {
    744     // Start flowing on next tick if stream isn't explicitly paused
    745     if (this._readableState.flowing !== false) this.resume();
    746   } else if (ev === 'readable') {
    747     var state = this._readableState;
    748     if (!state.endEmitted && !state.readableListening) {
    749       state.readableListening = state.needReadable = true;
    750       state.emittedReadable = false;
    751       if (!state.reading) {
    752         pna.nextTick(nReadingNextTick, this);
    753       } else if (state.length) {
    754         emitReadable(this);
    755       }
    756     }
    757   }
    758 
    759   return res;
    760 };
    761 Readable.prototype.addListener = Readable.prototype.on;
    762 
    763 function nReadingNextTick(self) {
    764   debug('readable nexttick read 0');
    765   self.read(0);
    766 }
    767 
    768 // pause() and resume() are remnants of the legacy readable stream API
    769 // If the user uses them, then switch into old mode.
    770 Readable.prototype.resume = function () {
    771   var state = this._readableState;
    772   if (!state.flowing) {
    773     debug('resume');
    774     state.flowing = true;
    775     resume(this, state);
    776   }
    777   return this;
    778 };
    779 
    780 function resume(stream, state) {
    781   if (!state.resumeScheduled) {
    782     state.resumeScheduled = true;
    783     pna.nextTick(resume_, stream, state);
    784   }
    785 }
    786 
    787 function resume_(stream, state) {
    788   if (!state.reading) {
    789     debug('resume read 0');
    790     stream.read(0);
    791   }
    792 
    793   state.resumeScheduled = false;
    794   state.awaitDrain = 0;
    795   stream.emit('resume');
    796   flow(stream);
    797   if (state.flowing && !state.reading) stream.read(0);
    798 }
    799 
    800 Readable.prototype.pause = function () {
    801   debug('call pause flowing=%j', this._readableState.flowing);
    802   if (false !== this._readableState.flowing) {
    803     debug('pause');
    804     this._readableState.flowing = false;
    805     this.emit('pause');
    806   }
    807   return this;
    808 };
    809 
    810 function flow(stream) {
    811   var state = stream._readableState;
    812   debug('flow', state.flowing);
    813   while (state.flowing && stream.read() !== null) {}
    814 }
    815 
    816 // wrap an old-style stream as the async data source.
    817 // This is *not* part of the readable stream interface.
    818 // It is an ugly unfortunate mess of history.
    819 Readable.prototype.wrap = function (stream) {
    820   var _this = this;
    821 
    822   var state = this._readableState;
    823   var paused = false;
    824 
    825   stream.on('end', function () {
    826     debug('wrapped end');
    827     if (state.decoder && !state.ended) {
    828       var chunk = state.decoder.end();
    829       if (chunk && chunk.length) _this.push(chunk);
    830     }
    831 
    832     _this.push(null);
    833   });
    834 
    835   stream.on('data', function (chunk) {
    836     debug('wrapped data');
    837     if (state.decoder) chunk = state.decoder.write(chunk);
    838 
    839     // don't skip over falsy values in objectMode
    840     if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
    841 
    842     var ret = _this.push(chunk);
    843     if (!ret) {
    844       paused = true;
    845       stream.pause();
    846     }
    847   });
    848 
    849   // proxy all the other methods.
    850   // important when wrapping filters and duplexes.
    851   for (var i in stream) {
    852     if (this[i] === undefined && typeof stream[i] === 'function') {
    853       this[i] = function (method) {
    854         return function () {
    855           return stream[method].apply(stream, arguments);
    856         };
    857       }(i);
    858     }
    859   }
    860 
    861   // proxy certain important events.
    862   for (var n = 0; n < kProxyEvents.length; n++) {
    863     stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n]));
    864   }
    865 
    866   // when we try to consume some more bytes, simply unpause the
    867   // underlying stream.
    868   this._read = function (n) {
    869     debug('wrapped _read', n);
    870     if (paused) {
    871       paused = false;
    872       stream.resume();
    873     }
    874   };
    875 
    876   return this;
    877 };
    878 
    879 Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
    880   // making it explicit this property is not enumerable
    881   // because otherwise some prototype manipulation in
    882   // userland will fail
    883   enumerable: false,
    884   get: function () {
    885     return this._readableState.highWaterMark;
    886   }
    887 });
    888 
    889 // exposed for testing purposes only.
    890 Readable._fromList = fromList;
    891 
    892 // Pluck off n bytes from an array of buffers.
    893 // Length is the combined lengths of all the buffers in the list.
    894 // This function is designed to be inlinable, so please take care when making
    895 // changes to the function body.
    896 function fromList(n, state) {
    897   // nothing buffered
    898   if (state.length === 0) return null;
    899 
    900   var ret;
    901   if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) {
    902     // read it all, truncate the list
    903     if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
    904     state.buffer.clear();
    905   } else {
    906     // read part of list
    907     ret = fromListPartial(n, state.buffer, state.decoder);
    908   }
    909 
    910   return ret;
    911 }
    912 
    913 // Extracts only enough buffered data to satisfy the amount requested.
    914 // This function is designed to be inlinable, so please take care when making
    915 // changes to the function body.
    916 function fromListPartial(n, list, hasStrings) {
    917   var ret;
    918   if (n < list.head.data.length) {
    919     // slice is the same for buffers and strings
    920     ret = list.head.data.slice(0, n);
    921     list.head.data = list.head.data.slice(n);
    922   } else if (n === list.head.data.length) {
    923     // first chunk is a perfect match
    924     ret = list.shift();
    925   } else {
    926     // result spans more than one buffer
    927     ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
    928   }
    929   return ret;
    930 }
    931 
    932 // Copies a specified amount of characters from the list of buffered data
    933 // chunks.
    934 // This function is designed to be inlinable, so please take care when making
    935 // changes to the function body.
    936 function copyFromBufferString(n, list) {
    937   var p = list.head;
    938   var c = 1;
    939   var ret = p.data;
    940   n -= ret.length;
    941   while (p = p.next) {
    942     var str = p.data;
    943     var nb = n > str.length ? str.length : n;
    944     if (nb === str.length) ret += str;else ret += str.slice(0, n);
    945     n -= nb;
    946     if (n === 0) {
    947       if (nb === str.length) {
    948         ++c;
    949         if (p.next) list.head = p.next;else list.head = list.tail = null;
    950       } else {
    951         list.head = p;
    952         p.data = str.slice(nb);
    953       }
    954       break;
    955     }
    956     ++c;
    957   }
    958   list.length -= c;
    959   return ret;
    960 }
    961 
    962 // Copies a specified amount of bytes from the list of buffered data chunks.
    963 // This function is designed to be inlinable, so please take care when making
    964 // changes to the function body.
    965 function copyFromBuffer(n, list) {
    966   var ret = Buffer.allocUnsafe(n);
    967   var p = list.head;
    968   var c = 1;
    969   p.data.copy(ret);
    970   n -= p.data.length;
    971   while (p = p.next) {
    972     var buf = p.data;
    973     var nb = n > buf.length ? buf.length : n;
    974     buf.copy(ret, ret.length - n, 0, nb);
    975     n -= nb;
    976     if (n === 0) {
    977       if (nb === buf.length) {
    978         ++c;
    979         if (p.next) list.head = p.next;else list.head = list.tail = null;
    980       } else {
    981         list.head = p;
    982         p.data = buf.slice(nb);
    983       }
    984       break;
    985     }
    986     ++c;
    987   }
    988   list.length -= c;
    989   return ret;
    990 }
    991 
    992 function endReadable(stream) {
    993   var state = stream._readableState;
    994 
    995   // If we get here before consuming all the bytes, then that is a
    996   // bug in node.  Should never happen.
    997   if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
    998 
    999   if (!state.endEmitted) {
   1000     state.ended = true;
   1001     pna.nextTick(endReadableNT, state, stream);
   1002   }
   1003 }
   1004 
   1005 function endReadableNT(state, stream) {
   1006   // Check that we didn't get one last unshift.
   1007   if (!state.endEmitted && state.length === 0) {
   1008     state.endEmitted = true;
   1009     stream.readable = false;
   1010     stream.emit('end');
   1011   }
   1012 }
   1013 
   1014 function indexOf(xs, x) {
   1015   for (var i = 0, l = xs.length; i < l; i++) {
   1016     if (xs[i] === x) return i;
   1017   }
   1018   return -1;
   1019 }