diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index ebbfefe9f..b4afd2033 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -117,6 +117,14 @@ pub enum QueuedRequestKind { #[serde(default)] accumulated: Vec, }, + + /// A redaction of another event to send. + Redaction { + /// The ID of the event to redact. + redacts: OwnedEventId, + /// The reason for the event being redacted. + reason: Option, + }, } impl From for QueuedRequestKind { @@ -421,12 +429,27 @@ pub enum SentRequestKey { /// The parent transaction returned an uploaded resource URL. Media(SentMediaInfo), + + /// The parent transaction returned a redaction event when it succeeded. + Redaction { + /// The event ID returned by the server. + event_id: OwnedEventId, + + /// The ID of the redacted event. + redacts: OwnedEventId, + + /// The reason for the event being redacted. + reason: Option, + }, } impl SentRequestKey { /// Converts the current parent key into an event id, if possible. pub fn into_event_id(self) -> Option { - as_variant!(self, Self::Event { event_id, .. } => event_id) + match self { + Self::Event { event_id, .. } | Self::Redaction { event_id, .. } => Some(event_id), + _ => None, + } } /// Converts the current parent key into information about a sent media, if diff --git a/crates/matrix-sdk-ui/CHANGELOG.md b/crates/matrix-sdk-ui/CHANGELOG.md index af2313ec0..f41a4f425 100644 --- a/crates/matrix-sdk-ui/CHANGELOG.md +++ b/crates/matrix-sdk-ui/CHANGELOG.md @@ -30,6 +30,8 @@ All notable changes to this project will be documented in this file. ### Features +- Handle local echoes of redactions in the timeline. + ([#6250](https://github.com/matrix-org/matrix-rust-sdk/pull/6250)) - [**breaking**] Remove support for `native-tls` and remove all feature flags for selecting TLS backend, as `rustls` is the now the only supported TLS backend. diff --git a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs index 07ae29953..d11c26552 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs @@ -127,7 +127,12 @@ pub(crate) enum AggregationKind { }, /// An event has been redacted. - Redaction, + Redaction { + /// Whether this aggregation results from the local echo of a redaction. + /// Local echoes of redactions are applied reversibly whereas remote + /// echoes of redactions are applied irreversibly. + is_local: bool, + }, /// An event has been edited. /// @@ -238,11 +243,15 @@ impl Aggregation { } } - AggregationKind::Redaction => { - if event.content().is_redacted() { + AggregationKind::Redaction { is_local } => { + let is_local_redacted = + event.content().is_redacted() && event.unredacted_item.is_some(); + let is_remote_redacted = + event.content().is_redacted() && event.unredacted_item.is_none(); + if *is_local && is_local_redacted || !*is_local && is_remote_redacted { ApplyAggregationResult::LeftItemIntact } else { - let new_item = event.redact(&rules.redaction); + let new_item = event.redact(&rules.redaction, *is_local); *event = Cow::Owned(new_item); ApplyAggregationResult::UpdatedItem } @@ -353,9 +362,20 @@ impl Aggregation { ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd) } - AggregationKind::Redaction => { - // Redactions are not reversible. - ApplyAggregationResult::Error(AggregationError::CantUndoRedaction) + AggregationKind::Redaction { is_local } => { + if *is_local { + if event.unredacted_item.is_some() { + // Unapply local redaction. + *event = Cow::Owned(event.unredact()); + ApplyAggregationResult::UpdatedItem + } else { + // Event isn't locally redacted. Nothing to do. + ApplyAggregationResult::LeftItemIntact + } + } else { + // Remote redactions are not reversible. + ApplyAggregationResult::Error(AggregationError::CantUndoRedaction) + } } AggregationKind::Reaction { key, sender, .. } => { @@ -491,7 +511,7 @@ impl Aggregations { pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) { // If the aggregation is a redaction, it invalidates all the other aggregations; // remove them. - if matches!(aggregation.kind, AggregationKind::Redaction) { + if matches!(aggregation.kind, AggregationKind::Redaction { .. }) { for agg in self.related_events.remove(&related_to).unwrap_or_default() { self.inverted_map.remove(&agg.own_id); } @@ -502,7 +522,7 @@ impl Aggregations { if let Some(previous_aggregations) = self.related_events.get(&related_to) && previous_aggregations .iter() - .any(|agg| matches!(agg.kind, AggregationKind::Redaction)) + .any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. })) { return; } @@ -718,12 +738,19 @@ impl Aggregations { AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } | AggregationKind::Edit(..) - | AggregationKind::Redaction | AggregationKind::BeaconUpdate { .. } | AggregationKind::BeaconStop { .. } => { // Nothing particular to do. } + AggregationKind::Redaction { is_local } => { + // Mark the redaction as being remote and apply it (irreversibly). + *is_local = false; + + let found = found.clone(); + find_item_and_apply_aggregation(self, items, &target, found, rules); + } + AggregationKind::Reaction { reaction_status, .. } => { // Mark the reaction as becoming remote, and signal that update to the // caller. diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 0e6a0d222..efe0b59d9 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -1149,6 +1149,23 @@ impl TimelineController

{ LocalEchoContent::React { key, send_handle, applies_to } => { self.handle_local_reaction(key, send_handle, applies_to).await; } + + LocalEchoContent::Redaction { redacts, send_error, .. } => { + self.handle_local_redaction(echo.transaction_id.clone(), redacts).await; + + if let Some(send_error) = send_error { + self.update_event_send_state( + &echo.transaction_id, + EventSendState::SendingFailed { + error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new( + send_error, + ))), + is_recoverable: false, + }, + ) + .await; + } + } } } @@ -1189,6 +1206,34 @@ impl TimelineController

