diff options
author | Minteck <freeziv.ytb@gmail.com> | 2021-03-07 18:29:17 +0100 |
---|---|---|
committer | Minteck <freeziv.ytb@gmail.com> | 2021-03-07 18:29:17 +0100 |
commit | 0f79e708bf07721b73ea41e5d341be08e8ea4dce (patch) | |
tree | f3c63cd6a9f4ef0b26f95eec6a031600232e80c8 /node_modules/peek-readable/lib/index.js | |
download | electrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.tar.gz electrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.tar.bz2 electrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.zip |
Initial commit
Diffstat (limited to 'node_modules/peek-readable/lib/index.js')
-rw-r--r-- | node_modules/peek-readable/lib/index.js | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/node_modules/peek-readable/lib/index.js b/node_modules/peek-readable/lib/index.js new file mode 100644 index 0000000..00dced4 --- /dev/null +++ b/node_modules/peek-readable/lib/index.js @@ -0,0 +1,137 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.StreamReader = exports.EndOfStreamError = void 0; +const EndOfFileStream_1 = require("./EndOfFileStream"); +var EndOfFileStream_2 = require("./EndOfFileStream"); +Object.defineProperty(exports, "EndOfStreamError", { enumerable: true, get: function () { return EndOfFileStream_2.EndOfStreamError; } }); +class Deferred { + constructor() { + this.promise = new Promise((resolve, reject) => { + this.reject = reject; + this.resolve = resolve; + }); + } +} +const maxStreamReadSize = 1 * 1024 * 1024; // Maximum request length on read-stream operation +class StreamReader { + constructor(s) { + this.s = s; + this.endOfStream = false; + /** + * Store peeked data + * @type {Array} + */ + this.peekQueue = []; + if (!s.read || !s.once) { + throw new Error('Expected an instance of stream.Readable'); + } + this.s.once('end', () => this.reject(new EndOfFileStream_1.EndOfStreamError())); + this.s.once('error', err => this.reject(err)); + this.s.once('close', () => this.reject(new Error('Stream closed'))); + } + /** + * Read ahead (peek) from stream. Subsequent read or peeks will return the same data + * @param buffer - Buffer to store data read from stream in + * @param offset - Offset buffer + * @param length - Number of bytes to read + * @returns Number of bytes peeked + */ + async peek(buffer, offset, length) { + const bytesRead = await this.read(buffer, offset, length); + this.peekQueue.push(buffer.slice(offset, offset + bytesRead)); // Put read data back to peek buffer + return bytesRead; + } + /** + * Read chunk from stream + * @param buffer - Target buffer to store data read from stream in + * @param offset - Offset of target buffer + * @param length - Number of bytes to read + * @returns Number of bytes read + */ + async read(buffer, offset, length) { + if (length === 0) { + return 0; + } + if (this.peekQueue.length === 0 && this.endOfStream) { + throw new EndOfFileStream_1.EndOfStreamError(); + } + let remaining = length; + let bytesRead = 0; + // consume peeked data first + while (this.peekQueue.length > 0 && remaining > 0) { + const peekData = this.peekQueue.pop(); // Front of queue + const lenCopy = Math.min(peekData.length, remaining); + peekData.copy(buffer, offset + bytesRead, 0, lenCopy); + bytesRead += lenCopy; + remaining -= lenCopy; + if (lenCopy < peekData.length) { + // remainder back to queue + this.peekQueue.push(peekData.slice(lenCopy)); + } + } + // continue reading from stream if required + while (remaining > 0 && !this.endOfStream) { + const reqLen = Math.min(remaining, maxStreamReadSize); + const chunkLen = await this._read(buffer, offset + bytesRead, reqLen); + bytesRead += chunkLen; + if (chunkLen < reqLen) + break; + remaining -= chunkLen; + } + return bytesRead; + } + /** + * Read chunk from stream + * @param buffer Buffer to store data read from stream in + * @param offset Offset buffer + * @param length Number of bytes to read + * @returns Number of bytes read + */ + async _read(buffer, offset, length) { + if (this.request) + throw new Error('Concurrent read operation?'); + const readBuffer = this.s.read(length); + if (readBuffer) { + readBuffer.copy(buffer, offset); + return readBuffer.length; + } + else { + this.request = { + buffer, + offset, + length, + deferred: new Deferred() + }; + this.s.once('readable', () => { + this.tryRead(); + }); + return this.request.deferred.promise.then(n => { + this.request = null; + return n; + }, err => { + this.request = null; + throw err; + }); + } + } + tryRead() { + const readBuffer = this.s.read(this.request.length); + if (readBuffer) { + readBuffer.copy(this.request.buffer, this.request.offset); + this.request.deferred.resolve(readBuffer.length); + } + else { + this.s.once('readable', () => { + this.tryRead(); + }); + } + } + reject(err) { + this.endOfStream = true; + if (this.request) { + this.request.deferred.reject(err); + this.request = null; + } + } +} +exports.StreamReader = StreamReader; |