diff options
author | Minteck <nekostarfan@gmail.com> | 2021-08-24 14:41:48 +0200 |
---|---|---|
committer | Minteck <nekostarfan@gmail.com> | 2021-08-24 14:41:48 +0200 |
commit | d25e11bee6ca5ca523884da132d18e1400e077b9 (patch) | |
tree | 8af39fde19f7ed640a60fb397c7edd647dff1c4c /node_modules/p-event/index.js | |
download | kartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.tar.gz kartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.tar.bz2 kartik-iridium-d25e11bee6ca5ca523884da132d18e1400e077b9.zip |
Initial commit
Diffstat (limited to 'node_modules/p-event/index.js')
-rw-r--r-- | node_modules/p-event/index.js | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/node_modules/p-event/index.js b/node_modules/p-event/index.js new file mode 100644 index 0000000..23ee421 --- /dev/null +++ b/node_modules/p-event/index.js @@ -0,0 +1,272 @@ +'use strict'; +const pTimeout = require('p-timeout'); + +const symbolAsyncIterator = Symbol.asyncIterator || '@@asyncIterator'; + +const normalizeEmitter = emitter => { + const addListener = emitter.on || emitter.addListener || emitter.addEventListener; + const removeListener = emitter.off || emitter.removeListener || emitter.removeEventListener; + + if (!addListener || !removeListener) { + throw new TypeError('Emitter is not compatible'); + } + + return { + addListener: addListener.bind(emitter), + removeListener: removeListener.bind(emitter) + }; +}; + +const normalizeEvents = event => Array.isArray(event) ? event : [event]; + +const multiple = (emitter, event, options) => { + let cancel; + const ret = new Promise((resolve, reject) => { + options = Object.assign({ + rejectionEvents: ['error'], + multiArgs: false, + resolveImmediately: false + }, options); + + if (!(options.count >= 0 && (options.count === Infinity || Number.isInteger(options.count)))) { + throw new TypeError('The `count` option should be at least 0 or more'); + } + + // Allow multiple events + const events = normalizeEvents(event); + + const items = []; + const {addListener, removeListener} = normalizeEmitter(emitter); + + const onItem = (...args) => { + const value = options.multiArgs ? args : args[0]; + + if (options.filter && !options.filter(value)) { + return; + } + + items.push(value); + + if (options.count === items.length) { + cancel(); + resolve(items); + } + }; + + const rejectHandler = error => { + cancel(); + reject(error); + }; + + cancel = () => { + for (const event of events) { + removeListener(event, onItem); + } + + for (const rejectionEvent of options.rejectionEvents) { + removeListener(rejectionEvent, rejectHandler); + } + }; + + for (const event of events) { + addListener(event, onItem); + } + + for (const rejectionEvent of options.rejectionEvents) { + addListener(rejectionEvent, rejectHandler); + } + + if (options.resolveImmediately) { + resolve(items); + } + }); + + ret.cancel = cancel; + + if (typeof options.timeout === 'number') { + const timeout = pTimeout(ret, options.timeout); + timeout.cancel = cancel; + return timeout; + } + + return ret; +}; + +module.exports = (emitter, event, options) => { + if (typeof options === 'function') { + options = {filter: options}; + } + + options = Object.assign({}, options, { + count: 1, + resolveImmediately: false + }); + + const arrayPromise = multiple(emitter, event, options); + + const promise = arrayPromise.then(array => array[0]); + promise.cancel = arrayPromise.cancel; + + return promise; +}; + +module.exports.multiple = multiple; + +module.exports.iterator = (emitter, event, options) => { + if (typeof options === 'function') { + options = {filter: options}; + } + + // Allow multiple events + const events = normalizeEvents(event); + + options = Object.assign({ + rejectionEvents: ['error'], + resolutionEvents: [], + limit: Infinity, + multiArgs: false + }, options); + + const {limit} = options; + const isValidLimit = limit >= 0 && (limit === Infinity || Number.isInteger(limit)); + if (!isValidLimit) { + throw new TypeError('The `limit` option should be a non-negative integer or Infinity'); + } + + if (limit === 0) { + // Return an empty async iterator to avoid any further cost + return { + [Symbol.asyncIterator]() { + return this; + }, + next() { + return Promise.resolve({done: true, value: undefined}); + } + }; + } + + let isLimitReached = false; + + const {addListener, removeListener} = normalizeEmitter(emitter); + + let done = false; + let error; + let hasPendingError = false; + const nextQueue = []; + const valueQueue = []; + let eventCount = 0; + + const valueHandler = (...args) => { + eventCount++; + isLimitReached = eventCount === limit; + + const value = options.multiArgs ? args : args[0]; + + if (nextQueue.length > 0) { + const {resolve} = nextQueue.shift(); + + resolve({done: false, value}); + + if (isLimitReached) { + cancel(); + } + + return; + } + + valueQueue.push(value); + + if (isLimitReached) { + cancel(); + } + }; + + const cancel = () => { + done = true; + for (const event of events) { + removeListener(event, valueHandler); + } + + for (const rejectionEvent of options.rejectionEvents) { + removeListener(rejectionEvent, rejectHandler); + } + + for (const resolutionEvent of options.resolutionEvents) { + removeListener(resolutionEvent, resolveHandler); + } + + while (nextQueue.length > 0) { + const {resolve} = nextQueue.shift(); + resolve({done: true, value: undefined}); + } + }; + + const rejectHandler = (...args) => { + error = options.multiArgs ? args : args[0]; + + if (nextQueue.length > 0) { + const {reject} = nextQueue.shift(); + reject(error); + } else { + hasPendingError = true; + } + + cancel(); + }; + + const resolveHandler = (...args) => { + const value = options.multiArgs ? args : args[0]; + + if (options.filter && !options.filter(value)) { + return; + } + + if (nextQueue.length > 0) { + const {resolve} = nextQueue.shift(); + resolve({done: true, value}); + } else { + valueQueue.push(value); + } + + cancel(); + }; + + for (const event of events) { + addListener(event, valueHandler); + } + + for (const rejectionEvent of options.rejectionEvents) { + addListener(rejectionEvent, rejectHandler); + } + + for (const resolutionEvent of options.resolutionEvents) { + addListener(resolutionEvent, resolveHandler); + } + + return { + [symbolAsyncIterator]() { + return this; + }, + next() { + if (valueQueue.length > 0) { + const value = valueQueue.shift(); + return Promise.resolve({done: done && valueQueue.length === 0 && !isLimitReached, value}); + } + + if (hasPendingError) { + hasPendingError = false; + return Promise.reject(error); + } + + if (done) { + return Promise.resolve({done: true, value: undefined}); + } + + return new Promise((resolve, reject) => nextQueue.push({resolve, reject})); + }, + return(value) { + cancel(); + return Promise.resolve({done, value}); + } + }; +}; |