diff options
Diffstat (limited to 'includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts')
-rw-r--r-- | includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts | 961 |
1 files changed, 961 insertions, 0 deletions
diff --git a/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts b/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts new file mode 100644 index 0000000..dde5f1b --- /dev/null +++ b/includes/external/matrix/node_modules/matrix-js-sdk/src/sliding-sync.ts @@ -0,0 +1,961 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { logger } from "./logger"; +import { MatrixClient } from "./client"; +import { IRoomEvent, IStateEvent } from "./sync-accumulator"; +import { TypedEventEmitter } from "./models/typed-event-emitter"; +import { sleep, IDeferred, defer } from "./utils"; +import { HTTPError } from "./http-api"; + +// /sync requests allow you to set a timeout= but the request may continue +// beyond that and wedge forever, so we need to track how long we are willing +// to keep open the connection. This constant is *ADDED* to the timeout= value +// to determine the max time we're willing to wait. +const BUFFER_PERIOD_MS = 10 * 1000; + +export const MSC3575_WILDCARD = "*"; +export const MSC3575_STATE_KEY_ME = "$ME"; +export const MSC3575_STATE_KEY_LAZY = "$LAZY"; + +/** + * Represents a subscription to a room or set of rooms. Controls which events are returned. + */ +export interface MSC3575RoomSubscription { + required_state?: string[][]; + timeline_limit?: number; + include_old_rooms?: MSC3575RoomSubscription; +} + +/** + * Controls which rooms are returned in a given list. + */ +export interface MSC3575Filter { + is_dm?: boolean; + is_encrypted?: boolean; + is_invite?: boolean; + room_name_like?: string; + room_types?: string[]; + not_room_types?: string[]; + spaces?: string[]; + tags?: string[]; + not_tags?: string[]; +} + +/** + * Represents a list subscription. + */ +export interface MSC3575List extends MSC3575RoomSubscription { + ranges: number[][]; + sort?: string[]; + filters?: MSC3575Filter; + slow_get_all_rooms?: boolean; +} + +/** + * A complete Sliding Sync request. + */ +export interface MSC3575SlidingSyncRequest { + // json body params + lists?: Record<string, MSC3575List>; + unsubscribe_rooms?: string[]; + room_subscriptions?: Record<string, MSC3575RoomSubscription>; + extensions?: object; + txn_id?: string; + + // query params + pos?: string; + timeout?: number; + clientTimeout?: number; +} + +export interface MSC3575RoomData { + name: string; + required_state: IStateEvent[]; + timeline: (IRoomEvent | IStateEvent)[]; + notification_count?: number; + highlight_count?: number; + joined_count?: number; + invited_count?: number; + invite_state?: IStateEvent[]; + initial?: boolean; + limited?: boolean; + is_dm?: boolean; + prev_batch?: string; + num_live?: number; +} + +interface ListResponse { + count: number; + ops: Operation[]; +} + +interface BaseOperation { + op: string; +} + +interface DeleteOperation extends BaseOperation { + op: "DELETE"; + index: number; +} + +interface InsertOperation extends BaseOperation { + op: "INSERT"; + index: number; + room_id: string; +} + +interface InvalidateOperation extends BaseOperation { + op: "INVALIDATE"; + range: [number, number]; +} + +interface SyncOperation extends BaseOperation { + op: "SYNC"; + range: [number, number]; + room_ids: string[]; +} + +type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncOperation; + +/** + * A complete Sliding Sync response + */ +export interface MSC3575SlidingSyncResponse { + pos: string; + txn_id?: string; + lists: Record<string, ListResponse>; + rooms: Record<string, MSC3575RoomData>; + extensions: Record<string, object>; +} + +export enum SlidingSyncState { + /** + * Fired by SlidingSyncEvent.Lifecycle event immediately before processing the response. + */ + RequestFinished = "FINISHED", + /** + * Fired by SlidingSyncEvent.Lifecycle event immediately after all room data listeners have been + * invoked, but before list listeners. + */ + Complete = "COMPLETE", +} + +/** + * Internal Class. SlidingList represents a single list in sliding sync. The list can have filters, + * multiple sliding windows, and maintains the index-\>room_id mapping. + */ +class SlidingList { + private list!: MSC3575List; + private isModified?: boolean; + + // returned data + public roomIndexToRoomId: Record<number, string> = {}; + public joinedCount = 0; + + /** + * Construct a new sliding list. + * @param list - The range, sort and filter values to use for this list. + */ + public constructor(list: MSC3575List) { + this.replaceList(list); + } + + /** + * Mark this list as modified or not. Modified lists will return sticky params with calls to getList. + * This is useful for the first time the list is sent, or if the list has changed in some way. + * @param modified - True to mark this list as modified so all sticky parameters will be re-sent. + */ + public setModified(modified: boolean): void { + this.isModified = modified; + } + + /** + * Update the list range for this list. Does not affect modified status as list ranges are non-sticky. + * @param newRanges - The new ranges for the list + */ + public updateListRange(newRanges: number[][]): void { + this.list.ranges = JSON.parse(JSON.stringify(newRanges)); + } + + /** + * Replace list parameters. All fields will be replaced with the new list parameters. + * @param list - The new list parameters + */ + public replaceList(list: MSC3575List): void { + list.filters = list.filters || {}; + list.ranges = list.ranges || []; + this.list = JSON.parse(JSON.stringify(list)); + this.isModified = true; + + // reset values as the join count may be very different (if filters changed) including the rooms + // (e.g. sort orders or sliding window ranges changed) + + // the constantly changing sliding window ranges. Not an array for performance reasons + // E.g. tracking ranges 0-99, 500-599, we don't want to have a 600 element array + this.roomIndexToRoomId = {}; + // the total number of joined rooms according to the server, always >= len(roomIndexToRoomId) + this.joinedCount = 0; + } + + /** + * Return a copy of the list suitable for a request body. + * @param forceIncludeAllParams - True to forcibly include all params even if the list + * hasn't been modified. Callers may want to do this if they are modifying the list prior to calling + * updateList. + */ + public getList(forceIncludeAllParams: boolean): MSC3575List { + let list = { + ranges: JSON.parse(JSON.stringify(this.list.ranges)), + }; + if (this.isModified || forceIncludeAllParams) { + list = JSON.parse(JSON.stringify(this.list)); + } + return list; + } + + /** + * Check if a given index is within the list range. This is required even though the /sync API + * provides explicit updates with index positions because of the following situation: + * 0 1 2 3 4 5 6 7 8 indexes + * a b c d e f COMMANDS: SYNC 0 2 a b c; SYNC 6 8 d e f; + * a b c d _ f COMMAND: DELETE 7; + * e a b c d f COMMAND: INSERT 0 e; + * c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it + * @param i - The index to check + * @returns True if the index is within a sliding window + */ + public isIndexInRange(i: number): boolean { + for (const r of this.list.ranges) { + if (r[0] <= i && i <= r[1]) { + return true; + } + } + return false; + } +} + +/** + * When onResponse extensions should be invoked: before or after processing the main response. + */ +export enum ExtensionState { + // Call onResponse before processing the response body. This is useful when your extension is + // preparing the ground for the response body e.g. processing to-device messages before the + // encrypted event arrives. + PreProcess = "ExtState.PreProcess", + // Call onResponse after processing the response body. This is useful when your extension is + // decorating data from the client, and you rely on MatrixClient.getRoom returning the Room object + // e.g. room account data. + PostProcess = "ExtState.PostProcess", +} + +/** + * An interface that must be satisfied to register extensions + */ +export interface Extension<Req extends {}, Res extends {}> { + /** + * The extension name to go under 'extensions' in the request body. + * @returns The JSON key. + */ + name(): string; + /** + * A function which is called when the request JSON is being formed. + * Returns the data to insert under this key. + * @param isInitial - True when this is part of the initial request (send sticky params) + * @returns The request JSON to send. + */ + onRequest(isInitial: boolean): Req | undefined; + /** + * A function which is called when there is response JSON under this extension. + * @param data - The response JSON under the extension name. + */ + onResponse(data: Res): void; + /** + * Controls when onResponse should be called. + * @returns The state when it should be called. + */ + when(): ExtensionState; +} + +/** + * Events which can be fired by the SlidingSync class. These are designed to provide different levels + * of information when processing sync responses. + * - RoomData: concerns rooms, useful for SlidingSyncSdk to update its knowledge of rooms. + * - Lifecycle: concerns callbacks at various well-defined points in the sync process. + * - List: concerns lists, useful for UI layers to re-render room lists. + * Specifically, the order of event invocation is: + * - Lifecycle (state=RequestFinished) + * - RoomData (N times) + * - Lifecycle (state=Complete) + * - List (at most once per list) + */ +export enum SlidingSyncEvent { + /** + * This event fires when there are updates for a room. Fired as and when rooms are encountered + * in the response. + */ + RoomData = "SlidingSync.RoomData", + /** + * This event fires at various points in the /sync loop lifecycle. + * - SlidingSyncState.RequestFinished: Fires after we receive a valid response but before the + * response has been processed. Perform any pre-process steps here. If there was a problem syncing, + * `err` will be set (e.g network errors). + * - SlidingSyncState.Complete: Fires after all SlidingSyncEvent.RoomData have been fired but before + * SlidingSyncEvent.List. + */ + Lifecycle = "SlidingSync.Lifecycle", + /** + * This event fires whenever there has been a change to this list index. It fires exactly once + * per list, even if there were multiple operations for the list. + * It fires AFTER Lifecycle and RoomData events. + */ + List = "SlidingSync.List", +} + +export type SlidingSyncEventHandlerMap = { + [SlidingSyncEvent.RoomData]: (roomId: string, roomData: MSC3575RoomData) => void; + [SlidingSyncEvent.Lifecycle]: ( + state: SlidingSyncState, + resp: MSC3575SlidingSyncResponse | null, + err?: Error, + ) => void; + [SlidingSyncEvent.List]: (listKey: string, joinedCount: number, roomIndexToRoomId: Record<number, string>) => void; +}; + +/** + * SlidingSync is a high-level data structure which controls the majority of sliding sync. + * It has no hooks into JS SDK except for needing a MatrixClient to perform the HTTP request. + * This means this class (and everything it uses) can be used in isolation from JS SDK if needed. + * To hook this up with the JS SDK, you need to use SlidingSyncSdk. + */ +export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSyncEventHandlerMap> { + private lists: Map<string, SlidingList>; + private listModifiedCount = 0; + private terminated = false; + // flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :( + private needsResend = false; + // the txn_id to send with the next request. + private txnId: string | null = null; + // a list (in chronological order of when they were sent) of objects containing the txn ID and + // a defer to resolve/reject depending on whether they were successfully sent or not. + private txnIdDefers: (IDeferred<string> & { txnId: string })[] = []; + // map of extension name to req/resp handler + private extensions: Record<string, Extension<any, any>> = {}; + + private desiredRoomSubscriptions = new Set<string>(); // the *desired* room subscriptions + private confirmedRoomSubscriptions = new Set<string>(); + + // map of custom subscription name to the subscription + private customSubscriptions: Map<string, MSC3575RoomSubscription> = new Map(); + // map of room ID to custom subscription name + private roomIdToCustomSubscription: Map<string, string> = new Map(); + + private pendingReq?: Promise<MSC3575SlidingSyncResponse>; + private abortController?: AbortController; + + /** + * Create a new sliding sync instance + * @param proxyBaseUrl - The base URL of the sliding sync proxy + * @param lists - The lists to use for sliding sync. + * @param roomSubscriptionInfo - The params to use for room subscriptions. + * @param client - The client to use for /sync calls. + * @param timeoutMS - The number of milliseconds to wait for a response. + */ + public constructor( + private readonly proxyBaseUrl: string, + lists: Map<string, MSC3575List>, + private roomSubscriptionInfo: MSC3575RoomSubscription, + private readonly client: MatrixClient, + private readonly timeoutMS: number, + ) { + super(); + this.lists = new Map<string, SlidingList>(); + lists.forEach((list, key) => { + this.lists.set(key, new SlidingList(list)); + }); + } + + /** + * Add a custom room subscription, referred to by an arbitrary name. If a subscription with this + * name already exists, it is replaced. No requests are sent by calling this method. + * @param name - The name of the subscription. Only used to reference this subscription in + * useCustomSubscription. + * @param sub - The subscription information. + */ + public addCustomSubscription(name: string, sub: MSC3575RoomSubscription): void { + if (this.customSubscriptions.has(name)) { + logger.warn(`addCustomSubscription: ${name} already exists as a custom subscription, ignoring.`); + return; + } + this.customSubscriptions.set(name, sub); + } + + /** + * Use a custom subscription previously added via addCustomSubscription. No requests are sent + * by calling this method. Use modifyRoomSubscriptions to resend subscription information. + * @param roomId - The room to use the subscription in. + * @param name - The name of the subscription. If this name is unknown, the default subscription + * will be used. + */ + public useCustomSubscription(roomId: string, name: string): void { + // We already know about this custom subscription, as it is immutable, + // we don't need to unconfirm the subscription. + if (this.roomIdToCustomSubscription.get(roomId) === name) { + return; + } + this.roomIdToCustomSubscription.set(roomId, name); + // unconfirm this subscription so a resend() will send it up afresh. + this.confirmedRoomSubscriptions.delete(roomId); + } + + /** + * Get the room index data for a list. + * @param key - The list key + * @returns The list data which contains the rooms in this list + */ + public getListData(key: string): { joinedCount: number; roomIndexToRoomId: Record<number, string> } | null { + const data = this.lists.get(key); + if (!data) { + return null; + } + return { + joinedCount: data.joinedCount, + roomIndexToRoomId: Object.assign({}, data.roomIndexToRoomId), + }; + } + + /** + * Get the full request list parameters for a list index. This function is provided for callers to use + * in conjunction with setList to update fields on an existing list. + * @param key - The list key to get the params for. + * @returns A copy of the list params or undefined. + */ + public getListParams(key: string): MSC3575List | null { + const params = this.lists.get(key); + if (!params) { + return null; + } + return params.getList(true); + } + + /** + * Set new ranges for an existing list. Calling this function when _only_ the ranges have changed + * is more efficient than calling setList(index,list) as this function won't resend sticky params, + * whereas setList always will. + * @param key - The list key to modify + * @param ranges - The new ranges to apply. + * @returns A promise which resolves to the transaction ID when it has been received down sync + * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled + * immediately after sending, in which case the action will be applied in the subsequent request) + */ + public setListRanges(key: string, ranges: number[][]): Promise<string> { + const list = this.lists.get(key); + if (!list) { + return Promise.reject(new Error("no list with key " + key)); + } + list.updateListRange(ranges); + return this.resend(); + } + + /** + * Add or replace a list. Calling this function will interrupt the /sync request to resend new + * lists. + * @param key - The key to modify + * @param list - The new list parameters. + * @returns A promise which resolves to the transaction ID when it has been received down sync + * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled + * immediately after sending, in which case the action will be applied in the subsequent request) + */ + public setList(key: string, list: MSC3575List): Promise<string> { + const existingList = this.lists.get(key); + if (existingList) { + existingList.replaceList(list); + this.lists.set(key, existingList); + } else { + this.lists.set(key, new SlidingList(list)); + } + this.listModifiedCount += 1; + return this.resend(); + } + + /** + * Get the room subscriptions for the sync API. + * @returns A copy of the desired room subscriptions. + */ + public getRoomSubscriptions(): Set<string> { + return new Set(Array.from(this.desiredRoomSubscriptions)); + } + + /** + * Modify the room subscriptions for the sync API. Calling this function will interrupt the + * /sync request to resend new subscriptions. If the /sync stream has not started, this will + * prepare the room subscriptions for when start() is called. + * @param s - The new desired room subscriptions. + * @returns A promise which resolves to the transaction ID when it has been received down sync + * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled + * immediately after sending, in which case the action will be applied in the subsequent request) + */ + public modifyRoomSubscriptions(s: Set<string>): Promise<string> { + this.desiredRoomSubscriptions = s; + return this.resend(); + } + + /** + * Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions + * such that they will be sent up afresh. + * @param rs - The new room subscription fields to fetch. + * @returns A promise which resolves to the transaction ID when it has been received down sync + * (or rejects with the transaction ID if the action was not applied e.g the request was cancelled + * immediately after sending, in which case the action will be applied in the subsequent request) + */ + public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise<string> { + this.roomSubscriptionInfo = rs; + this.confirmedRoomSubscriptions = new Set<string>(); + return this.resend(); + } + + /** + * Register an extension to send with the /sync request. + * @param ext - The extension to register. + */ + public registerExtension(ext: Extension<any, any>): void { + if (this.extensions[ext.name()]) { + throw new Error(`registerExtension: ${ext.name()} already exists as an extension`); + } + this.extensions[ext.name()] = ext; + } + + private getExtensionRequest(isInitial: boolean): Record<string, object | undefined> { + const ext: Record<string, object | undefined> = {}; + Object.keys(this.extensions).forEach((extName) => { + ext[extName] = this.extensions[extName].onRequest(isInitial); + }); + return ext; + } + + private onPreExtensionsResponse(ext: Record<string, object>): void { + Object.keys(ext).forEach((extName) => { + if (this.extensions[extName].when() == ExtensionState.PreProcess) { + this.extensions[extName].onResponse(ext[extName]); + } + }); + } + + private onPostExtensionsResponse(ext: Record<string, object>): void { + Object.keys(ext).forEach((extName) => { + if (this.extensions[extName].when() == ExtensionState.PostProcess) { + this.extensions[extName].onResponse(ext[extName]); + } + }); + } + + /** + * Invoke all attached room data listeners. + * @param roomId - The room which received some data. + * @param roomData - The raw sliding sync response JSON. + */ + private invokeRoomDataListeners(roomId: string, roomData: MSC3575RoomData): void { + if (!roomData.required_state) { + roomData.required_state = []; + } + if (!roomData.timeline) { + roomData.timeline = []; + } + this.emit(SlidingSyncEvent.RoomData, roomId, roomData); + } + + /** + * Invoke all attached lifecycle listeners. + * @param state - The Lifecycle state + * @param resp - The raw sync response JSON + * @param err - Any error that occurred when making the request e.g. network errors. + */ + private invokeLifecycleListeners( + state: SlidingSyncState, + resp: MSC3575SlidingSyncResponse | null, + err?: Error, + ): void { + this.emit(SlidingSyncEvent.Lifecycle, state, resp, err); + } + + private shiftRight(listKey: string, hi: number, low: number): void { + const list = this.lists.get(listKey); + if (!list) { + return; + } + // l h + // 0,1,2,3,4 <- before + // 0,1,2,2,3 <- after, hi is deleted and low is duplicated + for (let i = hi; i > low; i--) { + if (list.isIndexInRange(i)) { + list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i - 1]; + } + } + } + + private shiftLeft(listKey: string, hi: number, low: number): void { + const list = this.lists.get(listKey); + if (!list) { + return; + } + // l h + // 0,1,2,3,4 <- before + // 0,1,3,4,4 <- after, low is deleted and hi is duplicated + for (let i = low; i < hi; i++) { + if (list.isIndexInRange(i)) { + list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i + 1]; + } + } + } + + private removeEntry(listKey: string, index: number): void { + const list = this.lists.get(listKey); + if (!list) { + return; + } + // work out the max index + let max = -1; + for (const n in list.roomIndexToRoomId) { + if (Number(n) > max) { + max = Number(n); + } + } + if (max < 0 || index > max) { + return; + } + // Everything higher than the gap needs to be shifted left. + this.shiftLeft(listKey, max, index); + delete list.roomIndexToRoomId[max]; + } + + private addEntry(listKey: string, index: number): void { + const list = this.lists.get(listKey); + if (!list) { + return; + } + // work out the max index + let max = -1; + for (const n in list.roomIndexToRoomId) { + if (Number(n) > max) { + max = Number(n); + } + } + if (max < 0 || index > max) { + return; + } + // Everything higher than the gap needs to be shifted right, +1 so we don't delete the highest element + this.shiftRight(listKey, max + 1, index); + } + + private processListOps(list: ListResponse, listKey: string): void { + let gapIndex = -1; + const listData = this.lists.get(listKey); + if (!listData) { + return; + } + list.ops.forEach((op: Operation) => { + if (!listData) { + return; + } + switch (op.op) { + case "DELETE": { + logger.debug("DELETE", listKey, op.index, ";"); + delete listData.roomIndexToRoomId[op.index]; + if (gapIndex !== -1) { + // we already have a DELETE operation to process, so process it. + this.removeEntry(listKey, gapIndex); + } + gapIndex = op.index; + break; + } + case "INSERT": { + logger.debug("INSERT", listKey, op.index, op.room_id, ";"); + if (listData.roomIndexToRoomId[op.index]) { + // something is in this space, shift items out of the way + if (gapIndex < 0) { + // we haven't been told where to shift from, so make way for a new room entry. + this.addEntry(listKey, op.index); + } else if (gapIndex > op.index) { + // the gap is further down the list, shift every element to the right + // starting at the gap so we can just shift each element in turn: + // [A,B,C,_] gapIndex=3, op.index=0 + // [A,B,C,C] i=3 + // [A,B,B,C] i=2 + // [A,A,B,C] i=1 + // Terminate. We'll assign into op.index next. + this.shiftRight(listKey, gapIndex, op.index); + } else if (gapIndex < op.index) { + // the gap is further up the list, shift every element to the left + // starting at the gap so we can just shift each element in turn + this.shiftLeft(listKey, op.index, gapIndex); + } + } + // forget the gap, we don't need it anymore. This is outside the check for + // a room being present in this index position because INSERTs always universally + // forget the gap, not conditionally based on the presence of a room in the INSERT + // position. Without this, DELETE 0; INSERT 0; would do the wrong thing. + gapIndex = -1; + listData.roomIndexToRoomId[op.index] = op.room_id; + break; + } + case "INVALIDATE": { + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + delete listData.roomIndexToRoomId[i]; + } + logger.debug("INVALIDATE", listKey, op.range[0], op.range[1], ";"); + break; + } + case "SYNC": { + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + const roomId = op.room_ids[i - startIndex]; + if (!roomId) { + break; // we are at the end of list + } + listData.roomIndexToRoomId[i] = roomId; + } + logger.debug("SYNC", listKey, op.range[0], op.range[1], (op.room_ids || []).join(" "), ";"); + break; + } + } + }); + if (gapIndex !== -1) { + // we already have a DELETE operation to process, so process it + // Everything higher than the gap needs to be shifted left. + this.removeEntry(listKey, gapIndex); + } + } + + /** + * Resend a Sliding Sync request. Used when something has changed in the request. Resolves with + * the transaction ID of this request on success. Rejects with the transaction ID of this request + * on failure. + */ + public resend(): Promise<string> { + if (this.needsResend && this.txnIdDefers.length > 0) { + // we already have a resend queued, so just return the same promise + return this.txnIdDefers[this.txnIdDefers.length - 1].promise; + } + this.needsResend = true; + this.txnId = this.client.makeTxnId(); + const d = defer<string>(); + this.txnIdDefers.push({ + ...d, + txnId: this.txnId, + }); + this.abortController?.abort(); + this.abortController = new AbortController(); + return d.promise; + } + + private resolveTransactionDefers(txnId?: string): void { + if (!txnId) { + return; + } + // find the matching index + let txnIndex = -1; + for (let i = 0; i < this.txnIdDefers.length; i++) { + if (this.txnIdDefers[i].txnId === txnId) { + txnIndex = i; + break; + } + } + if (txnIndex === -1) { + // this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about, + // whine about it. + logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`); + return; + } + // This list is sorted in time, so if the input txnId ACKs in the middle of this array, + // then everything before it that hasn't been ACKed yet never will and we should reject them. + for (let i = 0; i < txnIndex; i++) { + this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId); + } + this.txnIdDefers[txnIndex].resolve(txnId); + // clear out settled promises, including the one we resolved. + this.txnIdDefers = this.txnIdDefers.slice(txnIndex + 1); + } + + /** + * Stop syncing with the server. + */ + public stop(): void { + this.terminated = true; + this.abortController?.abort(); + // remove all listeners so things can be GC'd + this.removeAllListeners(SlidingSyncEvent.Lifecycle); + this.removeAllListeners(SlidingSyncEvent.List); + this.removeAllListeners(SlidingSyncEvent.RoomData); + } + + /** + * Re-setup this connection e.g in the event of an expired session. + */ + private resetup(): void { + logger.warn("SlidingSync: resetting connection info"); + // any pending txn ID defers will be forgotten already by the server, so clear them out + this.txnIdDefers.forEach((d) => { + d.reject(d.txnId); + }); + this.txnIdDefers = []; + // resend sticky params and de-confirm all subscriptions + this.lists.forEach((l) => { + l.setModified(true); + }); + this.confirmedRoomSubscriptions = new Set<string>(); // leave desired ones alone though! + // reset the connection as we might be wedged + this.needsResend = true; + this.abortController?.abort(); + this.abortController = new AbortController(); + } + + /** + * Start syncing with the server. Blocks until stopped. + */ + public async start(): Promise<void> { + this.abortController = new AbortController(); + + let currentPos: string | undefined; + while (!this.terminated) { + this.needsResend = false; + let doNotUpdateList = false; + let resp: MSC3575SlidingSyncResponse | undefined; + try { + const listModifiedCount = this.listModifiedCount; + const reqLists: Record<string, MSC3575List> = {}; + this.lists.forEach((l: SlidingList, key: string) => { + reqLists[key] = l.getList(false); + }); + const reqBody: MSC3575SlidingSyncRequest = { + lists: reqLists, + pos: currentPos, + timeout: this.timeoutMS, + clientTimeout: this.timeoutMS + BUFFER_PERIOD_MS, + extensions: this.getExtensionRequest(currentPos === undefined), + }; + // check if we are (un)subscribing to a room and modify request this one time for it + const newSubscriptions = difference(this.desiredRoomSubscriptions, this.confirmedRoomSubscriptions); + const unsubscriptions = difference(this.confirmedRoomSubscriptions, this.desiredRoomSubscriptions); + if (unsubscriptions.size > 0) { + reqBody.unsubscribe_rooms = Array.from(unsubscriptions); + } + if (newSubscriptions.size > 0) { + reqBody.room_subscriptions = {}; + for (const roomId of newSubscriptions) { + const customSubName = this.roomIdToCustomSubscription.get(roomId); + let sub = this.roomSubscriptionInfo; + if (customSubName && this.customSubscriptions.has(customSubName)) { + sub = this.customSubscriptions.get(customSubName)!; + } + reqBody.room_subscriptions[roomId] = sub; + } + } + if (this.txnId) { + reqBody.txn_id = this.txnId; + this.txnId = null; + } + this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl, this.abortController.signal); + resp = await this.pendingReq; + currentPos = resp.pos; + // update what we think we're subscribed to. + for (const roomId of newSubscriptions) { + this.confirmedRoomSubscriptions.add(roomId); + } + for (const roomId of unsubscriptions) { + this.confirmedRoomSubscriptions.delete(roomId); + } + if (listModifiedCount !== this.listModifiedCount) { + // the lists have been modified whilst we were waiting for 'await' to return, but the abort() + // call did nothing. It is NOT SAFE to modify the list array now. We'll process the response but + // not update list pointers. + logger.debug("list modified during await call, not updating list"); + doNotUpdateList = true; + } + // mark all these lists as having been sent as sticky so we don't keep sending sticky params + this.lists.forEach((l) => { + l.setModified(false); + }); + // set default empty values so we don't need to null check + resp.lists = resp.lists || {}; + resp.rooms = resp.rooms || {}; + resp.extensions = resp.extensions || {}; + Object.keys(resp.lists).forEach((key: string) => { + const list = this.lists.get(key); + if (!list || !resp) { + return; + } + list.joinedCount = resp.lists[key].count; + }); + this.invokeLifecycleListeners(SlidingSyncState.RequestFinished, resp); + } catch (err) { + if ((<HTTPError>err).httpStatus) { + this.invokeLifecycleListeners(SlidingSyncState.RequestFinished, null, <Error>err); + if ((<HTTPError>err).httpStatus === 400) { + // session probably expired TODO: assign an errcode + // so drop state and re-request + this.resetup(); + currentPos = undefined; + await sleep(50); // in case the 400 was for something else; don't tightloop + continue; + } // else fallthrough to generic error handling + } else if (this.needsResend || (<Error>err).name === "AbortError") { + continue; // don't sleep as we caused this error by abort()ing the request. + } + logger.error(err); + await sleep(5000); + } + if (!resp) { + continue; + } + this.onPreExtensionsResponse(resp.extensions); + + Object.keys(resp.rooms).forEach((roomId) => { + this.invokeRoomDataListeners(roomId, resp!.rooms[roomId]); + }); + + const listKeysWithUpdates: Set<string> = new Set(); + if (!doNotUpdateList) { + for (const [key, list] of Object.entries(resp.lists)) { + list.ops = list.ops || []; + if (list.ops.length > 0) { + listKeysWithUpdates.add(key); + } + this.processListOps(list, key); + } + } + this.invokeLifecycleListeners(SlidingSyncState.Complete, resp); + this.onPostExtensionsResponse(resp.extensions); + listKeysWithUpdates.forEach((listKey: string) => { + const list = this.lists.get(listKey); + if (!list) { + return; + } + this.emit(SlidingSyncEvent.List, listKey, list.joinedCount, Object.assign({}, list.roomIndexToRoomId)); + }); + + this.resolveTransactionDefers(resp.txn_id); + } + } +} + +const difference = (setA: Set<string>, setB: Set<string>): Set<string> => { + const diff = new Set(setA); + for (const elem of setB) { + diff.delete(elem); + } + return diff; +}; |