feat(send_queue): send redactions via the send queue (#6250)

This is a first step towards
https://github.com/matrix-org/matrix-rust-sdk/issues/4162 and adds a way
to send redactions (including their local echoes) via the send queue.

I had to introduce new variants for `SentRequestKey` and
`LocalEchoContent` because in some room versions the redacted event ID
sits at the top-level of the event rather than in `content`.

At the timeline level redactions are handled via a new boolean flag in
`AggregationKind::Redaction`. Local echoes of redactions merely set a
flag on the timeline event whereas remote echoes of redactions lead to
actual redactions as before.

The FFI bindings will be updated in a follow-up PR.

Signed-off-by: Johannes Marbach <n0-0ne+github@mailbox.org>
This commit is contained in:
Johannes Marbach
2026-04-07 18:02:59 +02:00
committed by GitHub
parent b07069170f
commit 1f3dea778b
13 changed files with 677 additions and 23 deletions
+24 -1
View File
@@ -117,6 +117,14 @@ pub enum QueuedRequestKind {
#[serde(default)]
accumulated: Vec<AccumulatedSentMediaInfo>,
},
/// A redaction of another event to send.
Redaction {
/// The ID of the event to redact.
redacts: OwnedEventId,
/// The reason for the event being redacted.
reason: Option<String>,
},
}
impl From<SerializableEventContent> for QueuedRequestKind {
@@ -421,12 +429,27 @@ pub enum SentRequestKey {
/// The parent transaction returned an uploaded resource URL.
Media(SentMediaInfo),
/// The parent transaction returned a redaction event when it succeeded.
Redaction {
/// The event ID returned by the server.
event_id: OwnedEventId,
/// The ID of the redacted event.
redacts: OwnedEventId,
/// The reason for the event being redacted.
reason: Option<String>,
},
}
impl SentRequestKey {
/// Converts the current parent key into an event id, if possible.
pub fn into_event_id(self) -> Option<OwnedEventId> {
as_variant!(self, Self::Event { event_id, .. } => event_id)
match self {
Self::Event { event_id, .. } | Self::Redaction { event_id, .. } => Some(event_id),
_ => None,
}
}
/// Converts the current parent key into information about a sent media, if
+2
View File
@@ -30,6 +30,8 @@ All notable changes to this project will be documented in this file.
### Features
- Handle local echoes of redactions in the timeline.
([#6250](https://github.com/matrix-org/matrix-rust-sdk/pull/6250))
- [**breaking**] Remove support for `native-tls` and remove all feature
flags for selecting TLS backend, as `rustls` is the now the only supported
TLS backend.
@@ -127,7 +127,12 @@ pub(crate) enum AggregationKind {
},
/// An event has been redacted.
Redaction,
Redaction {
/// Whether this aggregation results from the local echo of a redaction.
/// Local echoes of redactions are applied reversibly whereas remote
/// echoes of redactions are applied irreversibly.
is_local: bool,
},
/// An event has been edited.
///
@@ -238,11 +243,15 @@ impl Aggregation {
}
}
AggregationKind::Redaction => {
if event.content().is_redacted() {
AggregationKind::Redaction { is_local } => {
let is_local_redacted =
event.content().is_redacted() && event.unredacted_item.is_some();
let is_remote_redacted =
event.content().is_redacted() && event.unredacted_item.is_none();
if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
ApplyAggregationResult::LeftItemIntact
} else {
let new_item = event.redact(&rules.redaction);
let new_item = event.redact(&rules.redaction, *is_local);
*event = Cow::Owned(new_item);
ApplyAggregationResult::UpdatedItem
}
@@ -353,9 +362,20 @@ impl Aggregation {
ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
}
AggregationKind::Redaction => {
// Redactions are not reversible.
ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
AggregationKind::Redaction { is_local } => {
if *is_local {
if event.unredacted_item.is_some() {
// Unapply local redaction.
*event = Cow::Owned(event.unredact());
ApplyAggregationResult::UpdatedItem
} else {
// Event isn't locally redacted. Nothing to do.
ApplyAggregationResult::LeftItemIntact
}
} else {
// Remote redactions are not reversible.
ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
}
}
AggregationKind::Reaction { key, sender, .. } => {
@@ -491,7 +511,7 @@ impl Aggregations {
pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
// If the aggregation is a redaction, it invalidates all the other aggregations;
// remove them.
if matches!(aggregation.kind, AggregationKind::Redaction) {
if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
for agg in self.related_events.remove(&related_to).unwrap_or_default() {
self.inverted_map.remove(&agg.own_id);
}
@@ -502,7 +522,7 @@ impl Aggregations {
if let Some(previous_aggregations) = self.related_events.get(&related_to)
&& previous_aggregations
.iter()
.any(|agg| matches!(agg.kind, AggregationKind::Redaction))
.any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
{
return;
}
@@ -718,12 +738,19 @@ impl Aggregations {
AggregationKind::PollResponse { .. }
| AggregationKind::PollEnd { .. }
| AggregationKind::Edit(..)
| AggregationKind::Redaction
| AggregationKind::BeaconUpdate { .. }
| AggregationKind::BeaconStop { .. } => {
// Nothing particular to do.
}
AggregationKind::Redaction { is_local } => {
// Mark the redaction as being remote and apply it (irreversibly).
*is_local = false;
let found = found.clone();
find_item_and_apply_aggregation(self, items, &target, found, rules);
}
AggregationKind::Reaction { reaction_status, .. } => {
// Mark the reaction as becoming remote, and signal that update to the
// caller.
@@ -1149,6 +1149,23 @@ impl<P: RoomDataProvider> TimelineController<P> {
LocalEchoContent::React { key, send_handle, applies_to } => {
self.handle_local_reaction(key, send_handle, applies_to).await;
}
LocalEchoContent::Redaction { redacts, send_error, .. } => {
self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
if let Some(send_error) = send_error {
self.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
send_error,
))),
is_recoverable: false,
},
)
.await;
}
}
}
}
@@ -1189,6 +1206,34 @@ impl<P: RoomDataProvider> TimelineController<P> {
tr.commit();
}
/// Applies a local echo of a redaction.
pub(super) async fn handle_local_redaction(
&self,
txn_id: OwnedTransactionId,
redacts: OwnedEventId,
) {
let mut state = self.state.write().await;
let mut tr = state.transaction();
let target = TimelineEventItemId::EventId(redacts);
let aggregation = Aggregation::new(
TimelineEventItemId::TransactionId(txn_id),
AggregationKind::Redaction { is_local: true },
);
tr.meta.aggregations.add(target.clone(), aggregation.clone());
find_item_and_apply_aggregation(
&tr.meta.aggregations,
&mut tr.items,
&target,
aggregation,
&tr.meta.room_version_rules,
);
tr.commit();
}
/// Handle a single room send queue update.
pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
match update {
@@ -852,8 +852,12 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
}
let target = TimelineEventItemId::EventId(redacted.clone());
let aggregation =
Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
let aggregation = Aggregation::new(
self.ctx.flow.timeline_item_id(),
AggregationKind::Redaction {
is_local: false, // We can only get here for remote echoes of redactions.
},
);
self.meta.aggregations.add(target.clone(), aggregation.clone());
find_item_and_apply_aggregation(
@@ -80,8 +80,14 @@ pub struct EventTimelineItem {
pub(super) forwarder_profile: Option<TimelineDetails<Profile>>,
/// The timestamp of the event.
pub(super) timestamp: MilliSecondsSinceUnixEpoch,
/// The content of the event.
/// The content of the event. Might be redacted if a redaction for this
/// event is currently being sent or has been received from the server.
pub(super) content: TimelineItemContent,
/// If a redaction for this event is currently being sent but the server
/// hasn't yet acknowledged it via its remote echo, the data
/// before redaction. This applies to all sorts of timeline items, including
/// state events. If no redaction is in flight, None.
pub(super) unredacted_item: Option<UnredactedEventTimelineItem>,
/// The kind of event timeline item, local or remote.
pub(super) kind: EventTimelineItemKind,
/// Whether or not the event belongs to an encrypted room.
@@ -118,6 +124,20 @@ pub(crate) enum TimelineItemHandle<'a> {
Local(&'a SendHandle),
}
/// A container for temporarily holding onto data that is going to be erased by
/// a redaction once the server plays it back.
#[derive(Clone, Debug)]
pub(super) struct UnredactedEventTimelineItem {
/// The original content before redaction.
content: TimelineItemContent,
/// JSON of the original event.
pub(crate) original_json: Option<Raw<AnySyncTimelineEvent>>,
/// JSON of the latest edit to this item.
pub(crate) latest_edit_json: Option<Raw<AnySyncTimelineEvent>>,
}
impl EventTimelineItem {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
@@ -137,6 +157,7 @@ impl EventTimelineItem {
forwarder_profile,
timestamp,
content,
unredacted_item: None,
kind,
is_room_encrypted,
}
@@ -508,7 +529,12 @@ impl EventTimelineItem {
}
/// Create a clone of the current item, with content that's been redacted.
pub(super) fn redact(&self, rules: &RedactionRules) -> Self {
pub(super) fn redact(&self, rules: &RedactionRules, is_local: bool) -> Self {
let unredacted_item = is_local.then(|| UnredactedEventTimelineItem {
content: self.content.clone(),
original_json: self.original_json().cloned(),
latest_edit_json: self.latest_edit_json().cloned(),
});
let content = self.content.redact(rules);
let kind = match &self.kind {
EventTimelineItemKind::Local(l) => EventTimelineItemKind::Local(l.clone()),
@@ -521,6 +547,35 @@ impl EventTimelineItem {
forwarder_profile: self.forwarder_profile.clone(),
timestamp: self.timestamp,
content,
unredacted_item,
kind,
is_room_encrypted: self.is_room_encrypted,
}
}
/// Create a clone of the current item, with data restored from the
/// item's unredacted_item field (if it was previously set by a call to
/// the `redact(...)` method).
pub(super) fn unredact(&self) -> Self {
let Some(unredacted_item) = &self.unredacted_item else { return self.clone() };
let kind = match &self.kind {
EventTimelineItemKind::Local(l) => EventTimelineItemKind::Local(l.clone()),
EventTimelineItemKind::Remote(r) => {
EventTimelineItemKind::Remote(RemoteEventTimelineItem {
original_json: unredacted_item.original_json.clone(),
latest_edit_json: unredacted_item.latest_edit_json.clone(),
..r.clone()
})
}
};
Self {
sender: self.sender.clone(),
sender_profile: self.sender_profile.clone(),
forwarder: self.forwarder.clone(),
forwarder_profile: self.forwarder_profile.clone(),
timestamp: self.timestamp,
content: unredacted_item.content.clone(),
unredacted_item: None,
kind,
is_room_encrypted: self.is_room_encrypted,
}
@@ -173,6 +173,12 @@ impl TestTimeline {
txn_id
}
async fn handle_local_redaction(&self, redacts: OwnedEventId) -> OwnedTransactionId {
let txn_id = TransactionId::new();
self.controller.handle_local_redaction(txn_id.clone(), redacts).await;
txn_id
}
async fn handle_back_paginated_event(&self, event: Raw<AnyTimelineEvent>) {
let timeline_event = TimelineEvent::from_plaintext(event.cast());
self.controller
@@ -23,13 +23,14 @@ use ruma::{
StateEventContentChange, reaction::RedactedReactionEventContent,
room::message::OriginalSyncRoomMessageEvent,
},
owned_event_id,
};
use stream_assert::{assert_next_matches, assert_pending};
use super::TestTimeline;
use crate::timeline::{
AnyOtherStateEventContentChange, TimelineDetails, TimelineItemContent,
event_item::RemoteEventOrigin,
event_item::{EventTimelineItemKind, RemoteEventOrigin, RemoteEventTimelineItem},
};
#[async_test]
@@ -203,3 +204,47 @@ async fn test_reaction_redaction_timeline_filter() {
assert_eq!(item.content().reactions().cloned().unwrap_or_default().len(), 0);
assert_eq!(timeline.controller.items().await.len(), 2);
}
#[async_test]
async fn test_local_and_remote_echo_of_redaction() {
let timeline = TestTimeline::new();
let mut stream = timeline.subscribe_events().await;
let f = &timeline.factory;
// Send a message.
let event_id = owned_event_id!("$1");
timeline
.handle_live_event(f.text_msg("Hello, world!").sender(&ALICE).event_id(&event_id))
.await;
let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value);
assert!(!item.content().is_redacted());
assert!(item.unredacted_item.is_none());
assert_let!(
EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind
);
assert!(original_json.is_some());
// Now redact the message. We first emit the local echo of the redaction event.
// The timeline event should be marked as being under redaction.
timeline.handle_local_redaction(event_id.clone()).await;
let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value);
assert!(item.content().is_redacted());
assert_let!(Some(unredacted_item) = item.unredacted_item);
assert!(unredacted_item.original_json.is_some());
assert_let!(
EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind
);
assert!(original_json.is_none());
// Then comes the remote echo of the redaction event. The timeline event should
// now be redacted.
timeline.handle_live_event(f.redaction(&event_id).sender(&ALICE)).await;
let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value);
assert!(item.content().is_redacted());
assert!(item.unredacted_item.is_none());
assert_let!(
EventTimelineItemKind::Remote(RemoteEventTimelineItem { original_json, .. }) = item.kind
);
assert!(original_json.is_none());
}
+4
View File
@@ -8,6 +8,10 @@ All notable changes to this project will be documented in this file.
### Features
- Enable sending redaction events through the send queue via `RoomSendQueue::redact`.
This includes local echoes for redaction events through the new `LocalEchoContent::Redaction`
variant.
([#6250](https://github.com/matrix-org/matrix-rust-sdk/pull/6250))
- [**breaking**] Remove support for `native-tls` and remove all feature
flags for selecting TLS backend, as `rustls` is the now the only supported
TLS backend.
@@ -313,6 +313,11 @@ async fn handle_thread_subscriber_send_queue_update(
// Nothing to do, reactions don't count as a thread
// subscription.
}
LocalEchoContent::Redaction { .. } => {
// Nothing to do, redactions don't count as a thread
// subscription.
}
}
return true;
}
@@ -210,6 +210,11 @@ impl Builder {
}
LocalEchoContent::React { .. } => None,
// TODO: Rework the latest event system to handle local redactions. This will
// require mixing the processing of local and remote events since a local redaction
// will target a previous remote event.
LocalEchoContent::Redaction { .. } => None,
},
// A local event has been cancelled before being sent.
@@ -1679,7 +1684,7 @@ mod builder_tests {
Client, Error,
send_queue::{
AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle,
SendReactionHandle,
SendReactionHandle, SendRedactionHandle,
},
test_utils::mocks::MatrixMockServer,
};
@@ -3641,4 +3646,108 @@ mod builder_tests {
);
assert!(buffer.buffer.is_empty());
}
#[async_test]
async fn test_local_redaction() {
let room_id = room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(room_id);
let event_id = event_id!("$ev0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
// Prelude.
{
// Create the room.
client.base_client().get_or_create_room(room_id, RoomState::Joined);
// Initialise the event cache store.
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event_factory.text_msg("hello").event_id(event_id).into()],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
let send_queue = client.send_queue();
let room = client.get_room(room_id).unwrap();
let room_send_queue = send_queue.for_room(room);
let mut buffer = BufferOfValuesForLocalEvents::new();
// We get a `Remote` because there is no `Local*` values!
assert_remote_value_matches_room_message_with_body!(
Builder::new_local(
// An update that won't create a new `LatestEventValue`: it maps
// to zero existing local value.
&RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid"),
event_id: event_id.to_owned(),
},
&mut buffer,
&room_event_cache,
None,
user_id,
None,
)
.await
=> with body = "hello"
);
// A local redaction of the latest event value is being sent
{
let transaction_id = OwnedTransactionId::from("txnid0");
let content = LocalEchoContent::Redaction {
redacts: event_id.to_owned(),
reason: Some("whatever".to_owned()),
send_handle: SendRedactionHandle::new(
room_send_queue.clone(),
transaction_id.clone(),
),
send_error: None,
};
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
// Local redactions are currently ignored.
assert_matches!(
Builder::new_local(
&update,
&mut buffer,
&room_event_cache,
Some(event_id.to_owned()),
user_id,
None
)
.await,
None
);
};
assert_eq!(buffer.buffer.len(), 0);
}
}
+223 -4
View File
@@ -130,6 +130,7 @@
use std::{
collections::{BTreeMap, HashMap},
ops::Not,
str::FromStr as _,
sync::{
Arc, RwLock,
@@ -161,7 +162,7 @@ use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
TransactionId,
events::{
AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, TimelineEventType,
reaction::ReactionEventContent,
relation::Annotation,
room::{
@@ -580,6 +581,59 @@ impl RoomSendQueue {
.await
}
/// Queues a redaction of another event for sending it to this room.
///
/// This immediately returns, and will push the redaction to be sent into a
/// queue, handled in the background.
///
/// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
/// the [`Self::subscribe()`] method to get updates about the sending of
/// that redaction.
///
/// By default, if sending failed on the first attempt, it will be retried a
/// few times. If sending failed after those retries, the entire
/// client's sending queue will be disabled, and it will need to be
/// manually re-enabled by the caller (e.g. after network is back, or when
/// something has been done about the faulty requests).
pub async fn redact(
&self,
redacts: OwnedEventId,
reason: Option<&str>,
) -> Result<SendRedactionHandle, RoomSendQueueError> {
let Some(room) = self.inner.room.get() else {
return Err(RoomSendQueueError::RoomDisappeared);
};
if room.state() != RoomState::Joined {
return Err(RoomSendQueueError::RoomNotJoined);
}
let request = QueuedRequestKind::Redaction {
redacts: redacts.clone(),
reason: reason.map(str::to_owned),
};
let created_at = MilliSecondsSinceUnixEpoch::now();
let transaction_id = self.inner.queue.push(request, created_at).await?;
trace!(%transaction_id, "manager sends a redaction event to the background task");
self.inner.notifier.notify_one();
let send_handle =
SendRedactionHandle { room: self.clone(), transaction_id: transaction_id.clone() };
self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id,
content: LocalEchoContent::Redaction {
redacts,
reason: reason.map(str::to_owned),
send_handle: send_handle.clone(),
send_error: None,
},
}));
Ok(send_handle)
}
/// Returns the current local requests as well as a receiver to listen to
/// the send queue updates, as defined in [`RoomSendQueueUpdate`].
///
@@ -796,9 +850,8 @@ impl RoomSendQueue {
}
};
// In case of an error, just log the error but not stop the Send
// Queue. This feature is not
// crucial.
// In case of an error, just log the error but don't stop the Send
// Queue. This feature is not crucial.
if let Some(timeline_event) = timeline_event
&& let Err(err) = room_event_cache
.insert_sent_event_from_send_queue(timeline_event)
@@ -839,6 +892,91 @@ impl RoomSendQueue {
progress,
});
}
SentRequestKey::Redaction { event_id, redacts, reason } => {
send_update(
&global_update_sender,
&update_sender,
room_id,
RoomSendQueueUpdate::SentEvent {
transaction_id: txn_id,
event_id: event_id.clone(),
},
);
// The redaction event has been sent to the server and the server has
// received it. It's safe to cache the event
// now to avoid any inconsistencies until the server
// sends down the remote echo via the sync.
if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
{
let content_field_redacts = room.version().is_some_and(|id| {
id.rules()
.is_some_and(|rules| rules.redaction.content_field_redacts)
});
let redacts = if content_field_redacts.not() {
format!("\"redacts\":\"{redacts}\",")
} else {
"".to_owned()
};
let reason = reason.map_or_else(
|| "".to_owned(),
|r| format!("\"reason\": \"{r}\""),
);
let content = if content_field_redacts {
format!("\"redacts\":\"{redacts}\",{reason}")
} else {
reason
};
let timeline_event = match Raw::from_json_string(
// Create a compact string: remove all useless spaces.
format!(
"{{\
{redacts}\
\"event_id\":\"{event_id}\",\
\"origin_server_ts\":{ts},\
\"sender\":\"{sender}\",\
\"type\":\"{type}\",\
\"content\":{{{content}}}\
}}",
redacts = redacts,
event_id = event_id,
ts = MilliSecondsSinceUnixEpoch::now().get(),
sender = room.client().user_id().expect("Client must be logged-in"),
type = TimelineEventType::RoomRedaction,
content = content
),
) {
Ok(event) => Some(TimelineEvent::from_plaintext(event)),
Err(err) => {
error!(
?err,
"Failed to build the (sync) redaction event before the saving in the Event Cache"
);
None
}
};
// In case of an error, just log the error but don't stop the Send
// Queue. This feature is not crucial.
if let Some(timeline_event) = timeline_event
&& let Err(err) = room_event_cache
.insert_sent_event_from_send_queue(timeline_event)
.await
{
error!(
?err,
"Failed to save the sent redaction event in the Event Cache"
);
}
} else {
info!(
"Cannot insert the sent redaction event in the Event Cache because \
either the room no longer exists, or the Room Event Cache cannot be retrieved"
);
}
}
},
Err(err) => {
@@ -1060,6 +1198,19 @@ impl RoomSendQueue {
}
}
}
QueuedRequestKind::Redaction { redacts, reason } => {
let result = room
.redact(&redacts, reason.as_deref(), Some(request.transaction_id.clone()))
.await?;
trace!(txn_id = %request.transaction_id, event_id = %result.event_id, "redaction successfully sent");
Ok((
Some(SentRequestKey::Redaction { event_id: result.event_id, redacts, reason }),
None,
))
}
}
}
@@ -1858,6 +2009,18 @@ impl QueueStorage {
// event represented as a dependent request should be sufficient.
return None;
}
QueuedRequestKind::Redaction { redacts, reason } => {
LocalEchoContent::Redaction {
redacts,
reason,
send_handle: SendRedactionHandle {
room: room.clone(),
transaction_id: queued.transaction_id,
},
send_error: queued.error,
}
}
},
})
});
@@ -2327,6 +2490,19 @@ pub enum LocalEchoContent {
/// The local echo which has been reacted to.
applies_to: OwnedTransactionId,
},
/// A local echo of a redaction event.
Redaction {
/// The ID of the redacted event.
redacts: OwnedEventId,
/// The reason for the event being redacted.
reason: Option<String>,
/// A handle to manipulate the sending of the associated event.
send_handle: SendRedactionHandle,
/// Whether trying to send this local echo failed in the past with an
/// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
send_error: Option<QueueWedgeError>,
},
}
/// A local representation for a request that hasn't been sent yet to the user's
@@ -2813,6 +2989,49 @@ impl SendReactionHandle {
}
}
/// A handle to manipulate a redaction event that was scheduled to be sent to a
/// room.
#[derive(Clone, Debug)]
pub struct SendRedactionHandle {
/// Link to the send queue used to send this request.
room: RoomSendQueue,
/// Transaction id used for the sent request.
transaction_id: OwnedTransactionId,
}
impl SendRedactionHandle {
/// Creates a new [`SendRedactionHandle`].
#[cfg(test)]
pub(crate) fn new(room: RoomSendQueue, transaction_id: OwnedTransactionId) -> Self {
Self { room, transaction_id }
}
/// Abort the sending of the redaction.
///
/// Will return true if the redaction could be aborted, false if it's been
/// sent (and there's no matching local echo anymore).
pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
trace!("received a redaction abort request");
let queue = &self.room.inner.queue;
if queue.cancel_event(&self.transaction_id).await? {
trace!("successful redaction abort");
// Propagate a cancelled update too.
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: self.transaction_id.clone(),
});
Ok(true)
} else {
debug!("local echo of redaction didn't exist anymore, can't abort");
Ok(false)
}
}
}
/// From a given source of [`DependentQueuedRequest`], return only the most
/// meaningful, i.e. the ones that wouldn't be overridden after applying the
/// others.
@@ -28,7 +28,8 @@ use ruma::events::room::message::GalleryItemType;
use ruma::{
MxcUri, OwnedEventId, OwnedTransactionId, TransactionId, event_id,
events::{
AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, Mentions,
MessageLikeEventContent as _,
poll::unstable_start::{
NewUnstablePollStartEventContent, UnstablePollAnswer, UnstablePollAnswers,
UnstablePollStartContentBlock, UnstablePollStartEventContent,
@@ -42,7 +43,7 @@ use ruma::{
},
},
},
mxc_uri, owned_mxc_uri, owned_user_id, room_id,
mxc_uri, owned_event_id, owned_mxc_uri, owned_user_id, room_id,
serde::Raw,
uint,
};
@@ -243,6 +244,27 @@ macro_rules! assert_update {
txn
}};
// Check the next stream event is a local echo for a redaction of an event with ID $redacts and reason $reason.
(($global_watch:ident, $watch:ident) => local echo redaction { redacts = $redacts:expr, reason = $reason:expr }) => {{
assert_let!(
Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
content: LocalEchoContent::Redaction {
redacts,
reason,
send_handle: _,
send_error: _,
},
transaction_id: txn,
}))) = timeout(Duration::from_secs(1), $watch.recv()).await
);
assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::NewLocalEvent { .. }, .. }));
assert_eq!(redacts, $redacts);
assert_eq!(reason, $reason);
txn
}};
// Check the next stream event is an edit event, and that the
// transaction id is the one we expect.
(($global_watch:ident, $watch:ident) => edit local echo { txn = $transaction_id:expr }) => {{
@@ -1890,6 +1912,94 @@ async fn test_reactions() {
assert!(watch.is_empty());
}
#[async_test]
async fn test_redaction() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
server.mock_room_state_encryption().plain().mount().await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
let queue = room.send_queue();
let mut global_watch = client.send_queue().subscribe();
let (local_echoes, mut watch) = queue.subscribe().await.unwrap();
// ----------------------
// Sanity check: the cache and queue are empty at the start.
assert!(events.is_empty());
assert!(local_echoes.is_empty());
assert!(watch.is_empty());
// ----------------------
// Send a message in the room.
let content = RoomMessageEventContent::text_plain("hello world");
let msg_event_id = owned_event_id!("$1");
server.mock_room_send().ok(msg_event_id.clone()).mock_once().mount().await;
queue.send(content.into()).await.unwrap();
// ----------------------
// Observe the local echo.
let (txn, _) = assert_update!((global_watch, watch) => local echo { body = "hello world" });
// ----------------------
// The event is sent, at some point.
assert_update!((global_watch, watch) => sent {
txn = txn,
event_id = msg_event_id
});
// ----------------------
// Observe the event getting added to the cache.
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents(up)) = stream.recv());
assert_eq!(up.diffs.len(), 1);
assert_let!(VectorDiff::Append { values } = &up.diffs[0]);
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref().unwrap(), msg_event_id);
// ----------------------
// Send a redaction for the event.
let redacts = msg_event_id.clone();
let reason = Some("whatever");
let redaction_event_id = owned_event_id!("$2");
server.mock_room_redact().ok(redaction_event_id.clone()).mount().await;
queue.redact(redacts.clone(), reason).await.expect("queuing the redaction works");
// ----------------------
// Observe the local echo.
let txn = assert_update!((global_watch, watch) => local echo redaction { redacts = redacts, reason = reason.map(str::to_owned) });
// ----------------------
// The redaction event is sent, at some point.
assert_update!((global_watch, watch) => sent {
txn = txn,
event_id = redaction_event_id
});
// ----------------------
// Observe the redaction getting applied in the cache.
assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents(up)) = stream.recv());
assert_eq!(up.diffs.len(), 2);
// The redaction event itself is added.
assert_let!(VectorDiff::Append { values } = &up.diffs[0]);
assert_eq!(values.len(), 1);
assert_eq!(values[0].event_id().as_deref().unwrap(), redaction_event_id);
// The target event is now redacted.
assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &up.diffs[1]);
let ev = redacted_event.raw().deserialize().unwrap();
assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev);
assert_matches!(ev.as_original(), None);
// That's all, folks!
assert!(watch.is_empty());
}
#[async_test]
async fn test_media_uploads() {
let mock = MatrixMockServer::new().await;