diff options
Diffstat (limited to 'node_modules/fastq/queue.js')
-rw-r--r-- | node_modules/fastq/queue.js | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/node_modules/fastq/queue.js b/node_modules/fastq/queue.js new file mode 100644 index 0000000..e20ff87 --- /dev/null +++ b/node_modules/fastq/queue.js @@ -0,0 +1,269 @@ +'use strict' + +/* eslint-disable no-var */ + +var reusify = require('reusify') + +function fastqueue (context, worker, concurrency) { + if (typeof context === 'function') { + concurrency = worker + worker = context + context = null + } + + if (concurrency < 1) { + throw new Error('fastqueue concurrency must be greater than 1') + } + + var cache = reusify(Task) + var queueHead = null + var queueTail = null + var _running = 0 + var errorHandler = null + + var self = { + push: push, + drain: noop, + saturated: noop, + pause: pause, + paused: false, + concurrency: concurrency, + running: running, + resume: resume, + idle: idle, + length: length, + getQueue: getQueue, + unshift: unshift, + empty: noop, + kill: kill, + killAndDrain: killAndDrain, + error: error + } + + return self + + function running () { + return _running + } + + function pause () { + self.paused = true + } + + function length () { + var current = queueHead + var counter = 0 + + while (current) { + current = current.next + counter++ + } + + return counter + } + + function getQueue () { + var current = queueHead + var tasks = [] + + while (current) { + tasks.push(current.value) + current = current.next + } + + return tasks + } + + function resume () { + if (!self.paused) return + self.paused = false + for (var i = 0; i < self.concurrency; i++) { + _running++ + release() + } + } + + function idle () { + return _running === 0 && self.length() === 0 + } + + function push (value, done) { + var current = cache.get() + + current.context = context + current.release = release + current.value = value + current.callback = done || noop + current.errorHandler = errorHandler + + if (_running === self.concurrency || self.paused) { + if (queueTail) { + queueTail.next = current + queueTail = current + } else { + queueHead = current + queueTail = current + self.saturated() + } + } else { + _running++ + worker.call(context, current.value, current.worked) + } + } + + function unshift (value, done) { + var current = cache.get() + + current.context = context + current.release = release + current.value = value + current.callback = done || noop + + if (_running === self.concurrency || self.paused) { + if (queueHead) { + current.next = queueHead + queueHead = current + } else { + queueHead = current + queueTail = current + self.saturated() + } + } else { + _running++ + worker.call(context, current.value, current.worked) + } + } + + function release (holder) { + if (holder) { + cache.release(holder) + } + var next = queueHead + if (next) { + if (!self.paused) { + if (queueTail === queueHead) { + queueTail = null + } + queueHead = next.next + next.next = null + worker.call(context, next.value, next.worked) + if (queueTail === null) { + self.empty() + } + } else { + _running-- + } + } else if (--_running === 0) { + self.drain() + } + } + + function kill () { + queueHead = null + queueTail = null + self.drain = noop + } + + function killAndDrain () { + queueHead = null + queueTail = null + self.drain() + self.drain = noop + } + + function error (handler) { + errorHandler = handler + } +} + +function noop () {} + +function Task () { + this.value = null + this.callback = noop + this.next = null + this.release = noop + this.context = null + this.errorHandler = null + + var self = this + + this.worked = function worked (err, result) { + var callback = self.callback + var errorHandler = self.errorHandler + var val = self.value + self.value = null + self.callback = noop + if (self.errorHandler) { + errorHandler(err, val) + } + callback.call(self.context, err, result) + self.release(self) + } +} + +function queueAsPromised (context, worker, concurrency) { + if (typeof context === 'function') { + concurrency = worker + worker = context + context = null + } + + function asyncWrapper (arg, cb) { + worker.call(this, arg) + .then(function (res) { + cb(null, res) + }, cb) + } + + var queue = fastqueue(context, asyncWrapper, concurrency) + + var pushCb = queue.push + var unshiftCb = queue.unshift + + queue.push = push + queue.unshift = unshift + + return queue + + function push (value) { + var p = new Promise(function (resolve, reject) { + pushCb(value, function (err, result) { + if (err) { + reject(err) + return + } + resolve(result) + }) + }) + + // Let's fork the promise chain to + // make the error bubble up to the user but + // not lead to a unhandledRejection + p.catch(noop) + + return p + } + + function unshift (value) { + var p = new Promise(function (resolve, reject) { + unshiftCb(value, function (err, result) { + if (err) { + reject(err) + return + } + resolve(result) + }) + }) + + // Let's fork the promise chain to + // make the error bubble up to the user but + // not lead to a unhandledRejection + p.catch(noop) + + return p + } +} + +module.exports = fastqueue +module.exports.promise = queueAsPromised |