_stream_writable.js (20317B)
1 // Copyright Joyent, Inc. and other Node contributors. 2 // 3 // Permission is hereby granted, free of charge, to any person obtaining a 4 // copy of this software and associated documentation files (the 5 // "Software"), to deal in the Software without restriction, including 6 // without limitation the rights to use, copy, modify, merge, publish, 7 // distribute, sublicense, and/or sell copies of the Software, and to permit 8 // persons to whom the Software is furnished to do so, subject to the 9 // following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included 12 // in all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20 // USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22 // A bit simpler than readable streams. 23 // Implement an async ._write(chunk, encoding, cb), and it'll handle all 24 // the drain event emission and buffering. 25 26 'use strict'; 27 28 /*<replacement>*/ 29 30 var pna = require('process-nextick-args'); 31 /*</replacement>*/ 32 33 module.exports = Writable; 34 35 /* <replacement> */ 36 function WriteReq(chunk, encoding, cb) { 37 this.chunk = chunk; 38 this.encoding = encoding; 39 this.callback = cb; 40 this.next = null; 41 } 42 43 // It seems a linked list but it is not 44 // there will be only 2 of these for each stream 45 function CorkedRequest(state) { 46 var _this = this; 47 48 this.next = null; 49 this.entry = null; 50 this.finish = function () { 51 onCorkedFinish(_this, state); 52 }; 53 } 54 /* </replacement> */ 55 56 /*<replacement>*/ 57 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : pna.nextTick; 58 /*</replacement>*/ 59 60 /*<replacement>*/ 61 var Duplex; 62 /*</replacement>*/ 63 64 Writable.WritableState = WritableState; 65 66 /*<replacement>*/ 67 var util = Object.create(require('core-util-is')); 68 util.inherits = require('inherits'); 69 /*</replacement>*/ 70 71 /*<replacement>*/ 72 var internalUtil = { 73 deprecate: require('util-deprecate') 74 }; 75 /*</replacement>*/ 76 77 /*<replacement>*/ 78 var Stream = require('./internal/streams/stream'); 79 /*</replacement>*/ 80 81 /*<replacement>*/ 82 83 var Buffer = require('safe-buffer').Buffer; 84 var OurUint8Array = global.Uint8Array || function () {}; 85 function _uint8ArrayToBuffer(chunk) { 86 return Buffer.from(chunk); 87 } 88 function _isUint8Array(obj) { 89 return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; 90 } 91 92 /*</replacement>*/ 93 94 var destroyImpl = require('./internal/streams/destroy'); 95 96 util.inherits(Writable, Stream); 97 98 function nop() {} 99 100 function WritableState(options, stream) { 101 Duplex = Duplex || require('./_stream_duplex'); 102 103 options = options || {}; 104 105 // Duplex streams are both readable and writable, but share 106 // the same options object. 107 // However, some cases require setting options to different 108 // values for the readable and the writable sides of the duplex stream. 109 // These options can be provided separately as readableXXX and writableXXX. 110 var isDuplex = stream instanceof Duplex; 111 112 // object stream flag to indicate whether or not this stream 113 // contains buffers or objects. 114 this.objectMode = !!options.objectMode; 115 116 if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; 117 118 // the point at which write() starts returning false 119 // Note: 0 is a valid value, means that we always return false if 120 // the entire buffer is not flushed immediately on write() 121 var hwm = options.highWaterMark; 122 var writableHwm = options.writableHighWaterMark; 123 var defaultHwm = this.objectMode ? 16 : 16 * 1024; 124 125 if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (writableHwm || writableHwm === 0)) this.highWaterMark = writableHwm;else this.highWaterMark = defaultHwm; 126 127 // cast to ints. 128 this.highWaterMark = Math.floor(this.highWaterMark); 129 130 // if _final has been called 131 this.finalCalled = false; 132 133 // drain event flag. 134 this.needDrain = false; 135 // at the start of calling end() 136 this.ending = false; 137 // when end() has been called, and returned 138 this.ended = false; 139 // when 'finish' is emitted 140 this.finished = false; 141 142 // has it been destroyed 143 this.destroyed = false; 144 145 // should we decode strings into buffers before passing to _write? 146 // this is here so that some node-core streams can optimize string 147 // handling at a lower level. 148 var noDecode = options.decodeStrings === false; 149 this.decodeStrings = !noDecode; 150 151 // Crypto is kind of old and crusty. Historically, its default string 152 // encoding is 'binary' so we have to make this configurable. 153 // Everything else in the universe uses 'utf8', though. 154 this.defaultEncoding = options.defaultEncoding || 'utf8'; 155 156 // not an actual buffer we keep track of, but a measurement 157 // of how much we're waiting to get pushed to some underlying 158 // socket or file. 159 this.length = 0; 160 161 // a flag to see when we're in the middle of a write. 162 this.writing = false; 163 164 // when true all writes will be buffered until .uncork() call 165 this.corked = 0; 166 167 // a flag to be able to tell if the onwrite cb is called immediately, 168 // or on a later tick. We set this to true at first, because any 169 // actions that shouldn't happen until "later" should generally also 170 // not happen before the first write call. 171 this.sync = true; 172 173 // a flag to know if we're processing previously buffered items, which 174 // may call the _write() callback in the same tick, so that we don't 175 // end up in an overlapped onwrite situation. 176 this.bufferProcessing = false; 177 178 // the callback that's passed to _write(chunk,cb) 179 this.onwrite = function (er) { 180 onwrite(stream, er); 181 }; 182 183 // the callback that the user supplies to write(chunk,encoding,cb) 184 this.writecb = null; 185 186 // the amount that is being written when _write is called. 187 this.writelen = 0; 188 189 this.bufferedRequest = null; 190 this.lastBufferedRequest = null; 191 192 // number of pending user-supplied write callbacks 193 // this must be 0 before 'finish' can be emitted 194 this.pendingcb = 0; 195 196 // emit prefinish if the only thing we're waiting for is _write cbs 197 // This is relevant for synchronous Transform streams 198 this.prefinished = false; 199 200 // True if the error was already emitted and should not be thrown again 201 this.errorEmitted = false; 202 203 // count buffered requests 204 this.bufferedRequestCount = 0; 205 206 // allocate the first CorkedRequest, there is always 207 // one allocated and free to use, and we maintain at most two 208 this.corkedRequestsFree = new CorkedRequest(this); 209 } 210 211 WritableState.prototype.getBuffer = function getBuffer() { 212 var current = this.bufferedRequest; 213 var out = []; 214 while (current) { 215 out.push(current); 216 current = current.next; 217 } 218 return out; 219 }; 220 221 (function () { 222 try { 223 Object.defineProperty(WritableState.prototype, 'buffer', { 224 get: internalUtil.deprecate(function () { 225 return this.getBuffer(); 226 }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003') 227 }); 228 } catch (_) {} 229 })(); 230 231 // Test _writableState for inheritance to account for Duplex streams, 232 // whose prototype chain only points to Readable. 233 var realHasInstance; 234 if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') { 235 realHasInstance = Function.prototype[Symbol.hasInstance]; 236 Object.defineProperty(Writable, Symbol.hasInstance, { 237 value: function (object) { 238 if (realHasInstance.call(this, object)) return true; 239 if (this !== Writable) return false; 240 241 return object && object._writableState instanceof WritableState; 242 } 243 }); 244 } else { 245 realHasInstance = function (object) { 246 return object instanceof this; 247 }; 248 } 249 250 function Writable(options) { 251 Duplex = Duplex || require('./_stream_duplex'); 252 253 // Writable ctor is applied to Duplexes, too. 254 // `realHasInstance` is necessary because using plain `instanceof` 255 // would return false, as no `_writableState` property is attached. 256 257 // Trying to use the custom `instanceof` for Writable here will also break the 258 // Node.js LazyTransform implementation, which has a non-trivial getter for 259 // `_writableState` that would lead to infinite recursion. 260 if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) { 261 return new Writable(options); 262 } 263 264 this._writableState = new WritableState(options, this); 265 266 // legacy. 267 this.writable = true; 268 269 if (options) { 270 if (typeof options.write === 'function') this._write = options.write; 271 272 if (typeof options.writev === 'function') this._writev = options.writev; 273 274 if (typeof options.destroy === 'function') this._destroy = options.destroy; 275 276 if (typeof options.final === 'function') this._final = options.final; 277 } 278 279 Stream.call(this); 280 } 281 282 // Otherwise people can pipe Writable streams, which is just wrong. 283 Writable.prototype.pipe = function () { 284 this.emit('error', new Error('Cannot pipe, not readable')); 285 }; 286 287 function writeAfterEnd(stream, cb) { 288 var er = new Error('write after end'); 289 // TODO: defer error events consistently everywhere, not just the cb 290 stream.emit('error', er); 291 pna.nextTick(cb, er); 292 } 293 294 // Checks that a user-supplied chunk is valid, especially for the particular 295 // mode the stream is in. Currently this means that `null` is never accepted 296 // and undefined/non-string values are only allowed in object mode. 297 function validChunk(stream, state, chunk, cb) { 298 var valid = true; 299 var er = false; 300 301 if (chunk === null) { 302 er = new TypeError('May not write null values to stream'); 303 } else if (typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { 304 er = new TypeError('Invalid non-string/buffer chunk'); 305 } 306 if (er) { 307 stream.emit('error', er); 308 pna.nextTick(cb, er); 309 valid = false; 310 } 311 return valid; 312 } 313 314 Writable.prototype.write = function (chunk, encoding, cb) { 315 var state = this._writableState; 316 var ret = false; 317 var isBuf = !state.objectMode && _isUint8Array(chunk); 318 319 if (isBuf && !Buffer.isBuffer(chunk)) { 320 chunk = _uint8ArrayToBuffer(chunk); 321 } 322 323 if (typeof encoding === 'function') { 324 cb = encoding; 325 encoding = null; 326 } 327 328 if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding; 329 330 if (typeof cb !== 'function') cb = nop; 331 332 if (state.ended) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) { 333 state.pendingcb++; 334 ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); 335 } 336 337 return ret; 338 }; 339 340 Writable.prototype.cork = function () { 341 var state = this._writableState; 342 343 state.corked++; 344 }; 345 346 Writable.prototype.uncork = function () { 347 var state = this._writableState; 348 349 if (state.corked) { 350 state.corked--; 351 352 if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state); 353 } 354 }; 355 356 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { 357 // node::ParseEncoding() requires lower case. 358 if (typeof encoding === 'string') encoding = encoding.toLowerCase(); 359 if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding); 360 this._writableState.defaultEncoding = encoding; 361 return this; 362 }; 363 364 function decodeChunk(state, chunk, encoding) { 365 if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') { 366 chunk = Buffer.from(chunk, encoding); 367 } 368 return chunk; 369 } 370 371 Object.defineProperty(Writable.prototype, 'writableHighWaterMark', { 372 // making it explicit this property is not enumerable 373 // because otherwise some prototype manipulation in 374 // userland will fail 375 enumerable: false, 376 get: function () { 377 return this._writableState.highWaterMark; 378 } 379 }); 380 381 // if we're already writing something, then just put this 382 // in the queue, and wait our turn. Otherwise, call _write 383 // If we return false, then we need a drain event, so set that flag. 384 function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { 385 if (!isBuf) { 386 var newChunk = decodeChunk(state, chunk, encoding); 387 if (chunk !== newChunk) { 388 isBuf = true; 389 encoding = 'buffer'; 390 chunk = newChunk; 391 } 392 } 393 var len = state.objectMode ? 1 : chunk.length; 394 395 state.length += len; 396 397 var ret = state.length < state.highWaterMark; 398 // we must ensure that previous needDrain will not be reset to false. 399 if (!ret) state.needDrain = true; 400 401 if (state.writing || state.corked) { 402 var last = state.lastBufferedRequest; 403 state.lastBufferedRequest = { 404 chunk: chunk, 405 encoding: encoding, 406 isBuf: isBuf, 407 callback: cb, 408 next: null 409 }; 410 if (last) { 411 last.next = state.lastBufferedRequest; 412 } else { 413 state.bufferedRequest = state.lastBufferedRequest; 414 } 415 state.bufferedRequestCount += 1; 416 } else { 417 doWrite(stream, state, false, len, chunk, encoding, cb); 418 } 419 420 return ret; 421 } 422 423 function doWrite(stream, state, writev, len, chunk, encoding, cb) { 424 state.writelen = len; 425 state.writecb = cb; 426 state.writing = true; 427 state.sync = true; 428 if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite); 429 state.sync = false; 430 } 431 432 function onwriteError(stream, state, sync, er, cb) { 433 --state.pendingcb; 434 435 if (sync) { 436 // defer the callback if we are being called synchronously 437 // to avoid piling up things on the stack 438 pna.nextTick(cb, er); 439 // this can emit finish, and it will always happen 440 // after error 441 pna.nextTick(finishMaybe, stream, state); 442 stream._writableState.errorEmitted = true; 443 stream.emit('error', er); 444 } else { 445 // the caller expect this to happen before if 446 // it is async 447 cb(er); 448 stream._writableState.errorEmitted = true; 449 stream.emit('error', er); 450 // this can emit finish, but finish must 451 // always follow error 452 finishMaybe(stream, state); 453 } 454 } 455 456 function onwriteStateUpdate(state) { 457 state.writing = false; 458 state.writecb = null; 459 state.length -= state.writelen; 460 state.writelen = 0; 461 } 462 463 function onwrite(stream, er) { 464 var state = stream._writableState; 465 var sync = state.sync; 466 var cb = state.writecb; 467 468 onwriteStateUpdate(state); 469 470 if (er) onwriteError(stream, state, sync, er, cb);else { 471 // Check if we're actually ready to finish, but don't emit yet 472 var finished = needFinish(state); 473 474 if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { 475 clearBuffer(stream, state); 476 } 477 478 if (sync) { 479 /*<replacement>*/ 480 asyncWrite(afterWrite, stream, state, finished, cb); 481 /*</replacement>*/ 482 } else { 483 afterWrite(stream, state, finished, cb); 484 } 485 } 486 } 487 488 function afterWrite(stream, state, finished, cb) { 489 if (!finished) onwriteDrain(stream, state); 490 state.pendingcb--; 491 cb(); 492 finishMaybe(stream, state); 493 } 494 495 // Must force callback to be called on nextTick, so that we don't 496 // emit 'drain' before the write() consumer gets the 'false' return 497 // value, and has a chance to attach a 'drain' listener. 498 function onwriteDrain(stream, state) { 499 if (state.length === 0 && state.needDrain) { 500 state.needDrain = false; 501 stream.emit('drain'); 502 } 503 } 504 505 // if there's something in the buffer waiting, then process it 506 function clearBuffer(stream, state) { 507 state.bufferProcessing = true; 508 var entry = state.bufferedRequest; 509 510 if (stream._writev && entry && entry.next) { 511 // Fast case, write everything using _writev() 512 var l = state.bufferedRequestCount; 513 var buffer = new Array(l); 514 var holder = state.corkedRequestsFree; 515 holder.entry = entry; 516 517 var count = 0; 518 var allBuffers = true; 519 while (entry) { 520 buffer[count] = entry; 521 if (!entry.isBuf) allBuffers = false; 522 entry = entry.next; 523 count += 1; 524 } 525 buffer.allBuffers = allBuffers; 526 527 doWrite(stream, state, true, state.length, buffer, '', holder.finish); 528 529 // doWrite is almost always async, defer these to save a bit of time 530 // as the hot path ends with doWrite 531 state.pendingcb++; 532 state.lastBufferedRequest = null; 533 if (holder.next) { 534 state.corkedRequestsFree = holder.next; 535 holder.next = null; 536 } else { 537 state.corkedRequestsFree = new CorkedRequest(state); 538 } 539 state.bufferedRequestCount = 0; 540 } else { 541 // Slow case, write chunks one-by-one 542 while (entry) { 543 var chunk = entry.chunk; 544 var encoding = entry.encoding; 545 var cb = entry.callback; 546 var len = state.objectMode ? 1 : chunk.length; 547 548 doWrite(stream, state, false, len, chunk, encoding, cb); 549 entry = entry.next; 550 state.bufferedRequestCount--; 551 // if we didn't call the onwrite immediately, then 552 // it means that we need to wait until it does. 553 // also, that means that the chunk and cb are currently 554 // being processed, so move the buffer counter past them. 555 if (state.writing) { 556 break; 557 } 558 } 559 560 if (entry === null) state.lastBufferedRequest = null; 561 } 562 563 state.bufferedRequest = entry; 564 state.bufferProcessing = false; 565 } 566 567 Writable.prototype._write = function (chunk, encoding, cb) { 568 cb(new Error('_write() is not implemented')); 569 }; 570 571 Writable.prototype._writev = null; 572 573 Writable.prototype.end = function (chunk, encoding, cb) { 574 var state = this._writableState; 575 576 if (typeof chunk === 'function') { 577 cb = chunk; 578 chunk = null; 579 encoding = null; 580 } else if (typeof encoding === 'function') { 581 cb = encoding; 582 encoding = null; 583 } 584 585 if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); 586 587 // .end() fully uncorks 588 if (state.corked) { 589 state.corked = 1; 590 this.uncork(); 591 } 592 593 // ignore unnecessary end() calls. 594 if (!state.ending && !state.finished) endWritable(this, state, cb); 595 }; 596 597 function needFinish(state) { 598 return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing; 599 } 600 function callFinal(stream, state) { 601 stream._final(function (err) { 602 state.pendingcb--; 603 if (err) { 604 stream.emit('error', err); 605 } 606 state.prefinished = true; 607 stream.emit('prefinish'); 608 finishMaybe(stream, state); 609 }); 610 } 611 function prefinish(stream, state) { 612 if (!state.prefinished && !state.finalCalled) { 613 if (typeof stream._final === 'function') { 614 state.pendingcb++; 615 state.finalCalled = true; 616 pna.nextTick(callFinal, stream, state); 617 } else { 618 state.prefinished = true; 619 stream.emit('prefinish'); 620 } 621 } 622 } 623 624 function finishMaybe(stream, state) { 625 var need = needFinish(state); 626 if (need) { 627 prefinish(stream, state); 628 if (state.pendingcb === 0) { 629 state.finished = true; 630 stream.emit('finish'); 631 } 632 } 633 return need; 634 } 635 636 function endWritable(stream, state, cb) { 637 state.ending = true; 638 finishMaybe(stream, state); 639 if (cb) { 640 if (state.finished) pna.nextTick(cb);else stream.once('finish', cb); 641 } 642 state.ended = true; 643 stream.writable = false; 644 } 645 646 function onCorkedFinish(corkReq, state, err) { 647 var entry = corkReq.entry; 648 corkReq.entry = null; 649 while (entry) { 650 var cb = entry.callback; 651 state.pendingcb--; 652 cb(err); 653 entry = entry.next; 654 } 655 if (state.corkedRequestsFree) { 656 state.corkedRequestsFree.next = corkReq; 657 } else { 658 state.corkedRequestsFree = corkReq; 659 } 660 } 661 662 Object.defineProperty(Writable.prototype, 'destroyed', { 663 get: function () { 664 if (this._writableState === undefined) { 665 return false; 666 } 667 return this._writableState.destroyed; 668 }, 669 set: function (value) { 670 // we ignore the value if the stream 671 // has not been initialized yet 672 if (!this._writableState) { 673 return; 674 } 675 676 // backward compatibility, the user is explicitly 677 // managing destroyed 678 this._writableState.destroyed = value; 679 } 680 }); 681 682 Writable.prototype.destroy = destroyImpl.destroy; 683 Writable.prototype._undestroy = destroyImpl.undestroy; 684 Writable.prototype._destroy = function (err, cb) { 685 this.end(); 686 cb(err); 687 };