summaryrefslogtreecommitdiff
path: root/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts
diff options
context:
space:
mode:
authorRaindropsSys <contact@minteck.org>2023-04-24 14:03:36 +0200
committerRaindropsSys <contact@minteck.org>2023-04-24 14:03:36 +0200
commit633c92eae865e957121e08de634aeee11a8b3992 (patch)
tree09d881bee1dae0b6eee49db1dfaf0f500240606c /includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts
parentc4657e4509733699c0f26a3c900bab47e915d5a0 (diff)
downloadpluralconnect-633c92eae865e957121e08de634aeee11a8b3992.tar.gz
pluralconnect-633c92eae865e957121e08de634aeee11a8b3992.tar.bz2
pluralconnect-633c92eae865e957121e08de634aeee11a8b3992.zip
Updated 18 files, added 1692 files and deleted includes/system/compare.inc (automated)
Diffstat (limited to 'includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts')
-rw-r--r--includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts148
1 files changed, 148 insertions, 0 deletions
diff --git a/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts b/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts
new file mode 100644
index 0000000..59eada4
--- /dev/null
+++ b/includes/external/matrix/node_modules/matrix-js-sdk/src/ToDeviceMessageQueue.ts
@@ -0,0 +1,148 @@
+/*
+Copyright 2022 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import { ToDeviceMessageId } from "./@types/event";
+import { logger } from "./logger";
+import { MatrixClient, ClientEvent } from "./client";
+import { MatrixError } from "./http-api";
+import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage";
+import { MatrixScheduler } from "./scheduler";
+import { SyncState } from "./sync";
+import { MapWithDefault } from "./utils";
+
+const MAX_BATCH_SIZE = 20;
+
+/**
+ * Maintains a queue of outgoing to-device messages, sending them
+ * as soon as the homeserver is reachable.
+ */
+export class ToDeviceMessageQueue {
+ private sending = false;
+ private running = true;
+ private retryTimeout: ReturnType<typeof setTimeout> | null = null;
+ private retryAttempts = 0;
+
+ public constructor(private client: MatrixClient) {}
+
+ public start(): void {
+ this.running = true;
+ this.sendQueue();
+ this.client.on(ClientEvent.Sync, this.onResumedSync);
+ }
+
+ public stop(): void {
+ this.running = false;
+ if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
+ this.retryTimeout = null;
+ this.client.removeListener(ClientEvent.Sync, this.onResumedSync);
+ }
+
+ public async queueBatch(batch: ToDeviceBatch): Promise<void> {
+ const batches: ToDeviceBatchWithTxnId[] = [];
+ for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) {
+ const batchWithTxnId = {
+ eventType: batch.eventType,
+ batch: batch.batch.slice(i, i + MAX_BATCH_SIZE),
+ txnId: this.client.makeTxnId(),
+ };
+ batches.push(batchWithTxnId);
+ const msgmap = batchWithTxnId.batch.map(
+ (msg) => `${msg.userId}/${msg.deviceId} (msgid ${msg.payload[ToDeviceMessageId]})`,
+ );
+ logger.info(
+ `Enqueuing batch of to-device messages. type=${batch.eventType} txnid=${batchWithTxnId.txnId}`,
+ msgmap,
+ );
+ }
+
+ await this.client.store.saveToDeviceBatches(batches);
+ this.sendQueue();
+ }
+
+ public sendQueue = async (): Promise<void> => {
+ if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
+ this.retryTimeout = null;
+
+ if (this.sending || !this.running) return;
+
+ logger.debug("Attempting to send queued to-device messages");
+
+ this.sending = true;
+ let headBatch: IndexedToDeviceBatch | null;
+ try {
+ while (this.running) {
+ headBatch = await this.client.store.getOldestToDeviceBatch();
+ if (headBatch === null) break;
+ await this.sendBatch(headBatch);
+ await this.client.store.removeToDeviceBatch(headBatch.id);
+ this.retryAttempts = 0;
+ }
+
+ // Make sure we're still running after the async tasks: if not, stop.
+ if (!this.running) return;
+
+ logger.debug("All queued to-device messages sent");
+ } catch (e) {
+ ++this.retryAttempts;
+ // eslint-disable-next-line @typescript-eslint/naming-convention
+ // eslint-disable-next-line new-cap
+ const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, <MatrixError>e);
+ if (retryDelay === -1) {
+ // the scheduler function doesn't differentiate between fatal errors and just getting
+ // bored and giving up for now
+ if (Math.floor((<MatrixError>e).httpStatus! / 100) === 4) {
+ logger.error("Fatal error when sending to-device message - dropping to-device batch!", e);
+ await this.client.store.removeToDeviceBatch(headBatch!.id);
+ } else {
+ logger.info("Automatic retry limit reached for to-device messages.");
+ }
+ return;
+ }
+
+ logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e);
+ this.retryTimeout = setTimeout(this.sendQueue, retryDelay);
+ } finally {
+ this.sending = false;
+ }
+ };
+
+ /**
+ * Attempts to send a batch of to-device messages.
+ */
+ private async sendBatch(batch: IndexedToDeviceBatch): Promise<void> {
+ const contentMap: MapWithDefault<string, Map<string, ToDevicePayload>> = new MapWithDefault(() => new Map());
+ for (const item of batch.batch) {
+ contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload);
+ }
+
+ logger.info(
+ `Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id} and txnId ${batch.txnId}`,
+ );
+
+ await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId);
+ }
+
+ /**
+ * Listen to sync state changes and automatically resend any pending events
+ * once syncing is resumed
+ */
+ private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => {
+ if (state === SyncState.Syncing && oldState !== SyncState.Syncing) {
+ logger.info(`Resuming queue after resumed sync`);
+ this.sendQueue();
+ }
+ };
+}