Skip to content

Commit

Permalink
loop over poll_next until Read() or pending
Browse files Browse the repository at this point in the history
  • Loading branch information
cameronvoell committed Feb 7, 2025
1 parent cb8ee07 commit 0dbd514
Showing 1 changed file with 44 additions and 34 deletions.
78 changes: 44 additions & 34 deletions xmtp_mls/src/subscriptions/stream_conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,45 +200,55 @@ where
use std::task::Poll::*;
use ProcessProject::*;

let this = self.as_mut().project();
let state = this.state.project();

match state {
Waiting => {
match this.inner.poll_next(cx) {
Ready(Some(item)) => {
let mut this = self.as_mut().project();
let future = ProcessWelcomeFuture::new(
this.known_welcome_ids.clone(),
this.client.clone(),
item?,
*this.conversation_type,
)?;

this.state.set(ProcessState::Processing {
future: FutureWrapper::new(future.process()),
});
// try to process the future immediately
// this will return immediately if we have already processed the welcome
// and it exists in the db
let Processing { future } = this.state.project() else {
unreachable!()
};
let poll = future.poll(cx);
self.as_mut().try_process(poll, cx)
loop {
let this = self.as_mut().project();
let state = this.state.project();

match state {
Waiting => {
match this.inner.poll_next(cx) {
Ready(Some(item)) => {
let mut this = self.as_mut().project();
let future = ProcessWelcomeFuture::new(
this.known_welcome_ids.clone(),
this.client.clone(),
item?,
*this.conversation_type,
)?;

this.state.set(ProcessState::Processing {
future: FutureWrapper::new(future.process()),
});

let Processing { future } = this.state.project() else {
unreachable!()
};
let poll = future.poll(cx);

// Instead of immediately returning, we handle the result:
match self.as_mut().try_process(poll, cx) {
// If we got Ready(None) or Ready(Some(_)), return it
Ready(result) => return Ready(result),
// If Pending, continue the loop to try the next item
Pending => continue,
}
}
Ready(None) => return Ready(None),
Pending => return Pending,
}
}
Processing { future } => {
let poll = future.poll(cx);
match self.as_mut().try_process(poll, cx) {
// If we got Ready(None) or Ready(Some(_)), return it
Ready(result) => return Ready(result),
// If Pending, continue processing
Pending => return Pending,
}
// stream ended
Ready(None) => Ready(None),
Pending => Pending,
}
}
Processing { future } => {
let poll = future.poll(cx);
self.as_mut().try_process(poll, cx)
}
}
}
}

impl<'a, C, Subscription> StreamConversations<'a, C, Subscription>
where
Expand Down

0 comments on commit 0dbd514

Please sign in to comment.