Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Improve RoomEventCacheUpdate #3471

Merged
36 changes: 28 additions & 8 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
use std::{collections::BTreeSet, sync::Arc};

use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{event_cache::RoomEventCacheUpdate, executor::spawn, Room};
use matrix_sdk::{
event_cache::{EventsOrigin, RoomEventCacheUpdate},
executor::spawn,
Room,
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
use tokio::sync::{broadcast, mpsc};
use tracing::{info, info_span, trace, warn, Instrument, Span};
Expand All @@ -27,7 +31,10 @@ use super::{
queue::send_queued_messages,
Error, Timeline, TimelineDropHandle, TimelineFocus,
};
use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
use crate::{
timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd},
unable_to_decrypt_hook::UtdHookManager,
};

/// Builder that allows creating and configuring various parts of a
/// [`Timeline`].
Expand Down Expand Up @@ -203,7 +210,7 @@ impl TimelineBuilder {
};

match update {
RoomEventCacheUpdate::UpdateReadMarker { event_id } => {
RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
trace!(target = %event_id, "Handling fully read marker.");
inner.handle_fully_read_marker(event_id).await;
}
Expand All @@ -220,13 +227,26 @@ impl TimelineBuilder {
inner.clear().await;
}

RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes } => {
trace!("Received new events from sync.");
RoomEventCacheUpdate::AddTimelineEvents { events, origin } => {
trace!("Received new timeline events.");

// TODO: (bnjbvr) ephemeral should be handled by the event cache, and
// we should replace this with a simple `add_events_at`.
inner.handle_sync_events(events, ephemeral).await;
inner.add_events_at(
events,
TimelineEnd::Back,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
}
).await;
}

RoomEventCacheUpdate::AddEphemeralEvents { events } => {
trace!("Received new ephemeral events from sync.");

// TODO: (bnjbvr) ephemeral should be handled by the event cache.
inner.handle_ephemeral_events(events).await;
}

RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
if !ambiguity_changes.is_empty() {
let member_ambiguity_changes = ambiguity_changes
.values()
Expand Down
7 changes: 3 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
}

