aboutsummaryrefslogtreecommitdiff
path: root/node_modules/peek-readable/lib/index.js
diff options
context:
space:
mode:
authorMinteck <freeziv.ytb@gmail.com>2021-03-07 18:29:17 +0100
committerMinteck <freeziv.ytb@gmail.com>2021-03-07 18:29:17 +0100
commit0f79e708bf07721b73ea41e5d341be08e8ea4dce (patch)
treef3c63cd6a9f4ef0b26f95eec6a031600232e80c8 /node_modules/peek-readable/lib/index.js
downloadelectrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.tar.gz
electrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.tar.bz2
electrode-0f79e708bf07721b73ea41e5d341be08e8ea4dce.zip
Initial commit
Diffstat (limited to 'node_modules/peek-readable/lib/index.js')
-rw-r--r--node_modules/peek-readable/lib/index.js137
1 files changed, 137 insertions, 0 deletions
diff --git a/node_modules/peek-readable/lib/index.js b/node_modules/peek-readable/lib/index.js
new file mode 100644
index 0000000..00dced4
--- /dev/null
+++ b/node_modules/peek-readable/lib/index.js
@@ -0,0 +1,137 @@
+"use strict";
+Object.defineProperty(exports, "__esModule", { value: true });
+exports.StreamReader = exports.EndOfStreamError = void 0;
+const EndOfFileStream_1 = require("./EndOfFileStream");
+var EndOfFileStream_2 = require("./EndOfFileStream");
+Object.defineProperty(exports, "EndOfStreamError", { enumerable: true, get: function () { return EndOfFileStream_2.EndOfStreamError; } });
+class Deferred {
+ constructor() {
+ this.promise = new Promise((resolve, reject) => {
+ this.reject = reject;
+ this.resolve = resolve;
+ });
+ }
+}
+const maxStreamReadSize = 1 * 1024 * 1024; // Maximum request length on read-stream operation
+class StreamReader {
+ constructor(s) {
+ this.s = s;
+ this.endOfStream = false;
+ /**
+ * Store peeked data
+ * @type {Array}
+ */
+ this.peekQueue = [];
+ if (!s.read || !s.once) {
+ throw new Error('Expected an instance of stream.Readable');
+ }
+ this.s.once('end', () => this.reject(new EndOfFileStream_1.EndOfStreamError()));
+ this.s.once('error', err => this.reject(err));
+ this.s.once('close', () => this.reject(new Error('Stream closed')));
+ }
+ /**
+ * Read ahead (peek) from stream. Subsequent read or peeks will return the same data
+ * @param buffer - Buffer to store data read from stream in
+ * @param offset - Offset buffer
+ * @param length - Number of bytes to read
+ * @returns Number of bytes peeked
+ */
+ async peek(buffer, offset, length) {
+ const bytesRead = await this.read(buffer, offset, length);
+ this.peekQueue.push(buffer.slice(offset, offset + bytesRead)); // Put read data back to peek buffer
+ return bytesRead;
+ }
+ /**
+ * Read chunk from stream
+ * @param buffer - Target buffer to store data read from stream in
+ * @param offset - Offset of target buffer
+ * @param length - Number of bytes to read
+ * @returns Number of bytes read
+ */
+ async read(buffer, offset, length) {
+ if (length === 0) {
+ return 0;
+ }
+ if (this.peekQueue.length === 0 && this.endOfStream) {
+ throw new EndOfFileStream_1.EndOfStreamError();
+ }
+ let remaining = length;
+ let bytesRead = 0;
+ // consume peeked data first
+ while (this.peekQueue.length > 0 && remaining > 0) {
+ const peekData = this.peekQueue.pop(); // Front of queue
+ const lenCopy = Math.min(peekData.length, remaining);
+ peekData.copy(buffer, offset + bytesRead, 0, lenCopy);
+ bytesRead += lenCopy;
+ remaining -= lenCopy;
+ if (lenCopy < peekData.length) {
+ // remainder back to queue
+ this.peekQueue.push(peekData.slice(lenCopy));
+ }
+ }
+ // continue reading from stream if required
+ while (remaining > 0 && !this.endOfStream) {
+ const reqLen = Math.min(remaining, maxStreamReadSize);
+ const chunkLen = await this._read(buffer, offset + bytesRead, reqLen);
+ bytesRead += chunkLen;
+ if (chunkLen < reqLen)
+ break;
+ remaining -= chunkLen;
+ }
+ return bytesRead;
+ }
+ /**
+ * Read chunk from stream
+ * @param buffer Buffer to store data read from stream in
+ * @param offset Offset buffer
+ * @param length Number of bytes to read
+ * @returns Number of bytes read
+ */
+ async _read(buffer, offset, length) {
+ if (this.request)
+ throw new Error('Concurrent read operation?');
+ const readBuffer = this.s.read(length);
+ if (readBuffer) {
+ readBuffer.copy(buffer, offset);
+ return readBuffer.length;
+ }
+ else {
+ this.request = {
+ buffer,
+ offset,
+ length,
+ deferred: new Deferred()
+ };
+ this.s.once('readable', () => {
+ this.tryRead();
+ });
+ return this.request.deferred.promise.then(n => {
+ this.request = null;
+ return n;
+ }, err => {
+ this.request = null;
+ throw err;
+ });
+ }
+ }
+ tryRead() {
+ const readBuffer = this.s.read(this.request.length);
+ if (readBuffer) {
+ readBuffer.copy(this.request.buffer, this.request.offset);
+ this.request.deferred.resolve(readBuffer.length);
+ }
+ else {
+ this.s.once('readable', () => {
+ this.tryRead();
+ });
+ }
+ }
+ reject(err) {
+ this.endOfStream = true;
+ if (this.request) {
+ this.request.deferred.reject(err);
+ this.request = null;
+ }
+ }
+}
+exports.StreamReader = StreamReader;