_stream_duplex.js (4015B)
1 // Copyright Joyent, Inc. and other Node contributors. 2 // 3 // Permission is hereby granted, free of charge, to any person obtaining a 4 // copy of this software and associated documentation files (the 5 // "Software"), to deal in the Software without restriction, including 6 // without limitation the rights to use, copy, modify, merge, publish, 7 // distribute, sublicense, and/or sell copies of the Software, and to permit 8 // persons to whom the Software is furnished to do so, subject to the 9 // following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included 12 // in all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 // USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22 // a duplex stream is just a stream that is both readable and writable. 23 // Since JS doesn't have multiple prototypal inheritance, this class 24 // prototypally inherits from Readable, and then parasitically from 25 // Writable. 26 27 'use strict'; 28 29 /*<replacement>*/ 30 31 var pna = require('process-nextick-args'); 32 /*</replacement>*/ 33 34 /*<replacement>*/ 35 var objectKeys = Object.keys || function (obj) { 36 var keys = []; 37 for (var key in obj) { 38 keys.push(key); 39 }return keys; 40 }; 41 /*</replacement>*/ 42 43 module.exports = Duplex; 44 45 /*<replacement>*/ 46 var util = Object.create(require('core-util-is')); 47 util.inherits = require('inherits'); 48 /*</replacement>*/ 49 50 var Readable = require('./_stream_readable'); 51 var Writable = require('./_stream_writable'); 52 53 util.inherits(Duplex, Readable); 54 55 { 56 // avoid scope creep, the keys array can then be collected 57 var keys = objectKeys(Writable.prototype); 58 for (var v = 0; v < keys.length; v++) { 59 var method = keys[v]; 60 if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; 61 } 62 } 63 64 function Duplex(options) { 65 if (!(this instanceof Duplex)) return new Duplex(options); 66 67 Readable.call(this, options); 68 Writable.call(this, options); 69 70 if (options && options.readable === false) this.readable = false; 71 72 if (options && options.writable === false) this.writable = false; 73 74 this.allowHalfOpen = true; 75 if (options && options.allowHalfOpen === false) this.allowHalfOpen = false; 76 77 this.once('end', onend); 78 } 79 80 Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', { 81 // making it explicit this property is not enumerable 82 // because otherwise some prototype manipulation in 83 // userland will fail 84 enumerable: false, 85 get: function () { 86 return this._writableState.highWaterMark; 87 } 88 }); 89 90 // the no-half-open enforcer 91 function onend() { 92 // if we allow half-open state, or if the writable side ended, 93 // then we're ok. 94 if (this.allowHalfOpen || this._writableState.ended) return; 95 96 // no more data can be written. 97 // But allow more writes to happen in this tick. 98 pna.nextTick(onEndNT, this); 99 } 100 101 function onEndNT(self) { 102 self.end(); 103 } 104 105 Object.defineProperty(Duplex.prototype, 'destroyed', { 106 get: function () { 107 if (this._readableState === undefined || this._writableState === undefined) { 108 return false; 109 } 110 return this._readableState.destroyed && this._writableState.destroyed; 111 }, 112 set: function (value) { 113 // we ignore the value if the stream 114 // has not been initialized yet 115 if (this._readableState === undefined || this._writableState === undefined) { 116 return; 117 } 118 119 // backward compatibility, the user is explicitly 120 // managing destroyed 121 this._readableState.destroyed = value; 122 this._writableState.destroyed = value; 123 } 124 }); 125 126 Duplex.prototype._destroy = function (err, cb) { 127 this.push(null); 128 this.end(); 129 130 pna.nextTick(cb, err); 131 };