pub(super) async fn handle_sync_events(
pub(super) async fn handle_ephemeral_events(
&self,
events: Vec<SyncTimelineEvent>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
) {
let mut state = self.state.write().await;
state.handle_sync_events(events, ephemeral, &self.room_data_provider, &self.settings).await;
state.handle_ephemeral_events(events, &self.room_data_provider).await;
}

#[cfg(test)]
Expand Down
43 changes: 17 additions & 26 deletions crates/matrix-sdk-ui/src/timeline/inner/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,37 +127,28 @@ impl TimelineInnerState {
}

#[instrument(skip_all)]
pub(super) async fn handle_sync_events<P: RoomDataProvider>(
pub(super) async fn handle_ephemeral_events<P: RoomDataProvider>(
&mut self,
events: Vec<SyncTimelineEvent>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
room_data_provider: &P,
settings: &TimelineInnerSettings,
) {
if events.is_empty() {
return;
}

let mut txn = self.transaction();

txn.add_events_at(
events,
TimelineEnd::Back,
RemoteEventOrigin::Sync,
room_data_provider,
settings,
)
.await;

if !ephemeral.is_empty() {
trace!("Handling ephemeral room events");
let own_user_id = room_data_provider.own_user_id();
for raw_event in ephemeral {
match raw_event.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => {
txn.handle_explicit_read_receipts(ev.content, own_user_id);
}
Ok(_) => {}
Err(e) => {
let event_type = raw_event.get_field::<String>("type").ok().flatten();
warn!(event_type, "Failed to deserialize ephemeral event: {e}");
}
trace!("Handling ephemeral room events");
let own_user_id = room_data_provider.own_user_id();
for raw_event in events {
match raw_event.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(ev)) => {
txn.handle_explicit_read_receipts(ev.content, own_user_id);
}
Ok(_) => {}
Err(e) => {
let event_type = raw_event.get_field::<String>("type").ok().flatten();
warn!(event_type, "Failed to deserialize ephemeral event: {e}");
}
}
}
Expand Down
91 changes: 63 additions & 28 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl RoomEventCacheInner {
handled_read_marker = true;

// Propagate to observers. (We ignore the error if there aren't any.)
let _ = self.sender.send(RoomEventCacheUpdate::UpdateReadMarker {
let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
event_id: ev.content.event_id,
});
}
Expand Down Expand Up @@ -549,7 +549,7 @@ impl RoomEventCacheInner {
async fn handle_timeline(
&self,
timeline: Timeline,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if timeline.limited {
Expand All @@ -561,7 +561,7 @@ impl RoomEventCacheInner {
self.replace_all_events_by(
timeline.events,
timeline.prev_batch,
ephemeral,
ephemeral_events,
ambiguity_changes,
)
.await?;
Expand All @@ -572,7 +572,7 @@ impl RoomEventCacheInner {
self.append_new_events(
timeline.events,
timeline.prev_batch,
ephemeral,
ephemeral_events,
ambiguity_changes,
)
.await?;
Expand All @@ -590,9 +590,9 @@ impl RoomEventCacheInner {
/// storage, notifying observers.
async fn replace_all_events_by(
&self,
events: Vec<SyncTimelineEvent>,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
// Acquire the lock.
Expand All @@ -607,9 +607,9 @@ impl RoomEventCacheInner {
// Push the new events.
self.append_events_locked_impl(
room_events,
events,
sync_timeline_events,
prev_batch,
ephemeral,
ephemeral_events,
ambiguity_changes,
)
.await
Expand All @@ -619,16 +619,16 @@ impl RoomEventCacheInner {
/// observers.
async fn append_new_events(
&self,
events: Vec<SyncTimelineEvent>,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
self.append_events_locked_impl(
self.events.write().await,
events,
sync_timeline_events,
prev_batch,
ephemeral,
ephemeral_events,
ambiguity_changes,
)
.await
Expand All @@ -642,14 +642,14 @@ impl RoomEventCacheInner {
async fn append_events_locked_impl(
&self,
mut room_events: RwLockWriteGuard<'_, RoomEvents>,
events: Vec<SyncTimelineEvent>,
sync_timeline_events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if events.is_empty()
if sync_timeline_events.is_empty()
&& prev_batch.is_none()
&& ephemeral.is_empty()
&& ephemeral_events.is_empty()
&& ambiguity_changes.is_empty()
{
return Ok(());
Expand All @@ -662,7 +662,7 @@ impl RoomEventCacheInner {
room_events.push_gap(Gap { prev_token: prev_token.clone() });
}

room_events.push_events(events.clone().into_iter());
room_events.push_events(sync_timeline_events.clone().into_iter());
}

// Now that all events have been added, we can trigger the
Expand All @@ -671,8 +671,25 @@ impl RoomEventCacheInner {
self.pagination.token_notifier.notify_one();
}

let _ =
self.sender.send(RoomEventCacheUpdate::Append { events, ephemeral, ambiguity_changes });
// The order of `RoomEventCacheUpdate`s is **really** important here.
{
if !sync_timeline_events.is_empty() {
let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents {
events: sync_timeline_events,
origin: EventsOrigin::Sync,
});
}

if !ephemeral_events.is_empty() {
let _ = self
.sender
.send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
}

if !ambiguity_changes.is_empty() {
let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
}
}

Ok(())
}
Expand Down Expand Up @@ -702,24 +719,42 @@ pub enum RoomEventCacheUpdate {
Clear,

/// The fully read marker has moved to a different event.
UpdateReadMarker {
MoveReadMarkerTo {
/// Event at which the read marker is now pointing.
event_id: OwnedEventId,
},

/// The room has new events.
Append {
/// All the new events that have been added to the room's timeline.
events: Vec<SyncTimelineEvent>,
/// XXX: this is temporary, until read receipts are handled in the event
/// cache
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
/// The members have changed.
UpdateMembers {
/// Collection of ambiguity changes that room member events trigger.
///
/// This is a map of event ID of the `m.room.member` event to the
/// details of the ambiguity change.
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
},

/// The room has received new timeline events.
AddTimelineEvents {
/// All the new events that have been added to the room's timeline.
events: Vec<SyncTimelineEvent>,

/// Where the events are coming from.
origin: EventsOrigin,
},

/// The room has received new ephemeral events.
AddEphemeralEvents {
/// XXX: this is temporary, until read receipts are handled in the event
/// cache
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
},
}

/// Indicate where events are coming from.
#[derive(Debug, Clone)]
pub enum EventsOrigin {
/// Events are coming from a sync.
Sync,
}

#[cfg(test)]
Expand Down Expand Up @@ -790,7 +825,7 @@ mod tests {
// … there's only one read marker update.
assert_matches!(
stream.recv().await.unwrap(),
RoomEventCacheUpdate::UpdateReadMarker { .. }
RoomEventCacheUpdate::MoveReadMarkerTo { .. }
);

assert!(stream.recv().now_or_never().is_none());
Expand Down
6 changes: 3 additions & 3 deletions crates/matrix-sdk/tests/integration/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_add_initial_events() {
.expect("should've received a room event cache update");

// Which contains the event that was sent beforehand.
assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "bonjour monde");

Expand All @@ -130,7 +130,7 @@ async fn test_add_initial_events() {
.await
.expect("timeout after receiving a sync update")
.expect("should've received a room event cache update");
assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "new choice!");

Expand Down Expand Up @@ -233,7 +233,7 @@ async fn test_ignored_unignored() {
.expect("timeout after receiving a sync update")
.expect("should've received a room event cache update");

assert_let!(RoomEventCacheUpdate::Append { events, .. } = update);
assert_let!(RoomEventCacheUpdate::AddTimelineEvents { events, .. } = update);
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "i don't like this dexter");

Expand Down
Loading