From c0b26b6f25cadaac181b6d6b73b0da4e652a0d2e Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 25 Mar 2026 17:16:21 +0100 Subject: [PATCH] refactor(event cache): move the thread subscriber and search indexing tasks to the `EventCacheDropHandles` struct This means that when some caller is subscribing to the `EventCache`, even if the event cache drops in the background, the task will remain active until the end of the subscriber. --- crates/matrix-sdk/src/event_cache/mod.rs | 94 ++++++++++++------------ 1 file changed, 46 insertions(+), 48 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 5463820a7..a46d62c57 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -153,6 +153,23 @@ pub struct EventCacheDropHandles { /// The task used to automatically shrink the linked chunks. _auto_shrink_linked_chunk_task: BackgroundTaskHandle, + /// A background task listening to room and send queue updates, and + /// automatically subscribing the user to threads when needed, based on + /// the semantics of MSC4306. + /// + /// One important constraint is that there is only one such task per + /// [`EventCache`], so it does listen to *all* rooms at the same time. + _thread_subscriber_task: BackgroundTaskHandle, + + /// A background task listening to room updates, and + /// automatically handling search index operations add/remove/edit + /// depending on the event type. + /// + /// One important constraint is that there is only one such task per + /// [`EventCache`], so it does listen to *all* rooms at the same time. + #[cfg(feature = "experimental-search")] + _search_indexing_task: BackgroundTaskHandle, + /// The task used to automatically redecrypt UTDs. #[cfg(feature = "e2e-encryption")] _redecryptor: redecryptor::Redecryptor, @@ -190,29 +207,6 @@ impl EventCache { let weak_client = WeakClient::from_inner(client); let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128); - let thread_subscriber_task = client - .task_monitor - .spawn_background_task( - "event_cache::thread_subscriber", - tasks::thread_subscriber_task( - weak_client.clone(), - linked_chunk_update_sender.clone(), - thread_subscriber_sender, - ), - ) - .abort_on_drop(); - - #[cfg(feature = "experimental-search")] - let search_indexing_task = client - .task_monitor - .spawn_background_task( - "event_cache::search_indexing", - tasks::search_indexing_task( - weak_client.clone(), - linked_chunk_update_sender.clone(), - ), - ) - .abort_on_drop(); #[cfg(feature = "e2e-encryption")] let redecryption_channels = redecryptor::RedecryptorChannels::new(); @@ -228,13 +222,9 @@ impl EventCache { auto_shrink_sender: Default::default(), generic_update_sender, linked_chunk_update_sender, - _thread_subscriber_task: thread_subscriber_task, - #[cfg(feature = "experimental-search")] - _search_indexing_task: search_indexing_task, #[cfg(feature = "e2e-encryption")] redecryption_channels, - #[cfg(feature = "testing")] - thread_subscriber_receiver: _thread_subscriber_receiver, + thread_subscriber_sender, }), } } @@ -255,7 +245,7 @@ impl EventCache { /// For testing purposes only. #[cfg(feature = "testing")] pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> { - self.inner.thread_subscriber_receiver.resubscribe() + self.inner.thread_subscriber_sender.subscribe() } /// Starts subscribing the [`EventCache`] to sync responses, if not done @@ -304,6 +294,29 @@ impl EventCache { redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender) }; + let thread_subscriber_task = client + .task_monitor() + .spawn_background_task( + "event_cache::thread_subscriber", + tasks::thread_subscriber_task( + self.inner.client.clone(), + self.inner.linked_chunk_update_sender.clone(), + self.inner.thread_subscriber_sender.clone(), + ), + ) + .abort_on_drop(); + + #[cfg(feature = "experimental-search")] + let search_indexing_task = client + .task_monitor() + .spawn_background_task( + "event_cache::search_indexing", + tasks::search_indexing_task( + self.inner.client.clone(), + self.inner.linked_chunk_update_sender.clone(), + ), + ) + .abort_on_drop(); Arc::new(EventCacheDropHandles { _listen_updates_task: listen_updates_task, @@ -311,6 +324,9 @@ impl EventCache { _auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_task, #[cfg(feature = "e2e-encryption")] _redecryptor: redecryptor, + _thread_subscriber_task: thread_subscriber_task, + #[cfg(feature = "experimental-search")] + _search_indexing_task: search_indexing_task, }) }); @@ -443,30 +459,12 @@ struct EventCacheInner { /// See doc comment of [`RoomEventCacheLinkedChunkUpdate`]. linked_chunk_update_sender: Sender, - /// A background task listening to room and send queue updates, and - /// automatically subscribing the user to threads when needed, based on - /// the semantics of MSC4306. - /// - /// One important constraint is that there is only one such task per - /// [`EventCache`], so it does listen to *all* rooms at the same time. - _thread_subscriber_task: BackgroundTaskHandle, - - /// A background task listening to room updates, and - /// automatically handling search index operations add/remove/edit - /// depending on the event type. - /// - /// One important constraint is that there is only one such task per - /// [`EventCache`], so it does listen to *all* rooms at the same time. - #[cfg(feature = "experimental-search")] - _search_indexing_task: BackgroundTaskHandle, - /// A test helper receiver that will be emitted every time the thread /// subscriber task subscribed to a new thread. /// /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. - #[cfg(feature = "testing")] - thread_subscriber_receiver: Receiver<()>, + thread_subscriber_sender: Sender<()>, #[cfg(feature = "e2e-encryption")] redecryption_channels: redecryptor::RedecryptorChannels,