Skip to content

Commit

Permalink
Retry middleware improvements for non-cloneable requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Mindaugas Vinkelis committed Aug 21, 2024
1 parent b2c48b4 commit d61109e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
23 changes: 7 additions & 16 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pin_project! {
P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
request: P::CloneableRequest,
#[pin]
retry: Retry<P, S>,
#[pin]
Expand Down Expand Up @@ -49,7 +49,7 @@ where
S: Service<Request>,
{
pub(crate) fn new(
request: Option<Request>,
request: P::CloneableRequest,
retry: Retry<P, S>,
future: S::Future,
) -> ResponseFuture<P, S, Request> {
Expand All @@ -75,16 +75,11 @@ where
match this.state.as_mut().project() {
StateProj::Called { future } => {
let mut result = ready!(future.poll(cx));
if let Some(req) = &mut this.request {
match this.retry.policy.retry(req, &mut result) {
Some(waiting) => {
this.state.set(State::Waiting { waiting });
}
None => return Poll::Ready(result),
match this.retry.policy.retry(this.request, &mut result) {
Some(waiting) => {
this.state.set(State::Waiting { waiting });
}
} else {
// request wasn't cloned, so no way to retry it
return Poll::Ready(result);
None => return Poll::Ready(result),
}
}
StateProj::Waiting { waiting } => {
Expand All @@ -105,11 +100,7 @@ where
// in Ready to make it Unpin so that we can get &mut Ready as needed to call
// poll_ready on it.
ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
let req = this
.request
.take()
.expect("retrying requires cloned request");
*this.request = this.retry.policy.clone_request(&req);
let req = this.retry.policy.clone_request(this.request);
this.state.set(State::Called {
future: this.retry.as_mut().project().service.call(req),
});
Expand Down
7 changes: 4 additions & 3 deletions tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ where
}

fn call(&mut self, request: Request) -> Self::Future {
let cloned = self.policy.clone_request(&request);
let future = self.service.call(request);
let cloneable = self.policy.create_cloneable_request(request);
let req = self.policy.clone_request(&cloneable);
let future = self.service.call(req);

ResponseFuture::new(cloned, self.clone(), future)
ResponseFuture::new(cloneable, self.clone(), future)
}
}
28 changes: 21 additions & 7 deletions tower/src/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub trait Policy<Req, Res, E> {
/// The [`Future`] type returned by [`Policy::retry`].
type Future: Future<Output = ()>;

/// A type that is able to store request object, that can be
/// cloned back to original request.
type CloneableRequest;

/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
Expand Down Expand Up @@ -80,15 +84,25 @@ pub trait Policy<Req, Res, E> {
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
fn retry(
&mut self,
req: &mut Self::CloneableRequest,
result: &mut Result<Res, E>,
) -> Option<Self::Future>;

/// Tries to clone a request before being passed to the inner service.
///
/// If the request cannot be cloned, return [`None`]. Moreover, the retry
/// function will not be called if the [`None`] is returned.
fn clone_request(&mut self, req: &Req) -> Option<Req>;
/// Consume initial request and returns `CloneableRequest` which
/// will be used to recreate original request objects for each retry.
/// This is essential in cases where original request cannot be cloned,
/// but can only be consumed.
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest;

/// Recreates original request object for each retry.
fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req;
}

// Ensure `Policy` is object safe
#[cfg(test)]
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
fn _obj_safe(
_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>, CloneableRequest = ()>>,
) {
}

0 comments on commit d61109e

Please sign in to comment.