From 953ddd82e48dd206cef5ac94456549aed13b3ad5 Mon Sep 17 00:00:00 2001 From: RaindropsSys Date: Fri, 17 Nov 2023 23:25:29 +0100 Subject: Updated 30 files and deleted 2976 files (automated) --- .../node_modules/matrix-js-sdk/src/sliding-sync.ts | 961 --------------------- 1 file changed, 961 deletions(-) delete mode 100644 includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts (limited to 'includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts') diff --git a/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts b/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts deleted file mode 100644 index dde5f1b..0000000 --- a/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts +++ /dev/null @@ -1,961 +0,0 @@ -/* -Copyright 2022 The Matrix.org Foundation C.I.C. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -import { logger } from "./logger"; -import { MatrixClient } from "./client"; -import { IRoomEvent, IStateEvent } from "./sync-accumulator"; -import { TypedEventEmitter } from "./models/typed-event-emitter"; -import { sleep, IDeferred, defer } from "./utils"; -import { HTTPError } from "./http-api"; - -// /sync requests allow you to set a timeout= but the request may continue -// beyond that and wedge forever, so we need to track how long we are willing -// to keep open the connection. This constant is *ADDED* to the timeout= value -// to determine the max time we're willing to wait. -const BUFFER_PERIOD_MS = 10 * 1000; - -export const MSC3575_WILDCARD = "*"; -export const MSC3575_STATE_KEY_ME = "$ME"; -export const MSC3575_STATE_KEY_LAZY = "$LAZY"; - -/** - * Represents a subscription to a room or set of rooms. Controls which events are returned. - */ -export interface MSC3575RoomSubscription { - required_state?: string[][]; - timeline_limit?: number; - include_old_rooms?: MSC3575RoomSubscription; -} - -/** - * Controls which rooms are returned in a given list. - */ -export interface MSC3575Filter { - is_dm?: boolean; - is_encrypted?: boolean; - is_invite?: boolean; - room_name_like?: string; - room_types?: string[]; - not_room_types?: string[]; - spaces?: string[]; - tags?: string[]; - not_tags?: string[]; -} - -/** - * Represents a list subscription. - */ -export interface MSC3575List extends MSC3575RoomSubscription { - ranges: number[][]; - sort?: string[]; - filters?: MSC3575Filter; - slow_get_all_rooms?: boolean; -} - -/** - * A complete Sliding Sync request. - */ -export interface MSC3575SlidingSyncRequest { - // json body params - lists?: Record; - unsubscribe_rooms?: string[]; - room_subscriptions?: Record; - extensions?: object; - txn_id?: string; - - // query params - pos?: string; - timeout?: number; - clientTimeout?: number; -} - -export interface MSC3575RoomData { - name: string; - required_state: IStateEvent[]; - timeline: (IRoomEvent | IStateEvent)[]; - notification_count?: number; - highlight_count?: number; - joined_count?: number; - invited_count?: number; - invite_state?: IStateEvent[]; - initial?: boolean; - limited?: boolean; - is_dm?: boolean; - prev_batch?: string; - num_live?: number; -} - -interface ListResponse { - count: number; - ops: Operation[]; -} - -interface BaseOperation { - op: string; -} - -interface DeleteOperation extends BaseOperation { - op: "DELETE"; - index: number; -} - -interface InsertOperation extends BaseOperation { - op: "INSERT"; - index: number; - room_id: string; -} - -interface InvalidateOperation extends BaseOperation { - op: "INVALIDATE"; - range: [number, number]; -} - -interface SyncOperation extends BaseOperation { - op: "SYNC"; - range: [number, number]; - room_ids: string[]; -} - -type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncOperation; - -/** - * A complete Sliding Sync response - */ -export interface MSC3575SlidingSyncResponse { - pos: string; - txn_id?: string; - lists: Record; - rooms: Record; - extensions: Record; -} - -export enum SlidingSyncState { - /** - * Fired by SlidingSyncEvent.Lifecycle event immediately before processing the response. - */ - RequestFinished = "FINISHED", - /** - * Fired by SlidingSyncEvent.Lifecycle event immediately after all room data listeners have been - * invoked, but before list listeners. - */ - Complete = "COMPLETE", -} - -/** - * Internal Class. SlidingList represents a single list in sliding sync. The list can have filters, - * multiple sliding windows, and maintains the index-\>room_id mapping. - */ -class SlidingList { - private list!: MSC3575List; - private isModified?: boolean; - - // returned data - public roomIndexToRoomId: Record = {}; - public joinedCount = 0; - - /** - * Construct a new sliding list. - * @param list - The range, sort and filter values to use for this list. - */ - public constructor(list: MSC3575List) { - this.replaceList(list); - } - - /** - * Mark this list as modified or not. Modified lists will return sticky params with calls to getList. - * This is useful for the first time the list is sent, or if the list has changed in some way. - * @param modified - True to mark this list as modified so all sticky parameters will be re-sent. - */ - public setModified(modified: boolean): void { - this.isModified = modified; - } - - /** - * Update the list range for this list. Does not affect modified status as list ranges are non-sticky. - * @param newRanges - The new ranges for the list - */ - public updateListRange(newRanges: number[][]): void { - this.list.ranges = JSON.parse(JSON.stringify(newRanges)); - } - - /** - * Replace list parameters. All fields will be replaced with the new list parameters. - * @param list - The new list parameters - */ - public replaceList(list: MSC3575List): void { - list.filters = list.filters || {}; - list.ranges = list.ranges || []; - this.list = JSON.parse(JSON.stringify(list)); - this.isModified = true; - - // reset values as the join count may be very different (if filters changed) including the rooms - // (e.g. sort orders or sliding window ranges changed) - - // the constantly changing sliding window ranges. Not an array for performance reasons - // E.g. tracking ranges 0-99, 500-599, we don't want to have a 600 element array - this.roomIndexToRoomId = {}; - // the total number of joined rooms according to the server, always >= len(roomIndexToRoomId) - this.joinedCount = 0; - } - - /** - * Return a copy of the list suitable for a request body. - * @param forceIncludeAllParams - True to forcibly include all params even if the list - * hasn't been modified. Callers may want to do this if they are modifying the list prior to calling - * updateList. - */ - public getList(forceIncludeAllParams: boolean): MSC3575List { - let list = { - ranges: JSON.parse(JSON.stringify(this.list.ranges)), - }; - if (this.isModified || forceIncludeAllParams) { - list = JSON.parse(JSON.stringify(this.list)); - } - return list; - } - - /** - * Check if a given index is within the list range. This is required even though the /sync API - * provides explicit updates with index positions because of the following situation: - * 0 1 2 3 4 5 6 7 8 indexes - * a b c d e f COMMANDS: SYNC 0 2 a b c; SYNC 6 8 d e f; - * a b c d _ f COMMAND: DELETE 7; - * e a b c d f COMMAND: INSERT 0 e; - * c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it - * @param i - The index to check - * @returns True if the index is within a sliding window - */ - public isIndexInRange(i: number): boolean { - for (const r of this.list.ranges) { - if (r[0] <= i && i <= r[1]) { - return true; - } - } - return false; - } -} - -/** - * When onResponse extensions should be invoked: before or after processing the main response. - */ -export enum ExtensionState { - // Call onResponse before processing the response body. This is useful when your extension is - // preparing the ground for the response body e.g. processing to-device messages before the - // encrypted event arrives. - PreProcess = "ExtState.PreProcess", - // Call onResponse after processing the response body. This is useful when your extension is - // decorating data from the client, and you rely on MatrixClient.getRoom returning the Room object - // e.g. room account data. - PostProcess = "ExtState.PostProcess", -} - -/** - * An interface that must be satisfied to register extensions - */ -export interface Extension { - /** - * The extension name to go under 'extensions' in the request body. - * @returns The JSON key. - */ - name(): string; - /** - * A function which is called when the request JSON is being formed. - * Returns the data to insert under this key. - * @param isInitial - True when this is part of the initial request (send sticky params) - * @returns The request JSON to send. - */ - onRequest(isInitial: boolean): Req | undefined; - /** - * A function which is called when there is response JSON under this extension. - * @param data - The response JSON under the extension name. - */ - onResponse(data: Res): void; - /** - * Controls when onResponse should be called. - * @returns The state when it should be called. - */ - when(): ExtensionState; -} - -/** - * Events which can be fired by the SlidingSync class. These are designed to provide different levels - * of information when processing sync responses. - * - RoomData: concerns rooms, useful for SlidingSyncSdk to update its knowledge of rooms. - * - Lifecycle: concerns callbacks at various well-defined points in the sync process. - * - List: concerns lists, useful for UI layers to re-render room lists. - * Specifically, the order of event invocation is: - * - Lifecycle (state=RequestFinished) - * - RoomData (N times) - * - Lifecycle (state=Complete) - * - List (at most once per list) - */ -export enum SlidingSyncEvent { - /** - * This event fires when there are updates for a room. Fired as and when rooms are encountered - * in the response. - */ - RoomData = "SlidingSync.RoomData", - /** - * This event fires at various points in the /sync loop lifecycle. - * - SlidingSyncState.RequestFinished: Fires after we receive a valid response but before the - * response has been processed. Perform any pre-process steps here. If there was a problem syncing, - * `err` will be set (e.g network errors). - * - SlidingSyncState.Complete: Fires after all SlidingSyncEvent.RoomData have been fired but before - * SlidingSyncEvent.List. - */ - Lifecycle = "SlidingSync.Lifecycle", - /** - * This event fires whenever there has been a change to this list index. It fires exactly once - * per list, even if there were multiple operations for the list. - * It fires AFTER Lifecycle and RoomData events. - */ - List = "SlidingSync.List", -} - -export type SlidingSyncEventHandlerMap = { - [SlidingSyncEvent.RoomData]: (roomId: string, roomData: MSC3575RoomData) => void; - [SlidingSyncEvent.Lifecycle]: ( - state: SlidingSyncState, - resp: MSC3575SlidingSyncResponse | null, - err?: Error, - ) => void; - [SlidingSyncEvent.List]: (listKey: string, joinedCount: number, roomIndexToRoomId: Record) => void; -}; - -/** - * SlidingSync is a high-level data structure which controls the majority of sliding sync. - * It has no hooks into JS SDK except for needing a MatrixClient to perform the HTTP request. - * This means this class (and everything it uses) can be used in isolation from JS SDK if needed. - * To hook this up with the JS SDK, you need to use SlidingSyncSdk. - */ -export class SlidingSync extends TypedEventEmitter { - private lists: Map; - private listModifiedCount = 0; - private terminated = false; - // flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :( - private needsResend = false; - // the txn_id to send with the next request. - private txnId: string | null = null; - // a list (in chronological order of when they were sent) of objects containing the txn ID and - // a defer to resolve/reject depending on whether they were successfully sent or not. - private txnIdDefers: (IDeferred & { txnId: string })[] = []; - // map of extension name to req/resp handler - private extensions: Record> = {}; - - private desiredRoomSubscriptions = new Set(); // the *desired* room subscriptions - private confirmedRoomSubscriptions = new Set(); - - // map of custom subscription name to the subscription - private customSubscriptions: Map = new Map(); - // map of room ID to custom subscription name - private roomIdToCustomSubscription: Map = new Map(); - - private pendingReq?: Promise; - private abortController?: AbortController; - - /** - * Create a new sliding sync instance - * @param proxyBaseUrl - The base URL of the sliding sync proxy - * @param lists - The lists to use for sliding sync. - * @param roomSubscriptionInfo - The params to use for room subscriptions. - * @param client - The client to use for /sync calls. - * @param timeoutMS - The number of milliseconds to wait for a response. - */ - public constructor( - private readonly proxyBaseUrl: string, - lists: Map, - private roomSubscriptionInfo: MSC3575RoomSubscription, - private readonly client: MatrixClient, - private readonly timeoutMS: number, - ) { - super(); - this.lists = new Map(); - lists.forEach((list, key) => { - this.lists.set(key, new SlidingList(list)); - }); - } - - /** - * Add a custom room subscription, referred to by an arbitrary name. If a subscription with this - * name already exists, it is replaced. No requests are sent by calling this method. - * @param name - The name of the subscription. Only used to reference this subscription in - * useCustomSubscription. - * @param sub - The subscription information. - */ - public addCustomSubscription(name: string, sub: MSC3575RoomSubscription): void { - if (this.customSubscriptions.has(name)) { - logger.warn(`addCustomSubscription: ${name} already exists as a custom subscription, ignoring.`); - return; - } - this.customSubscriptions.set(name, sub); - } - - /** - * Use a custom subscription previously added via addCustomSubscription. No requests are sent - * by calling this method. Use modifyRoomSubscriptions to resend subscription information. - * @param roomId - The room to use the subscription in. - * @param name - The name of the subscription. If this name is unknown, the default subscription - * will be used. - */ - public useCustomSubscription(roomId: string, name: string): void { - // We already know about this custom subscription, as it is immutable, - // we don't need to unconfirm the subscription. - if (this.roomIdToCustomSubscription.get(roomId) === name) { - return; - } - this.roomIdToCustomSubscription.set(roomId, name); - // unconfirm this subscription so a resend() will send it up afresh. - this.confirmedRoomSubscriptions.delete(roomId); - } - - /** - * Get the room index data for a list. - * @param key - The list key - * @returns The list data which contains the rooms in this list - */ - public getListData(key: string): { joinedCount: number; roomIndexToRoomId: Record } | null { - const data = this.lists.get(key); - if (!data) { - return null; - } - return { - joinedCount: data.joinedCount, - roomIndexToRoomId: Object.assign({}, data.roomIndexToRoomId), - }; - } - - /** - * Get the full request list parameters for a list index. This function is provided for callers to use - * in conjunction with setList to update fields on an existing list. - * @param key - The list key to get the params for. - * @returns A copy of the list params or undefined. - */ - public getListParams(key: string): MSC3575List | null { - const params = this.lists.get(key); - if (!params) { - return null; - } - return params.getList(true); - } - - /** - * Set new ranges for an existing list. Calling this function when _only_ the ranges have changed - * is more efficient than calling setList(index,list) as this function won't resend sticky params, - * whereas setList always will. - * @param key - The list key to modify - * @param ranges - The new ranges to apply. - * @returns A promise which resolves to the transaction ID when it has been received down sync - * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled - * immediately after sending, in which case the action will be applied in the subsequent request) - */ - public setListRanges(key: string, ranges: number[][]): Promise { - const list = this.lists.get(key); - if (!list) { - return Promise.reject(new Error("no list with key " + key)); - } - list.updateListRange(ranges); - return this.resend(); - } - - /** - * Add or replace a list. Calling this function will interrupt the /sync request to resend new - * lists. - * @param key - The key to modify - * @param list - The new list parameters. - * @returns A promise which resolves to the transaction ID when it has been received down sync - * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled - * immediately after sending, in which case the action will be applied in the subsequent request) - */ - public setList(key: string, list: MSC3575List): Promise { - const existingList = this.lists.get(key); - if (existingList) { - existingList.replaceList(list); - this.lists.set(key, existingList); - } else { - this.lists.set(key, new SlidingList(list)); - } - this.listModifiedCount += 1; - return this.resend(); - } - - /** - * Get the room subscriptions for the sync API. - * @returns A copy of the desired room subscriptions. - */ - public getRoomSubscriptions(): Set { - return new Set(Array.from(this.desiredRoomSubscriptions)); - } - - /** - * Modify the room subscriptions for the sync API. Calling this function will interrupt the - * /sync request to resend new subscriptions. If the /sync stream has not started, this will - * prepare the room subscriptions for when start() is called. - * @param s - The new desired room subscriptions. - * @returns A promise which resolves to the transaction ID when it has been received down sync - * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled - * immediately after sending, in which case the action will be applied in the subsequent request) - */ - public modifyRoomSubscriptions(s: Set): Promise { - this.desiredRoomSubscriptions = s; - return this.resend(); - } - - /** - * Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions - * such that they will be sent up afresh. - * @param rs - The new room subscription fields to fetch. - * @returns A promise which resolves to the transaction ID when it has been received down sync - * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled - * immediately after sending, in which case the action will be applied in the subsequent request) - */ - public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise { - this.roomSubscriptionInfo = rs; - this.confirmedRoomSubscriptions = new Set(); - return this.resend(); - } - - /** - * Register an extension to send with the /sync request. - * @param ext - The extension to register. - */ - public registerExtension(ext: Extension): void { - if (this.extensions[ext.name()]) { - throw new Error(`registerExtension: ${ext.name()} already exists as an extension`); - } - this.extensions[ext.name()] = ext; - } - - private getExtensionRequest(isInitial: boolean): Record { - const ext: Record = {}; - Object.keys(this.extensions).forEach((extName) => { - ext[extName] = this.extensions[extName].onRequest(isInitial); - }); - return ext; - } - - private onPreExtensionsResponse(ext: Record): void { - Object.keys(ext).forEach((extName) => { - if (this.extensions[extName].when() == ExtensionState.PreProcess) { - this.extensions[extName].onResponse(ext[extName]); - } - }); - } - - private onPostExtensionsResponse(ext: Record): void { - Object.keys(ext).forEach((extName) => { - if (this.extensions[extName].when() == ExtensionState.PostProcess) { - this.extensions[extName].onResponse(ext[extName]); - } - }); - } - - /** - * Invoke all attached room data listeners. - * @param roomId - The room which received some data. - * @param roomData - The raw sliding sync response JSON. - */ - private invokeRoomDataListeners(roomId: string, roomData: MSC3575RoomData): void { - if (!roomData.required_state) { - roomData.required_state = []; - } - if (!roomData.timeline) { - roomData.timeline = []; - } - this.emit(SlidingSyncEvent.RoomData, roomId, roomData); - } - - /** - * Invoke all attached lifecycle listeners. - * @param state - The Lifecycle state - * @param resp - The raw sync response JSON - * @param err - Any error that occurred when making the request e.g. network errors. - */ - private invokeLifecycleListeners( - state: SlidingSyncState, - resp: MSC3575SlidingSyncResponse | null, - err?: Error, - ): void { - this.emit(SlidingSyncEvent.Lifecycle, state, resp, err); - } - - private shiftRight(listKey: string, hi: number, low: number): void { - const list = this.lists.get(listKey); - if (!list) { - return; - } - // l h - // 0,1,2,3,4 <- before - // 0,1,2,2,3 <- after, hi is deleted and low is duplicated - for (let i = hi; i > low; i--) { - if (list.isIndexInRange(i)) { - list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i - 1]; - } - } - } - - private shiftLeft(listKey: string, hi: number, low: number): void { - const list = this.lists.get(listKey); - if (!list) { - return; - } - // l h - // 0,1,2,3,4 <- before - // 0,1,3,4,4 <- after, low is deleted and hi is duplicated - for (let i = low; i < hi; i++) { - if (list.isIndexInRange(i)) { - list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i + 1]; - } - } - } - - private removeEntry(listKey: string, index: number): void { - const list = this.lists.get(listKey); - if (!list) { - return; - } - // work out the max index - let max = -1; - for (const n in list.roomIndexToRoomId) { - if (Number(n) > max) { - max = Number(n); - } - } - if (max < 0 || index > max) { - return; - } - // Everything higher than the gap needs to be shifted left. - this.shiftLeft(listKey, max, index); - delete list.roomIndexToRoomId[max]; - } - - private addEntry(listKey: string, index: number): void { - const list = this.lists.get(listKey); - if (!list) { - return; - } - // work out the max index - let max = -1; - for (const n in list.roomIndexToRoomId) { - if (Number(n) > max) { - max = Number(n); - } - } - if (max < 0 || index > max) { - return; - } - // Everything higher than the gap needs to be shifted right, +1 so we don't delete the highest element - this.shiftRight(listKey, max + 1, index); - } - - private processListOps(list: ListResponse, listKey: string): void { - let gapIndex = -1; - const listData = this.lists.get(listKey); - if (!listData) { - return; - } - list.ops.forEach((op: Operation) => { - if (!listData) { - return; - } - switch (op.op) { - case "DELETE": { - logger.debug("DELETE", listKey, op.index, ";"); - delete listData.roomIndexToRoomId[op.index]; - if (gapIndex !== -1) { - // we already have a DELETE operation to process, so process it. - this.removeEntry(listKey, gapIndex); - } - gapIndex = op.index; - break; - } - case "INSERT": { - logger.debug("INSERT", listKey, op.index, op.room_id, ";"); - if (listData.roomIndexToRoomId[op.index]) { - // something is in this space, shift items out of the way - if (gapIndex < 0) { - // we haven't been told where to shift from, so make way for a new room entry. - this.addEntry(listKey, op.index); - } else if (gapIndex > op.index) { - // the gap is further down the list, shift every element to the right - // starting at the gap so we can just shift each element in turn: - // [A,B,C,_] gapIndex=3, op.index=0 - // [A,B,C,C] i=3 - // [A,B,B,C] i=2 - // [A,A,B,C] i=1 - // Terminate. We'll assign into op.index next. - this.shiftRight(listKey, gapIndex, op.index); - } else if (gapIndex < op.index) { - // the gap is further up the list, shift every element to the left - // starting at the gap so we can just shift each element in turn - this.shiftLeft(listKey, op.index, gapIndex); - } - } - // forget the gap, we don't need it anymore. This is outside the check for - // a room being present in this index position because INSERTs always universally - // forget the gap, not conditionally based on the presence of a room in the INSERT - // position. Without this, DELETE 0; INSERT 0; would do the wrong thing. - gapIndex = -1; - listData.roomIndexToRoomId[op.index] = op.room_id; - break; - } - case "INVALIDATE": { - const startIndex = op.range[0]; - for (let i = startIndex; i <= op.range[1]; i++) { - delete listData.roomIndexToRoomId[i]; - } - logger.debug("INVALIDATE", listKey, op.range[0], op.range[1], ";"); - break; - } - case "SYNC": { - const startIndex = op.range[0]; - for (let i = startIndex; i <= op.range[1]; i++) { - const roomId = op.room_ids[i - startIndex]; - if (!roomId) { - break; // we are at the end of list - } - listData.roomIndexToRoomId[i] = roomId; - } - logger.debug("SYNC", listKey, op.range[0], op.range[1], (op.room_ids || []).join(" "), ";"); - break; - } - } - }); - if (gapIndex !== -1) { - // we already have a DELETE operation to process, so process it - // Everything higher than the gap needs to be shifted left. - this.removeEntry(listKey, gapIndex); - } - } - - /** - * Resend a Sliding Sync request. Used when something has changed in the request. Resolves with - * the transaction ID of this request on success. Rejects with the transaction ID of this request - * on failure. - */ - public resend(): Promise { - if (this.needsResend && this.txnIdDefers.length > 0) { - // we already have a resend queued, so just return the same promise - return this.txnIdDefers[this.txnIdDefers.length - 1].promise; - } - this.needsResend = true; - this.txnId = this.client.makeTxnId(); - const d = defer(); - this.txnIdDefers.push({ - ...d, - txnId: this.txnId, - }); - this.abortController?.abort(); - this.abortController = new AbortController(); - return d.promise; - } - - private resolveTransactionDefers(txnId?: string): void { - if (!txnId) { - return; - } - // find the matching index - let txnIndex = -1; - for (let i = 0; i < this.txnIdDefers.length; i++) { - if (this.txnIdDefers[i].txnId === txnId) { - txnIndex = i; - break; - } - } - if (txnIndex === -1) { - // this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about, - // whine about it. - logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`); - return; - } - // This list is sorted in time, so if the input txnId ACKs in the middle of this array, - // then everything before it that hasn't been ACKed yet never will and we should reject them. - for (let i = 0; i < txnIndex; i++) { - this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId); - } - this.txnIdDefers[txnIndex].resolve(txnId); - // clear out settled promises, including the one we resolved. - this.txnIdDefers = this.txnIdDefers.slice(txnIndex + 1); - } - - /** - * Stop syncing with the server. - */ - public stop(): void { - this.terminated = true; - this.abortController?.abort(); - // remove all listeners so things can be GC'd - this.removeAllListeners(SlidingSyncEvent.Lifecycle); - this.removeAllListeners(SlidingSyncEvent.List); - this.removeAllListeners(SlidingSyncEvent.RoomData); - } - - /** - * Re-setup this connection e.g in the event of an expired session. - */ - private resetup(): void { - logger.warn("SlidingSync: resetting connection info"); - // any pending txn ID defers will be forgotten already by the server, so clear them out - this.txnIdDefers.forEach((d) => { - d.reject(d.txnId); - }); - this.txnIdDefers = []; - // resend sticky params and de-confirm all subscriptions - this.lists.forEach((l) => { - l.setModified(true); - }); - this.confirmedRoomSubscriptions = new Set(); // leave desired ones alone though! - // reset the connection as we might be wedged - this.needsResend = true; - this.abortController?.abort(); - this.abortController = new AbortController(); - } - - /** - * Start syncing with the server. Blocks until stopped. - */ - public async start(): Promise { - this.abortController = new AbortController(); - - let currentPos: string | undefined; - while (!this.terminated) { - this.needsResend = false; - let doNotUpdateList = false; - let resp: MSC3575SlidingSyncResponse | undefined; - try { - const listModifiedCount = this.listModifiedCount; - const reqLists: Record = {}; - this.lists.forEach((l: SlidingList, key: string) => { - reqLists[key] = l.getList(false); - }); - const reqBody: MSC3575SlidingSyncRequest = { - lists: reqLists, - pos: currentPos, - timeout: this.timeoutMS, - clientTimeout: this.timeoutMS + BUFFER_PERIOD_MS, - extensions: this.getExtensionRequest(currentPos === undefined), - }; - // check if we are (un)subscribing to a room and modify request this one time for it - const newSubscriptions = difference(this.desiredRoomSubscriptions, this.confirmedRoomSubscriptions); - const unsubscriptions = difference(this.confirmedRoomSubscriptions, this.desiredRoomSubscriptions); - if (unsubscriptions.size > 0) { - reqBody.unsubscribe_rooms = Array.from(unsubscriptions); - } - if (newSubscriptions.size > 0) { - reqBody.room_subscriptions = {}; - for (const roomId of newSubscriptions) { - const customSubName = this.roomIdToCustomSubscription.get(roomId); - let sub = this.roomSubscriptionInfo; - if (customSubName && this.customSubscriptions.has(customSubName)) { - sub = this.customSubscriptions.get(customSubName)!; - } - reqBody.room_subscriptions[roomId] = sub; - } - } - if (this.txnId) { - reqBody.txn_id = this.txnId; - this.txnId = null; - } - this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl, this.abortController.signal); - resp = await this.pendingReq; - currentPos = resp.pos; - // update what we think we're subscribed to. - for (const roomId of newSubscriptions) { - this.confirmedRoomSubscriptions.add(roomId); - } - for (const roomId of unsubscriptions) { - this.confirmedRoomSubscriptions.delete(roomId); - } - if (listModifiedCount !== this.listModifiedCount) { - // the lists have been modified whilst we were waiting for 'await' to return, but the abort() - // call did nothing. It is NOT SAFE to modify the list array now. We'll process the response but - // not update list pointers. - logger.debug("list modified during await call, not updating list"); - doNotUpdateList = true; - } - // mark all these lists as having been sent as sticky so we don't keep sending sticky params - this.lists.forEach((l) => { - l.setModified(false); - }); - // set default empty values so we don't need to null check - resp.lists = resp.lists || {}; - resp.rooms = resp.rooms || {}; - resp.extensions = resp.extensions || {}; - Object.keys(resp.lists).forEach((key: string) => { - const list = this.lists.get(key); - if (!list || !resp) { - return; - } - list.joinedCount = resp.lists[key].count; - }); - this.invokeLifecycleListeners(SlidingSyncState.RequestFinished, resp); - } catch (err) { - if ((err).httpStatus) { - this.invokeLifecycleListeners(SlidingSyncState.RequestFinished, null, err); - if ((err).httpStatus === 400) { - // session probably expired TODO: assign an errcode - // so drop state and re-request - this.resetup(); - currentPos = undefined; - await sleep(50); // in case the 400 was for something else; don't tightloop - continue; - } // else fallthrough to generic error handling - } else if (this.needsResend || (err).name === "AbortError") { - continue; // don't sleep as we caused this error by abort()ing the request. - } - logger.error(err); - await sleep(5000); - } - if (!resp) { - continue; - } - this.onPreExtensionsResponse(resp.extensions); - - Object.keys(resp.rooms).forEach((roomId) => { - this.invokeRoomDataListeners(roomId, resp!.rooms[roomId]); - }); - - const listKeysWithUpdates: Set = new Set(); - if (!doNotUpdateList) { - for (const [key, list] of Object.entries(resp.lists)) { - list.ops = list.ops || []; - if (list.ops.length > 0) { - listKeysWithUpdates.add(key); - } - this.processListOps(list, key); - } - } - this.invokeLifecycleListeners(SlidingSyncState.Complete, resp); - this.onPostExtensionsResponse(resp.extensions); - listKeysWithUpdates.forEach((listKey: string) => { - const list = this.lists.get(listKey); - if (!list) { - return; - } - this.emit(SlidingSyncEvent.List, listKey, list.joinedCount, Object.assign({}, list.roomIndexToRoomId)); - }); - - this.resolveTransactionDefers(resp.txn_id); - } - } -} - -const difference = (setA: Set, setB: Set): Set => { - const diff = new Set(setA); - for (const elem of setB) { - diff.delete(elem); - } - return diff; -}; -- cgit