{ tr.commit(); } + /// Applies a local echo of a redaction. + pub(super) async fn handle_local_redaction( + &self, + txn_id: OwnedTransactionId, + redacts: OwnedEventId, + ) { + let mut state = self.state.write().await; + let mut tr = state.transaction(); + + let target = TimelineEventItemId::EventId(redacts); + + let aggregation = Aggregation::new( + TimelineEventItemId::TransactionId(txn_id), + AggregationKind::Redaction { is_local: true }, + ); + + tr.meta.aggregations.add(target.clone(), aggregation.clone()); + find_item_and_apply_aggregation( + &tr.meta.aggregations, + &mut tr.items, + &target, + aggregation, + &tr.meta.room_version_rules, + ); + + tr.commit(); + } + /// Handle a single room send queue update. pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) { match update { diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 03bfb74b7..9a1e5f4e1 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -852,8 +852,12 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } let target = TimelineEventItemId::EventId(redacted.clone()); - let aggregation = - Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction); + let aggregation = Aggregation::new( + self.ctx.flow.timeline_item_id(), + AggregationKind::Redaction { + is_local: false, // We can only get here for remote echoes of redactions. + }, + ); self.meta.aggregations.add(target.clone(), aggregation.clone()); find_item_and_apply_aggregation( diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 3415724d3..7294e216f 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -80,8 +80,14 @@ pub struct EventTimelineItem { pub(super) forwarder_profile: Option>, /// The timestamp of the event. pub(super) timestamp: MilliSecondsSinceUnixEpoch, - /// The content of the event. + /// The content of the event. Might be redacted if a redaction for this + /// event is currently being sent or has been received from the server. pub(super) content: TimelineItemContent, + /// If a redaction for this event is currently being sent but the server + /// hasn't yet acknowledged it via its remote echo, the data + /// before redaction. This applies to all sorts of timeline items, including + /// state events. If no redaction is in flight, None. + pub(super) unredacted_item: Option, /// The kind of event timeline item, local or remote. pub(super) kind: EventTimelineItemKind, /// Whether or not the event belongs to an encrypted room. @@ -118,6 +124,20 @@ pub(crate) enum TimelineItemHandle<'a> { Local(&'a SendHandle), } +/// A container for temporarily holding onto data that is going to be erased by +/// a redaction once the server plays it back. +#[derive(Clone, Debug)] +pub(super) struct UnredactedEventTimelineItem { + /// The original content before redaction. + content: TimelineItemContent, + + /// JSON of the original event. + pub(crate) original_json: Option>, + + /// JSON of the latest edit to this item. + pub(crate) latest_edit_json: Option>, +} + impl EventTimelineItem { #[allow(clippy::too_many_arguments)] pub(super) fn new( @@ -137,6 +157,7 @@ impl EventTimelineItem { forwarder_profile, timestamp, content, + unredacted_item: None, kind, is_room_encrypted, } @@ -508,7 +529,12 @@ impl EventTimelineItem { } /// Create a clone of the current item, with content that's been redacted. - pub(super) fn redact(&self, rules: &RedactionRules) -> Self { + pub(super) fn redact(&self, rules: &RedactionRules, is_local: bool) -> Self { + let unredacted_item = is_local.then(|| UnredactedEventTimelineItem { + content: self.content.clone(), + original_json: self.original_json().cloned(), + latest_edit_json: self.latest_edit_json().cloned(), + }); let content = self.content.redact(rules); let kind = match &self.kind { EventTimelineItemKind::Local(l) => EventTimelineItemKind::Local(l.clone()), @@ -521,6 +547,35 @@ impl EventTimelineItem { forwarder_profile: self.forwarder_profile.clone(), timestamp: self.timestamp, content, + unredacted_item, + kind, + is_room_encrypted: self.is_room_encrypted, + } + } + + /// Create a clone of the current item, with data restored from the + /// item's unredacted_item field (if it was previously set by a call to + /// the `redact(...)` method). + pub(super) fn unredact(&self) -> Self { + let Some(unredacted_item) = &self.unredacted_item else { return self.clone() }; + let kind = match &self.kind { + EventTimelineItemKind::Local(l) => EventTimelineItemKind::Local(l.clone()), + EventTimelineItemKind::Remote(r) => { + EventTimelineItemKind::Remote(RemoteEventTimelineItem { + original_json: unredacted_item.original_json.clone(), + latest_edit_json: unredacted_item.latest_edit_json.clone(), + ..r.clone() + }) + } + }; + Self { + sender: self.sender.clone(), + sender_profile: self.sender_profile.clone(), + forwarder: self.forwarder.clone(), + forwarder_profile: self.forwarder_profile.clone(), + timestamp: self.timestamp, + content: unredacted_item.content.clone(), + unredacted_item: None, kind, is_room_encrypted: self.is_room_encrypted, } diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index c29e0024e..840e38de1 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -173,6 +173,12 @@ impl TestTimeline { txn_id } + async fn handle_local_redaction(&self, redacts: OwnedEventId) -> OwnedTransactionId { + let txn_id = TransactionId::new(); + self.controller.handle_local_redaction(txn_id.clone(), redacts).await; + txn_id + } + async fn handle_back_paginated_event(&self, event: Raw) { let timeline_event = TimelineEvent::from_plaintext(event.cast()); self.controller diff --git a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs index d065dff02..5d4c5b769 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs @@ -23,13 +23,14 @@ use ruma::{ StateEventContentChange, reaction::RedactedReactionEventContent, room::message::OriginalSyncRoomMessageEvent, }, + owned_event_id, }; use stream_assert::{assert_next_matches, assert_pending}; use super::TestTimeline; use crate::timeline::{ AnyOtherStateEventContentChange, TimelineDetails, TimelineItemContent, - event_item::RemoteEventOrigin, + event_item::{EventTimelineItemKind, RemoteEventOrigin, RemoteEventTimelineItem}, }; #[async_test] @@ -203,3 +204,47 @@ async fn test_reaction_redaction_timeline_filter() { assert_eq!(item.content().reactions().cloned().unwrap_or_default().len(), 0); assert_eq!(timeline.controller.items().await.len(), 2); } + +#[async_test] +async fn test_local_and_remote_echo_of_redaction() { + let timeline = TestTimeline::new(); + let mut stream = timeline.subscribe_events().await; + + let f = &timeline.factory; + + // Send a message. + let event_id = owned_event_id!("$1"); + timeline + .handle_live_event(f.text_msg("Hello, world!").sender(&ALICE).event_id(&event_id)) + .await; + let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value); + assert!(!item.content().is_redacted()); + assert!(item.unredacted_item.is_none()); + assert_let!( + EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind + ); + assert!(original_json.is_some()); + + // Now redact the message. We first emit the local echo of the redaction event. + // The timeline event should be marked as being under redaction. + timeline.handle_local_redaction(event_id.clone()).await; + let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value); + assert!(item.content().is_redacted()); + assert_let!(Some(unredacted_item) = item.unredacted_item); + assert!(unredacted_item.original_json.is_some()); + assert_let!( + EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind + ); + assert!(original_json.is_none()); + + // Then comes the remote echo of the redaction event. The timeline event should + // now be redacted. + timeline.handle_live_event(f.redaction(&event_id).sender(&ALICE)).await; + let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value); + assert!(item.content().is_redacted()); + assert!(item.unredacted_item.is_none()); + assert_let!( + EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind + ); + assert!(original_json.is_none()); +} diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 0fd0ae159..d965379ba 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file. ### Features +- Enable sending redaction events through the send queue via `RoomSendQueue::redact`. + This includes local echoes for redaction events through the new `LocalEchoContent::Redaction` + variant. + ([#6250](https://github.com/matrix-org/matrix-rust-sdk/pull/6250)) - [**breaking**] Remove support for `native-tls` and remove all feature flags for selecting TLS backend, as `rustls` is the now the only supported TLS backend. diff --git a/crates/matrix-sdk/src/event_cache/tasks.rs b/crates/matrix-sdk/src/event_cache/tasks.rs index 0498f0d4f..2a85465ce 100644 --- a/crates/matrix-sdk/src/event_cache/tasks.rs +++ b/crates/matrix-sdk/src/event_cache/tasks.rs @@ -313,6 +313,11 @@ async fn handle_thread_subscriber_send_queue_update( // Nothing to do, reactions don't count as a thread // subscription. } + + LocalEchoContent::Redaction { .. } => { + // Nothing to do, redactions don't count as a thread + // subscription. + } } return true; } diff --git a/crates/matrix-sdk/src/latest_events/latest_event/builder.rs b/crates/matrix-sdk/src/latest_events/latest_event/builder.rs index 81959d29b..ccb592232 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event/builder.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event/builder.rs @@ -210,6 +210,11 @@ impl Builder { } LocalEchoContent::React { .. } => None, + + // TODO: Rework the latest event system to handle local redactions. This will + // require mixing the processing of local and remote events since a local redaction + // will target a previous remote event. + LocalEchoContent::Redaction { .. } => None, }, // A local event has been cancelled before being sent. @@ -1679,7 +1684,7 @@ mod builder_tests { Client, Error, send_queue::{ AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle, - SendReactionHandle, + SendReactionHandle, SendRedactionHandle, }, test_utils::mocks::MatrixMockServer, }; @@ -3641,4 +3646,108 @@ mod builder_tests { ); assert!(buffer.buffer.is_empty()); } + + #[async_test] + async fn test_local_redaction() { + let room_id = room_id!("!r0"); + let user_id = user_id!("@mnt_io:matrix.org"); + let event_factory = EventFactory::new().sender(user_id).room(room_id); + let event_id = event_id!("$ev0"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + // Prelude. + { + // Create the room. + client.base_client().get_or_create_room(room_id, RoomState::Joined); + + // Initialise the event cache store. + client + .event_cache_store() + .lock() + .await + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") + .handle_linked_chunk_updates( + LinkedChunkId::Room(room_id), + vec![ + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(0), 0), + items: vec![event_factory.text_msg("hello").event_id(event_id).into()], + }, + ], + ) + .await + .unwrap(); + } + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); + let send_queue = client.send_queue(); + let room = client.get_room(room_id).unwrap(); + let room_send_queue = send_queue.for_room(room); + + let mut buffer = BufferOfValuesForLocalEvents::new(); + + // We get a `Remote` because there is no `Local*` values! + assert_remote_value_matches_room_message_with_body!( + Builder::new_local( + // An update that won't create a new `LatestEventValue`: it maps + // to zero existing local value. + &RoomSendQueueUpdate::SentEvent { + transaction_id: OwnedTransactionId::from("txnid"), + event_id: event_id.to_owned(), + }, + &mut buffer, + &room_event_cache, + None, + user_id, + None, + ) + .await + => with body = "hello" + ); + + // A local redaction of the latest event value is being sent + { + let transaction_id = OwnedTransactionId::from("txnid0"); + let content = LocalEchoContent::Redaction { + redacts: event_id.to_owned(), + reason: Some("whatever".to_owned()), + send_handle: SendRedactionHandle::new( + room_send_queue.clone(), + transaction_id.clone(), + ), + send_error: None, + }; + let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: transaction_id.clone(), + content, + }); + + // Local redactions are currently ignored. + assert_matches!( + Builder::new_local( + &update, + &mut buffer, + &room_event_cache, + Some(event_id.to_owned()), + user_id, + None + ) + .await, + None + ); + }; + assert_eq!(buffer.buffer.len(), 0); + } } diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 06cc04f26..deeb4e346 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -130,6 +130,7 @@ use std::{ collections::{BTreeMap, HashMap}, + ops::Not, str::FromStr as _, sync::{ Arc, RwLock, @@ -161,7 +162,7 @@ use ruma::{ MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, TransactionId, events::{ - AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, + AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, TimelineEventType, reaction::ReactionEventContent, relation::Annotation, room::{ @@ -580,6 +581,59 @@ impl RoomSendQueue { .await } + /// Queues a redaction of another event for sending it to this room. + /// + /// This immediately returns, and will push the redaction to be sent into a + /// queue, handled in the background. + /// + /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling + /// the [`Self::subscribe()`] method to get updates about the sending of + /// that redaction. + /// + /// By default, if sending failed on the first attempt, it will be retried a + /// few times. If sending failed after those retries, the entire + /// client's sending queue will be disabled, and it will need to be + /// manually re-enabled by the caller (e.g. after network is back, or when + /// something has been done about the faulty requests). + pub async fn redact( + &self, + redacts: OwnedEventId, + reason: Option<&str>, + ) -> Result { + let Some(room) = self.inner.room.get() else { + return Err(RoomSendQueueError::RoomDisappeared); + }; + if room.state() != RoomState::Joined { + return Err(RoomSendQueueError::RoomNotJoined); + } + + let request = QueuedRequestKind::Redaction { + redacts: redacts.clone(), + reason: reason.map(str::to_owned), + }; + + let created_at = MilliSecondsSinceUnixEpoch::now(); + let transaction_id = self.inner.queue.push(request, created_at).await?; + trace!(%transaction_id, "manager sends a redaction event to the background task"); + + self.inner.notifier.notify_one(); + + let send_handle = + SendRedactionHandle { room: self.clone(), transaction_id: transaction_id.clone() }; + + self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id, + content: LocalEchoContent::Redaction { + redacts, + reason: reason.map(str::to_owned), + send_handle: send_handle.clone(), + send_error: None, + }, + })); + + Ok(send_handle) + } + /// Returns the current local requests as well as a receiver to listen to /// the send queue updates, as defined in [`RoomSendQueueUpdate`]. /// @@ -796,9 +850,8 @@ impl RoomSendQueue { } }; - // In case of an error, just log the error but not stop the Send - // Queue. This feature is not - // crucial. + // In case of an error, just log the error but don't stop the Send + // Queue. This feature is not crucial. if let Some(timeline_event) = timeline_event && let Err(err) = room_event_cache .insert_sent_event_from_send_queue(timeline_event) @@ -839,6 +892,91 @@ impl RoomSendQueue { progress, }); } + + SentRequestKey::Redaction { event_id, redacts, reason } => { + send_update( + &global_update_sender, + &update_sender, + room_id, + RoomSendQueueUpdate::SentEvent { + transaction_id: txn_id, + event_id: event_id.clone(), + }, + ); + + // The redaction event has been sent to the server and the server has + // received it. It's safe to cache the event + // now to avoid any inconsistencies until the server + // sends down the remote echo via the sync. + if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await + { + let content_field_redacts = room.version().is_some_and(|id| { + id.rules() + .is_some_and(|rules| rules.redaction.content_field_redacts) + }); + let redacts = if content_field_redacts.not() { + format!("\"redacts\":\"{redacts}\",") + } else { + "".to_owned() + }; + let reason = reason.map_or_else( + || "".to_owned(), + |r| format!("\"reason\": \"{r}\""), + ); + let content = if content_field_redacts { + format!("\"redacts\":\"{redacts}\",{reason}") + } else { + reason + }; + + let timeline_event = match Raw::from_json_string( + // Create a compact string: remove all useless spaces. + format!( + "{{\ + {redacts}\ + \"event_id\":\"{event_id}\",\ + \"origin_server_ts\":{ts},\ + \"sender\":\"{sender}\",\ + \"type\":\"{type}\",\ + \"content\":{{{content}}}\ + }}", + redacts = redacts, + event_id = event_id, + ts = MilliSecondsSinceUnixEpoch::now().get(), + sender = room.client().user_id().expect("Client must be logged-in"), + type = TimelineEventType::RoomRedaction, + content = content + ), + ) { + Ok(event) => Some(TimelineEvent::from_plaintext(event)), + Err(err) => { + error!( + ?err, + "Failed to build the (sync) redaction event before the saving in the Event Cache" + ); + None + } + }; + + // In case of an error, just log the error but don't stop the Send + // Queue. This feature is not crucial. + if let Some(timeline_event) = timeline_event + && let Err(err) = room_event_cache + .insert_sent_event_from_send_queue(timeline_event) + .await + { + error!( + ?err, + "Failed to save the sent redaction event in the Event Cache" + ); + } + } else { + info!( + "Cannot insert the sent redaction event in the Event Cache because \ + either the room no longer exists, or the Room Event Cache cannot be retrieved" + ); + } + } }, Err(err) => { @@ -1060,6 +1198,19 @@ impl RoomSendQueue { } } } + + QueuedRequestKind::Redaction { redacts, reason } => { + let result = room + .redact(&redacts, reason.as_deref(), Some(request.transaction_id.clone())) + .await?; + + trace!(txn_id = %request.transaction_id, event_id = %result.event_id, "redaction successfully sent"); + + Ok(( + Some(SentRequestKey::Redaction { event_id: result.event_id, redacts, reason }), + None, + )) + } } } @@ -1858,6 +2009,18 @@ impl QueueStorage { // event represented as a dependent request should be sufficient. return None; } + + QueuedRequestKind::Redaction { redacts, reason } => { + LocalEchoContent::Redaction { + redacts, + reason, + send_handle: SendRedactionHandle { + room: room.clone(), + transaction_id: queued.transaction_id, + }, + send_error: queued.error, + } + } }, }) }); @@ -2327,6 +2490,19 @@ pub enum LocalEchoContent { /// The local echo which has been reacted to. applies_to: OwnedTransactionId, }, + + /// A local echo of a redaction event. + Redaction { + /// The ID of the redacted event. + redacts: OwnedEventId, + /// The reason for the event being redacted. + reason: Option, + /// A handle to manipulate the sending of the associated event. + send_handle: SendRedactionHandle, + /// Whether trying to send this local echo failed in the past with an + /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]). + send_error: Option, + }, } /// A local representation for a request that hasn't been sent yet to the user's @@ -2813,6 +2989,49 @@ impl SendReactionHandle { } } +/// A handle to manipulate a redaction event that was scheduled to be sent to a +/// room. +#[derive(Clone, Debug)] +pub struct SendRedactionHandle { + /// Link to the send queue used to send this request. + room: RoomSendQueue, + + /// Transaction id used for the sent request. + transaction_id: OwnedTransactionId, +} + +impl SendRedactionHandle { + /// Creates a new [`SendRedactionHandle`]. + #[cfg(test)] + pub(crate) fn new(room: RoomSendQueue, transaction_id: OwnedTransactionId) -> Self { + Self { room, transaction_id } + } + + /// Abort the sending of the redaction. + /// + /// Will return true if the redaction could be aborted, false if it's been + /// sent (and there's no matching local echo anymore). + pub async fn abort(&self) -> Result { + trace!("received a redaction abort request"); + + let queue = &self.room.inner.queue; + + if queue.cancel_event(&self.transaction_id).await? { + trace!("successful redaction abort"); + + // Propagate a cancelled update too. + self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent { + transaction_id: self.transaction_id.clone(), + }); + + Ok(true) + } else { + debug!("local echo of redaction didn't exist anymore, can't abort"); + Ok(false) + } + } +} + /// From a given source of [`DependentQueuedRequest`], return only the most /// meaningful, i.e. the ones that wouldn't be overridden after applying the /// others. diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index a48ec9ad0..1ec577739 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -28,7 +28,8 @@ use ruma::events::room::message::GalleryItemType; use ruma::{ MxcUri, OwnedEventId, OwnedTransactionId, TransactionId, event_id, events::{ - AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, + AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, Mentions, + MessageLikeEventContent as _, poll::unstable_start::{ NewUnstablePollStartEventContent, UnstablePollAnswer, UnstablePollAnswers, UnstablePollStartContentBlock, UnstablePollStartEventContent, @@ -42,7 +43,7 @@ use ruma::{ }, }, }, - mxc_uri, owned_mxc_uri, owned_user_id, room_id, + mxc_uri, owned_event_id, owned_mxc_uri, owned_user_id, room_id, serde::Raw, uint, }; @@ -243,6 +244,27 @@ macro_rules! assert_update { txn }}; + // Check the next stream event is a local echo for a redaction of an event with ID $redacts and reason $reason. + (($global_watch:ident, $watch:ident) => local echo redaction { redacts = $redacts:expr, reason = $reason:expr }) => {{ + assert_let!( + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + content: LocalEchoContent::Redaction { + redacts, + reason, + send_handle: _, + send_error: _, + }, + transaction_id: txn, + }))) = timeout(Duration::from_secs(1), $watch.recv()).await + ); + assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::NewLocalEvent { .. }, .. })); + + assert_eq!(redacts, $redacts); + assert_eq!(reason, $reason); + + txn + }}; + // Check the next stream event is an edit event, and that the // transaction id is the one we expect. (($global_watch:ident, $watch:ident) => edit local echo { txn = $transaction_id:expr }) => {{ @@ -1890,6 +1912,94 @@ async fn test_reactions() { assert!(watch.is_empty()); } +#[async_test] +async fn test_redaction() { + let server = MatrixMockServer::new().await; + + let client = server.client_builder().build().await; + client.event_cache().subscribe().unwrap(); + + let room_id = room_id!("!a:b.c"); + let room = server.sync_joined_room(&client, room_id).await; + + server.mock_room_state_encryption().plain().mount().await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + + let queue = room.send_queue(); + let mut global_watch = client.send_queue().subscribe(); + let (local_echoes, mut watch) = queue.subscribe().await.unwrap(); + + // ---------------------- + // Sanity check: the cache and queue are empty at the start. + assert!(events.is_empty()); + assert!(local_echoes.is_empty()); + assert!(watch.is_empty()); + + // ---------------------- + // Send a message in the room. + let content = RoomMessageEventContent::text_plain("hello world"); + let msg_event_id = owned_event_id!("$1"); + server.mock_room_send().ok(msg_event_id.clone()).mock_once().mount().await; + queue.send(content.into()).await.unwrap(); + + // ---------------------- + // Observe the local echo. + let (txn, _) = assert_update!((global_watch, watch) => local echo { body = "hello world" }); + + // ---------------------- + // The event is sent, at some point. + assert_update!((global_watch, watch) => sent { + txn = txn, + event_id = msg_event_id + }); + + // ---------------------- + // Observe the event getting added to the cache. + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents(up)) = stream.recv()); + assert_eq!(up.diffs.len(), 1); + assert_let!(VectorDiff::Append { values } = &up.diffs[0]); + assert_eq!(values.len(), 1); + assert_eq!(values[0].event_id().as_deref().unwrap(), msg_event_id); + + // ---------------------- + // Send a redaction for the event. + let redacts = msg_event_id.clone(); + let reason = Some("whatever"); + let redaction_event_id = owned_event_id!("$2"); + server.mock_room_redact().ok(redaction_event_id.clone()).mount().await; + queue.redact(redacts.clone(), reason).await.expect("queuing the redaction works"); + + // ---------------------- + // Observe the local echo. + let txn = assert_update!((global_watch, watch) => local echo redaction { redacts = redacts, reason = reason.map(str::to_owned) }); + + // ---------------------- + // The redaction event is sent, at some point. + assert_update!((global_watch, watch) => sent { + txn = txn, + event_id = redaction_event_id + }); + + // ---------------------- + // Observe the redaction getting applied in the cache. + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents(up)) = stream.recv()); + assert_eq!(up.diffs.len(), 2); + // The redaction event itself is added. + assert_let!(VectorDiff::Append { values } = &up.diffs[0]); + assert_eq!(values.len(), 1); + assert_eq!(values[0].event_id().as_deref().unwrap(), redaction_event_id); + // The target event is now redacted. + assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &up.diffs[1]); + let ev = redacted_event.raw().deserialize().unwrap(); + assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev); + assert_matches!(ev.as_original(), None); + + // That's all, folks! + assert!(watch.is_empty()); +} + #[async_test] async fn test_media_uploads() { let mock = MatrixMockServer::new().await;