-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a helper for pausing a Stream on errors #50
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -218,6 +218,9 @@ pub mod exponential; | |
#[cfg(feature = "futures")] | ||
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))] | ||
pub mod future; | ||
#[cfg(feature = "futures")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't create a new feature here, since this doesn't bring in any new (runtime) dependencies, but happy to change if prefer |
||
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))] | ||
pub mod stream; | ||
|
||
mod retry; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
use std::{pin::Pin, task::Poll}; | ||
|
||
use futures_core::{Future, Stream, TryStream}; | ||
use pin_project_lite::pin_project; | ||
|
||
use crate::{ | ||
backoff::Backoff, | ||
future::{rt_sleeper, Sleeper}, | ||
}; | ||
|
||
/// Applies a [`Backoff`] policy to a [`Stream`] | ||
/// | ||
/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The | ||
/// [`Backoff`] is [`reset`](`Backoff::reset`) on any [`Ok`] value. | ||
/// | ||
/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed. | ||
#[cfg(any(feature = "tokio", feature = "async-std"))] | ||
pub fn backoff<S: TryStream, B: Backoff>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally this'd be an extension method on |
||
stream: S, | ||
backoff: B, | ||
) -> StreamBackoff<S, B, impl Sleeper> { | ||
StreamBackoff::new(stream, backoff, rt_sleeper()) | ||
} | ||
|
||
pin_project! { | ||
/// See [`backoff`] | ||
pub struct StreamBackoff<S, B, Sl: Sleeper> { | ||
#[pin] | ||
stream: S, | ||
backoff: B, | ||
sleeper: Sl, | ||
#[pin] | ||
state: State<Sl>, | ||
} | ||
} | ||
|
||
// It's expected to have relatively few but long-lived `StreamBackoff`s in a project, so we would rather have | ||
// cheaper sleeps than a smaller `StreamBackoff`. | ||
// #[allow(clippy::large_enum_variant)] | ||
pin_project! { | ||
#[project = StateProj] | ||
enum State<Sl: Sleeper> { | ||
BackingOff{ | ||
#[pin] | ||
backoff_sleep: Sl::Sleep, | ||
}, | ||
GivenUp, | ||
Awake, | ||
} | ||
} | ||
|
||
impl<S: TryStream, B: Backoff, Sl: Sleeper> StreamBackoff<S, B, Sl> { | ||
pub fn new(stream: S, backoff: B, sleeper: Sl) -> Self { | ||
Self { | ||
stream, | ||
backoff, | ||
sleeper, | ||
state: State::Awake, | ||
} | ||
} | ||
} | ||
|
||
impl<S: TryStream, B: Backoff, Sl: Sleeper> Stream for StreamBackoff<S, B, Sl> | ||
where | ||
Sl::Sleep: Future, | ||
{ | ||
type Item = Result<S::Ok, S::Error>; | ||
|
||
fn poll_next( | ||
self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> Poll<Option<Self::Item>> { | ||
let mut this = self.project(); | ||
match this.state.as_mut().project() { | ||
StateProj::BackingOff { mut backoff_sleep } => match backoff_sleep.as_mut().poll(cx) { | ||
Poll::Ready(()) => { | ||
// tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these trace messages can be valuable for debugging (since backoff is bound to look weird to the user), but I didn't want to bring in new dependencies for this. On the other hand, Tokio already brings in the dependency so I'm not sure it'd be a huge problem to add. |
||
this.state.set(State::Awake) | ||
} | ||
Poll::Pending => { | ||
// let deadline = backoff_sleep.deadline(); | ||
// tracing::trace!( | ||
// ?deadline, | ||
// remaining_duration = ?deadline.saturating_duration_since(Instant::now()), | ||
// "Still waiting for backoff sleep to complete" | ||
// ); | ||
return Poll::Pending; | ||
} | ||
}, | ||
StateProj::GivenUp => { | ||
// tracing::debug!("Backoff has given up, stream is closed"); | ||
return Poll::Ready(None); | ||
} | ||
StateProj::Awake => {} | ||
} | ||
|
||
let next_item = this.stream.try_poll_next(cx); | ||
match &next_item { | ||
Poll::Ready(Some(Err(_))) => { | ||
if let Some(backoff_duration) = this.backoff.next_backoff() { | ||
let backoff_sleep = this.sleeper.sleep(backoff_duration); | ||
// tracing::debug!( | ||
// deadline = ?backoff_sleep.deadline(), | ||
// duration = ?backoff_duration, | ||
// "Error received, backing off" | ||
// ); | ||
this.state.set(State::BackingOff { backoff_sleep }); | ||
} else { | ||
// tracing::debug!("Error received, giving up"); | ||
this.state.set(State::GivenUp); | ||
} | ||
} | ||
Poll::Ready(_) => { | ||
// tracing::trace!("Non-error received, resetting backoff"); | ||
this.backoff.reset(); | ||
} | ||
Poll::Pending => {} | ||
} | ||
next_item | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::StreamBackoff; | ||
use crate::{backoff::Backoff, future::TokioSleeper}; | ||
use futures_channel::mpsc; | ||
use futures_util::{pin_mut, poll, stream, StreamExt}; | ||
use std::{task::Poll, time::Duration}; | ||
use tokio_1 as tokio; | ||
|
||
#[tokio::test] | ||
async fn stream_should_back_off() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests depend on |
||
tokio::time::pause(); | ||
let tick = Duration::from_secs(1); | ||
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); | ||
let rx = StreamBackoff::new(rx, crate::backoff::Constant::new(tick), TokioSleeper); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to use Tokio specifically for these marble tests, since async-std doesn't support time travel (and, regardless, |
||
pin_mut!(rx); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
tokio::time::advance(tick * 2).await; | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(3)))); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4)))); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(None)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn backoff_time_should_update() { | ||
tokio::time::pause(); | ||
let (tx, rx) = mpsc::unbounded(); | ||
let rx = StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2)), TokioSleeper); | ||
pin_mut!(rx); | ||
tx.unbounded_send(Ok(0)).unwrap(); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); | ||
tx.unbounded_send(Ok(1)).unwrap(); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); | ||
tx.unbounded_send(Err(2)).unwrap(); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
tokio::time::advance(Duration::from_secs(3)).await; | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
tx.unbounded_send(Err(3)).unwrap(); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(3)))); | ||
tx.unbounded_send(Ok(4)).unwrap(); | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
tokio::time::advance(Duration::from_secs(3)).await; | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
tokio::time::advance(Duration::from_secs(2)).await; | ||
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4)))); | ||
assert_eq!(poll!(rx.next()), Poll::Pending); | ||
drop(tx); | ||
assert_eq!(poll!(rx.next()), Poll::Ready(None)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn backoff_should_close_when_requested() { | ||
assert_eq!( | ||
StreamBackoff::new( | ||
stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), | ||
crate::backoff::Stop {}, | ||
TokioSleeper | ||
) | ||
.collect::<Vec<_>>() | ||
.await, | ||
vec![Ok(0), Ok(1), Err(2)] | ||
); | ||
} | ||
|
||
/// Dynamic backoff policy that is still deterministic and testable | ||
struct LinearBackoff { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth promoting this to |
||
interval: Duration, | ||
current_duration: Duration, | ||
} | ||
|
||
impl LinearBackoff { | ||
fn new(interval: Duration) -> Self { | ||
Self { | ||
interval, | ||
current_duration: Duration::ZERO, | ||
} | ||
} | ||
} | ||
|
||
impl Backoff for LinearBackoff { | ||
fn next_backoff(&mut self) -> Option<Duration> { | ||
self.current_duration += self.interval; | ||
Some(self.current_duration) | ||
} | ||
|
||
fn reset(&mut self) { | ||
self.current_duration = Duration::ZERO | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to use this in
stream