aboutsummaryrefslogtreecommitdiff
path: root/node_modules/multistream/index.js
diff options
context:
space:
mode:
authorMinteck <nekostarfan@gmail.com>2021-08-24 14:41:48 +0200
committerMinteck <nekostarfan@gmail.com>2021-08-24 14:41:48 +0200
commitd25e11bee6ca5ca523884da132d18e1400e077b9 (patch)
tree8af39fde19f7ed640a60fb397c7edd647dff1c4c /node_modules/multistream/index.js
downloadkartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.tar.gz
kartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.tar.bz2
kartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.zip
Initial commit
Diffstat (limited to 'node_modules/multistream/index.js')
-rw-r--r--node_modules/multistream/index.js166
1 files changed, 166 insertions, 0 deletions
diff --git a/node_modules/multistream/index.js b/node_modules/multistream/index.js
new file mode 100644
index 0000000..13f03f1
--- /dev/null
+++ b/node_modules/multistream/index.js
@@ -0,0 +1,166 @@
+/*! multistream. MIT License. Feross Aboukhadijeh <https://feross.org/opensource> */
+const stream = require('readable-stream')
+const once = require('once')
+
+function toStreams2Obj (s) {
+ return toStreams2(s, { objectMode: true, highWaterMark: 16 })
+}
+
+function toStreams2Buf (s) {
+ return toStreams2(s)
+}
+
+function toStreams2 (s, opts) {
+ if (!s || typeof s === 'function' || s._readableState) return s
+
+ const wrap = new stream.Readable(opts).wrap(s)
+ if (s.destroy) {
+ wrap.destroy = s.destroy.bind(s)
+ }
+ return wrap
+}
+
+class MultiStream extends stream.Readable {
+ constructor (streams, opts) {
+ super({ ...opts, autoDestroy: true })
+
+ this._drained = false
+ this._forwarding = false
+ this._current = null
+ this._toStreams2 = (opts && opts.objectMode) ? toStreams2Obj : toStreams2Buf
+
+ if (typeof streams === 'function') {
+ this._queue = streams
+ } else {
+ this._queue = streams.map(this._toStreams2)
+ this._queue.forEach(stream => {
+ if (typeof stream !== 'function') this._attachErrorListener(stream)
+ })
+ }
+
+ this._next()
+ }
+
+ _read () {
+ this._drained = true
+ this._forward()
+ }
+
+ _forward () {
+ if (this._forwarding || !this._drained || !this._current) return
+ this._forwarding = true
+
+ let chunk
+ while (this._drained && (chunk = this._current.read()) !== null) {
+ this._drained = this.push(chunk)
+ }
+
+ this._forwarding = false
+ }
+
+ _destroy (err, cb) {
+ let streams = []
+ if (this._current) streams.push(this._current)
+ if (typeof this._queue !== 'function') streams = streams.concat(this._queue)
+
+ if (streams.length === 0) {
+ cb(err)
+ } else {
+ let counter = streams.length
+ let er = err
+ streams.forEach(stream => {
+ destroy(stream, err, err => {
+ er = er || err
+ if (--counter === 0) {
+ cb(er)
+ }
+ })
+ })
+ }
+ }
+
+ _next () {
+ this._current = null
+
+ if (typeof this._queue === 'function') {
+ this._queue((err, stream) => {
+ if (err) return this.destroy(err)
+ stream = this._toStreams2(stream)
+ this._attachErrorListener(stream)
+ this._gotNextStream(stream)
+ })
+ } else {
+ let stream = this._queue.shift()
+ if (typeof stream === 'function') {
+ stream = this._toStreams2(stream())
+ this._attachErrorListener(stream)
+ }
+ this._gotNextStream(stream)
+ }
+ }
+
+ _gotNextStream (stream) {
+ if (!stream) {
+ this.push(null)
+ return
+ }
+
+ this._current = stream
+ this._forward()
+
+ const onReadable = () => {
+ this._forward()
+ }
+
+ const onClose = () => {
+ if (!stream._readableState.ended && !stream.destroyed) {
+ const err = new Error('ERR_STREAM_PREMATURE_CLOSE')
+ err.code = 'ERR_STREAM_PREMATURE_CLOSE'
+ this.destroy(err)
+ }
+ }
+
+ const onEnd = () => {
+ this._current = null
+ stream.removeListener('readable', onReadable)
+ stream.removeListener('end', onEnd)
+ stream.removeListener('close', onClose)
+ stream.destroy()
+ this._next()
+ }
+
+ stream.on('readable', onReadable)
+ stream.once('end', onEnd)
+ stream.once('close', onClose)
+ }
+
+ _attachErrorListener (stream) {
+ if (!stream) return
+
+ const onError = (err) => {
+ stream.removeListener('error', onError)
+ this.destroy(err)
+ }
+
+ stream.once('error', onError)
+ }
+}
+
+MultiStream.obj = streams => (
+ new MultiStream(streams, { objectMode: true, highWaterMark: 16 })
+)
+
+module.exports = MultiStream
+
+// Normalize stream destroy w/ callback.
+function destroy (stream, err, cb) {
+ if (!stream.destroy || stream.destroyed) {
+ cb(err)
+ } else {
+ const callback = once(er => cb(er || err))
+ stream
+ .on('error', callback)
+ .on('close', () => callback())
+ .destroy(err, callback)
+ }
+}