chore: rename background request to automatic pagination request

And associated vocabulary and fields.
This commit is contained in:
Benjamin Bouvier
2026-03-30 11:27:28 +02:00
parent 552639763e
commit 752695c6ff
6 changed files with 63 additions and 54 deletions
@@ -21,23 +21,24 @@ use tracing::{info, instrument, trace, warn};
use crate::event_cache::EventCacheInner;
#[derive(Clone, Debug)]
pub(crate) enum BackgroundRequest {
pub(crate) enum AutomaticPaginationRequest {
PaginateRoomBackwards { room_id: OwnedRoomId },
}
/// Listen to background requests, and execute them in real-time.
/// Listen to background automatic pagination requests, and execute them in
/// real-time.
#[instrument(skip_all)]
pub(super) async fn background_requests_task(
pub(super) async fn automatic_paginations_task(
inner: Arc<EventCacheInner>,
mut receiver: mpsc::UnboundedReceiver<BackgroundRequest>,
mut receiver: mpsc::UnboundedReceiver<AutomaticPaginationRequest>,
) {
trace!("Spawning the background request task");
trace!("Spawning the automatic pagination task");
let mut room_pagination_credits = HashMap::new();
while let Some(request) = receiver.recv().await {
match request {
BackgroundRequest::PaginateRoomBackwards { room_id } => {
AutomaticPaginationRequest::PaginateRoomBackwards { room_id } => {
let config = *inner.config.read().unwrap();
let credits = room_pagination_credits
@@ -61,12 +62,13 @@ pub(super) async fn background_requests_task(
match pagination.run_backwards_once(config.room_pagination_batch_size).await {
Ok(outcome) => {
// Background requests must be idempotent, so we only decrement credits if
// Pagination requests must be idempotent, so we only decrement credits if
// we actually paginated something new.
if !outcome.reached_start || !outcome.events.is_empty() {
*credits -= 1;
}
}
Err(err) => {
warn!(for_room = %room_id, "Failed to run background pagination: {err}");
// Don't decrement credits in this case, to allow a
@@ -78,7 +80,7 @@ pub(super) async fn background_requests_task(
}
// The sender has shut down, exit.
info!("Closing the background request task because receiver closed");
info!("Closing the automatic pagination task because receiver closed");
}
// MatrixMockServer et al. aren't available on wasm.
@@ -96,20 +98,20 @@ mod tests {
assert_let_timeout,
event_cache::{
EventsOrigin, RoomEventCacheUpdate,
automatic_pagination::BackgroundRequest::PaginateRoomBackwards,
automatic_pagination::AutomaticPaginationRequest::PaginateRoomBackwards,
},
test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
};
impl super::super::EventCache {
fn background_requests_sender(
fn pagination_requests_sender(
&self,
) -> Option<mpsc::UnboundedSender<super::BackgroundRequest>> {
self.inner.background_requests_sender.get().cloned()
) -> Option<mpsc::UnboundedSender<super::AutomaticPaginationRequest>> {
self.inner.automatic_pagination_requests_sender.get().cloned()
}
}
/// Test that we can send background requests and trigger room paginations.
/// Test that we can send automatic pagination requests.
#[async_test]
async fn test_background_room_paginations() {
let server = MatrixMockServer::new().await;
@@ -150,7 +152,7 @@ mod tests {
.await;
// Send a request for a background pagination,
let sender = event_cache.background_requests_sender().unwrap();
let sender = event_cache.pagination_requests_sender().unwrap();
sender.send(PaginateRoomBackwards { room_id: room_id.to_owned() }).unwrap();
// The room pagination happens in the background.
@@ -227,7 +229,7 @@ mod tests {
.await;
// Send a request for a background pagination,
let sender = event_cache.background_requests_sender().unwrap();
let sender = event_cache.pagination_requests_sender().unwrap();
sender.send(PaginateRoomBackwards { room_id: room_id.to_owned() }).unwrap();
// The room pagination happens in the background.
@@ -25,7 +25,8 @@ use tokio::sync::{broadcast::Sender, mpsc};
use super::{EventCacheError, EventsOrigin, Result};
use crate::{
client::WeakClient, event_cache::automatic_pagination::BackgroundRequest, room::WeakRoom,
client::WeakClient, event_cache::automatic_pagination::AutomaticPaginationRequest,
room::WeakRoom,
};
pub mod event_focused;
@@ -52,7 +53,9 @@ impl Caches {
linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
store: EventCacheStoreLock,
background_request_sender: Option<mpsc::UnboundedSender<BackgroundRequest>>,
automatic_pagination_requests_sender: Option<
mpsc::UnboundedSender<AutomaticPaginationRequest>,
>,
) -> Result<Self> {
let Some(client) = weak_client.get() else {
return Err(EventCacheError::ClientDropped);
@@ -87,7 +90,7 @@ impl Caches {
linked_chunk_update_sender,
store,
pagination_status.clone(),
background_request_sender,
automatic_pagination_requests_sender,
)
.await?;
@@ -112,7 +112,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, instrument, trace, warn};
use crate::event_cache::{
automatic_pagination::BackgroundRequest, caches::event_linked_chunk::EventLinkedChunk,
automatic_pagination::AutomaticPaginationRequest, caches::event_linked_chunk::EventLinkedChunk,
};
trait RoomReadReceiptsExt {
@@ -366,7 +366,7 @@ pub(crate) fn compute_unread_counts(
linked_chunk: &EventLinkedChunk,
read_receipts: &mut RoomReadReceipts,
with_threading_support: bool,
background_request_sender: Option<&UnboundedSender<BackgroundRequest>>,
automatic_pagination_request_sender: Option<&UnboundedSender<AutomaticPaginationRequest>>,
) {
debug!(?read_receipts, "Starting");
@@ -381,11 +381,11 @@ pub(crate) fn compute_unread_counts(
if select_best_receipt_result.request_pagination {
trace!("Requesting pagination to find a better receipt");
// Note: we use `try_send` here to keep the method sync, as computing the
// perfect receipt is best effort.
if let Some(sender) = background_request_sender
if let Some(sender) = automatic_pagination_request_sender
&& sender
.send(BackgroundRequest::PaginateRoomBackwards { room_id: room_id.to_owned() })
.send(AutomaticPaginationRequest::PaginateRoomBackwards {
room_id: room_id.to_owned(),
})
.is_err()
{
warn!("Failed to request pagination to find a better receipt");
@@ -77,7 +77,8 @@ use super::{
use crate::{
Room,
event_cache::{
automatic_pagination::BackgroundRequest, caches::pagination::SharedPaginationStatus,
automatic_pagination::AutomaticPaginationRequest,
caches::pagination::SharedPaginationStatus,
},
room::WeakRoom,
};
@@ -151,9 +152,9 @@ pub struct RoomEventCacheState {
/// [`super::RoomEventCache`].
subscriber_count: Arc<AtomicUsize>,
/// A notifier to trigger backpagination under certain predefined
/// conditions.
background_request_sender: Option<mpsc::UnboundedSender<BackgroundRequest>>,
/// A sender to trigger automatic pagination requests under certain
/// predefined conditions.
automatic_pagination_request_sender: Option<mpsc::UnboundedSender<AutomaticPaginationRequest>>,
}
impl RoomEventCacheState {
@@ -314,7 +315,9 @@ impl LockedRoomEventCacheState {
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
store: EventCacheStoreLock,
pagination_status: SharedObservable<SharedPaginationStatus>,
background_request_sender: Option<mpsc::UnboundedSender<BackgroundRequest>>,
automatic_pagination_request_sender: Option<
mpsc::UnboundedSender<AutomaticPaginationRequest>,
>,
) -> Result<Self, EventCacheError> {
let store_guard = match store.lock().await? {
// Lock is clean: all good!
@@ -397,7 +400,7 @@ impl LockedRoomEventCacheState {
waited_for_initial_prev_token: false,
subscriber_count: Default::default(),
pinned_event_cache: OnceLock::new(),
background_request_sender,
automatic_pagination_request_sender,
}))
}
}
@@ -1061,7 +1064,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
&self.state.room_linked_chunk,
&mut read_receipts,
self.state.enabled_thread_support,
self.state.background_request_sender.as_ref(),
self.state.automatic_pagination_request_sender.as_ref(),
);
if prev_read_receipts != read_receipts {
+23 -22
View File
@@ -53,7 +53,7 @@ use tracing::{error, instrument, trace};
use crate::{
Client,
client::{ClientInner, WeakClient},
event_cache::automatic_pagination::{BackgroundRequest, background_requests_task},
event_cache::automatic_pagination::{AutomaticPaginationRequest, automatic_paginations_task},
paginators::PaginatorError,
};
@@ -154,9 +154,8 @@ pub struct EventCacheDropHandles {
/// The task used to automatically shrink the linked chunks.
auto_shrink_linked_chunk_task: BackgroundTaskHandle,
/// The task used to automatically handle background requests (like
/// paginations).
background_requests_task: Option<BackgroundTaskHandle>,
/// The task used to handle automatic pagination requests.
automatic_paginations_task: Option<BackgroundTaskHandle>,
/// The task used to automatically redecrypt UTDs.
#[cfg(feature = "e2e-encryption")]
@@ -174,7 +173,7 @@ impl Drop for EventCacheDropHandles {
self.listen_updates_task.abort();
self.ignore_user_list_update_task.abort();
self.auto_shrink_linked_chunk_task.abort();
if let Some(task) = self.background_requests_task.take() {
if let Some(task) = self.automatic_paginations_task.take() {
task.abort();
}
}
@@ -242,7 +241,7 @@ impl EventCache {
by_room: Default::default(),
drop_handles: Default::default(),
auto_shrink_sender: Default::default(),
background_requests_sender: Default::default(),
automatic_pagination_requests_sender: Default::default(),
generic_update_sender,
linked_chunk_update_sender,
_thread_subscriber_task: thread_subscriber_task,
@@ -321,19 +320,19 @@ impl EventCache {
redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
};
let background_requests_task = if self.config().experimental_auto_backpagination {
let automatic_paginations_task = if self.config().experimental_auto_backpagination {
let (sender, receiver) = mpsc::unbounded_channel();
// Run the deferred initialization of the background request sender, that is shared
// with every room.
self.inner.background_requests_sender.get_or_init(|| sender);
// Run the deferred initialization of the automatic pagination request sender, that
// is shared with every room.
self.inner.automatic_pagination_requests_sender.get_or_init(|| sender);
trace!("spawning the backgrounds requests task");
Some(task_monitor.spawn_background_task("event_cache::background_requests_task", background_requests_task(
trace!("spawning the automatic paginations task");
Some(task_monitor.spawn_background_task("event_cache::automatic_paginations_task", automatic_paginations_task(
self.inner.clone(), receiver
)))
} else {
trace!("backgrounds requests task is disabled");
trace!("automatic paginations task is disabled");
None
};
@@ -343,7 +342,7 @@ impl EventCache {
auto_shrink_linked_chunk_task,
#[cfg(feature = "e2e-encryption")]
_redecryptor: redecryptor,
background_requests_task
automatic_paginations_task
})
});
@@ -411,7 +410,7 @@ pub struct EventCacheConfig {
pub experimental_auto_backpagination: bool,
/// The maximum number of allowed room paginations, for a given room, that
/// can be executed in the background request task.
/// can be executed in the automatic paginations task.
///
/// After that number of paginations, the task will stop executing
/// paginations for that room *in the background* (user-requested
@@ -420,8 +419,8 @@ pub struct EventCacheConfig {
/// Defaults to [`EventCacheConfig::DEFAULT_ROOM_PAGINATION_CREDITS`].
pub room_pagination_per_room_credit: usize,
/// The number of messages to paginate in a single batch, when executing a
/// background pagination request.
/// The number of messages to paginate in a single batch, when executing an
/// automatic pagination request.
///
/// Defaults to [`EventCacheConfig::DEFAULT_ROOM_PAGINATION_BATCH_SIZE`].
pub room_pagination_batch_size: u16,
@@ -435,13 +434,13 @@ impl EventCacheConfig {
/// loading the pinned events.
pub const DEFAULT_MAX_CONCURRENT_REQUESTS: usize = 8;
/// The default number of credits to give to a room for background
/// The default number of credits to give to a room for automatic
/// paginations (see also
/// [`EventCacheConfig::room_pagination_per_room_credit`]).
pub const DEFAULT_ROOM_PAGINATION_CREDITS: usize = 20;
/// The default number of messages to paginate in a single batch, when
/// executing a background pagination request (see also
/// executing an automatic pagination request (see also
/// [`EventCacheConfig::room_pagination_batch_size`]).
pub const DEFAULT_ROOM_PAGINATION_BATCH_SIZE: u16 = 30;
}
@@ -498,11 +497,13 @@ struct EventCacheInner {
/// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
/// A sender for background requests, that is shared with every room.
/// A sender for automatic pagination requests, that is shared with every
/// room.
///
/// It's a `OnceLock` because its initialization is deferred to
/// [`EventCache::subscribe`].
background_requests_sender: OnceLock<mpsc::UnboundedSender<BackgroundRequest>>,
automatic_pagination_requests_sender:
OnceLock<mpsc::UnboundedSender<AutomaticPaginationRequest>>,
/// A sender for room generic update.
///
@@ -705,7 +706,7 @@ impl EventCacheInner {
"we must have called `EventCache::subscribe()` before calling here.",
),
self.store.clone(),
self.background_requests_sender.get().cloned(),
self.automatic_pagination_requests_sender.get().cloned(),
)
.await?;
@@ -602,7 +602,7 @@ async fn test_reset_while_backpaginating() {
wait_for_initial_events(events, &mut room_stream).await;
// We're going to cause a small race:
// - a background request to sync will be sent,
// - a pagination request to sync will be sent,
// - a backpagination will be sent concurrently.
//
// So events have to happen in this order: