aboutsummaryrefslogtreecommitdiff
path: root/node_modules/fastq/queue.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/fastq/queue.js')
-rw-r--r--node_modules/fastq/queue.js269
1 files changed, 269 insertions, 0 deletions
diff --git a/node_modules/fastq/queue.js b/node_modules/fastq/queue.js
new file mode 100644
index 0000000..e20ff87
--- /dev/null
+++ b/node_modules/fastq/queue.js
@@ -0,0 +1,269 @@
+'use strict'
+
+/* eslint-disable no-var */
+
+var reusify = require('reusify')
+
+function fastqueue (context, worker, concurrency) {
+ if (typeof context === 'function') {
+ concurrency = worker
+ worker = context
+ context = null
+ }
+
+ if (concurrency < 1) {
+ throw new Error('fastqueue concurrency must be greater than 1')
+ }
+
+ var cache = reusify(Task)
+ var queueHead = null
+ var queueTail = null
+ var _running = 0
+ var errorHandler = null
+
+ var self = {
+ push: push,
+ drain: noop,
+ saturated: noop,
+ pause: pause,
+ paused: false,
+ concurrency: concurrency,
+ running: running,
+ resume: resume,
+ idle: idle,
+ length: length,
+ getQueue: getQueue,
+ unshift: unshift,
+ empty: noop,
+ kill: kill,
+ killAndDrain: killAndDrain,
+ error: error
+ }
+
+ return self
+
+ function running () {
+ return _running
+ }
+
+ function pause () {
+ self.paused = true
+ }
+
+ function length () {
+ var current = queueHead
+ var counter = 0
+
+ while (current) {
+ current = current.next
+ counter++
+ }
+
+ return counter
+ }
+
+ function getQueue () {
+ var current = queueHead
+ var tasks = []
+
+ while (current) {
+ tasks.push(current.value)
+ current = current.next
+ }
+
+ return tasks
+ }
+
+ function resume () {
+ if (!self.paused) return
+ self.paused = false
+ for (var i = 0; i < self.concurrency; i++) {
+ _running++
+ release()
+ }
+ }
+
+ function idle () {
+ return _running === 0 && self.length() === 0
+ }
+
+ function push (value, done) {
+ var current = cache.get()
+
+ current.context = context
+ current.release = release
+ current.value = value
+ current.callback = done || noop
+ current.errorHandler = errorHandler
+
+ if (_running === self.concurrency || self.paused) {
+ if (queueTail) {
+ queueTail.next = current
+ queueTail = current
+ } else {
+ queueHead = current
+ queueTail = current
+ self.saturated()
+ }
+ } else {
+ _running++
+ worker.call(context, current.value, current.worked)
+ }
+ }
+
+ function unshift (value, done) {
+ var current = cache.get()
+
+ current.context = context
+ current.release = release
+ current.value = value
+ current.callback = done || noop
+
+ if (_running === self.concurrency || self.paused) {
+ if (queueHead) {
+ current.next = queueHead
+ queueHead = current
+ } else {
+ queueHead = current
+ queueTail = current
+ self.saturated()
+ }
+ } else {
+ _running++
+ worker.call(context, current.value, current.worked)
+ }
+ }
+
+ function release (holder) {
+ if (holder) {
+ cache.release(holder)
+ }
+ var next = queueHead
+ if (next) {
+ if (!self.paused) {
+ if (queueTail === queueHead) {
+ queueTail = null
+ }
+ queueHead = next.next
+ next.next = null
+ worker.call(context, next.value, next.worked)
+ if (queueTail === null) {
+ self.empty()
+ }
+ } else {
+ _running--
+ }
+ } else if (--_running === 0) {
+ self.drain()
+ }
+ }
+
+ function kill () {
+ queueHead = null
+ queueTail = null
+ self.drain = noop
+ }
+
+ function killAndDrain () {
+ queueHead = null
+ queueTail = null
+ self.drain()
+ self.drain = noop
+ }
+
+ function error (handler) {
+ errorHandler = handler
+ }
+}
+
+function noop () {}
+
+function Task () {
+ this.value = null
+ this.callback = noop
+ this.next = null
+ this.release = noop
+ this.context = null
+ this.errorHandler = null
+
+ var self = this
+
+ this.worked = function worked (err, result) {
+ var callback = self.callback
+ var errorHandler = self.errorHandler
+ var val = self.value
+ self.value = null
+ self.callback = noop
+ if (self.errorHandler) {
+ errorHandler(err, val)
+ }
+ callback.call(self.context, err, result)
+ self.release(self)
+ }
+}
+
+function queueAsPromised (context, worker, concurrency) {
+ if (typeof context === 'function') {
+ concurrency = worker
+ worker = context
+ context = null
+ }
+
+ function asyncWrapper (arg, cb) {
+ worker.call(this, arg)
+ .then(function (res) {
+ cb(null, res)
+ }, cb)
+ }
+
+ var queue = fastqueue(context, asyncWrapper, concurrency)
+
+ var pushCb = queue.push
+ var unshiftCb = queue.unshift
+
+ queue.push = push
+ queue.unshift = unshift
+
+ return queue
+
+ function push (value) {
+ var p = new Promise(function (resolve, reject) {
+ pushCb(value, function (err, result) {
+ if (err) {
+ reject(err)
+ return
+ }
+ resolve(result)
+ })
+ })
+
+ // Let's fork the promise chain to
+ // make the error bubble up to the user but
+ // not lead to a unhandledRejection
+ p.catch(noop)
+
+ return p
+ }
+
+ function unshift (value) {
+ var p = new Promise(function (resolve, reject) {
+ unshiftCb(value, function (err, result) {
+ if (err) {
+ reject(err)
+ return
+ }
+ resolve(result)
+ })
+ })
+
+ // Let's fork the promise chain to
+ // make the error bubble up to the user but
+ // not lead to a unhandledRejection
+ p.catch(noop)
+
+ return p
+ }
+}
+
+module.exports = fastqueue
+module.exports.promise = queueAsPromised