-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a helper for pausing a Stream on errors #50
base: master
Are you sure you want to change the base?
Conversation
Based on kube-rs/kube#720, but with generalizations for runtime-agnosticity, and and pin-project-lite.
pub(crate) fn rt_sleeper() -> impl Sleeper { | ||
AsyncStdSleeper | ||
} | ||
|
||
#[cfg(feature = "tokio")] | ||
fn rt_sleeper() -> impl Sleeper { | ||
pub(crate) fn rt_sleeper() -> impl Sleeper { | ||
TokioSleeper | ||
} | ||
|
||
#[cfg(feature = "tokio")] | ||
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] | ||
|
||
struct TokioSleeper; | ||
pub(crate) struct TokioSleeper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to use this in stream
@@ -218,6 +218,9 @@ pub mod exponential; | |||
#[cfg(feature = "futures")] | |||
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))] | |||
pub mod future; | |||
#[cfg(feature = "futures")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't create a new feature here, since this doesn't bring in any new (runtime) dependencies, but happy to change if prefer
tokio::time::pause(); | ||
let tick = Duration::from_secs(1); | ||
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); | ||
let rx = StreamBackoff::new(rx, crate::backoff::Constant::new(tick), TokioSleeper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to use Tokio specifically for these marble tests, since async-std doesn't support time travel (and, regardless, Sleeper
doesn't and probably shouldn't support that).
match this.state.as_mut().project() { | ||
StateProj::BackingOff { mut backoff_sleep } => match backoff_sleep.as_mut().poll(cx) { | ||
Poll::Ready(()) => { | ||
// tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these trace messages can be valuable for debugging (since backoff is bound to look weird to the user), but I didn't want to bring in new dependencies for this. On the other hand, Tokio already brings in the dependency so I'm not sure it'd be a huge problem to add.
use tokio_1 as tokio; | ||
|
||
#[tokio::test] | ||
async fn stream_should_back_off() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests depend on TokioSleeper
which requires the tokio
feature, so they'll fail to build with the feature set futures,!tokio
. Do we want to always build TokioSleeper
in tests?
} | ||
|
||
/// Dynamic backoff policy that is still deterministic and testable | ||
struct LinearBackoff { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth promoting this to crate::backoff
? Not sure I'd use it in production, but it's pretty nice to have a predictable and deterministic variant of ExponentialBackoff
when writing marble tests.
/// | ||
/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed. | ||
#[cfg(any(feature = "tokio", feature = "async-std"))] | ||
pub fn backoff<S: TryStream, B: Backoff>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this'd be an extension method on TryStream
, but that isn't allowed for impl Trait
return values.
Even when neither tokio nor async-std are enabled.
Based on kube-rs/kube#720, but with generalizations for runtime-agnosticity, and and pin-project-lite.
Not sure about whether this ultimately belongs here or in a utility crate, but figured it was worth raising the question regardless.