refactor(event cache): move the thread subscriber and search indexing tasks to the EventCacheDropHandles struct

This means that when some caller is subscribing to the `EventCache`,
even if the event cache drops in the background, the task will remain
active until the end of the subscriber.
This commit is contained in:
Benjamin Bouvier
2026-03-25 17:16:21 +01:00
parent 8f96c32d56
commit c0b26b6f25
+46 -48
View File
@@ -153,6 +153,23 @@ pub struct EventCacheDropHandles {
/// The task used to automatically shrink the linked chunks.
_auto_shrink_linked_chunk_task: BackgroundTaskHandle,
/// A background task listening to room and send queue updates, and
/// automatically subscribing the user to threads when needed, based on
/// the semantics of MSC4306.
///
/// One important constraint is that there is only one such task per
/// [`EventCache`], so it does listen to *all* rooms at the same time.
_thread_subscriber_task: BackgroundTaskHandle,
/// A background task listening to room updates, and
/// automatically handling search index operations add/remove/edit
/// depending on the event type.
///
/// One important constraint is that there is only one such task per
/// [`EventCache`], so it does listen to *all* rooms at the same time.
#[cfg(feature = "experimental-search")]
_search_indexing_task: BackgroundTaskHandle,
/// The task used to automatically redecrypt UTDs.
#[cfg(feature = "e2e-encryption")]
_redecryptor: redecryptor::Redecryptor,
@@ -190,29 +207,6 @@ impl EventCache {
let weak_client = WeakClient::from_inner(client);
let (thread_subscriber_sender, _thread_subscriber_receiver) = channel(128);
let thread_subscriber_task = client
.task_monitor
.spawn_background_task(
"event_cache::thread_subscriber",
tasks::thread_subscriber_task(
weak_client.clone(),
linked_chunk_update_sender.clone(),
thread_subscriber_sender,
),
)
.abort_on_drop();
#[cfg(feature = "experimental-search")]
let search_indexing_task = client
.task_monitor
.spawn_background_task(
"event_cache::search_indexing",
tasks::search_indexing_task(
weak_client.clone(),
linked_chunk_update_sender.clone(),
),
)
.abort_on_drop();
#[cfg(feature = "e2e-encryption")]
let redecryption_channels = redecryptor::RedecryptorChannels::new();
@@ -228,13 +222,9 @@ impl EventCache {
auto_shrink_sender: Default::default(),
generic_update_sender,
linked_chunk_update_sender,
_thread_subscriber_task: thread_subscriber_task,
#[cfg(feature = "experimental-search")]
_search_indexing_task: search_indexing_task,
#[cfg(feature = "e2e-encryption")]
redecryption_channels,
#[cfg(feature = "testing")]
thread_subscriber_receiver: _thread_subscriber_receiver,
thread_subscriber_sender,
}),
}
}
@@ -255,7 +245,7 @@ impl EventCache {
/// For testing purposes only.
#[cfg(feature = "testing")]
pub fn subscribe_thread_subscriber_updates(&self) -> Receiver<()> {
self.inner.thread_subscriber_receiver.resubscribe()
self.inner.thread_subscriber_sender.subscribe()
}
/// Starts subscribing the [`EventCache`] to sync responses, if not done
@@ -304,6 +294,29 @@ impl EventCache {
redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
};
let thread_subscriber_task = client
.task_monitor()
.spawn_background_task(
"event_cache::thread_subscriber",
tasks::thread_subscriber_task(
self.inner.client.clone(),
self.inner.linked_chunk_update_sender.clone(),
self.inner.thread_subscriber_sender.clone(),
),
)
.abort_on_drop();
#[cfg(feature = "experimental-search")]
let search_indexing_task = client
.task_monitor()
.spawn_background_task(
"event_cache::search_indexing",
tasks::search_indexing_task(
self.inner.client.clone(),
self.inner.linked_chunk_update_sender.clone(),
),
)
.abort_on_drop();
Arc::new(EventCacheDropHandles {
_listen_updates_task: listen_updates_task,
@@ -311,6 +324,9 @@ impl EventCache {
_auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_task,
#[cfg(feature = "e2e-encryption")]
_redecryptor: redecryptor,
_thread_subscriber_task: thread_subscriber_task,
#[cfg(feature = "experimental-search")]
_search_indexing_task: search_indexing_task,
})
});
@@ -443,30 +459,12 @@ struct EventCacheInner {
/// See doc comment of [`RoomEventCacheLinkedChunkUpdate`].
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
/// A background task listening to room and send queue updates, and
/// automatically subscribing the user to threads when needed, based on
/// the semantics of MSC4306.
///
/// One important constraint is that there is only one such task per
/// [`EventCache`], so it does listen to *all* rooms at the same time.
_thread_subscriber_task: BackgroundTaskHandle,
/// A background task listening to room updates, and
/// automatically handling search index operations add/remove/edit
/// depending on the event type.
///
/// One important constraint is that there is only one such task per
/// [`EventCache`], so it does listen to *all* rooms at the same time.
#[cfg(feature = "experimental-search")]
_search_indexing_task: BackgroundTaskHandle,
/// A test helper receiver that will be emitted every time the thread
/// subscriber task subscribed to a new thread.
///
/// This is helpful for tests to coordinate that a new thread subscription
/// has been sent or not.
#[cfg(feature = "testing")]
thread_subscriber_receiver: Receiver<()>,
thread_subscriber_sender: Sender<()>,
#[cfg(feature = "e2e-encryption")]
redecryption_channels: redecryptor::RedecryptorChannels,