Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Add RoomPagination::run_backwards(…, until), take 2 #3491

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions crates/matrix-sdk-ui/src/timeline/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::ControlFlow;

use async_rx::StreamExt as _;
use async_stream::stream;
use futures_core::Stream;
Expand Down Expand Up @@ -67,49 +69,44 @@ impl super::Timeline {
pub async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
let pagination = self.event_cache.pagination();

loop {
let result = pagination.run_backwards(batch_size).await;

let event_cache_outcome = match result {
Ok(outcome) => outcome,

Err(EventCacheError::BackpaginationError(
PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating, ..
},
)) => {
warn!("Another pagination request is already happening, returning early");
return Ok(false);
}

Err(err) => return Err(err),
};

let BackPaginationOutcome { events, reached_start } = event_cache_outcome;

let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");

self.inner
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
.await;

if reached_start {
return Ok(true);
let result = pagination
.run_backwards(
batch_size,
|BackPaginationOutcome { events, reached_start },
_timeline_has_been_reset| async move {
let num_events = events.len();
trace!("Back-pagination succeeded with {num_events} events");

// TODO(hywan): Remove, and let spread events via
// `matrix_sdk::event_cache::RoomEventCacheUpdate` from
// `matrix_sdk::event_cache::RoomPagination::run_backwards`.
self.inner
.add_events_at(events, TimelineEnd::Front, RemoteEventOrigin::Pagination)
.await;

if num_events == 0 && !reached_start {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, retry until we get
// some events or reach the start of the timeline.
return ControlFlow::Continue(());
}

ControlFlow::Break(reached_start)
},
)
.await;

match result {
Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState {
actual: PaginatorState::Paginating,
..
})) => {
warn!("Another pagination request is already happening, returning early");
Ok(false)
}

if num_events == 0 {
// As an exceptional contract: if there were no events in the response,
// and we've not hit the start of the timeline, retry until we get
// some events or reach the start of the timeline.
continue;
}

// Exit the inner loop, and ask for another limit.
break;
result => result,
}

Ok(false)
}

/// Subscribe to the back-pagination status of a live timeline.
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ mod pagination;
mod store;

pub mod paginator;
pub use pagination::RoomPagination;
pub use pagination::{RoomPagination, TimelineHasBeenResetWhilePaginating};

/// An error observed in the [`EventCache`].
#[derive(thiserror::Error, Debug)]
Expand Down
78 changes: 73 additions & 5 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! A sub-object for running pagination tasks on a given room.

use std::{sync::Arc, time::Duration};
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};

use eyeball::Subscriber;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
Expand Down Expand Up @@ -59,17 +59,75 @@ impl RoomPagination {
/// This automatically takes care of waiting for a pagination token from
/// sync, if we haven't done that before.
///
/// The `until` argument is an async closure that returns a [`ControlFlow`]
/// to decide whether a new pagination must be run or not. It's helpful when
/// the server replies with e.g. a certain set of events, but we would like
/// more, or the event we are looking for isn't part of this set: in this
/// case, `until` returns [`ControlFlow::Continue`], otherwise it returns
/// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as
/// its sole argument.
///
/// # Errors
///
/// It may return an error if the pagination token used during
/// back-pagination has disappeared while we started the pagination. In
/// that case, it's desirable to call the method again.
#[instrument(skip(self))]
pub async fn run_backwards(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
///
/// # Example
///
/// To do a single run:
///
/// ```rust
/// use std::ops::ControlFlow;
///
/// use matrix_sdk::event_cache::{
/// BackPaginationOutcome,
/// RoomPagination,
/// TimelineHasBeenResetWhilePaginating
/// };
///
/// # async fn foo(room_pagination: RoomPagination) {
/// let result = room_pagination.run_backwards(
/// 42,
/// |BackPaginationOutcome { events, reached_start },
/// _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move {
/// // Do something with `events` and `reached_start` maybe?
/// let _ = events;
/// let _ = reached_start;
///
/// ControlFlow::Break(())
/// }
/// ).await;
/// # }
#[instrument(skip(self, until))]
pub async fn run_backwards<Until, Break, UntilFuture>(
&self,
batch_size: u16,
mut until: Until,
) -> Result<Break>
where
Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture,
UntilFuture: Future<Output = ControlFlow<Break, ()>>,
{
let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;

loop {
if let Some(result) = self.run_backwards_impl(batch_size).await? {
return Ok(result);
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
match until(outcome, timeline_has_been_reset).await {
ControlFlow::Continue(()) => {
trace!("back-pagination continues");

timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;

continue;
}

ControlFlow::Break(value) => return Ok(value),
}
}

timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes;

debug!("back-pagination has been internally restarted because of a timeline reset.");
}
}
Expand Down Expand Up @@ -259,6 +317,16 @@ impl RoomPagination {
}
}

/// A type representing whether the timeline has been reset.
#[derive(Debug)]
pub enum TimelineHasBeenResetWhilePaginating {
/// The timeline has been reset.
Yes,

/// The timeline has not been reset.
No,
}

#[cfg(test)]
mod tests {
// Those tests require time to work, and it does not on wasm32.
Expand Down
Loading
Loading