Improve startup performance by using promise.all when processing rooms from sync (#5095)

* Use Promise.all when processing rooms in sync response

* Remove import
This commit is contained in:
R Midhun Suresh
2026-01-27 14:26:26 +05:30
committed by GitHub
parent fb12a5a1d6
commit 82b51d0d46
+285 -271
View File
@@ -26,7 +26,7 @@ limitations under the License.
import type { SyncCryptoCallbacks } from "./common-crypto/CryptoBackend.ts";
import { User } from "./models/user.ts";
import { NotificationCountType, Room, RoomEvent } from "./models/room.ts";
import { deepCopy, noUnsafeEventProps, promiseMapSeries, unsafeProp } from "./utils.ts";
import { deepCopy, noUnsafeEventProps, unsafeProp } from "./utils.ts";
import { Filter } from "./filter.ts";
import { EventTimeline } from "./models/event-timeline.ts";
import { type Logger } from "./logger.ts";
@@ -1188,312 +1188,326 @@ export class SyncApi {
this.notifEvents = [];
// Handle invites
await promiseMapSeries(inviteRooms, async (inviteObj) => {
const room = inviteObj.room;
const stateEvents = this.mapSyncEventsFormat(inviteObj.invite_state, room);
await Promise.all(
inviteRooms.map(async (inviteObj) => {
const room = inviteObj.room;
const stateEvents = this.mapSyncEventsFormat(inviteObj.invite_state, room);
await this.injectRoomEvents(room, stateEvents, undefined);
await this.injectRoomEvents(room, stateEvents, undefined);
if (inviteObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
} else {
// Update room state for invite->reject->invite cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
});
if (inviteObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
} else {
// Update room state for invite->reject->invite cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
}),
);
// Handle joins
await promiseMapSeries(joinRooms, async (joinObj) => {
const room = joinObj.room;
const stateEvents = this.mapSyncEventsFormat(joinObj.state, room);
const stateAfterEvents = this.mapSyncEventsFormat(joinObj["org.matrix.msc4222.state_after"], room);
// Prevent events from being decrypted ahead of time
// this helps large account to speed up faster
// room::decryptCriticalEvent is in charge of decrypting all the events
// required for a client to function properly
const timelineEvents = this.mapSyncEventsFormat(joinObj.timeline, room, false);
const ephemeralEvents = this.mapSyncEventsFormat(joinObj.ephemeral);
const accountDataEvents = this.mapSyncEventsFormat(joinObj.account_data);
const stickyEvents = this.mapSyncEventsFormat(joinObj.msc4354_sticky);
await Promise.all(
joinRooms.map(async (joinObj) => {
const room = joinObj.room;
const stateEvents = this.mapSyncEventsFormat(joinObj.state, room);
const stateAfterEvents = this.mapSyncEventsFormat(joinObj["org.matrix.msc4222.state_after"], room);
// Prevent events from being decrypted ahead of time
// this helps large account to speed up faster
// room::decryptCriticalEvent is in charge of decrypting all the events
// required for a client to function properly
const timelineEvents = this.mapSyncEventsFormat(joinObj.timeline, room, false);
const ephemeralEvents = this.mapSyncEventsFormat(joinObj.ephemeral);
const accountDataEvents = this.mapSyncEventsFormat(joinObj.account_data);
const stickyEvents = this.mapSyncEventsFormat(joinObj.msc4354_sticky);
// If state_after is present, this is the events that form the state at the end of the timeline block and
// regular timeline events do *not* count towards state. If it's not present, then the state is formed by
// the state events plus the timeline events. Note mapSyncEventsFormat returns an empty array if the field
// is absent so we explicitly check the field on the original object.
const eventsFormingFinalState = joinObj["org.matrix.msc4222.state_after"]
? stateAfterEvents
: stateEvents.concat(timelineEvents);
// If state_after is present, this is the events that form the state at the end of the timeline block and
// regular timeline events do *not* count towards state. If it's not present, then the state is formed by
// the state events plus the timeline events. Note mapSyncEventsFormat returns an empty array if the field
// is absent so we explicitly check the field on the original object.
const eventsFormingFinalState = joinObj["org.matrix.msc4222.state_after"]
? stateAfterEvents
: stateEvents.concat(timelineEvents);
const encrypted = this.isRoomEncrypted(room, eventsFormingFinalState);
// We store the server-provided value first so it's correct when any of the events fire.
if (joinObj.unread_notifications) {
/**
* We track unread notifications ourselves in encrypted rooms, so don't
* bother setting it here. We trust our calculations better than the
* server's for this case, and therefore will assume that our non-zero
* count is accurate.
* XXX: this is known faulty as the push rule for `.m.room.encrypted` may be disabled so server
* may issue notification counts of 0 which we wrongly trust.
* https://github.com/matrix-org/matrix-spec-proposals/pull/2654 would fix this
*
* @see import("./client").fixNotificationCountOnDecryption
*/
if (!encrypted || joinObj.unread_notifications.notification_count === 0) {
// In an encrypted room, if the room has notifications enabled then it's typical for
// the server to flag all new messages as notifying. However, some push rules calculate
// events as ignored based on their event contents (e.g. ignoring msgtype=m.notice messages)
// so we want to calculate this figure on the client in all cases.
room.setUnreadNotificationCount(
NotificationCountType.Total,
joinObj.unread_notifications.notification_count ?? 0,
);
}
if (!encrypted || room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0) {
// If the locally stored highlight count is zero, use the server provided value.
room.setUnreadNotificationCount(
NotificationCountType.Highlight,
joinObj.unread_notifications.highlight_count ?? 0,
);
}
}
const unreadThreadNotifications =
joinObj[UNREAD_THREAD_NOTIFICATIONS.name] ?? joinObj[UNREAD_THREAD_NOTIFICATIONS.altName!];
if (unreadThreadNotifications) {
// This mirrors the logic above for rooms: take the *total* notification count from
// the server for unencrypted rooms or is it's zero. Any threads not present in this
// object implicitly have zero notifications, so start by clearing the total counts
// for all such threads.
room.resetThreadUnreadNotificationCountFromSync(Object.keys(unreadThreadNotifications));
for (const [threadId, unreadNotification] of Object.entries(unreadThreadNotifications)) {
if (!encrypted || unreadNotification.notification_count === 0) {
room.setThreadUnreadNotificationCount(
threadId,
const encrypted = this.isRoomEncrypted(room, eventsFormingFinalState);
// We store the server-provided value first so it's correct when any of the events fire.
if (joinObj.unread_notifications) {
/**
* We track unread notifications ourselves in encrypted rooms, so don't
* bother setting it here. We trust our calculations better than the
* server's for this case, and therefore will assume that our non-zero
* count is accurate.
* XXX: this is known faulty as the push rule for `.m.room.encrypted` may be disabled so server
* may issue notification counts of 0 which we wrongly trust.
* https://github.com/matrix-org/matrix-spec-proposals/pull/2654 would fix this
*
* @see import("./client").fixNotificationCountOnDecryption
*/
if (!encrypted || joinObj.unread_notifications.notification_count === 0) {
// In an encrypted room, if the room has notifications enabled then it's typical for
// the server to flag all new messages as notifying. However, some push rules calculate
// events as ignored based on their event contents (e.g. ignoring msgtype=m.notice messages)
// so we want to calculate this figure on the client in all cases.
room.setUnreadNotificationCount(
NotificationCountType.Total,
unreadNotification.notification_count ?? 0,
joinObj.unread_notifications.notification_count ?? 0,
);
}
const hasNoNotifications =
room.getThreadUnreadNotificationCount(threadId, NotificationCountType.Highlight) <= 0;
if (!encrypted || (encrypted && hasNoNotifications)) {
room.setThreadUnreadNotificationCount(
threadId,
if (!encrypted || room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0) {
// If the locally stored highlight count is zero, use the server provided value.
room.setUnreadNotificationCount(
NotificationCountType.Highlight,
unreadNotification.highlight_count ?? 0,
joinObj.unread_notifications.highlight_count ?? 0,
);
}
}
} else {
room.resetThreadUnreadNotificationCountFromSync();
}
joinObj.timeline = joinObj.timeline || ({} as ITimeline);
const unreadThreadNotifications =
joinObj[UNREAD_THREAD_NOTIFICATIONS.name] ?? joinObj[UNREAD_THREAD_NOTIFICATIONS.altName!];
if (unreadThreadNotifications) {
// This mirrors the logic above for rooms: take the *total* notification count from
// the server for unencrypted rooms or is it's zero. Any threads not present in this
// object implicitly have zero notifications, so start by clearing the total counts
// for all such threads.
room.resetThreadUnreadNotificationCountFromSync(Object.keys(unreadThreadNotifications));
for (const [threadId, unreadNotification] of Object.entries(unreadThreadNotifications)) {
if (!encrypted || unreadNotification.notification_count === 0) {
room.setThreadUnreadNotificationCount(
threadId,
NotificationCountType.Total,
unreadNotification.notification_count ?? 0,
);
}
if (joinObj.isBrandNewRoom) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
if (joinObj.timeline.prev_batch !== null) {
room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, EventTimeline.BACKWARDS);
}
} else if (joinObj.timeline.limited) {
let limited = true;
// we've got a limited sync, so we *probably* have a gap in the
// timeline, so should reset. But we might have been peeking or
// paginating and already have some of the events, in which
// case we just want to append any subsequent events to the end
// of the existing timeline.
//
// This is particularly important in the case that we already have
// *all* of the events in the timeline - in that case, if we reset
// the timeline, we'll end up with an entirely empty timeline,
// which we'll try to paginate but not get any new events (which
// will stop us linking the empty timeline into the chain).
//
for (let i = timelineEvents.length - 1; i >= 0; i--) {
const eventId = timelineEvents[i].getId()!;
if (room.getTimelineForEvent(eventId)) {
this.syncOpts.logger.debug(`Already have event ${eventId} in limited sync - not resetting`);
limited = false;
// we might still be missing some of the events before i;
// we don't want to be adding them to the end of the
// timeline because that would put them out of order.
timelineEvents.splice(0, i);
// XXX: there's a problem here if the skipped part of the
// timeline modifies the state set in stateEvents, because
// we'll end up using the state from stateEvents rather
// than the later state from timelineEvents. We probably
// need to wind stateEvents forward over the events we're
// skipping.
break;
const hasNoNotifications =
room.getThreadUnreadNotificationCount(threadId, NotificationCountType.Highlight) <= 0;
if (!encrypted || (encrypted && hasNoNotifications)) {
room.setThreadUnreadNotificationCount(
threadId,
NotificationCountType.Highlight,
unreadNotification.highlight_count ?? 0,
);
}
}
}
if (limited) {
room.resetLiveTimeline(
joinObj.timeline.prev_batch,
this.syncOpts.canResetEntireTimeline!(room.roomId)
? null
: (syncEventData.oldSyncToken ?? null),
);
// We have to assume any gap in any timeline is
// reason to stop incrementally tracking notifications and
// reset the timeline.
client.resetNotifTimelineSet();
}
}
// process any crypto events *before* emitting the RoomStateEvent events. This
// avoids a race condition if the application tries to send a message after the
// state event is processed, but before crypto is enabled, which then causes the
// crypto layer to complain.
if (this.syncOpts.cryptoCallbacks) {
for (const e of eventsFormingFinalState) {
if (e.isState() && e.getType() === EventType.RoomEncryption && e.getStateKey() === "") {
await this.syncOpts.cryptoCallbacks.onCryptoEvent(room, e);
}
}
}
// Proactively decrypt state events: normally we decrypt on demand, but for state
// events we need them immediately, so we handle them here. Specifically, consumers
// (e.g. Element Web) expect state events to be unencrypted upon receipt.
for (const ev of timelineEvents.filter((ev) => ev.isState())) {
await this.client.decryptEventIfNeeded(ev);
}
try {
if ("org.matrix.msc4222.state_after" in joinObj) {
await this.injectRoomEvents(
room,
undefined,
stateAfterEvents,
timelineEvents,
syncEventData.fromCache,
);
} else {
await this.injectRoomEvents(room, stateEvents, undefined, timelineEvents, syncEventData.fromCache);
room.resetThreadUnreadNotificationCountFromSync();
}
} catch (e) {
this.syncOpts.logger.error(`Failed to process events on room ${room.roomId}:`, e);
}
// set summary after processing events,
// because it will trigger a name calculation
// which needs the room state to be up to date
if (joinObj.summary) {
room.setSummary(joinObj.summary);
}
joinObj.timeline = joinObj.timeline || ({} as ITimeline);
// we deliberately don't add ephemeral events to the timeline
room.addEphemeralEvents(ephemeralEvents);
if (joinObj.isBrandNewRoom) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
if (joinObj.timeline.prev_batch !== null) {
room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, EventTimeline.BACKWARDS);
}
} else if (joinObj.timeline.limited) {
let limited = true;
// we deliberately don't add accountData to the timeline
room.addAccountData(accountDataEvents);
// we've got a limited sync, so we *probably* have a gap in the
// timeline, so should reset. But we might have been peeking or
// paginating and already have some of the events, in which
// case we just want to append any subsequent events to the end
// of the existing timeline.
//
// This is particularly important in the case that we already have
// *all* of the events in the timeline - in that case, if we reset
// the timeline, we'll end up with an entirely empty timeline,
// which we'll try to paginate but not get any new events (which
// will stop us linking the empty timeline into the chain).
//
for (let i = timelineEvents.length - 1; i >= 0; i--) {
const eventId = timelineEvents[i].getId()!;
if (room.getTimelineForEvent(eventId)) {
this.syncOpts.logger.debug(`Already have event ${eventId} in limited sync - not resetting`);
limited = false;
// Sticky events primarily come via the `timeline` field, with the
// sticky info field marking them as sticky.
// If the sync is "gappy" (meaning it is skipping events to catch up) then
// sticky events will instead come down the sticky section.
// This ensures we collect sticky events from both places.
const stickyEventsAndStickyEventsFromTheTimeline = stickyEvents.concat(
timelineEvents.filter((e) => e.unstableStickyInfo !== undefined),
);
// Note: We calculate sticky events before emitting `.Room` as it's nice to have
// sticky events calculated and ready to go.
room._unstable_addStickyEvents(stickyEventsAndStickyEventsFromTheTimeline);
// we might still be missing some of the events before i;
// we don't want to be adding them to the end of the
// timeline because that would put them out of order.
timelineEvents.splice(0, i);
room.recalculate();
if (joinObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
}
// XXX: there's a problem here if the skipped part of the
// timeline modifies the state set in stateEvents, because
// we'll end up using the state from stateEvents rather
// than the later state from timelineEvents. We probably
// need to wind stateEvents forward over the events we're
// skipping.
this.processEventsForNotifs(room, timelineEvents);
break;
}
}
const emitEvent = (e: MatrixEvent): boolean => client.emit(ClientEvent.Event, e);
// this fires a couple of times for some events. (eg state events are in the timeline and the state)
// should this get a sync section as an additional event emission param (e, syncSection))?
stateEvents.forEach(emitEvent);
timelineEvents.forEach(emitEvent);
ephemeralEvents.forEach(emitEvent);
accountDataEvents.forEach(emitEvent);
stickyEvents
.filter(
(stickyEvent) =>
// This is highly unlikey, but in the case where a sticky event
// has appeared in the timeline AND the sticky section, we only
// want to emit the event once.
!timelineEvents.some((timelineEvent) => timelineEvent.getId() === stickyEvent.getId()),
)
.forEach(emitEvent);
// Decrypt only the last message in all rooms to make sure we can generate a preview
// And decrypt all events after the recorded read receipt to ensure an accurate
// notification count
room.decryptCriticalEvents();
});
if (limited) {
room.resetLiveTimeline(
joinObj.timeline.prev_batch,
this.syncOpts.canResetEntireTimeline!(room.roomId)
? null
: (syncEventData.oldSyncToken ?? null),
);
// We have to assume any gap in any timeline is
// reason to stop incrementally tracking notifications and
// reset the timeline.
client.resetNotifTimelineSet();
}
}
// process any crypto events *before* emitting the RoomStateEvent events. This
// avoids a race condition if the application tries to send a message after the
// state event is processed, but before crypto is enabled, which then causes the
// crypto layer to complain.
if (this.syncOpts.cryptoCallbacks) {
for (const e of eventsFormingFinalState) {
if (e.isState() && e.getType() === EventType.RoomEncryption && e.getStateKey() === "") {
await this.syncOpts.cryptoCallbacks.onCryptoEvent(room, e);
}
}
}
// Proactively decrypt state events: normally we decrypt on demand, but for state
// events we need them immediately, so we handle them here. Specifically, consumers
// (e.g. Element Web) expect state events to be unencrypted upon receipt.
for (const ev of timelineEvents.filter((ev) => ev.isState())) {
await this.client.decryptEventIfNeeded(ev);
}
try {
if ("org.matrix.msc4222.state_after" in joinObj) {
await this.injectRoomEvents(
room,
undefined,
stateAfterEvents,
timelineEvents,
syncEventData.fromCache,
);
} else {
await this.injectRoomEvents(
room,
stateEvents,
undefined,
timelineEvents,
syncEventData.fromCache,
);
}
} catch (e) {
this.syncOpts.logger.error(`Failed to process events on room ${room.roomId}:`, e);
}
// set summary after processing events,
// because it will trigger a name calculation
// which needs the room state to be up to date
if (joinObj.summary) {
room.setSummary(joinObj.summary);
}
// we deliberately don't add ephemeral events to the timeline
room.addEphemeralEvents(ephemeralEvents);
// we deliberately don't add accountData to the timeline
room.addAccountData(accountDataEvents);
// Sticky events primarily come via the `timeline` field, with the
// sticky info field marking them as sticky.
// If the sync is "gappy" (meaning it is skipping events to catch up) then
// sticky events will instead come down the sticky section.
// This ensures we collect sticky events from both places.
const stickyEventsAndStickyEventsFromTheTimeline = stickyEvents.concat(
timelineEvents.filter((e) => e.unstableStickyInfo !== undefined),
);
// Note: We calculate sticky events before emitting `.Room` as it's nice to have
// sticky events calculated and ready to go.
room._unstable_addStickyEvents(stickyEventsAndStickyEventsFromTheTimeline);
room.recalculate();
if (joinObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
}
this.processEventsForNotifs(room, timelineEvents);
const emitEvent = (e: MatrixEvent): boolean => client.emit(ClientEvent.Event, e);
// this fires a couple of times for some events. (eg state events are in the timeline and the state)
// should this get a sync section as an additional event emission param (e, syncSection))?
stateEvents.forEach(emitEvent);
timelineEvents.forEach(emitEvent);
ephemeralEvents.forEach(emitEvent);
accountDataEvents.forEach(emitEvent);
stickyEvents
.filter(
(stickyEvent) =>
// This is highly unlikey, but in the case where a sticky event
// has appeared in the timeline AND the sticky section, we only
// want to emit the event once.
!timelineEvents.some((timelineEvent) => timelineEvent.getId() === stickyEvent.getId()),
)
.forEach(emitEvent);
// Decrypt only the last message in all rooms to make sure we can generate a preview
// And decrypt all events after the recorded read receipt to ensure an accurate
// notification count
room.decryptCriticalEvents();
}),
);
// Handle leaves (e.g. kicked rooms)
await promiseMapSeries(leaveRooms, async (leaveObj) => {
const room = leaveObj.room;
const { timelineEvents, stateEvents, stateAfterEvents } = await this.mapAndInjectRoomEvents(leaveObj);
const accountDataEvents = this.mapSyncEventsFormat(leaveObj.account_data);
await Promise.all(
leaveRooms.map(async (leaveObj) => {
const room = leaveObj.room;
const { timelineEvents, stateEvents, stateAfterEvents } = await this.mapAndInjectRoomEvents(leaveObj);
const accountDataEvents = this.mapSyncEventsFormat(leaveObj.account_data);
room.addAccountData(accountDataEvents);
room.addAccountData(accountDataEvents);
room.recalculate();
if (leaveObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
}
room.recalculate();
if (leaveObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
}
this.processEventsForNotifs(room, timelineEvents);
this.processEventsForNotifs(room, timelineEvents);
stateEvents?.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
stateAfterEvents?.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
timelineEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
accountDataEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
});
stateEvents?.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
stateAfterEvents?.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
timelineEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
accountDataEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
}),
);
// Handle knocks
await promiseMapSeries(knockRooms, async (knockObj) => {
const room = knockObj.room;
const stateEvents = this.mapSyncEventsFormat(knockObj.knock_state, room);
await Promise.all(
knockRooms.map(async (knockObj) => {
const room = knockObj.room;
const stateEvents = this.mapSyncEventsFormat(knockObj.knock_state, room);
await this.injectRoomEvents(room, stateEvents, undefined);
await this.injectRoomEvents(room, stateEvents, undefined);
if (knockObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
} else {
// Update room state for knock->leave->knock cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
});
if (knockObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
} else {
// Update room state for knock->leave->knock cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(ClientEvent.Event, e);
});
}),
);
// update the notification timeline, if appropriate.
// we only do this for live events, as otherwise we can't order them sanely