aboutsummaryrefslogtreecommitdiff
path: root/node_modules/merge2/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/merge2/index.js')
-rw-r--r--node_modules/merge2/index.js144
1 files changed, 144 insertions, 0 deletions
diff --git a/node_modules/merge2/index.js b/node_modules/merge2/index.js
new file mode 100644
index 0000000..78a61ed
--- /dev/null
+++ b/node_modules/merge2/index.js
@@ -0,0 +1,144 @@
+'use strict'
+/*
+ * merge2
+ * https://github.com/teambition/merge2
+ *
+ * Copyright (c) 2014-2020 Teambition
+ * Licensed under the MIT license.
+ */
+const Stream = require('stream')
+const PassThrough = Stream.PassThrough
+const slice = Array.prototype.slice
+
+module.exports = merge2
+
+function merge2 () {
+ const streamsQueue = []
+ const args = slice.call(arguments)
+ let merging = false
+ let options = args[args.length - 1]
+
+ if (options && !Array.isArray(options) && options.pipe == null) {
+ args.pop()
+ } else {
+ options = {}
+ }
+
+ const doEnd = options.end !== false
+ const doPipeError = options.pipeError === true
+ if (options.objectMode == null) {
+ options.objectMode = true
+ }
+ if (options.highWaterMark == null) {
+ options.highWaterMark = 64 * 1024
+ }
+ const mergedStream = PassThrough(options)
+
+ function addStream () {
+ for (let i = 0, len = arguments.length; i < len; i++) {
+ streamsQueue.push(pauseStreams(arguments[i], options))
+ }
+ mergeStream()
+ return this
+ }
+
+ function mergeStream () {
+ if (merging) {
+ return
+ }
+ merging = true
+
+ let streams = streamsQueue.shift()
+ if (!streams) {
+ process.nextTick(endStream)
+ return
+ }
+ if (!Array.isArray(streams)) {
+ streams = [streams]
+ }
+
+ let pipesCount = streams.length + 1
+
+ function next () {
+ if (--pipesCount > 0) {
+ return
+ }
+ merging = false
+ mergeStream()
+ }
+
+ function pipe (stream) {
+ function onend () {
+ stream.removeListener('merge2UnpipeEnd', onend)
+ stream.removeListener('end', onend)
+ if (doPipeError) {
+ stream.removeListener('error', onerror)
+ }
+ next()
+ }
+ function onerror (err) {
+ mergedStream.emit('error', err)
+ }
+ // skip ended stream
+ if (stream._readableState.endEmitted) {
+ return next()
+ }
+
+ stream.on('merge2UnpipeEnd', onend)
+ stream.on('end', onend)
+
+ if (doPipeError) {
+ stream.on('error', onerror)
+ }
+
+ stream.pipe(mergedStream, { end: false })
+ // compatible for old stream
+ stream.resume()
+ }
+
+ for (let i = 0; i < streams.length; i++) {
+ pipe(streams[i])
+ }
+
+ next()
+ }
+
+ function endStream () {
+ merging = false
+ // emit 'queueDrain' when all streams merged.
+ mergedStream.emit('queueDrain')
+ if (doEnd) {
+ mergedStream.end()
+ }
+ }
+
+ mergedStream.setMaxListeners(0)
+ mergedStream.add = addStream
+ mergedStream.on('unpipe', function (stream) {
+ stream.emit('merge2UnpipeEnd')
+ })
+
+ if (args.length) {
+ addStream.apply(null, args)
+ }
+ return mergedStream
+}
+
+// check and pause streams for pipe.
+function pauseStreams (streams, options) {
+ if (!Array.isArray(streams)) {
+ // Backwards-compat with old-style streams
+ if (!streams._readableState && streams.pipe) {
+ streams = streams.pipe(PassThrough(options))
+ }
+ if (!streams._readableState || !streams.pause || !streams.pipe) {
+ throw new Error('Only readable stream can be merged.')
+ }
+ streams.pause()
+ } else {
+ for (let i = 0, len = streams.length; i < len; i++) {
+ streams[i] = pauseStreams(streams[i], options)
+ }
+ }
+ return streams
+}