diff options
author | RaindropsSys <contact@minteck.org> | 2023-04-24 14:03:36 +0200 |
---|---|---|
committer | RaindropsSys <contact@minteck.org> | 2023-04-24 14:03:36 +0200 |
commit | 633c92eae865e957121e08de634aeee11a8b3992 (patch) | |
tree | 09d881bee1dae0b6eee49db1dfaf0f500240606c /includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts | |
parent | c4657e4509733699c0f26a3c900bab47e915d5a0 (diff) | |
download | pluralconnect-633c92eae865e957121e08de634aeee11a8b3992.tar.gz pluralconnect-633c92eae865e957121e08de634aeee11a8b3992.tar.bz2 pluralconnect-633c92eae865e957121e08de634aeee11a8b3992.zip |
Updated 18 files, added 1692 files and deleted includes/system/compare.inc (automated)
Diffstat (limited to 'includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts')
-rw-r--r-- | includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts | 1898 |
1 files changed, 1898 insertions, 0 deletions
diff --git a/includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts b/includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts new file mode 100644 index 0000000..dc5217c --- /dev/null +++ b/includes/external/matrix/node_modules/matrix-js-sdk/src/sync.ts @@ -0,0 +1,1898 @@ +/* +Copyright 2015 - 2023 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* + * TODO: + * This class mainly serves to take all the syncing logic out of client.js and + * into a separate file. It's all very fluid, and this class gut wrenches a lot + * of MatrixClient props (e.g. http). Given we want to support WebSockets as + * an alternative syncing API, we may want to have a proper syncing interface + * for HTTP and WS at some point. + */ + +import { Optional } from "matrix-events-sdk"; + +import type { SyncCryptoCallbacks } from "./common-crypto/CryptoBackend"; +import { User, UserEvent } from "./models/user"; +import { NotificationCountType, Room, RoomEvent } from "./models/room"; +import * as utils from "./utils"; +import { IDeferred, noUnsafeEventProps, unsafeProp } from "./utils"; +import { Filter } from "./filter"; +import { EventTimeline } from "./models/event-timeline"; +import { logger } from "./logger"; +import { InvalidStoreError, InvalidStoreState } from "./errors"; +import { ClientEvent, IStoredClientOpts, MatrixClient, PendingEventOrdering, ResetTimelineCallback } from "./client"; +import { + IEphemeral, + IInvitedRoom, + IInviteState, + IJoinedRoom, + ILeftRoom, + IMinimalEvent, + IRoomEvent, + IStateEvent, + IStrippedState, + ISyncResponse, + ITimeline, + IToDeviceEvent, +} from "./sync-accumulator"; +import { MatrixEvent } from "./models/event"; +import { MatrixError, Method } from "./http-api"; +import { ISavedSync } from "./store"; +import { EventType } from "./@types/event"; +import { IPushRules } from "./@types/PushRules"; +import { RoomStateEvent, IMarkerFoundOptions } from "./models/room-state"; +import { RoomMemberEvent } from "./models/room-member"; +import { BeaconEvent } from "./models/beacon"; +import { IEventsResponse } from "./@types/requests"; +import { UNREAD_THREAD_NOTIFICATIONS } from "./@types/sync"; +import { Feature, ServerSupport } from "./feature"; +import { Crypto } from "./crypto"; + +const DEBUG = true; + +// /sync requests allow you to set a timeout= but the request may continue +// beyond that and wedge forever, so we need to track how long we are willing +// to keep open the connection. This constant is *ADDED* to the timeout= value +// to determine the max time we're willing to wait. +const BUFFER_PERIOD_MS = 80 * 1000; + +// Number of consecutive failed syncs that will lead to a syncState of ERROR as opposed +// to RECONNECTING. This is needed to inform the client of server issues when the +// keepAlive is successful but the server /sync fails. +const FAILED_SYNC_ERROR_THRESHOLD = 3; + +export enum SyncState { + /** Emitted after we try to sync more than `FAILED_SYNC_ERROR_THRESHOLD` + * times and are still failing. Or when we enounter a hard error like the + * token being invalid. */ + Error = "ERROR", + /** Emitted after the first sync events are ready (this could even be sync + * events from the cache) */ + Prepared = "PREPARED", + /** Emitted when the sync loop is no longer running */ + Stopped = "STOPPED", + /** Emitted after each sync request happens */ + Syncing = "SYNCING", + /** Emitted after a connectivity error and we're ready to start syncing again */ + Catchup = "CATCHUP", + /** Emitted for each time we try reconnecting. Will switch to `Error` after + * we reach the `FAILED_SYNC_ERROR_THRESHOLD` + */ + Reconnecting = "RECONNECTING", +} + +// Room versions where "insertion", "batch", and "marker" events are controlled +// by power-levels. MSC2716 is supported in existing room versions but they +// should only have special meaning when the room creator sends them. +const MSC2716_ROOM_VERSIONS = ["org.matrix.msc2716v3"]; + +function getFilterName(userId: string, suffix?: string): string { + // scope this on the user ID because people may login on many accounts + // and they all need to be stored! + return `FILTER_SYNC_${userId}` + (suffix ? "_" + suffix : ""); +} + +/* istanbul ignore next */ +function debuglog(...params: any[]): void { + if (!DEBUG) return; + logger.log(...params); +} + +/** + * Options passed into the constructor of SyncApi by MatrixClient + */ +export interface SyncApiOptions { + /** + * Crypto manager + * + * @deprecated in favour of cryptoCallbacks + */ + crypto?: Crypto; + + /** + * If crypto is enabled on our client, callbacks into the crypto module + */ + cryptoCallbacks?: SyncCryptoCallbacks; + + /** + * A function which is called + * with a room ID and returns a boolean. It should return 'true' if the SDK can + * SAFELY remove events from this room. It may not be safe to remove events if + * there are other references to the timelines for this room. + */ + canResetEntireTimeline?: ResetTimelineCallback; +} + +interface ISyncOptions { + filter?: string; + hasSyncedBefore?: boolean; +} + +export interface ISyncStateData { + /** + * The matrix error if `state=ERROR`. + */ + error?: Error; + /** + * The 'since' token passed to /sync. + * `null` for the first successful sync since this client was + * started. Only present if `state=PREPARED` or + * `state=SYNCING`. + */ + oldSyncToken?: string; + /** + * The 'next_batch' result from /sync, which + * will become the 'since' token for the next call to /sync. Only present if + * `state=PREPARED</code> or <code>state=SYNCING`. + */ + nextSyncToken?: string; + /** + * True if we are working our way through a + * backlog of events after connecting. Only present if `state=SYNCING`. + */ + catchingUp?: boolean; + fromCache?: boolean; +} + +enum SetPresence { + Offline = "offline", + Online = "online", + Unavailable = "unavailable", +} + +interface ISyncParams { + filter?: string; + timeout: number; + since?: string; + // eslint-disable-next-line camelcase + full_state?: boolean; + // eslint-disable-next-line camelcase + set_presence?: SetPresence; + _cacheBuster?: string | number; // not part of the API itself +} + +type WrappedRoom<T> = T & { + room: Room; + isBrandNewRoom: boolean; +}; + +/** add default settings to an IStoredClientOpts */ +export function defaultClientOpts(opts?: IStoredClientOpts): IStoredClientOpts { + return { + initialSyncLimit: 8, + resolveInvitesToProfiles: false, + pollTimeout: 30 * 1000, + pendingEventOrdering: PendingEventOrdering.Chronological, + threadSupport: false, + ...opts, + }; +} + +export function defaultSyncApiOpts(syncOpts?: SyncApiOptions): SyncApiOptions { + return { + canResetEntireTimeline: (_roomId): boolean => false, + ...syncOpts, + }; +} + +export class SyncApi { + private readonly opts: IStoredClientOpts; + private readonly syncOpts: SyncApiOptions; + + private _peekRoom: Optional<Room> = null; + private currentSyncRequest?: Promise<ISyncResponse>; + private abortController?: AbortController; + private syncState: SyncState | null = null; + private syncStateData?: ISyncStateData; // additional data (eg. error object for failed sync) + private catchingUp = false; + private running = false; + private keepAliveTimer?: ReturnType<typeof setTimeout>; + private connectionReturnedDefer?: IDeferred<boolean>; + private notifEvents: MatrixEvent[] = []; // accumulator of sync events in the current sync response + private failedSyncCount = 0; // Number of consecutive failed /sync requests + private storeIsInvalid = false; // flag set if the store needs to be cleared before we can start + + /** + * Construct an entity which is able to sync with a homeserver. + * @param client - The matrix client instance to use. + * @param opts - client config options + * @param syncOpts - sync-specific options passed by the client + * @internal + */ + public constructor(private readonly client: MatrixClient, opts?: IStoredClientOpts, syncOpts?: SyncApiOptions) { + this.opts = defaultClientOpts(opts); + this.syncOpts = defaultSyncApiOpts(syncOpts); + + if (client.getNotifTimelineSet()) { + client.reEmitter.reEmit(client.getNotifTimelineSet()!, [RoomEvent.Timeline, RoomEvent.TimelineReset]); + } + } + + public createRoom(roomId: string): Room { + const room = _createAndReEmitRoom(this.client, roomId, this.opts); + + room.on(RoomStateEvent.Marker, (markerEvent, markerFoundOptions) => { + this.onMarkerStateEvent(room, markerEvent, markerFoundOptions); + }); + + return room; + } + + /** When we see the marker state change in the room, we know there is some + * new historical messages imported by MSC2716 `/batch_send` somewhere in + * the room and we need to throw away the timeline to make sure the + * historical messages are shown when we paginate `/messages` again. + * @param room - The room where the marker event was sent + * @param markerEvent - The new marker event + * @param setStateOptions - When `timelineWasEmpty` is set + * as `true`, the given marker event will be ignored + */ + private onMarkerStateEvent( + room: Room, + markerEvent: MatrixEvent, + { timelineWasEmpty }: IMarkerFoundOptions = {}, + ): void { + // We don't need to refresh the timeline if it was empty before the + // marker arrived. This could be happen in a variety of cases: + // 1. From the initial sync + // 2. If it's from the first state we're seeing after joining the room + // 3. Or whether it's coming from `syncFromCache` + if (timelineWasEmpty) { + logger.debug( + `MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} ` + + `because the timeline was empty before the marker arrived which means there is nothing to refresh.`, + ); + return; + } + + const isValidMsc2716Event = + // Check whether the room version directly supports MSC2716, in + // which case, "marker" events are already auth'ed by + // power_levels + MSC2716_ROOM_VERSIONS.includes(room.getVersion()) || + // MSC2716 is also supported in all existing room versions but + // special meaning should only be given to "insertion", "batch", + // and "marker" events when they come from the room creator + markerEvent.getSender() === room.getCreator(); + + // It would be nice if we could also specifically tell whether the + // historical messages actually affected the locally cached client + // timeline or not. The problem is we can't see the prev_events of + // the base insertion event that the marker was pointing to because + // prev_events aren't available in the client API's. In most cases, + // the history won't be in people's locally cached timelines in the + // client, so we don't need to bother everyone about refreshing + // their timeline. This works for a v1 though and there are use + // cases like initially bootstrapping your bridged room where people + // are likely to encounter the historical messages affecting their + // current timeline (think someone signing up for Beeper and + // importing their Whatsapp history). + if (isValidMsc2716Event) { + // Saw new marker event, let's let the clients know they should + // refresh the timeline. + logger.debug( + `MarkerState: Timeline needs to be refreshed because ` + + `a new markerEventId=${markerEvent.getId()} was sent in roomId=${room.roomId}`, + ); + room.setTimelineNeedsRefresh(true); + room.emit(RoomEvent.HistoryImportedWithinTimeline, markerEvent, room); + } else { + logger.debug( + `MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} because ` + + `MSC2716 is not supported in the room version or for any room version, the marker wasn't sent ` + + `by the room creator.`, + ); + } + } + + /** + * Sync rooms the user has left. + * @returns Resolved when they've been added to the store. + */ + public async syncLeftRooms(): Promise<Room[]> { + const client = this.client; + + // grab a filter with limit=1 and include_leave=true + const filter = new Filter(this.client.credentials.userId); + filter.setTimelineLimit(1); + filter.setIncludeLeaveRooms(true); + + const localTimeoutMs = this.opts.pollTimeout! + BUFFER_PERIOD_MS; + + const filterId = await client.getOrCreateFilter( + getFilterName(client.credentials.userId!, "LEFT_ROOMS"), + filter, + ); + + const qps: ISyncParams = { + timeout: 0, // don't want to block since this is a single isolated req + filter: filterId, + }; + + const data = await client.http.authedRequest<ISyncResponse>(Method.Get, "/sync", qps as any, undefined, { + localTimeoutMs, + }); + + let leaveRooms: WrappedRoom<ILeftRoom>[] = []; + if (data.rooms?.leave) { + leaveRooms = this.mapSyncResponseToRoomArray(data.rooms.leave); + } + + const rooms = await Promise.all( + leaveRooms.map(async (leaveObj) => { + const room = leaveObj.room; + if (!leaveObj.isBrandNewRoom) { + // the intention behind syncLeftRooms is to add in rooms which were + // *omitted* from the initial /sync. Rooms the user were joined to + // but then left whilst the app is running will appear in this list + // and we do not want to bother with them since they will have the + // current state already (and may get dupe messages if we add + // yet more timeline events!), so skip them. + // NB: When we persist rooms to localStorage this will be more + // complicated... + return; + } + leaveObj.timeline = leaveObj.timeline || { + prev_batch: null, + events: [], + }; + const events = this.mapSyncEventsFormat(leaveObj.timeline, room); + + const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room); + + // set the back-pagination token. Do this *before* adding any + // events so that clients can start back-paginating. + room.getLiveTimeline().setPaginationToken(leaveObj.timeline.prev_batch, EventTimeline.BACKWARDS); + + await this.injectRoomEvents(room, stateEvents, events); + + room.recalculate(); + client.store.storeRoom(room); + client.emit(ClientEvent.Room, room); + + this.processEventsForNotifs(room, events); + return room; + }), + ); + + return rooms.filter(Boolean) as Room[]; + } + + /** + * Peek into a room. This will result in the room in question being synced so it + * is accessible via getRooms(). Live updates for the room will be provided. + * @param roomId - The room ID to peek into. + * @returns A promise which resolves once the room has been added to the + * store. + */ + public peek(roomId: string): Promise<Room> { + if (this._peekRoom?.roomId === roomId) { + return Promise.resolve(this._peekRoom); + } + + const client = this.client; + this._peekRoom = this.createRoom(roomId); + return this.client.roomInitialSync(roomId, 20).then((response) => { + // make sure things are init'd + response.messages = response.messages || { chunk: [] }; + response.messages.chunk = response.messages.chunk || []; + response.state = response.state || []; + + // FIXME: Mostly duplicated from injectRoomEvents but not entirely + // because "state" in this API is at the BEGINNING of the chunk + const oldStateEvents = utils.deepCopy(response.state).map(client.getEventMapper()); + const stateEvents = response.state.map(client.getEventMapper()); + const messages = response.messages.chunk.map(client.getEventMapper()); + + // XXX: copypasted from /sync until we kill off this minging v1 API stuff) + // handle presence events (User objects) + if (Array.isArray(response.presence)) { + response.presence.map(client.getEventMapper()).forEach(function (presenceEvent) { + let user = client.store.getUser(presenceEvent.getContent().user_id); + if (user) { + user.setPresenceEvent(presenceEvent); + } else { + user = createNewUser(client, presenceEvent.getContent().user_id); + user.setPresenceEvent(presenceEvent); + client.store.storeUser(user); + } + client.emit(ClientEvent.Event, presenceEvent); + }); + } + + // set the pagination token before adding the events in case people + // fire off pagination requests in response to the Room.timeline + // events. + if (response.messages.start) { + this._peekRoom!.oldState.paginationToken = response.messages.start; + } + + // set the state of the room to as it was after the timeline executes + this._peekRoom!.oldState.setStateEvents(oldStateEvents); + this._peekRoom!.currentState.setStateEvents(stateEvents); + + this.resolveInvites(this._peekRoom!); + this._peekRoom!.recalculate(); + + // roll backwards to diverge old state. addEventsToTimeline + // will overwrite the pagination token, so make sure it overwrites + // it with the right thing. + this._peekRoom!.addEventsToTimeline( + messages.reverse(), + true, + this._peekRoom!.getLiveTimeline(), + response.messages.start, + ); + + client.store.storeRoom(this._peekRoom!); + client.emit(ClientEvent.Room, this._peekRoom!); + + this.peekPoll(this._peekRoom!); + return this._peekRoom!; + }); + } + + /** + * Stop polling for updates in the peeked room. NOPs if there is no room being + * peeked. + */ + public stopPeeking(): void { + this._peekRoom = null; + } + + /** + * Do a peek room poll. + * @param token - from= token + */ + private peekPoll(peekRoom: Room, token?: string): void { + if (this._peekRoom !== peekRoom) { + debuglog("Stopped peeking in room %s", peekRoom.roomId); + return; + } + + // FIXME: gut wrenching; hard-coded timeout values + this.client.http + .authedRequest<IEventsResponse>( + Method.Get, + "/events", + { + room_id: peekRoom.roomId, + timeout: String(30 * 1000), + from: token, + }, + undefined, + { + localTimeoutMs: 50 * 1000, + abortSignal: this.abortController?.signal, + }, + ) + .then( + (res) => { + if (this._peekRoom !== peekRoom) { + debuglog("Stopped peeking in room %s", peekRoom.roomId); + return; + } + // We have a problem that we get presence both from /events and /sync + // however, /sync only returns presence for users in rooms + // you're actually joined to. + // in order to be sure to get presence for all of the users in the + // peeked room, we handle presence explicitly here. This may result + // in duplicate presence events firing for some users, which is a + // performance drain, but such is life. + // XXX: copypasted from /sync until we can kill this minging v1 stuff. + + res.chunk + .filter(function (e) { + return e.type === "m.presence"; + }) + .map(this.client.getEventMapper()) + .forEach((presenceEvent) => { + let user = this.client.store.getUser(presenceEvent.getContent().user_id); + if (user) { + user.setPresenceEvent(presenceEvent); + } else { + user = createNewUser(this.client, presenceEvent.getContent().user_id); + user.setPresenceEvent(presenceEvent); + this.client.store.storeUser(user); + } + this.client.emit(ClientEvent.Event, presenceEvent); + }); + + // strip out events which aren't for the given room_id (e.g presence) + // and also ephemeral events (which we're assuming is anything without + // and event ID because the /events API doesn't separate them). + const events = res.chunk + .filter(function (e) { + return e.room_id === peekRoom.roomId && e.event_id; + }) + .map(this.client.getEventMapper()); + + peekRoom.addLiveEvents(events); + this.peekPoll(peekRoom, res.end); + }, + (err) => { + logger.error("[%s] Peek poll failed: %s", peekRoom.roomId, err); + setTimeout(() => { + this.peekPoll(peekRoom, token); + }, 30 * 1000); + }, + ); + } + + /** + * Returns the current state of this sync object + * @see MatrixClient#event:"sync" + */ + public getSyncState(): SyncState | null { + return this.syncState; + } + + /** + * Returns the additional data object associated with + * the current sync state, or null if there is no + * such data. + * Sync errors, if available, are put in the 'error' key of + * this object. + */ + public getSyncStateData(): ISyncStateData | null { + return this.syncStateData ?? null; + } + + public async recoverFromSyncStartupError(savedSyncPromise: Promise<void> | undefined, error: Error): Promise<void> { + // Wait for the saved sync to complete - we send the pushrules and filter requests + // before the saved sync has finished so they can run in parallel, but only process + // the results after the saved sync is done. Equivalently, we wait for it to finish + // before reporting failures from these functions. + await savedSyncPromise; + const keepaliveProm = this.startKeepAlives(); + this.updateSyncState(SyncState.Error, { error }); + await keepaliveProm; + } + + /** + * Is the lazy loading option different than in previous session? + * @param lazyLoadMembers - current options for lazy loading + * @returns whether or not the option has changed compared to the previous session */ + private async wasLazyLoadingToggled(lazyLoadMembers = false): Promise<boolean> { + // assume it was turned off before + // if we don't know any better + let lazyLoadMembersBefore = false; + const isStoreNewlyCreated = await this.client.store.isNewlyCreated(); + if (!isStoreNewlyCreated) { + const prevClientOptions = await this.client.store.getClientOptions(); + if (prevClientOptions) { + lazyLoadMembersBefore = !!prevClientOptions.lazyLoadMembers; + } + return lazyLoadMembersBefore !== lazyLoadMembers; + } + return false; + } + + private shouldAbortSync(error: MatrixError): boolean { + if (error.errcode === "M_UNKNOWN_TOKEN") { + // The logout already happened, we just need to stop. + logger.warn("Token no longer valid - assuming logout"); + this.stop(); + this.updateSyncState(SyncState.Error, { error }); + return true; + } + return false; + } + + private getPushRules = async (): Promise<void> => { + try { + debuglog("Getting push rules..."); + const result = await this.client.getPushRules(); + debuglog("Got push rules"); + + this.client.pushRules = result; + } catch (err) { + logger.error("Getting push rules failed", err); + if (this.shouldAbortSync(<MatrixError>err)) return; + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + debuglog("Waiting for saved sync before retrying push rules..."); + await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err); + return this.getPushRules(); // try again + } + }; + + private buildDefaultFilter = (): Filter => { + const filter = new Filter(this.client.credentials.userId); + if (this.client.canSupport.get(Feature.ThreadUnreadNotifications) !== ServerSupport.Unsupported) { + filter.setUnreadThreadNotifications(true); + } + return filter; + }; + + private checkLazyLoadStatus = async (): Promise<void> => { + debuglog("Checking lazy load status..."); + if (this.opts.lazyLoadMembers && this.client.isGuest()) { + this.opts.lazyLoadMembers = false; + } + if (this.opts.lazyLoadMembers) { + debuglog("Checking server lazy load support..."); + const supported = await this.client.doesServerSupportLazyLoading(); + if (supported) { + debuglog("Enabling lazy load on sync filter..."); + if (!this.opts.filter) { + this.opts.filter = this.buildDefaultFilter(); + } + this.opts.filter.setLazyLoadMembers(true); + } else { + debuglog("LL: lazy loading requested but not supported " + "by server, so disabling"); + this.opts.lazyLoadMembers = false; + } + } + // need to vape the store when enabling LL and wasn't enabled before + debuglog("Checking whether lazy loading has changed in store..."); + const shouldClear = await this.wasLazyLoadingToggled(this.opts.lazyLoadMembers); + if (shouldClear) { + this.storeIsInvalid = true; + const error = new InvalidStoreError(InvalidStoreState.ToggledLazyLoading, !!this.opts.lazyLoadMembers); + this.updateSyncState(SyncState.Error, { error }); + // bail out of the sync loop now: the app needs to respond to this error. + // we leave the state as 'ERROR' which isn't great since this normally means + // we're retrying. The client must be stopped before clearing the stores anyway + // so the app should stop the client, clear the store and start it again. + logger.warn("InvalidStoreError: store is not usable: stopping sync."); + return; + } + if (this.opts.lazyLoadMembers) { + this.syncOpts.crypto?.enableLazyLoading(); + } + try { + debuglog("Storing client options..."); + await this.client.storeClientOptions(); + debuglog("Stored client options"); + } catch (err) { + logger.error("Storing client options failed", err); + throw err; + } + }; + + private getFilter = async (): Promise<{ + filterId?: string; + filter?: Filter; + }> => { + debuglog("Getting filter..."); + let filter: Filter; + if (this.opts.filter) { + filter = this.opts.filter; + } else { + filter = this.buildDefaultFilter(); + } + + let filterId: string; + try { + filterId = await this.client.getOrCreateFilter(getFilterName(this.client.credentials.userId!), filter); + } catch (err) { + logger.error("Getting filter failed", err); + if (this.shouldAbortSync(<MatrixError>err)) return {}; + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + debuglog("Waiting for saved sync before retrying filter..."); + await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err); + return this.getFilter(); // try again + } + return { filter, filterId }; + }; + + private savedSyncPromise?: Promise<void>; + + /** + * Main entry point + */ + public async sync(): Promise<void> { + this.running = true; + this.abortController = new AbortController(); + + global.window?.addEventListener?.("online", this.onOnline, false); + + if (this.client.isGuest()) { + // no push rules for guests, no access to POST filter for guests. + return this.doSync({}); + } + + // Pull the saved sync token out first, before the worker starts sending + // all the sync data which could take a while. This will let us send our + // first incremental sync request before we've processed our saved data. + debuglog("Getting saved sync token..."); + const savedSyncTokenPromise = this.client.store.getSavedSyncToken().then((tok) => { + debuglog("Got saved sync token"); + return tok; + }); + + this.savedSyncPromise = this.client.store + .getSavedSync() + .then((savedSync) => { + debuglog(`Got reply from saved sync, exists? ${!!savedSync}`); + if (savedSync) { + return this.syncFromCache(savedSync); + } + }) + .catch((err) => { + logger.error("Getting saved sync failed", err); + }); + + // We need to do one-off checks before we can begin the /sync loop. + // These are: + // 1) We need to get push rules so we can check if events should bing as we get + // them from /sync. + // 2) We need to get/create a filter which we can use for /sync. + // 3) We need to check the lazy loading option matches what was used in the + // stored sync. If it doesn't, we can't use the stored sync. + + // Now start the first incremental sync request: this can also + // take a while so if we set it going now, we can wait for it + // to finish while we process our saved sync data. + await this.getPushRules(); + await this.checkLazyLoadStatus(); + const { filterId, filter } = await this.getFilter(); + if (!filter) return; // bail, getFilter failed + + // reset the notifications timeline to prepare it to paginate from + // the current point in time. + // The right solution would be to tie /sync pagination tokens into + // /notifications API somehow. + this.client.resetNotifTimelineSet(); + + if (!this.currentSyncRequest) { + let firstSyncFilter = filterId; + const savedSyncToken = await savedSyncTokenPromise; + + if (savedSyncToken) { + debuglog("Sending first sync request..."); + } else { + debuglog("Sending initial sync request..."); + const initialFilter = this.buildDefaultFilter(); + initialFilter.setDefinition(filter.getDefinition()); + initialFilter.setTimelineLimit(this.opts.initialSyncLimit!); + // Use an inline filter, no point uploading it for a single usage + firstSyncFilter = JSON.stringify(initialFilter.getDefinition()); + } + + // Send this first sync request here so we can then wait for the saved + // sync data to finish processing before we process the results of this one. + this.currentSyncRequest = this.doSyncRequest({ filter: firstSyncFilter }, savedSyncToken); + } + + // Now wait for the saved sync to finish... + debuglog("Waiting for saved sync before starting sync processing..."); + await this.savedSyncPromise; + // process the first sync request and continue syncing with the normal filterId + return this.doSync({ filter: filterId }); + } + + /** + * Stops the sync object from syncing. + */ + public stop(): void { + debuglog("SyncApi.stop"); + // It is necessary to check for the existance of + // global.window AND global.window.removeEventListener. + // Some platforms (e.g. React Native) register global.window, + // but do not have global.window.removeEventListener. + global.window?.removeEventListener?.("online", this.onOnline, false); + this.running = false; + this.abortController?.abort(); + if (this.keepAliveTimer) { + clearTimeout(this.keepAliveTimer); + this.keepAliveTimer = undefined; + } + } + + /** + * Retry a backed off syncing request immediately. This should only be used when + * the user <b>explicitly</b> attempts to retry their lost connection. + * @returns True if this resulted in a request being retried. + */ + public retryImmediately(): boolean { + if (!this.connectionReturnedDefer) { + return false; + } + this.startKeepAlives(0); + return true; + } + /** + * Process a single set of cached sync data. + * @param savedSync - a saved sync that was persisted by a store. This + * should have been acquired via client.store.getSavedSync(). + */ + private async syncFromCache(savedSync: ISavedSync): Promise<void> { + debuglog("sync(): not doing HTTP hit, instead returning stored /sync data"); + + const nextSyncToken = savedSync.nextBatch; + + // Set sync token for future incremental syncing + this.client.store.setSyncToken(nextSyncToken); + + // No previous sync, set old token to null + const syncEventData: ISyncStateData = { + nextSyncToken, + catchingUp: false, + fromCache: true, + }; + + const data: ISyncResponse = { + next_batch: nextSyncToken, + rooms: savedSync.roomsData, + account_data: { + events: savedSync.accountData, + }, + }; + + try { + await this.processSyncResponse(syncEventData, data); + } catch (e) { + logger.error("Error processing cached sync", e); + } + + // Don't emit a prepared if we've bailed because the store is invalid: + // in this case the client will not be usable until stopped & restarted + // so this would be useless and misleading. + if (!this.storeIsInvalid) { + this.updateSyncState(SyncState.Prepared, syncEventData); + } + } + + /** + * Invoke me to do /sync calls + */ + private async doSync(syncOptions: ISyncOptions): Promise<void> { + while (this.running) { + const syncToken = this.client.store.getSyncToken(); + + let data: ISyncResponse; + try { + if (!this.currentSyncRequest) { + this.currentSyncRequest = this.doSyncRequest(syncOptions, syncToken); + } + data = await this.currentSyncRequest; + } catch (e) { + const abort = await this.onSyncError(<MatrixError>e); + if (abort) return; + continue; + } finally { + this.currentSyncRequest = undefined; + } + + // set the sync token NOW *before* processing the events. We do this so + // if something barfs on an event we can skip it rather than constantly + // polling with the same token. + this.client.store.setSyncToken(data.next_batch); + + // Reset after a successful sync + this.failedSyncCount = 0; + + await this.client.store.setSyncData(data); + + const syncEventData = { + oldSyncToken: syncToken ?? undefined, + nextSyncToken: data.next_batch, + catchingUp: this.catchingUp, + }; + + if (this.syncOpts.crypto) { + // tell the crypto module we're about to process a sync + // response + await this.syncOpts.crypto.onSyncWillProcess(syncEventData); + } + + try { + await this.processSyncResponse(syncEventData, data); + } catch (e) { + // log the exception with stack if we have it, else fall back + // to the plain description + logger.error("Caught /sync error", e); + + // Emit the exception for client handling + this.client.emit(ClientEvent.SyncUnexpectedError, <Error>e); + } + + // update this as it may have changed + syncEventData.catchingUp = this.catchingUp; + + // emit synced events + if (!syncOptions.hasSyncedBefore) { + this.updateSyncState(SyncState.Prepared, syncEventData); + syncOptions.hasSyncedBefore = true; + } + + // tell the crypto module to do its processing. It may block (to do a + // /keys/changes request). + if (this.syncOpts.cryptoCallbacks) { + await this.syncOpts.cryptoCallbacks.onSyncCompleted(syncEventData); + } + + // keep emitting SYNCING -> SYNCING for clients who want to do bulk updates + this.updateSyncState(SyncState.Syncing, syncEventData); + + if (this.client.store.wantsSave()) { + // We always save the device list (if it's dirty) before saving the sync data: + // this means we know the saved device list data is at least as fresh as the + // stored sync data which means we don't have to worry that we may have missed + // device changes. We can also skip the delay since we're not calling this very + // frequently (and we don't really want to delay the sync for it). + if (this.syncOpts.crypto) { + await this.syncOpts.crypto.saveDeviceList(0); + } + + // tell databases that everything is now in a consistent state and can be saved. + this.client.store.save(); + } + } + + if (!this.running) { + debuglog("Sync no longer running: exiting."); + if (this.connectionReturnedDefer) { + this.connectionReturnedDefer.reject(); + this.connectionReturnedDefer = undefined; + } + this.updateSyncState(SyncState.Stopped); + } + } + + private doSyncRequest(syncOptions: ISyncOptions, syncToken: string | null): Promise<ISyncResponse> { + const qps = this.getSyncParams(syncOptions, syncToken); + return this.client.http.authedRequest<ISyncResponse>(Method.Get, "/sync", qps as any, undefined, { + localTimeoutMs: qps.timeout + BUFFER_PERIOD_MS, + abortSignal: this.abortController?.signal, + }); + } + + private getSyncParams(syncOptions: ISyncOptions, syncToken: string | null): ISyncParams { + let timeout = this.opts.pollTimeout!; + + if (this.getSyncState() !== SyncState.Syncing || this.catchingUp) { + // unless we are happily syncing already, we want the server to return + // as quickly as possible, even if there are no events queued. This + // serves two purposes: + // + // * When the connection dies, we want to know asap when it comes back, + // so that we can hide the error from the user. (We don't want to + // have to wait for an event or a timeout). + // + // * We want to know if the server has any to_device messages queued up + // for us. We do that by calling it with a zero timeout until it + // doesn't give us any more to_device messages. + this.catchingUp = true; + timeout = 0; + } + + let filter = syncOptions.filter; + if (this.client.isGuest() && !filter) { + filter = this.getGuestFilter(); + } + + const qps: ISyncParams = { filter, timeout }; + + if (this.opts.disablePresence) { + qps.set_presence = SetPresence.Offline; + } + + if (syncToken) { + qps.since = syncToken; + } else { + // use a cachebuster for initialsyncs, to make sure that + // we don't get a stale sync + // (https://github.com/vector-im/vector-web/issues/1354) + qps._cacheBuster = Date.now(); + } + + if ([SyncState.Reconnecting, SyncState.Error].includes(this.getSyncState()!)) { + // we think the connection is dead. If it comes back up, we won't know + // about it till /sync returns. If the timeout= is high, this could + // be a long time. Set it to 0 when doing retries so we don't have to wait + // for an event or a timeout before emiting the SYNCING event. + qps.timeout = 0; + } + + return qps; + } + + private async onSyncError(err: MatrixError): Promise<boolean> { + if (!this.running) { + debuglog("Sync no longer running: exiting"); + if (this.connectionReturnedDefer) { + this.connectionReturnedDefer.reject(); + this.connectionReturnedDefer = undefined; + } + this.updateSyncState(SyncState.Stopped); + return true; // abort + } + + logger.error("/sync error %s", err); + + if (this.shouldAbortSync(err)) { + return true; // abort + } + + this.failedSyncCount++; + logger.log("Number of consecutive failed sync requests:", this.failedSyncCount); + + debuglog("Starting keep-alive"); + // Note that we do *not* mark the sync connection as + // lost yet: we only do this if a keepalive poke + // fails, since long lived HTTP connections will + // go away sometimes and we shouldn't treat this as + // erroneous. We set the state to 'reconnecting' + // instead, so that clients can observe this state + // if they wish. + const keepAlivePromise = this.startKeepAlives(); + + this.currentSyncRequest = undefined; + // Transition from RECONNECTING to ERROR after a given number of failed syncs + this.updateSyncState( + this.failedSyncCount >= FAILED_SYNC_ERROR_THRESHOLD ? SyncState.Error : SyncState.Reconnecting, + { error: err }, + ); + + const connDidFail = await keepAlivePromise; + + // Only emit CATCHUP if we detected a connectivity error: if we didn't, + // it's quite likely the sync will fail again for the same reason and we + // want to stay in ERROR rather than keep flip-flopping between ERROR + // and CATCHUP. + if (connDidFail && this.getSyncState() === SyncState.Error) { + this.updateSyncState(SyncState.Catchup, { + catchingUp: true, + }); + } + return false; + } + + /** + * Process data returned from a sync response and propagate it + * into the model objects + * + * @param syncEventData - Object containing sync tokens associated with this sync + * @param data - The response from /sync + */ + private async processSyncResponse(syncEventData: ISyncStateData, data: ISyncResponse): Promise<void> { + const client = this.client; + + // data looks like: + // { + // next_batch: $token, + // presence: { events: [] }, + // account_data: { events: [] }, + // device_lists: { changed: ["@user:server", ... ]}, + // to_device: { events: [] }, + // device_one_time_keys_count: { signed_curve25519: 42 }, + // rooms: { + // invite: { + // $roomid: { + // invite_state: { events: [] } + // } + // }, + // join: { + // $roomid: { + // state: { events: [] }, + // timeline: { events: [], prev_batch: $token, limited: true }, + // ephemeral: { events: [] }, + // summary: { + // m.heroes: [ $user_id ], + // m.joined_member_count: $count, + // m.invited_member_count: $count + // }, + // account_data: { events: [] }, + // unread_notifications: { + // highlight_count: 0, + // notification_count: 0, + // } + // } + // }, + // leave: { + // $roomid: { + // state: { events: [] }, + // timeline: { events: [], prev_batch: $token } + // } + // } + // } + // } + + // TODO-arch: + // - Each event we pass through needs to be emitted via 'event', can we + // do this in one place? + // - The isBrandNewRoom boilerplate is boilerplatey. + + // handle presence events (User objects) + if (Array.isArray(data.presence?.events)) { + data.presence!.events.filter(noUnsafeEventProps) + .map(client.getEventMapper()) + .forEach(function (presenceEvent) { + let user = client.store.getUser(presenceEvent.getSender()!); + if (user) { + user.setPresenceEvent(presenceEvent); + } else { + user = createNewUser(client, presenceEvent.getSender()!); + user.setPresenceEvent(presenceEvent); + client.store.storeUser(user); + } + client.emit(ClientEvent.Event, presenceEvent); + }); + } + + // handle non-room account_data + if (Array.isArray(data.account_data?.events)) { + const events = data.account_data.events.filter(noUnsafeEventProps).map(client.getEventMapper()); + const prevEventsMap = events.reduce<Record<string, MatrixEvent | undefined>>((m, c) => { + m[c.getType()!] = client.store.getAccountData(c.getType()); + return m; + }, {}); + client.store.storeAccountDataEvents(events); + events.forEach(function (accountDataEvent) { + // Honour push rules that come down the sync stream but also + // honour push rules that were previously cached. Base rules + // will be updated when we receive push rules via getPushRules + // (see sync) before syncing over the network. + if (accountDataEvent.getType() === EventType.PushRules) { + const rules = accountDataEvent.getContent<IPushRules>(); + client.setPushRules(rules); + } + const prevEvent = prevEventsMap[accountDataEvent.getType()!]; + client.emit(ClientEvent.AccountData, accountDataEvent, prevEvent); + return accountDataEvent; + }); + } + + // handle to-device events + if (data.to_device && Array.isArray(data.to_device.events) && data.to_device.events.length > 0) { + let toDeviceMessages: IToDeviceEvent[] = data.to_device.events.filter(noUnsafeEventProps); + + if (this.syncOpts.cryptoCallbacks) { + toDeviceMessages = await this.syncOpts.cryptoCallbacks.preprocessToDeviceMessages(toDeviceMessages); + } + + const cancelledKeyVerificationTxns: string[] = []; + toDeviceMessages + .map(client.getEventMapper({ toDevice: true })) + .map((toDeviceEvent) => { + // map is a cheap inline forEach + // We want to flag m.key.verification.start events as cancelled + // if there's an accompanying m.key.verification.cancel event, so + // we pull out the transaction IDs from the cancellation events + // so we can flag the verification events as cancelled in the loop + // below. + if (toDeviceEvent.getType() === "m.key.verification.cancel") { + const txnId: string = toDeviceEvent.getContent()["transaction_id"]; + if (txnId) { + cancelledKeyVerificationTxns.push(txnId); + } + } + + // as mentioned above, .map is a cheap inline forEach, so return + // the unmodified event. + return toDeviceEvent; + }) + .forEach(function (toDeviceEvent) { + const content = toDeviceEvent.getContent(); + if (toDeviceEvent.getType() == "m.room.message" && content.msgtype == "m.bad.encrypted") { + // the mapper already logged a warning. + logger.log("Ignoring undecryptable to-device event from " + toDeviceEvent.getSender()); + return; + } + + if ( + toDeviceEvent.getType() === "m.key.verification.start" || + toDeviceEvent.getType() === "m.key.verification.request" + ) { + const txnId = content["transaction_id"]; + if (cancelledKeyVerificationTxns.includes(txnId)) { + toDeviceEvent.flagCancelled(); + } + } + + client.emit(ClientEvent.ToDeviceEvent, toDeviceEvent); + }); + } else { + // no more to-device events: we can stop polling with a short timeout. + this.catchingUp = false; + } + + // the returned json structure is a bit crap, so make it into a + // nicer form (array) after applying sanity to make sure we don't fail + // on missing keys (on the off chance) + let inviteRooms: WrappedRoom<IInvitedRoom>[] = []; + let joinRooms: WrappedRoom<IJoinedRoom>[] = []; + let leaveRooms: WrappedRoom<ILeftRoom>[] = []; + + if (data.rooms) { + if (data.rooms.invite) { + inviteRooms = this.mapSyncResponseToRoomArray(data.rooms.invite); + } + if (data.rooms.join) { + joinRooms = this.mapSyncResponseToRoomArray(data.rooms.join); + } + if (data.rooms.leave) { + leaveRooms = this.mapSyncResponseToRoomArray(data.rooms.leave); + } + } + + this.notifEvents = []; + + // Handle invites + await utils.promiseMapSeries(inviteRooms, async (inviteObj) => { + const room = inviteObj.room; + const stateEvents = this.mapSyncEventsFormat(inviteObj.invite_state, room); + + await this.injectRoomEvents(room, stateEvents); + + const inviter = room.currentState.getStateEvents(EventType.RoomMember, client.getUserId()!)?.getSender(); + + const crypto = client.crypto; + if (crypto) { + const parkedHistory = await crypto.cryptoStore.takeParkedSharedHistory(room.roomId); + for (const parked of parkedHistory) { + if (parked.senderId === inviter) { + await crypto.olmDevice.addInboundGroupSession( + room.roomId, + parked.senderKey, + parked.forwardingCurve25519KeyChain, + parked.sessionId, + parked.sessionKey, + parked.keysClaimed, + true, + { sharedHistory: true, untrusted: true }, + ); + } + } + } + + if (inviteObj.isBrandNewRoom) { + room.recalculate(); + client.store.storeRoom(room); + client.emit(ClientEvent.Room, room); + } else { + // Update room state for invite->reject->invite cycles + room.recalculate(); + } + stateEvents.forEach(function (e) { + client.emit(ClientEvent.Event, e); + }); + }); + + // Handle joins + await utils.promiseMapSeries(joinRooms, async (joinObj) => { + const room = joinObj.room; + const stateEvents = this.mapSyncEventsFormat(joinObj.state, room); + // Prevent events from being decrypted ahead of time + // this helps large account to speed up faster + // room::decryptCriticalEvent is in charge of decrypting all the events + // required for a client to function properly + const events = this.mapSyncEventsFormat(joinObj.timeline, room, false); + const ephemeralEvents = this.mapSyncEventsFormat(joinObj.ephemeral); + const accountDataEvents = this.mapSyncEventsFormat(joinObj.account_data); + + const encrypted = client.isRoomEncrypted(room.roomId); + // We store the server-provided value first so it's correct when any of the events fire. + if (joinObj.unread_notifications) { + /** + * We track unread notifications ourselves in encrypted rooms, so don't + * bother setting it here. We trust our calculations better than the + * server's for this case, and therefore will assume that our non-zero + * count is accurate. + * + * @see import("./client").fixNotificationCountOnDecryption + */ + if (!encrypted || joinObj.unread_notifications.notification_count === 0) { + // In an encrypted room, if the room has notifications enabled then it's typical for + // the server to flag all new messages as notifying. However, some push rules calculate + // events as ignored based on their event contents (e.g. ignoring msgtype=m.notice messages) + // so we want to calculate this figure on the client in all cases. + room.setUnreadNotificationCount( + NotificationCountType.Total, + joinObj.unread_notifications.notification_count ?? 0, + ); + } + + if (!encrypted || room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0) { + // If the locally stored highlight count is zero, use the server provided value. + room.setUnreadNotificationCount( + NotificationCountType.Highlight, + joinObj.unread_notifications.highlight_count ?? 0, + ); + } + } + + const unreadThreadNotifications = + joinObj[UNREAD_THREAD_NOTIFICATIONS.name] ?? joinObj[UNREAD_THREAD_NOTIFICATIONS.altName!]; + if (unreadThreadNotifications) { + // Only partially reset unread notification + // We want to keep the client-generated count. Particularly important + // for encrypted room that refresh their notification count on event + // decryption + room.resetThreadUnreadNotificationCount(Object.keys(unreadThreadNotifications)); + for (const [threadId, unreadNotification] of Object.entries(unreadThreadNotifications)) { + if (!encrypted || unreadNotification.notification_count === 0) { + room.setThreadUnreadNotificationCount( + threadId, + NotificationCountType.Total, + unreadNotification.notification_count ?? 0, + ); + } + + const hasNoNotifications = + room.getThreadUnreadNotificationCount(threadId, NotificationCountType.Highlight) <= 0; + if (!encrypted || (encrypted && hasNoNotifications)) { + room.setThreadUnreadNotificationCount( + threadId, + NotificationCountType.Highlight, + unreadNotification.highlight_count ?? 0, + ); + } + } + } else { + room.resetThreadUnreadNotificationCount(); + } + + joinObj.timeline = joinObj.timeline || ({} as ITimeline); + + if (joinObj.isBrandNewRoom) { + // set the back-pagination token. Do this *before* adding any + // events so that clients can start back-paginating. + if (joinObj.timeline.prev_batch !== null) { + room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, EventTimeline.BACKWARDS); + } + } else if (joinObj.timeline.limited) { + let limited = true; + + // we've got a limited sync, so we *probably* have a gap in the + // timeline, so should reset. But we might have been peeking or + // paginating and already have some of the events, in which + // case we just want to append any subsequent events to the end + // of the existing timeline. + // + // This is particularly important in the case that we already have + // *all* of the events in the timeline - in that case, if we reset + // the timeline, we'll end up with an entirely empty timeline, + // which we'll try to paginate but not get any new events (which + // will stop us linking the empty timeline into the chain). + // + for (let i = events.length - 1; i >= 0; i--) { + const eventId = events[i].getId()!; + if (room.getTimelineForEvent(eventId)) { + debuglog(`Already have event ${eventId} in limited sync - not resetting`); + limited = false; + + // we might still be missing some of the events before i; + // we don't want to be adding them to the end of the + // timeline because that would put them out of order. + events.splice(0, i); + + // XXX: there's a problem here if the skipped part of the + // timeline modifies the state set in stateEvents, because + // we'll end up using the state from stateEvents rather + // than the later state from timelineEvents. We probably + // need to wind stateEvents forward over the events we're + // skipping. + + break; + } + } + + if (limited) { + room.resetLiveTimeline( + joinObj.timeline.prev_batch, + this.syncOpts.canResetEntireTimeline!(room.roomId) ? null : syncEventData.oldSyncToken ?? null, + ); + + // We have to assume any gap in any timeline is + // reason to stop incrementally tracking notifications and + // reset the timeline. + client.resetNotifTimelineSet(); + } + } + + // process any crypto events *before* emitting the RoomStateEvent events. This + // avoids a race condition if the application tries to send a message after the + // state event is processed, but before crypto is enabled, which then causes the + // crypto layer to complain. + if (this.syncOpts.cryptoCallbacks) { + for (const e of stateEvents.concat(events)) { + if (e.isState() && e.getType() === EventType.RoomEncryption && e.getStateKey() === "") { + await this.syncOpts.cryptoCallbacks.onCryptoEvent(room, e); + } + } + } + + try { + await this.injectRoomEvents(room, stateEvents, events, syncEventData.fromCache); + } catch (e) { + logger.error(`Failed to process events on room ${room.roomId}:`, e); + } + + // set summary after processing events, + // because it will trigger a name calculation + // which needs the room state to be up to date + if (joinObj.summary) { + room.setSummary(joinObj.summary); + } + + // we deliberately don't add ephemeral events to the timeline + room.addEphemeralEvents(ephemeralEvents); + + // we deliberately don't add accountData to the timeline + room.addAccountData(accountDataEvents); + + room.recalculate(); + if (joinObj.isBrandNewRoom) { + client.store.storeRoom(room); + client.emit(ClientEvent.Room, room); + } + + this.processEventsForNotifs(room, events); + + const emitEvent = (e: MatrixEvent): boolean => client.emit(ClientEvent.Event, e); + stateEvents.forEach(emitEvent); + events.forEach(emitEvent); + ephemeralEvents.forEach(emitEvent); + accountDataEvents.forEach(emitEvent); + + // Decrypt only the last message in all rooms to make sure we can generate a preview + // And decrypt all events after the recorded read receipt to ensure an accurate + // notification count + room.decryptCriticalEvents(); + }); + + // Handle leaves (e.g. kicked rooms) + await utils.promiseMapSeries(leaveRooms, async (leaveObj) => { + const room = leaveObj.room; + const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room); + const events = this.mapSyncEventsFormat(leaveObj.timeline, room); + const accountDataEvents = this.mapSyncEventsFormat(leaveObj.account_data); + + await this.injectRoomEvents(room, stateEvents, events); + room.addAccountData(accountDataEvents); + + room.recalculate(); + if (leaveObj.isBrandNewRoom) { + client.store.storeRoom(room); + client.emit(ClientEvent.Room, room); + } + + this.processEventsForNotifs(room, events); + + stateEvents.forEach(function (e) { + client.emit(ClientEvent.Event, e); + }); + events.forEach(function (e) { + client.emit(ClientEvent.Event, e); + }); + accountDataEvents.forEach(function (e) { + client.emit(ClientEvent.Event, e); + }); + }); + + // update the notification timeline, if appropriate. + // we only do this for live events, as otherwise we can't order them sanely + // in the timeline relative to ones paginated in by /notifications. + // XXX: we could fix this by making EventTimeline support chronological + // ordering... but it doesn't, right now. + if (syncEventData.oldSyncToken && this.notifEvents.length) { + this.notifEvents.sort(function (a, b) { + return a.getTs() - b.getTs(); + }); + this.notifEvents.forEach(function (event) { + client.getNotifTimelineSet()?.addLiveEvent(event); + }); + } + + // Handle device list updates + if (data.device_lists) { + if (this.syncOpts.crypto) { + await this.syncOpts.crypto.handleDeviceListChanges(syncEventData, data.device_lists); + } else { + // FIXME if we *don't* have a crypto module, we still need to + // invalidate the device lists. But that would require a + // substantial bit of rework :/. + } + } + + // Handle one_time_keys_count + if (data.device_one_time_keys_count) { + const map = new Map<string, number>(Object.entries(data.device_one_time_keys_count)); + this.syncOpts.cryptoCallbacks?.preprocessOneTimeKeyCounts(map); + } + if (data.device_unused_fallback_key_types || data["org.matrix.msc2732.device_unused_fallback_key_types"]) { + // The presence of device_unused_fallback_key_types indicates that the + // server supports fallback keys. If there's no unused + // signed_curve25519 fallback key we need a new one. + const unusedFallbackKeys = + data.device_unused_fallback_key_types || data["org.matrix.msc2732.device_unused_fallback_key_types"]; + this.syncOpts.cryptoCallbacks?.preprocessUnusedFallbackKeys(new Set<string>(unusedFallbackKeys || null)); + } + } + + /** + * Starts polling the connectivity check endpoint + * @param delay - How long to delay until the first poll. + * defaults to a short, randomised interval (to prevent + * tight-looping if /versions succeeds but /sync etc. fail). + * @returns which resolves once the connection returns + */ + private startKeepAlives(delay?: number): Promise<boolean> { + if (delay === undefined) { + delay = 2000 + Math.floor(Math.random() * 5000); + } + + if (this.keepAliveTimer !== null) { + clearTimeout(this.keepAliveTimer); + } + if (delay > 0) { + this.keepAliveTimer = setTimeout(this.pokeKeepAlive.bind(this), delay); + } else { + this.pokeKeepAlive(); + } + if (!this.connectionReturnedDefer) { + this.connectionReturnedDefer = utils.defer(); + } + return this.connectionReturnedDefer.promise; + } + + /** + * Make a dummy call to /_matrix/client/versions, to see if the HS is + * reachable. + * + * On failure, schedules a call back to itself. On success, resolves + * this.connectionReturnedDefer. + * + * @param connDidFail - True if a connectivity failure has been detected. Optional. + */ + private pokeKeepAlive(connDidFail = false): void { + const success = (): void => { + clearTimeout(this.keepAliveTimer); + if (this.connectionReturnedDefer) { + this.connectionReturnedDefer.resolve(connDidFail); + this.connectionReturnedDefer = undefined; + } + }; + + this.client.http + .request( + Method.Get, + "/_matrix/client/versions", + undefined, // queryParams + undefined, // data + { + prefix: "", + localTimeoutMs: 15 * 1000, + abortSignal: this.abortController?.signal, + }, + ) + .then( + () => { + success(); + }, + (err) => { + if (err.httpStatus == 400 || err.httpStatus == 404) { + // treat this as a success because the server probably just doesn't + // support /versions: point is, we're getting a response. + // We wait a short time though, just in case somehow the server + // is in a mode where it 400s /versions responses and sync etc. + // responses fail, this will mean we don't hammer in a loop. + this.keepAliveTimer = setTimeout(success, 2000); + } else { + connDidFail = true; + this.keepAliveTimer = setTimeout( + this.pokeKeepAlive.bind(this, connDidFail), + 5000 + Math.floor(Math.random() * 5000), + ); + // A keepalive has failed, so we emit the + // error state (whether or not this is the + // first failure). + // Note we do this after setting the timer: + // this lets the unit tests advance the mock + // clock when they get the error. + this.updateSyncState(SyncState.Error, { error: err }); + } + }, + ); + } + + private mapSyncResponseToRoomArray<T extends ILeftRoom | IJoinedRoom | IInvitedRoom>( + obj: Record<string, T>, + ): Array<WrappedRoom<T>> { + // Maps { roomid: {stuff}, roomid: {stuff} } + // to + // [{stuff+Room+isBrandNewRoom}, {stuff+Room+isBrandNewRoom}] + const client = this.client; + return Object.keys(obj) + .filter((k) => !unsafeProp(k)) + .map((roomId) => { + const arrObj = obj[roomId] as T & { room: Room; isBrandNewRoom: boolean }; + let room = client.store.getRoom(roomId); + let isBrandNewRoom = false; + if (!room) { + room = this.createRoom(roomId); + isBrandNewRoom = true; + } + arrObj.room = room; + arrObj.isBrandNewRoom = isBrandNewRoom; + return arrObj; + }); + } + + private mapSyncEventsFormat( + obj: IInviteState | ITimeline | IEphemeral, + room?: Room, + decrypt = true, + ): MatrixEvent[] { + if (!obj || !Array.isArray(obj.events)) { + return []; + } + const mapper = this.client.getEventMapper({ decrypt }); + type TaggedEvent = (IStrippedState | IRoomEvent | IStateEvent | IMinimalEvent) & { room_id?: string }; + return (obj.events as TaggedEvent[]).filter(noUnsafeEventProps).map(function (e) { + if (room) { + e.room_id = room.roomId; + } + return mapper(e); + }); + } + + /** + */ + private resolveInvites(room: Room): void { + if (!room || !this.opts.resolveInvitesToProfiles) { + return; + } + const client = this.client; + // For each invited room member we want to give them a displayname/avatar url + // if they have one (the m.room.member invites don't contain this). + room.getMembersWithMembership("invite").forEach(function (member) { + if (member.requestedProfileInfo) return; + member.requestedProfileInfo = true; + // try to get a cached copy first. + const user = client.getUser(member.userId); + let promise; + if (user) { + promise = Promise.resolve({ + avatar_url: user.avatarUrl, + displayname: user.displayName, + }); + } else { + promise = client.getProfileInfo(member.userId); + } + promise.then( + function (info) { + // slightly naughty by doctoring the invite event but this means all + // the code paths remain the same between invite/join display name stuff + // which is a worthy trade-off for some minor pollution. + const inviteEvent = member.events.member; + if (inviteEvent?.getContent().membership !== "invite") { + // between resolving and now they have since joined, so don't clobber + return; + } + inviteEvent.getContent().avatar_url = info.avatar_url; + inviteEvent.getContent().displayname = info.displayname; + // fire listeners + member.setMembershipEvent(inviteEvent, room.currentState); + }, + function (err) { + // OH WELL. + }, + ); + }); + } + + /** + * Injects events into a room's model. + * @param stateEventList - A list of state events. This is the state + * at the *START* of the timeline list if it is supplied. + * @param timelineEventList - A list of timeline events, including threaded. Lower index + * is earlier in time. Higher index is later. + * @param fromCache - whether the sync response came from cache + */ + public async injectRoomEvents( + room: Room, + stateEventList: MatrixEvent[], + timelineEventList?: MatrixEvent[], + fromCache = false, + ): Promise<void> { + // If there are no events in the timeline yet, initialise it with + // the given state events + const liveTimeline = room.getLiveTimeline(); + const timelineWasEmpty = liveTimeline.getEvents().length == 0; + if (timelineWasEmpty) { + // Passing these events into initialiseState will freeze them, so we need + // to compute and cache the push actions for them now, otherwise sync dies + // with an attempt to assign to read only property. + // XXX: This is pretty horrible and is assuming all sorts of behaviour from + // these functions that it shouldn't be. We should probably either store the + // push actions cache elsewhere so we can freeze MatrixEvents, or otherwise + // find some solution where MatrixEvents are immutable but allow for a cache + // field. + for (const ev of stateEventList) { + this.client.getPushActionsForEvent(ev); + } + liveTimeline.initialiseState(stateEventList, { + timelineWasEmpty, + }); + } + + this.resolveInvites(room); + + // recalculate the room name at this point as adding events to the timeline + // may make notifications appear which should have the right name. + // XXX: This looks suspect: we'll end up recalculating the room once here + // and then again after adding events (processSyncResponse calls it after + // calling us) even if no state events were added. It also means that if + // one of the room events in timelineEventList is something that needs + // a recalculation (like m.room.name) we won't recalculate until we've + // finished adding all the events, which will cause the notification to have + // the old room name rather than the new one. + room.recalculate(); + + // If the timeline wasn't empty, we process the state events here: they're + // defined as updates to the state before the start of the timeline, so this + // starts to roll the state forward. + // XXX: That's what we *should* do, but this can happen if we were previously + // peeking in a room, in which case we obviously do *not* want to add the + // state events here onto the end of the timeline. Historically, the js-sdk + // has just set these new state events on the old and new state. This seems + // very wrong because there could be events in the timeline that diverge the + // state, in which case this is going to leave things out of sync. However, + // for now I think it;s best to behave the same as the code has done previously. + if (!timelineWasEmpty) { + // XXX: As above, don't do this... + //room.addLiveEvents(stateEventList || []); + // Do this instead... + room.oldState.setStateEvents(stateEventList || []); + room.currentState.setStateEvents(stateEventList || []); + } + + // Execute the timeline events. This will continue to diverge the current state + // if the timeline has any state events in it. + // This also needs to be done before running push rules on the events as they need + // to be decorated with sender etc. + room.addLiveEvents(timelineEventList || [], { + fromCache, + timelineWasEmpty, + }); + this.client.processBeaconEvents(room, timelineEventList); + } + + /** + * Takes a list of timelineEvents and adds and adds to notifEvents + * as appropriate. + * This must be called after the room the events belong to has been stored. + * + * @param timelineEventList - A list of timeline events. Lower index + * is earlier in time. Higher index is later. + */ + private processEventsForNotifs(room: Room, timelineEventList: MatrixEvent[]): void { + // gather our notifications into this.notifEvents + if (this.client.getNotifTimelineSet()) { + for (const event of timelineEventList) { + const pushActions = this.client.getPushActionsForEvent(event); + if (pushActions?.notify && pushActions.tweaks?.highlight) { + this.notifEvents.push(event); + } + } + } + } + + private getGuestFilter(): string { + // Dev note: This used to be conditional to return a filter of 20 events maximum, but + // the condition never went to the other branch. This is now hardcoded. + return "{}"; + } + + /** + * Sets the sync state and emits an event to say so + * @param newState - The new state string + * @param data - Object of additional data to emit in the event + */ + private updateSyncState(newState: SyncState, data?: ISyncStateData): void { + const old = this.syncState; + this.syncState = newState; + this.syncStateData = data; + this.client.emit(ClientEvent.Sync, this.syncState, old, data); + } + + /** + * Event handler for the 'online' event + * This event is generally unreliable and precise behaviour + * varies between browsers, so we poll for connectivity too, + * but this might help us reconnect a little faster. + */ + private onOnline = (): void => { + debuglog("Browser thinks we are back online"); + this.startKeepAlives(0); + }; +} + +function createNewUser(client: MatrixClient, userId: string): User { + const user = new User(userId); + client.reEmitter.reEmit(user, [ + UserEvent.AvatarUrl, + UserEvent.DisplayName, + UserEvent.Presence, + UserEvent.CurrentlyActive, + UserEvent.LastPresenceTs, + ]); + return user; +} + +// /!\ This function is not intended for public use! It's only exported from +// here in order to share some common logic with sliding-sync-sdk.ts. +export function _createAndReEmitRoom(client: MatrixClient, roomId: string, opts: Partial<IStoredClientOpts>): Room { + const { timelineSupport } = client; + + const room = new Room(roomId, client, client.getUserId()!, { + lazyLoadMembers: opts.lazyLoadMembers, + pendingEventOrdering: opts.pendingEventOrdering, + timelineSupport, + }); + + client.reEmitter.reEmit(room, [ + RoomEvent.Name, + RoomEvent.Redaction, + RoomEvent.RedactionCancelled, + RoomEvent.Receipt, + RoomEvent.Tags, + RoomEvent.LocalEchoUpdated, + RoomEvent.AccountData, + RoomEvent.MyMembership, + RoomEvent.Timeline, + RoomEvent.TimelineReset, + RoomStateEvent.Events, + RoomStateEvent.Members, + RoomStateEvent.NewMember, + RoomStateEvent.Update, + BeaconEvent.New, + BeaconEvent.Update, + BeaconEvent.Destroy, + BeaconEvent.LivenessChange, + ]); + + // We need to add a listener for RoomState.members in order to hook them + // correctly. + room.on(RoomStateEvent.NewMember, (event, state, member) => { + member.user = client.getUser(member.userId) ?? undefined; + client.reEmitter.reEmit(member, [ + RoomMemberEvent.Name, + RoomMemberEvent.Typing, + RoomMemberEvent.PowerLevel, + RoomMemberEvent.Membership, + ]); + }); + + return room; +} |