From b85c81941422224ff0533efc7b4c4225df391377 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 3 Jun 2024 09:09:57 +0200 Subject: [PATCH 1/2] =?UTF-8?q?Reapply=20"feat(sdk):=20Add=20`RoomPaginati?= =?UTF-8?q?on::run=5Fbackwards(=E2=80=A6,=20until)`"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 392fd004d9986bfd0d757ba755638f3f5ab52d76. --- .../matrix-sdk-ui/src/timeline/pagination.rs | 77 ++++---- crates/matrix-sdk/src/event_cache/mod.rs | 2 +- .../matrix-sdk/src/event_cache/pagination.rs | 78 +++++++- .../tests/integration/event_cache.rs | 178 ++++++++++++++++-- 4 files changed, 276 insertions(+), 59 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/pagination.rs b/crates/matrix-sdk-ui/src/timeline/pagination.rs index 7d4595b2e88..dd18392f622 100644 --- a/crates/matrix-sdk-ui/src/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/src/timeline/pagination.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::ControlFlow; + use async_rx::StreamExt as _; use async_stream::stream; use futures_core::Stream; @@ -67,49 +69,44 @@ impl super::Timeline { pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result { let pagination = self.event_cache.pagination(); - loop { - let result = pagination.run_backwards(batch_size).await; - - let event_cache_outcome = match result { - Ok(outcome) => outcome, - - Err(EventCacheError::BackpaginationError( - PaginatorError::InvalidPreviousState { - actual: PaginatorState::Paginating, .. - }, - )) => { - warn!("Another pagination request is already happening, returning early"); - return Ok(false); - } - - Err(err) => return Err(err), - }; - - let BackPaginationOutcome { events, reached_start } = event_cache_outcome; - - let num_events = events.len(); - trace!("Back-pagination succeeded with {num_events} events"); - - self.inner - .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) - .await; - - if reached_start { - return Ok(true); + let result = pagination + .run_backwards( + batch_size, + |BackPaginationOutcome { events, reached_start }, + _timeline_has_been_reset| async move { + let num_events = events.len(); + trace!("Back-pagination succeeded with {num_events} events"); + + // TODO(hywan): Remove, and let spread events via + // `matrix_sdk::event_cache::RoomEventCacheUpdate` from + // `matrix_sdk::event_cache::RoomPagination::run_backwards`. + self.inner + .add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination) + .await; + + if num_events == 0 && !reached_start { + // As an exceptional contract: if there were no events in the response, + // and we've not hit the start of the timeline, retry until we get + // some events or reach the start of the timeline. + return ControlFlow::Continue(()); + } + + ControlFlow::Break(reached_start) + }, + ) + .await; + + match result { + Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState { + actual: PaginatorState::Paginating, + .. + })) => { + warn!("Another pagination request is already happening, returning early"); + Ok(false) } - if num_events == 0 { - // As an exceptional contract: if there were no events in the response, - // and we've not hit the start of the timeline, retry until we get - // some events or reach the start of the timeline. - continue; - } - - // Exit the inner loop, and ask for another limit. - break; + result => result, } - - Ok(false) } /// Subscribe to the back-pagination status of a live timeline. diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index b4349737e61..e59d4be3955 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -76,7 +76,7 @@ mod pagination; mod store; pub mod paginator; -pub use pagination::RoomPagination; +pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating}; /// An error observed in the [`EventCache`]. #[derive(thiserror::Error, Debug)] diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 9477dada861..2d178e6555d 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -14,7 +14,7 @@ //! A sub-object for running pagination tasks on a given room. -use std::{sync::Arc, time::Duration}; +use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration}; use eyeball::Subscriber; use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; @@ -59,17 +59,75 @@ impl RoomPagination { /// This automatically takes care of waiting for a pagination token from /// sync, if we haven't done that before. /// + /// The `until` argument is an async closure that returns a [`ControlFlow`] + /// to decide whether a new pagination must be run or not. It's helpful when + /// the server replies with e.g. a certain set of events, but we would like + /// more, or the event we are looking for isn't part of this set: in this + /// case, `until` returns [`Control::Continue`], otherwise it returns + /// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as + /// its sole argument. + /// /// # Errors /// /// It may return an error if the pagination token used during /// back-pagination has disappeared while we started the pagination. In /// that case, it's desirable to call the method again. - #[instrument(skip(self))] - pub async fn run_backwards(&self, batch_size: u16) -> Result { + /// + /// # Example + /// + /// To do a single run: + /// + /// ```rust + /// use std::ops::ControlFlow; + /// + /// use matrix_sdk::event_cache::{ + /// BackPaginationOutcome, + /// RoomPagination, + /// TimelineHasBeenResetWhilePaginating + /// }; + /// + /// # async fn foo(room_pagination: RoomPagination) { + /// let result = room_pagination.run_backwards( + /// 42, + /// |BackPaginationOutcome { events, reached_start }, + /// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move { + /// // Do something with `events` and `reached_start` maybe? + /// let _ = events; + /// let _ = reached_start; + /// + /// ControlFlow::Break(()) + /// } + /// ).await; + /// # } + #[instrument(skip(self, until))] + pub async fn run_backwards( + &self, + batch_size: u16, + mut until: Until, + ) -> Result + where + Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture, + UntilFuture: Future>, + { + let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; + loop { - if let Some(result) = self.run_backwards_impl(batch_size).await? { - return Ok(result); + if let Some(outcome) = self.run_backwards_impl(batch_size).await? { + match until(outcome, timeline_has_been_reset).await { + ControlFlow::Continue(()) => { + trace!("back-pagination continues"); + + timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No; + + continue; + } + + ControlFlow::Break(value) => return Ok(value), + } } + + timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes; + debug!("back-pagination has been internally restarted because of a timeline reset."); } } @@ -259,6 +317,16 @@ impl RoomPagination { } } +/// A type representing whether the timeline has been reset. +#[derive(Debug)] +pub enum TimelineHasBeenResetWhilePaginating { + /// The timeline has been reset. + Yes, + + /// The timeline has not been reset. + No, +} + #[cfg(test)] mod tests { // Those tests require time to work, and it does not on wasm32. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 5d6124f474a..a3e3891f007 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1,8 +1,11 @@ -use std::time::Duration; +use std::{future::ready, ops::ControlFlow, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use matrix_sdk::{ - event_cache::{BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate}, + event_cache::{ + BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, + TimelineHasBeenResetWhilePaginating, + }, test_utils::{assert_event_matches_msg, events::EventFactory, logged_in_client_with_server}, }; use matrix_sdk_test::{ @@ -24,6 +27,13 @@ use wiremock::{ use crate::mock_sync; +async fn once( + outcome: BackPaginationOutcome, + _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating, +) -> ControlFlow { + ControlFlow::Break(outcome) +} + #[async_test] async fn test_must_explicitly_subscribe() { let (client, server) = logged_in_client_with_server().await; @@ -362,7 +372,7 @@ async fn test_backpaginate_once() { assert!(pagination.get_or_wait_for_token().await.is_some()); - pagination.run_backwards(20).await.unwrap() + pagination.run_backwards(20, once).await.unwrap() }; // I'll get all the previous events, in "reverse" order (same as the response). @@ -377,7 +387,7 @@ async fn test_backpaginate_once() { } #[async_test] -async fn test_backpaginate_multiple_iterations() { +async fn test_backpaginate_many_times_with_many_iterations() { let (client, server) = logged_in_client_with_server().await; let event_cache = client.event_cache(); @@ -425,6 +435,7 @@ async fn test_backpaginate_multiple_iterations() { } let mut num_iterations = 0; + let mut num_paginations = 0; let mut global_events = Vec::new(); let mut global_reached_start = false; @@ -449,19 +460,149 @@ async fn test_backpaginate_multiple_iterations() { // Then if I backpaginate in a loop, let pagination = room_event_cache.pagination(); while pagination.get_or_wait_for_token().await.is_some() { - let BackPaginationOutcome { reached_start, events } = - pagination.run_backwards(20).await.unwrap(); + pagination + .run_backwards(20, |outcome, timeline_has_been_reset| { + num_paginations += 1; - if !global_reached_start { - global_reached_start = reached_start; - } - global_events.extend(events); + assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); + + if !global_reached_start { + global_reached_start = outcome.reached_start; + } + + global_events.extend(outcome.events); + + ready(ControlFlow::Break(())) + }) + .await + .unwrap(); num_iterations += 1; } // I'll get all the previous events, - assert_eq!(num_iterations, 2); + assert_eq!(num_iterations, 2); // in two iterations… + assert_eq!(num_paginations, 2); // … we get two paginations. + assert!(global_reached_start); + + assert_event_matches_msg(&global_events[0], "world"); + assert_event_matches_msg(&global_events[1], "hello"); + assert_event_matches_msg(&global_events[2], "oh well"); + assert_eq!(global_events.len(), 3); + + // And next time I'll open the room, I'll get the events in the right order. + let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + + assert_event_matches_msg(&events[0], "oh well"); + assert_event_matches_msg(&events[1], "hello"); + assert_event_matches_msg(&events[2], "world"); + assert_event_matches_msg(&events[3], "heyo"); + assert_eq!(events.len(), 4); + + assert!(room_stream.is_empty()); +} + +#[async_test] +async fn test_backpaginate_many_times_with_one_iteration() { + let (client, server) = logged_in_client_with_server().await; + + let event_cache = client.event_cache(); + + // Immediately subscribe the event cache to sync updates. + event_cache.subscribe().unwrap(); + + // If I sync and get informed I've joined The Room, and get a previous batch + // token, + let room_id = room_id!("!omelette:fromage.fr"); + + let event_builder = EventBuilder::new(); + let mut sync_builder = SyncResponseBuilder::new(); + + { + sync_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + // Note to self: a timeline must have at least single event to be properly + // serialized. + .add_timeline_event(event_builder.make_sync_message_event( + user_id!("@a:b.c"), + RoomMessageEventContent::text_plain("heyo"), + )) + .set_timeline_prev_batch("prev_batch".to_owned()), + ); + let response_body = sync_builder.build_json_sync_response(); + + mock_sync(&server, response_body, None).await; + client.sync_once(Default::default()).await.unwrap(); + server.reset().await; + } + + let (room_event_cache, _drop_handles) = + client.get_room(room_id).unwrap().event_cache().await.unwrap(); + + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + + // This is racy: either the initial message has been processed by the event + // cache (and no room updates will happen in this case), or it hasn't, and + // the stream will return the next message soon. + if events.is_empty() { + let _ = room_stream.recv().await.expect("read error"); + } else { + assert_eq!(events.len(), 1); + } + + let mut num_iterations = 0; + let mut num_paginations = 0; + let mut global_events = Vec::new(); + let mut global_reached_start = false; + + // The first back-pagination will return these two. + mock_messages( + &server, + "prev_batch", + Some("prev_batch2"), + non_sync_events!(event_builder, [ (room_id, "$2": "world"), (room_id, "$3": "hello") ]), + ) + .await; + + // The second round of back-pagination will return this one. + mock_messages( + &server, + "prev_batch2", + None, + non_sync_events!(event_builder, [ (room_id, "$4": "oh well"), ]), + ) + .await; + + // Then if I backpaginate in a loop, + let pagination = room_event_cache.pagination(); + while pagination.get_or_wait_for_token().await.is_some() { + pagination + .run_backwards(20, |outcome, timeline_has_been_reset| { + num_paginations += 1; + + assert_matches!(timeline_has_been_reset, TimelineHasBeenResetWhilePaginating::No); + + if !global_reached_start { + global_reached_start = outcome.reached_start; + } + + global_events.extend(outcome.events); + + ready(if outcome.reached_start { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + }) + }) + .await + .unwrap(); + + num_iterations += 1; + } + + // I'll get all the previous events, + assert_eq!(num_iterations, 1); // in one iteration… + assert_eq!(num_paginations, 2); // … we get two paginations! assert!(global_reached_start); assert_event_matches_msg(&global_events[0], "world"); @@ -586,7 +727,18 @@ async fn test_reset_while_backpaginating() { let backpagination = spawn({ let pagination = room_event_cache.pagination(); - async move { pagination.run_backwards(20).await } + async move { + pagination + .run_backwards(20, |outcome, timeline_has_been_reset| { + assert_matches!( + timeline_has_been_reset, + TimelineHasBeenResetWhilePaginating::Yes + ); + + ready(ControlFlow::Break(outcome)) + }) + .await + } }); // Receive the sync response (which clears the timeline). @@ -656,7 +808,7 @@ async fn test_backpaginating_without_token() { // If we try to back-paginate with a token, it will hit the end of the timeline // and give us the resulting event. let BackPaginationOutcome { events, reached_start } = - pagination.run_backwards(20).await.unwrap(); + pagination.run_backwards(20, once).await.unwrap(); assert!(reached_start); From 59924ea03e2097eb4c5a7ee500b5488749bc3cb0 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 3 Jun 2024 09:11:47 +0200 Subject: [PATCH 2/2] doc(sdk): Fix `Control` to `ControlFlow`. --- crates/matrix-sdk/src/event_cache/pagination.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 2d178e6555d..def8063c7d7 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -63,7 +63,7 @@ impl RoomPagination { /// to decide whether a new pagination must be run or not. It's helpful when /// the server replies with e.g. a certain set of events, but we would like /// more, or the event we are looking for isn't part of this set: in this - /// case, `until` returns [`Control::Continue`], otherwise it returns + /// case, `until` returns [`ControlFlow::Continue`], otherwise it returns /// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as /// its sole argument. ///