diff options
author | RaindropsSys <raindrops@equestria.dev> | 2023-11-17 23:25:29 +0100 |
---|---|---|
committer | RaindropsSys <raindrops@equestria.dev> | 2023-11-17 23:25:29 +0100 |
commit | 953ddd82e48dd206cef5ac94456549aed13b3ad5 (patch) | |
tree | 8f003106ee2e7f422e5a22d2ee04d0db302e66c0 /includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts | |
parent | 62a9199846b0c07c03218703b33e8385764f42d9 (diff) | |
download | pluralconnect-953ddd82e48dd206cef5ac94456549aed13b3ad5.tar.gz pluralconnect-953ddd82e48dd206cef5ac94456549aed13b3ad5.tar.bz2 pluralconnect-953ddd82e48dd206cef5ac94456549aed13b3ad5.zip |
Updated 30 files and deleted 2976 files (automated)
Diffstat (limited to 'includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts')
-rw-r--r-- | includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts | 148 |
1 files changed, 0 insertions, 148 deletions
diff --git a/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts b/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts deleted file mode 100644 index 59eada4..0000000 --- a/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts +++ /dev/null @@ -1,148 +0,0 @@ -/* -Copyright 2022 The Matrix.org Foundation C.I.C. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -import { ToDeviceMessageId } from "./@types/event"; -import { logger } from "./logger"; -import { MatrixClient, ClientEvent } from "./client"; -import { MatrixError } from "./http-api"; -import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; -import { MatrixScheduler } from "./scheduler"; -import { SyncState } from "./sync"; -import { MapWithDefault } from "./utils"; - -const MAX_BATCH_SIZE = 20; - -/** - * Maintains a queue of outgoing to-device messages, sending them - * as soon as the homeserver is reachable. - */ -export class ToDeviceMessageQueue { - private sending = false; - private running = true; - private retryTimeout: ReturnType<typeof setTimeout> | null = null; - private retryAttempts = 0; - - public constructor(private client: MatrixClient) {} - - public start(): void { - this.running = true; - this.sendQueue(); - this.client.on(ClientEvent.Sync, this.onResumedSync); - } - - public stop(): void { - this.running = false; - if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); - this.retryTimeout = null; - this.client.removeListener(ClientEvent.Sync, this.onResumedSync); - } - - public async queueBatch(batch: ToDeviceBatch): Promise<void> { - const batches: ToDeviceBatchWithTxnId[] = []; - for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { - const batchWithTxnId = { - eventType: batch.eventType, - batch: batch.batch.slice(i, i + MAX_BATCH_SIZE), - txnId: this.client.makeTxnId(), - }; - batches.push(batchWithTxnId); - const msgmap = batchWithTxnId.batch.map( - (msg) => `${msg.userId}/${msg.deviceId} (msgid ${msg.payload[ToDeviceMessageId]})`, - ); - logger.info( - `Enqueuing batch of to-device messages. type=${batch.eventType} txnid=${batchWithTxnId.txnId}`, - msgmap, - ); - } - - await this.client.store.saveToDeviceBatches(batches); - this.sendQueue(); - } - - public sendQueue = async (): Promise<void> => { - if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); - this.retryTimeout = null; - - if (this.sending || !this.running) return; - - logger.debug("Attempting to send queued to-device messages"); - - this.sending = true; - let headBatch: IndexedToDeviceBatch | null; - try { - while (this.running) { - headBatch = await this.client.store.getOldestToDeviceBatch(); - if (headBatch === null) break; - await this.sendBatch(headBatch); - await this.client.store.removeToDeviceBatch(headBatch.id); - this.retryAttempts = 0; - } - - // Make sure we're still running after the async tasks: if not, stop. - if (!this.running) return; - - logger.debug("All queued to-device messages sent"); - } catch (e) { - ++this.retryAttempts; - // eslint-disable-next-line @typescript-eslint/naming-convention - // eslint-disable-next-line new-cap - const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, <MatrixError>e); - if (retryDelay === -1) { - // the scheduler function doesn't differentiate between fatal errors and just getting - // bored and giving up for now - if (Math.floor((<MatrixError>e).httpStatus! / 100) === 4) { - logger.error("Fatal error when sending to-device message - dropping to-device batch!", e); - await this.client.store.removeToDeviceBatch(headBatch!.id); - } else { - logger.info("Automatic retry limit reached for to-device messages."); - } - return; - } - - logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e); - this.retryTimeout = setTimeout(this.sendQueue, retryDelay); - } finally { - this.sending = false; - } - }; - - /** - * Attempts to send a batch of to-device messages. - */ - private async sendBatch(batch: IndexedToDeviceBatch): Promise<void> { - const contentMap: MapWithDefault<string, Map<string, ToDevicePayload>> = new MapWithDefault(() => new Map()); - for (const item of batch.batch) { - contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload); - } - - logger.info( - `Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id} and txnId ${batch.txnId}`, - ); - - await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId); - } - - /** - * Listen to sync state changes and automatically resend any pending events - * once syncing is resumed - */ - private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => { - if (state === SyncState.Syncing && oldState !== SyncState.Syncing) { - logger.info(`Resuming queue after resumed sync`); - this.sendQueue(); - } - }; -} |