Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a helper for pausing a Stream on errors #50

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

nightkr
Copy link
Contributor

@nightkr nightkr commented Nov 22, 2021

Based on kube-rs/kube#720, but with generalizations for runtime-agnosticity, and and pin-project-lite.

Not sure about whether this ultimately belongs here or in a utility crate, but figured it was worth raising the question regardless.

Based on kube-rs/kube#720, but with generalizations
for runtime-agnosticity, and and pin-project-lite.
Comment on lines +206 to +218
pub(crate) fn rt_sleeper() -> impl Sleeper {
AsyncStdSleeper
}

#[cfg(feature = "tokio")]
fn rt_sleeper() -> impl Sleeper {
pub(crate) fn rt_sleeper() -> impl Sleeper {
TokioSleeper
}

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]

struct TokioSleeper;
pub(crate) struct TokioSleeper;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use this in stream

@@ -218,6 +218,9 @@ pub mod exponential;
#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
pub mod future;
#[cfg(feature = "futures")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't create a new feature here, since this doesn't bring in any new (runtime) dependencies, but happy to change if prefer

tokio::time::pause();
let tick = Duration::from_secs(1);
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let rx = StreamBackoff::new(rx, crate::backoff::Constant::new(tick), TokioSleeper);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use Tokio specifically for these marble tests, since async-std doesn't support time travel (and, regardless, Sleeper doesn't and probably shouldn't support that).

match this.state.as_mut().project() {
StateProj::BackingOff { mut backoff_sleep } => match backoff_sleep.as_mut().poll(cx) {
Poll::Ready(()) => {
// tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these trace messages can be valuable for debugging (since backoff is bound to look weird to the user), but I didn't want to bring in new dependencies for this. On the other hand, Tokio already brings in the dependency so I'm not sure it'd be a huge problem to add.

use tokio_1 as tokio;

#[tokio::test]
async fn stream_should_back_off() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests depend on TokioSleeper which requires the tokio feature, so they'll fail to build with the feature set futures,!tokio. Do we want to always build TokioSleeper in tests?

}

/// Dynamic backoff policy that is still deterministic and testable
struct LinearBackoff {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth promoting this to crate::backoff? Not sure I'd use it in production, but it's pretty nice to have a predictable and deterministic variant of ExponentialBackoff when writing marble tests.

///
/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed.
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub fn backoff<S: TryStream, B: Backoff>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this'd be an extension method on TryStream, but that isn't allowed for impl Trait return values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant