chore: address review comments

This commit is contained in:
Benjamin Bouvier
2025-08-18 13:11:24 +02:00
parent c019009d00
commit b43aac129b
5 changed files with 53 additions and 29 deletions
Generated
+1
View File
@@ -3084,6 +3084,7 @@ dependencies = [
"imbl",
"indexmap",
"insta",
"itertools 0.14.0",
"js_int",
"language-tags",
"matrix-sdk-base",
+1
View File
@@ -95,6 +95,7 @@ futures-util.workspace = true
http.workspace = true
imbl = { workspace = true, features = ["serde"] }
indexmap.workspace = true
itertools.workspace = true
js_int = "0.2.2"
language-tags = { version = "0.3.2" }
matrix-sdk-base.workspace = true
+45 -27
View File
@@ -433,6 +433,12 @@ impl EventCache {
self.inner.generic_update_sender.subscribe()
}
/// React to a given linked chunk update by subscribing the user to a
/// thread, if needs be (when the user got mentioned in a thread reply, for
/// a thread they were not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_linked_chunk_update(
client: &WeakClient,
@@ -445,7 +451,7 @@ impl EventCache {
return false;
};
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk else {
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
trace!("received an update for a non-thread linked chunk, ignoring");
return true;
};
@@ -457,8 +463,9 @@ impl EventCache {
let thread_root = thread_root.clone();
let new_events = up.events();
if new_events.is_empty() {
let mut new_events = up.events().peekable();
if new_events.peek().is_none() {
// No new events, nothing to do.
return true;
}
@@ -489,8 +496,9 @@ impl EventCache {
let mut subscribe_up_to = None;
// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest,
for ev in new_events.into_iter().rev() {
// user, iterating from the end of the new events towards the oldest, so we can
// find the most recent event to subscribe to.
for ev in new_events.rev() {
if push_context
.for_event(ev.raw())
.await
@@ -520,6 +528,12 @@ impl EventCache {
true
}
/// React to a given send queue update by subscribing the user to a
/// thread, if needs be (when the user sent an event in a thread they were
/// not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_send_queue_update(
client: &WeakClient,
@@ -710,7 +724,8 @@ struct EventCacheInner {
/// A sender for a persisted linked chunk update.
///
/// This is used to notify that some linked chunk has persisted some updates
/// to a store, and can be used by observers to look for new events.
/// to a store, during sync or a back-pagination of *any* linked chunk.
/// This can be used by observers to look for new events.
///
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
@@ -964,34 +979,37 @@ pub struct RoomEventCacheGenericUpdate {
#[derive(Clone, Debug)]
struct RoomEventCacheLinkedChunkUpdate {
/// The linked chunk affected by the update.
linked_chunk: OwnedLinkedChunkId,
linked_chunk_id: OwnedLinkedChunkId,
/// A vector of all the updates that happened during this update.
/// A vector of all the linked chunk updates that happened during this event
/// cache update.
updates: Vec<linked_chunk::Update<TimelineEvent, Gap>>,
}
impl RoomEventCacheLinkedChunkUpdate {
/// Return all the new events propagated by this update, in topological
/// order.
pub fn events(self) -> Vec<TimelineEvent> {
self.updates
.into_iter()
.flat_map(|update| match update {
linked_chunk::Update::PushItems { items, .. } => items,
linked_chunk::Update::ReplaceItem { item, .. } => vec![item],
linked_chunk::Update::RemoveItem { .. }
| linked_chunk::Update::DetachLastItems { .. }
| linked_chunk::Update::StartReattachItems
| linked_chunk::Update::EndReattachItems
| linked_chunk::Update::NewItemsChunk { .. }
| linked_chunk::Update::NewGapChunk { .. }
| linked_chunk::Update::RemoveChunk(..)
| linked_chunk::Update::Clear => {
// All these updates don't contain any new event.
vec![]
}
})
.collect()
pub fn events(self) -> impl DoubleEndedIterator<Item = TimelineEvent> {
use itertools::Either;
self.updates.into_iter().flat_map(|update| match update {
linked_chunk::Update::PushItems { items, .. } => {
Either::Left(Either::Left(items.into_iter()))
}
linked_chunk::Update::ReplaceItem { item, .. } => {
Either::Left(Either::Right(std::iter::once(item)))
}
linked_chunk::Update::RemoveItem { .. }
| linked_chunk::Update::DetachLastItems { .. }
| linked_chunk::Update::StartReattachItems
| linked_chunk::Update::EndReattachItems
| linked_chunk::Update::NewItemsChunk { .. }
| linked_chunk::Update::NewGapChunk { .. }
| linked_chunk::Update::RemoveChunk(..)
| linked_chunk::Update::Clear => {
// All these updates don't contain any new event.
Either::Right(std::iter::empty())
}
})
}
}
@@ -1280,7 +1280,7 @@ mod private {
// Forward that the store got updated to observers.
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
linked_chunk: OwnedLinkedChunkId::Room(self.room.clone()),
linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()),
updates,
});
@@ -55,6 +55,10 @@ pub(crate) struct ThreadEventCache {
/// A sender for live events updates in this thread.
sender: Sender<ThreadEventCacheUpdate>,
/// A sender for the globally observable linked chunk updates that happened
/// during a sync or a back-pagination.
///
/// See also [`super::super::EventCacheInner::linked_chunk_update_sender`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}
@@ -102,7 +106,7 @@ impl ThreadEventCache {
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
updates,
linked_chunk: OwnedLinkedChunkId::Thread(
linked_chunk_id: OwnedLinkedChunkId::Thread(
self.room_id.clone(),
self.thread_root.clone(),
),