feat(sdk): Subscribe to many rooms only via Sliding Sync.

This patch changes the `SlidingSync::subscribe_to_room` method to
`subscribe_to_rooms`. Note the plural form. It's now mandatory to
subscribe to a set of rooms. The idea is to avoid calling this method
repeatedly. Why? Because each time the method is called, it sends a
`SlidingSyncInternalMessage` of kind `SyncLoopSkipOverCurrentIteration`,
i.e. it cancels the in-flight sliding sync request, to start over with
a new one (with the new room subscription). A problem arises when the
async runtime (here, Tokio) is busy: in this case, the internal message
channel can be filled pretty easily because its size is 8. Messages
are not consumed as fast as they are inserted. By changing this API:
subscribing to multiple rooms will result in a single internal message,
instead of one per room.

Consequently, the rest of the patch moves the `subscribe` method of
`room_list_service::Room` to `room_list_service::RoomListService`
because it now concerns multiple rooms instead of a single one.
This commit is contained in:
Ivan Enderlin
2024-08-08 17:19:31 +02:00
committed by Stefan Ceriu
parent 89ce8870a9
commit be404f6666
8 changed files with 77 additions and 60 deletions
+20 -4
View File
@@ -132,6 +132,26 @@ impl RoomListService {
}
})))
}
fn subscribe_to_rooms(
&self,
room_ids: Vec<String>,
settings: Option<RoomSubscription>,
) -> Result<(), RoomListError> {
let room_ids = room_ids
.into_iter()
.map(|room_id| {
RoomId::parse(&room_id).map_err(|_| RoomListError::InvalidRoomId { error: room_id })
})
.collect::<Result<Vec<_>, _>>()?;
self.inner.subscribe_to_rooms(
&room_ids.iter().map(AsRef::as_ref).collect::<Vec<_>>(),
settings.map(Into::into),
);
Ok(())
}
}
#[derive(uniffi::Object)]
@@ -649,10 +669,6 @@ impl RoomListItem {
self.inner.is_encrypted().await.unwrap_or(false)
}
fn subscribe(&self, settings: Option<RoomSubscription>) {
self.inner.subscribe(settings.map(Into::into));
}
async fn latest_event(&self) -> Option<Arc<EventTimelineItem>> {
self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new)
}
@@ -373,8 +373,8 @@ impl NotificationClient {
.build()
.await?;
sync.subscribe_to_room(
room_id.to_owned(),
sync.subscribe_to_rooms(
&[room_id],
Some(assign!(http::request::RoomSubscription::default(), {
required_state,
timeline_limit: Some(uint!(16))
@@ -365,6 +365,30 @@ impl RoomListService {
))
}
/// Subscribe to rooms.
///
/// It means that all events from these rooms will be received every time,
/// no matter how the `RoomList` is configured.
pub fn subscribe_to_rooms(
&self,
room_ids: &[&RoomId],
settings: Option<http::request::RoomSubscription>,
) {
let mut settings = settings.unwrap_or_default();
// Make sure to always include the room creation event in the required state
// events, to know what the room version is.
if !settings
.required_state
.iter()
.any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate)
{
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}
self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings))
}
#[cfg(test)]
pub fn sliding_sync(&self) -> &SlidingSync {
&self.sliding_sync
@@ -19,8 +19,7 @@ use std::{ops::Deref, sync::Arc};
use async_once_cell::OnceCell as AsyncOnceCell;
use matrix_sdk::SlidingSync;
use matrix_sdk_base::sliding_sync::http;
use ruma::{events::StateEventType, RoomId};
use ruma::RoomId;
use super::Error;
use crate::{
@@ -88,28 +87,6 @@ impl Room {
&self.inner.room
}
/// Subscribe to this room.
///
/// It means that all events from this room will be received every time, no
/// matter how the `RoomList` is configured.
pub fn subscribe(&self, settings: Option<http::request::RoomSubscription>) {
let mut settings = settings.unwrap_or_default();
// Make sure to always include the room creation event in the required state
// events, to know what the room version is.
if !settings
.required_state
.iter()
.any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate)
{
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}
self.inner
.sliding_sync
.subscribe_to_room(self.inner.room.room_id().to_owned(), Some(settings))
}
/// Get the timeline of the room if one exists.
pub fn timeline(&self) -> Option<Arc<Timeline>> {
self.inner.timeline.get().cloned()
@@ -2128,19 +2128,20 @@ async fn test_room_subscription() -> Result<(), Error> {
},
};
let room1 = room_list.room(room_id_1).unwrap();
// Subscribe.
room1.subscribe(Some(assign!(RoomSubscription::default(), {
required_state: vec![
(StateEventType::RoomName, "".to_owned()),
(StateEventType::RoomTopic, "".to_owned()),
(StateEventType::RoomAvatar, "".to_owned()),
(StateEventType::RoomCanonicalAlias, "".to_owned()),
],
timeline_limit: Some(uint!(30)),
})));
room_list.subscribe_to_rooms(
&[room_id_1],
Some(assign!(RoomSubscription::default(), {
required_state: vec![
(StateEventType::RoomName, "".to_owned()),
(StateEventType::RoomTopic, "".to_owned()),
(StateEventType::RoomAvatar, "".to_owned()),
(StateEventType::RoomCanonicalAlias, "".to_owned()),
],
timeline_limit: Some(uint!(30)),
})),
);
sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
+1 -1
View File
@@ -139,7 +139,7 @@ Notably, this map only knows about the rooms that have come down [Sliding
Sync protocol][MSC] and if the given room isn't in any active list range, it
may be stale. Additionally to selecting the room data via the room lists,
the [Sliding Sync protocol][MSC] allows to subscribe to specific rooms via
the [`subscribe_to_room()`](SlidingSync::subscribe_to_room). Any room subscribed
the [`subscribe_to_rooms()`](SlidingSync::subscribe_to_rooms). Any room subscribed
to will receive updates (with the given settings) regardless of whether they are
visible in any list. The most common case for using this API is when the user
enters a room - as we want to receive the incoming new messages regardless of
+16 -17
View File
@@ -143,26 +143,26 @@ impl SlidingSync {
SlidingSyncBuilder::new(id, client)
}
/// Subscribe to a given room.
/// Subscribe to many rooms.
///
/// If the associated `Room` exists, it will be marked as
/// If the associated `Room`s exist, it will be marked as
/// members are missing, so that it ensures to re-fetch all members.
pub fn subscribe_to_room(
pub fn subscribe_to_rooms(
&self,
room_id: OwnedRoomId,
room_ids: &[&RoomId],
settings: Option<http::request::RoomSubscription>,
) {
if let Some(room) = self.inner.client.get_room(&room_id) {
room.mark_members_missing();
}
let settings = settings.unwrap_or_default();
let mut sticky = self.inner.sticky.write().unwrap();
let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
self.inner
.sticky
.write()
.unwrap()
.data_mut()
.room_subscriptions
.insert(room_id, settings.unwrap_or_default());
for room_id in room_ids {
if let Some(room) = self.inner.client.get_room(room_id) {
room.mark_members_missing();
}
room_subscriptions.insert((*room_id).to_owned(), settings.clone());
}
self.inner.internal_channel_send_if_possible(
SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
@@ -1081,7 +1081,7 @@ mod tests {
}
#[async_test]
async fn test_subscribe_to_room() -> Result<()> {
async fn test_subscribe_to_rooms() -> Result<()> {
let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
.await?;
@@ -1150,8 +1150,7 @@ mod tests {
// Members are now synced! We can start subscribing and see how it goes.
assert!(room0.are_members_synced());
sliding_sync.subscribe_to_room(room_id_0.to_owned(), None);
sliding_sync.subscribe_to_room(room_id_1.to_owned(), None);
sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None);
// OK, we have subscribed to some rooms. Let's check on `room0` if members are
// now marked as not synced.
+1 -1
View File
@@ -398,7 +398,7 @@ impl App {
.get_selected_room_id(Some(selected))
.and_then(|room_id| self.ui_rooms.lock().unwrap().get(&room_id).cloned())
{
room.subscribe(None);
self.sync_service.room_list_service().subscribe_to_rooms(&[room.room_id()], None);
self.current_room_subscription = Some(room);
}
}