From f2838978172acb645a905d51e4f8511dda329290 Mon Sep 17 00:00:00 2001 From: Jack Rickard Date: Sat, 29 Jan 2022 11:50:43 +0000 Subject: [PATCH] Try out tokens --- guides/building-a-middleware-from-scratch.md | 19 +-- tower-layer/src/layer_fn.rs | 4 +- tower-layer/src/lib.rs | 4 +- tower-service/src/lib.rs | 38 +++--- tower-test/src/mock/mod.rs | 45 ++++--- tower-test/src/mock/spawn.rs | 6 +- tower-test/tests/mock.rs | 9 +- tower/src/balance/p2c/make.rs | 7 +- tower/src/balance/p2c/service.rs | 7 +- tower/src/balance/p2c/test.rs | 4 +- tower/src/balance/pool/mod.rs | 22 ++-- tower/src/balance/pool/test.rs | 34 +++--- tower/src/buffer/service.rs | 31 ++--- tower/src/buffer/worker.rs | 4 +- tower/src/builder/mod.rs | 8 +- tower/src/filter/future.rs | 18 +-- tower/src/filter/mod.rs | 14 ++- tower/src/hedge/delay.rs | 5 +- tower/src/hedge/latency.rs | 7 +- tower/src/hedge/mod.rs | 54 ++++----- tower/src/hedge/select.rs | 54 ++++++--- tower/src/limit/concurrency/service.rs | 50 +++----- tower/src/limit/rate/service.rs | 7 +- tower/src/load/constant.rs | 7 +- tower/src/load/peak_ewma.rs | 14 ++- tower/src/load/pending_requests.rs | 18 +-- tower/src/load_shed/mod.rs | 30 +++-- tower/src/make/make_connection.rs | 13 ++- tower/src/make/make_service.rs | 27 +++-- tower/src/make/make_service/shared.rs | 25 ++-- tower/src/ready_cache/cache.rs | 24 ++-- tower/src/reconnect/mod.rs | 35 +++--- tower/src/retry/future.rs | 4 +- tower/src/retry/mod.rs | 7 +- tower/src/spawn_ready/make.rs | 7 +- tower/src/spawn_ready/service.rs | 7 +- tower/src/steer/mod.rs | 72 +++++++----- tower/src/timeout/mod.rs | 7 +- tower/src/util/and_then.rs | 7 +- tower/src/util/boxed/layer.rs | 3 +- tower/src/util/boxed/sync.rs | 24 +++- tower/src/util/boxed/unsync.rs | 39 +++++-- tower/src/util/boxed_clone.rs | 50 +++++--- tower/src/util/call_all/common.rs | 4 +- tower/src/util/call_all/ordered.rs | 4 +- tower/src/util/either.rs | 20 ++-- tower/src/util/future_service.rs | 12 +- tower/src/util/map_err.rs | 7 +- tower/src/util/map_future.rs | 7 +- tower/src/util/map_request.rs | 7 +- tower/src/util/map_response.rs | 7 +- tower/src/util/map_result.rs | 7 +- tower/src/util/map_token.rs | 117 +++++++++++++++++++ tower/src/util/mod.rs | 60 ++++++---- tower/src/util/oneshot.rs | 4 +- tower/src/util/optional/mod.rs | 16 ++- tower/src/util/service_fn.rs | 3 +- tower/src/util/then.rs | 7 +- tower/tests/balance/main.rs | 6 +- tower/tests/steer/main.rs | 2 +- tower/tests/support.rs | 2 +- tower/tests/util/call_all.rs | 4 +- 62 files changed, 710 insertions(+), 457 deletions(-) create mode 100644 tower/src/util/map_token.rs diff --git a/guides/building-a-middleware-from-scratch.md b/guides/building-a-middleware-from-scratch.md index 28191e62a..07890c538 100644 --- a/guides/building-a-middleware-from-scratch.md +++ b/guides/building-a-middleware-from-scratch.md @@ -71,15 +71,16 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Our middleware doesn't care about backpressure so its ready as long // as the inner service is ready. self.inner.poll_ready(cx) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { self.inner.call(request) } } @@ -102,7 +103,7 @@ Creating both futures is done like this: ```rust use tokio::time::sleep; -fn call(&mut self, request: Request) -> Self::Future { +fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let response_future = self.inner.call(request); // This variable has type `tokio::time::Sleep`. @@ -151,11 +152,11 @@ where // Use our new `ResponseFuture` type. type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let response_future = self.inner.call(request); let sleep = tokio::time::sleep(self.timeout); @@ -502,12 +503,12 @@ where type Error = BoxError; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Have to map the error type here as well. self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let response_future = self.inner.call(request); let sleep = tokio::time::sleep(self.timeout); @@ -559,11 +560,11 @@ where type Error = BoxError; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let response_future = self.inner.call(request); let sleep = tokio::time::sleep(self.timeout); diff --git a/tower-layer/src/layer_fn.rs b/tower-layer/src/layer_fn.rs index 06f6e0e35..37be7dcfe 100644 --- a/tower-layer/src/layer_fn.rs +++ b/tower-layer/src/layer_fn.rs @@ -33,11 +33,11 @@ use std::fmt; /// type Error = S::Error; /// type Future = S::Future; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// self.service.poll_ready(cx) /// } /// -/// fn call(&mut self, request: Request) -> Self::Future { +/// fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { /// // Log the request /// println!("request = {:?}, target = {:?}", request, self.target); /// diff --git a/tower-layer/src/lib.rs b/tower-layer/src/lib.rs index 218737550..e2fa47c20 100644 --- a/tower-layer/src/lib.rs +++ b/tower-layer/src/lib.rs @@ -74,11 +74,11 @@ pub use self::{ /// type Error = S::Error; /// type Future = S::Future; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// self.service.poll_ready(cx) /// } /// -/// fn call(&mut self, request: Request) -> Self::Future { +/// fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { /// // Insert log statement here or other functionality /// println!("request = {:?}, target = {:?}", request, self.target); /// self.service.call(request) diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index c2f531e1a..83d82cbdb 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -13,6 +13,7 @@ //! request / response clients and servers. It is simple but powerful and is //! used as the foundation for the rest of Tower. +use std::any::Any; use std::future::Future; use std::task::{Context, Poll}; @@ -56,11 +57,11 @@ use std::task::{Context, Poll}; /// type Error = http::Error; /// type Future = Pin>>>; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// -/// fn call(&mut self, req: Request>) -> Self::Future { +/// fn call(&mut self, token: Self::Token, req: Request>) -> Self::Future { /// // create the body /// let body: Vec = "hello, world!\n" /// .as_bytes() @@ -166,13 +167,13 @@ use std::task::{Context, Poll}; /// type Error = Box; /// type Future = Pin>>>; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// // Our timeout service is ready if the inner service is ready. /// // This is how backpressure can be propagated through a tree of nested services. /// self.inner.poll_ready(cx).map_err(Into::into) /// } /// -/// fn call(&mut self, req: Request) -> Self::Future { +/// fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { /// // Create a future that completes after `self.timeout` /// let timeout = tokio::time::sleep(self.timeout); /// @@ -259,11 +260,11 @@ use std::task::{Context, Poll}; /// type Error = S::Error; /// type Future = Pin>>>; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// -/// fn call(&mut self, req: R) -> Self::Future { +/// fn call(&mut self, token: Self::Token, req: R) -> Self::Future { /// let mut inner = self.inner.clone(); /// Box::pin(async move { /// // `inner` might not be ready since its a clone @@ -294,11 +295,11 @@ use std::task::{Context, Poll}; /// type Error = S::Error; /// type Future = Pin>>>; /// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// -/// fn call(&mut self, req: R) -> Self::Future { +/// fn call(&mut self, token: Self::Token, req: R) -> Self::Future { /// let clone = self.inner.clone(); /// // take the service that was ready /// let mut inner = std::mem::replace(&mut self.inner, clone); @@ -315,6 +316,9 @@ pub trait Service { /// Errors produced by the service. type Error; + /// A token that allows you to `call` once. + type Token; + /// The future response value. type Future: Future>; @@ -331,7 +335,7 @@ pub trait Service { /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a request may be dispatched to the /// service using `call`. Until a request is dispatched, repeated calls to /// `poll_ready` must return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; /// Process the request and return the response asynchronously. /// @@ -345,7 +349,7 @@ pub trait Service { /// /// Implementations are permitted to panic if `call` is invoked without /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. - fn call(&mut self, req: Request) -> Self::Future; + fn call(&mut self, token: Self::Token, req: Request) -> Self::Future; } impl<'a, S, Request> Service for &'a mut S @@ -354,14 +358,15 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { (**self).poll_ready(cx) } - fn call(&mut self, request: Request) -> S::Future { - (**self).call(request) + fn call(&mut self, token: Self::Token, request: Request) -> S::Future { + (**self).call(token, request) } } @@ -371,13 +376,14 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { (**self).poll_ready(cx) } - fn call(&mut self, request: Request) -> S::Future { - (**self).call(request) + fn call(&mut self, token: Self::Token, request: Request) -> S::Future { + (**self).call(token, request) } } diff --git a/tower-test/src/mock/mod.rs b/tower-test/src/mock/mod.rs index 6720c994e..2c7a81451 100644 --- a/tower-test/src/mock/mod.rs +++ b/tower-test/src/mock/mod.rs @@ -57,7 +57,6 @@ pub struct Mock { id: u64, tx: Mutex>, state: Arc>, - can_send: bool, } /// Handle to the `Mock`. @@ -107,7 +106,6 @@ pub fn pair() -> (Mock, Handle) { id: 0, tx, state: state.clone(), - can_send: false, }; let handle = Handle { rx, state }; @@ -115,12 +113,25 @@ pub fn pair() -> (Mock, Handle) { (mock, handle) } +#[derive(Debug)] +pub struct Token(Arc>); + +impl Drop for Token { + fn drop(&mut self) { + // TODO: Should probably avoid aborting if we're already panicking + let mut state = self.0.lock().unwrap(); + // Give back a call, as this was dropped without calling call. + state.rem += 1; + } +} + impl Service for Mock { type Response = U; type Error = Error; + type Token = Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut state = self.state.lock().unwrap(); if state.is_closed { @@ -131,17 +142,10 @@ impl Service for Mock { return Poll::Ready(Err(e)); } - if self.can_send { - return Poll::Ready(Ok(())); - } - if state.rem > 0 { assert!(!state.tasks.contains_key(&self.id)); - - // Returning `Ready` means the next call to `call` must succeed. - self.can_send = true; - - Poll::Ready(Ok(())) + state.rem -= 1; + Poll::Ready(Ok(Token(self.state.clone()))) } else { // Bit weird... but whatevz *state @@ -153,24 +157,16 @@ impl Service for Mock { } } - fn call(&mut self, request: T) -> Self::Future { + fn call(&mut self, token: Token, request: T) -> Self::Future { // Make sure that the service has capacity - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if state.is_closed { return ResponseFuture::closed(); } - if !self.can_send { - panic!("service not ready; poll_ready must be called first"); - } - - self.can_send = false; - - // Decrement the number of remaining requests that can be sent - if state.rem > 0 { - state.rem -= 1; - } + // "Use up" the token so that rem stays decremented. + std::mem::forget(token); let (tx, rx) = oneshot::channel(); let send_response = SendResponse { tx }; @@ -204,7 +200,6 @@ impl Clone for Mock { id, tx, state: self.state.clone(), - can_send: false, } } } diff --git a/tower-test/src/mock/spawn.rs b/tower-test/src/mock/spawn.rs index 32d23132c..c431faf59 100644 --- a/tower-test/src/mock/spawn.rs +++ b/tower-test/src/mock/spawn.rs @@ -31,7 +31,7 @@ impl Spawn { } /// Poll this service ready. - pub fn poll_ready(&mut self) -> Poll> + pub fn poll_ready(&mut self) -> Poll> where T: Service, { @@ -42,11 +42,11 @@ impl Spawn { } /// Call the inner Service. - pub fn call(&mut self, req: Request) -> T::Future + pub fn call(&mut self, token: T::Token, req: Request) -> T::Future where T: Service, { - self.inner.call(req) + self.inner.call(token, req) } /// Get the inner service. diff --git a/tower-test/tests/mock.rs b/tower-test/tests/mock.rs index 0c6e73554..ddfeffa56 100644 --- a/tower-test/tests/mock.rs +++ b/tower-test/tests/mock.rs @@ -7,9 +7,9 @@ async fn single_request_ready() { assert_pending!(handle.poll_request()); - assert_ready!(service.poll_ready()).unwrap(); + let token = assert_ready!(service.poll_ready()).unwrap(); - let response = service.call("hello"); + let response = service.call(token, "hello"); assert_request_eq!(handle, "hello").send_response("world"); @@ -17,13 +17,10 @@ async fn single_request_ready() { } #[tokio::test(flavor = "current_thread")] -#[should_panic] async fn backpressure() { - let (mut service, mut handle) = mock::spawn::<_, ()>(); + let (mut service, mut handle) = mock::spawn::<&'static str, ()>(); handle.allow(0); assert_pending!(service.poll_ready()); - - service.call("hello").await.unwrap(); } diff --git a/tower/src/balance/p2c/make.rs b/tower/src/balance/p2c/make.rs index 538d70360..ff17004e8 100644 --- a/tower/src/balance/p2c/make.rs +++ b/tower/src/balance/p2c/make.rs @@ -72,15 +72,16 @@ where { type Response = Balance; type Error = S::Error; + type Token = S::Token; type Future = MakeFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, target: Target) -> Self::Future { + fn call(&mut self, token: Self::Token, target: Target) -> Self::Future { MakeFuture { - inner: self.inner.call(target), + inner: self.inner.call(token, target), _marker: PhantomData, } } diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index f85d2419c..cb3f46323 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -33,6 +33,7 @@ pub struct Balance where D: Discover, D::Key: Hash, + D::Service: Service, { discover: D, @@ -48,7 +49,7 @@ impl fmt::Debug for Balance where D: fmt::Debug, D::Key: Hash + fmt::Debug, - D::Service: fmt::Debug, + D::Service: Service + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Balance") @@ -235,7 +236,7 @@ where fn(>::Error) -> crate::BoxError, >; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // `ready_index` may have already been set by a prior invocation. These // updates cannot disturb the order of existing ready services. let _ = self.update_pending_from_discover(cx)?; @@ -279,7 +280,7 @@ where } } - fn call(&mut self, request: Req) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Req) -> Self::Future { let index = self.ready_index.take().expect("called before ready"); self.services .call_ready_index(index, request) diff --git a/tower/src/balance/p2c/test.rs b/tower/src/balance/p2c/test.rs index 404e94ff8..9cf5246b9 100644 --- a/tower/src/balance/p2c/test.rs +++ b/tower/src/balance/p2c/test.rs @@ -32,9 +32,9 @@ async fn single_endpoint() { ); handle.allow(1); - assert_ready_ok!(svc.poll_ready()); + let token = assert_ready_ok!(svc.poll_ready()); - let mut fut = task::spawn(svc.call(())); + let mut fut = task::spawn(svc.call(token, ())); assert_request_eq!(handle, ()).send_response(1); diff --git a/tower/src/balance/pool/mod.rs b/tower/src/balance/pool/mod.rs index ab43936fe..beeb9cc82 100644 --- a/tower/src/balance/pool/mod.rs +++ b/tower/src/balance/pool/mod.rs @@ -100,10 +100,10 @@ where } if this.services.is_empty() && this.making.is_none() { - let _ = ready!(this.maker.poll_ready(cx))?; + let token = ready!(this.maker.poll_ready(cx))?; tracing::trace!("construct initial pool connection"); this.making - .set(Some(this.maker.make_service(this.target.clone()))); + .set(Some(this.maker.make_service(token, this.target.clone()))); } if let Level::High = this.load { @@ -120,11 +120,11 @@ where pool.services = this.services.len(), message = "decided to add service to loaded pool" ); - ready!(this.maker.poll_ready(cx))?; + let token = ready!(this.maker.poll_ready(cx))?; tracing::trace!("making new service"); // TODO: it'd be great if we could avoid the clone here and use, say, &Target this.making - .set(Some(this.maker.make_service(this.target.clone()))); + .set(Some(this.maker.make_service(token, this.target.clone()))); } } @@ -361,9 +361,10 @@ where { type Response = , Req> as Service>::Response; type Error = , Req> as Service>::Error; + type Token = , Req> as Service>::Token; type Future = , Req> as Service>::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(()) = self.balance.poll_ready(cx)? { // services was ready -- there are enough services // update ewma with a 0 sample @@ -418,8 +419,8 @@ where Poll::Pending } - fn call(&mut self, req: Req) -> Self::Future { - self.balance.call(req) + fn call(&mut self, token: Self::Token, req: Req) -> Self::Future { + self.balance.call(token, req) } } @@ -447,13 +448,14 @@ impl Load for DropNotifyService { impl> Service for DropNotifyService { type Response = Svc::Response; type Future = Svc::Future; + type Token = Svc::Token; type Error = Svc::Error; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.svc.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { - self.svc.call(req) + fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { + self.svc.call(token, req) } } diff --git a/tower/src/balance/pool/test.rs b/tower/src/balance/pool/test.rs index 6861b25a9..74df5d293 100644 --- a/tower/src/balance/pool/test.rs +++ b/tower/src/balance/pool/test.rs @@ -19,10 +19,10 @@ async fn basic() { pin_mut!(svc1); assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // send a request to the one backing service - let mut fut = task::spawn(pool.call(())); + let mut fut = task::spawn(pool.call(token, ())); assert_pending!(fut.poll()); assert_request_eq!(svc1, ()).send_response("foobar"); @@ -49,10 +49,10 @@ async fn high_load() { svc1.allow(1); assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // make the one backing service not ready - let mut fut1 = task::spawn(pool.call(())); + let mut fut1 = task::spawn(pool.call(token, ())); // if we poll_ready again, pool should notice that load is increasing // since urgency == 1.0, it should immediately enter high load @@ -65,8 +65,8 @@ async fn high_load() { assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); // the pool should now be ready again for one more request - assert_ready_ok!(pool.poll_ready()); - let mut fut2 = task::spawn(pool.call(())); + let token = assert_ready_ok!(pool.poll_ready()); + let mut fut2 = task::spawn(pool.call(token, ())); assert_pending!(pool.poll_ready()); @@ -101,10 +101,10 @@ async fn low_load() { svc1.allow(1); assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // cycling a request should now work - let mut fut = task::spawn(pool.call(())); + let mut fut = task::spawn(pool.call(token, ())); assert_request_eq!(svc1, ()).send_response("foo"); assert_eq!(assert_ready_ok!(fut.poll()), "foo"); @@ -120,20 +120,20 @@ async fn low_load() { // pool is now ready // which (because of urgency == 1.0) should immediately cause it to drop a service // it'll drop svc1, so it'll still be ready - assert_ready_ok!(pool.poll_ready()); + let _ = assert_ready_ok!(pool.poll_ready()); // and even with another ready, it won't drop svc2 since its now the only service - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // cycling a request should now work on svc2 - let mut fut = task::spawn(pool.call(())); + let mut fut = task::spawn(pool.call(token, ())); assert_request_eq!(svc2, ()).send_response("foo"); assert_eq!(assert_ready_ok!(fut.poll()), "foo"); // and again (still svc2) svc2.allow(1); - assert_ready_ok!(pool.poll_ready()); - let mut fut = task::spawn(pool.call(())); + let token = assert_ready_ok!(pool.poll_ready()); + let mut fut = task::spawn(pool.call(token, ())); assert_request_eq!(svc2, ()).send_response("foo"); assert_eq!(assert_ready_ok!(fut.poll()), "foo"); @@ -160,10 +160,10 @@ async fn failing_service() { svc1.allow(1); assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // one request-response cycle - let mut fut = task::spawn(pool.call(())); + let mut fut = task::spawn(pool.call(token, ())); assert_request_eq!(svc1, ()).send_response("foo"); assert_eq!(assert_ready_ok!(fut.poll()), "foo"); @@ -181,9 +181,9 @@ async fn failing_service() { assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); // the pool should now be ready again - assert_ready_ok!(pool.poll_ready()); + let token = assert_ready_ok!(pool.poll_ready()); // and a cycle should work (and go through svc2) - let mut fut = task::spawn(pool.call(())); + let mut fut = task::spawn(pool.call(token, ())); assert_request_eq!(svc2, ()).send_response("bar"); assert_eq!(assert_ready_ok!(fut.poll()), "bar"); diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index 9b690f0f6..a595f5bb7 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -32,10 +32,6 @@ where // own bounded MPSC on top of the unbounded channel, using a semaphore to // limit how many items are in the channel. semaphore: PollSemaphore, - // The current semaphore permit, if one has been acquired. - // - // This is acquired in `poll_ready` and taken in `call`. - permit: Option, handle: Handle, } @@ -95,7 +91,6 @@ where tx, handle, semaphore: PollSemaphore::new(semaphore), - permit: None, }; (buffer, worker) } @@ -105,6 +100,9 @@ where } } +#[derive(Debug)] +pub struct Token(OwnedSemaphorePermit); + impl Service for Buffer where T: Service, @@ -112,38 +110,28 @@ where { type Response = T::Response; type Error = crate::BoxError; + type Token = Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // First, check if the worker is still alive. if self.tx.is_closed() { // If the inner service has errored, then we error here. return Poll::Ready(Err(self.get_worker_error())); } - // Then, check if we've already acquired a permit. - if self.permit.is_some() { - // We've already reserved capacity to send a request. We're ready! - return Poll::Ready(Ok(())); - } - // Finally, if we haven't already acquired a permit, poll the semaphore // to acquire one. If we acquire a permit, then there's enough buffer // capacity to send a new request. Otherwise, we need to wait for // capacity. let permit = ready!(self.semaphore.poll_acquire(cx)).ok_or_else(|| self.get_worker_error())?; - self.permit = Some(permit); - Poll::Ready(Ok(())) + Poll::Ready(Ok(Token(permit))) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { tracing::trace!("sending request to buffer worker"); - let _permit = self - .permit - .take() - .expect("buffer full; poll_ready must be called first"); // get the current Span so that we can explicitly propagate it to the worker // if we didn't do this, events on the worker related to this span wouldn't be counted @@ -158,7 +146,7 @@ where request, span, tx, - _permit, + _permit: token.0, }) { Err(_) => ResponseFuture::failed(self.get_worker_error()), Ok(_) => ResponseFuture::new(rx), @@ -175,9 +163,6 @@ where tx: self.tx.clone(), handle: self.handle.clone(), semaphore: self.semaphore.clone(), - // The new clone hasn't acquired a permit yet. It will when it's - // next polled ready. - permit: None, } } } diff --git a/tower/src/buffer/worker.rs b/tower/src/buffer/worker.rs index fe7ea555c..6c50d30de 100644 --- a/tower/src/buffer/worker.rs +++ b/tower/src/buffer/worker.rs @@ -193,9 +193,9 @@ where message = "worker received request; waiting for service readiness" ); match self.service.poll_ready(cx) { - Poll::Ready(Ok(())) => { + Poll::Ready(Ok(token)) => { tracing::debug!(service.ready = true, message = "processing request"); - let response = self.service.call(msg.request); + let response = self.service.call(token, msg.request); // Send the response future back to the sender. // diff --git a/tower/src/builder/mod.rs b/tower/src/builder/mod.rs index 33684137d..f204f97c6 100644 --- a/tower/src/builder/mod.rs +++ b/tower/src/builder/mod.rs @@ -3,7 +3,7 @@ use tower_layer::{Identity, Layer, Stack}; use tower_service::Service; -use std::fmt; +use std::{any::Any, fmt}; /// Declaratively construct [`Service`] values. /// @@ -654,12 +654,12 @@ impl ServiceBuilder { /// type Error = Error; /// type Future = futures_util::future::Ready>; /// - /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// // ... /// # todo!() /// } /// - /// fn call(&mut self, request: Request) -> Self::Future { + /// fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { /// // ... /// # todo!() /// } @@ -740,6 +740,7 @@ impl ServiceBuilder { where L: Layer, L::Service: Service + Send + 'static, + >::Token: Any + Send + 'static, >::Future: Send + 'static, { self.layer(crate::util::BoxService::layer()) @@ -796,6 +797,7 @@ impl ServiceBuilder { R, >::Response, >::Error, + >::Token, >, >, L, diff --git a/tower/src/filter/future.rs b/tower/src/filter/future.rs index 67772bbec..0ebcd15b6 100644 --- a/tower/src/filter/future.rs +++ b/tower/src/filter/future.rs @@ -22,7 +22,7 @@ pin_project! { S: Service, { #[pin] - state: State, + state: State, // Inner service service: S, @@ -43,11 +43,12 @@ opaque_future! { pin_project! { #[project = StateProj] #[derive(Debug)] - enum State { + enum State { /// Waiting for the predicate future Check { #[pin] - check: F + check: F, + token: Option, }, /// Waiting for the response future WaitResponse { @@ -63,9 +64,12 @@ where S: Service, S::Error: Into, { - pub(crate) fn new(check: P::Future, service: S) -> Self { + pub(crate) fn new(check: P::Future, service: S, token: S::Token) -> Self { Self { - state: State::Check { check }, + state: State::Check { + check, + token: Some(token), + }, service, } } @@ -84,9 +88,9 @@ where loop { match this.state.as_mut().project() { - StateProj::Check { mut check } => { + StateProj::Check { mut check, token } => { let request = ready!(check.as_mut().poll(cx))?; - let response = this.service.call(request); + let response = this.service.call(token.take().unwrap(), request); this.state.set(State::WaitResponse { response }); } StateProj::WaitResponse { response } => { diff --git a/tower/src/filter/mod.rs b/tower/src/filter/mod.rs index 9ed7f63d7..17423d5e6 100644 --- a/tower/src/filter/mod.rs +++ b/tower/src/filter/mod.rs @@ -105,15 +105,16 @@ where { type Response = T::Response; type Error = BoxError; + type Token = T::Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { ResponseFuture::new(match self.predicate.check(request) { - Ok(request) => Either::Right(self.inner.call(request).err_into()), + Ok(request) => Either::Right(self.inner.call(token, request).err_into()), Err(e) => Either::Left(futures_util::future::ready(Err(e.into()))), }) } @@ -167,13 +168,14 @@ where { type Response = T::Response; type Error = BoxError; + type Token = T::Token; type Future = AsyncResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { use std::mem; let inner = self.inner.clone(); @@ -186,6 +188,6 @@ where // Check the request let check = self.predicate.check(request); - AsyncResponseFuture::new(check, inner) + AsyncResponseFuture::new(check, inner, token) } } diff --git a/tower/src/hedge/delay.rs b/tower/src/hedge/delay.rs index 3d634bfaa..65b110eac 100644 --- a/tower/src/hedge/delay.rs +++ b/tower/src/hedge/delay.rs @@ -80,16 +80,17 @@ where { type Response = S::Response; type Error = crate::BoxError; + type Token = (); type Future = ResponseFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { // Calling self.service.poll_ready would reserve a slot for the delayed request, // potentially well in advance of actually making it. Instead, signal readiness here and // treat the service as a Oneshot in the future. Poll::Ready(Ok(())) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let delay = self.policy.delay(&request); ResponseFuture { service: Some(self.service.clone()), diff --git a/tower/src/hedge/latency.rs b/tower/src/hedge/latency.rs index 5f99642ba..fa9c5842e 100644 --- a/tower/src/hedge/latency.rs +++ b/tower/src/hedge/latency.rs @@ -55,17 +55,18 @@ where { type Response = S::Response; type Error = crate::BoxError; + type Token = S::Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { ResponseFuture { start: Instant::now(), rec: self.rec.clone(), - inner: self.service.call(request), + inner: self.service.call(token, request), } } } diff --git a/tower/src/hedge/mod.rs b/tower/src/hedge/mod.rs index 3cd152e7e..ee92a827c 100644 --- a/tower/src/hedge/mod.rs +++ b/tower/src/hedge/mod.rs @@ -25,7 +25,8 @@ use rotating_histogram::RotatingHistogram; use select::Select; type Histo = Arc>; -type Service = select::Select< +type Service = select::Select< + R, SelectPolicy

, Latency, Delay, PolicyPredicate

>>, @@ -35,7 +36,11 @@ type Service = select::Select< /// for longer than a given latency percentile. If either of the original /// future or the retry future completes, that value is used. #[derive(Debug)] -pub struct Hedge(Service); +pub struct Hedge(Service) +where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone; pin_project! { /// The [`Future`] returned by the [`Hedge`] service. @@ -82,39 +87,34 @@ pub struct SelectPolicy

{ min_data_points: u64, } -impl Hedge { +impl Hedge +where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, +{ /// Create a new hedge middleware. - pub fn new( + pub fn new( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, - ) -> Hedge - where - S: tower_service::Service + Clone, - S::Error: Into, - P: Policy + Clone, - { + ) -> Self { let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) } /// A hedge middleware with a prepopulated latency histogram. This is usedful /// for integration tests. - pub fn new_with_mock_latencies( + pub fn new_with_mock_latencies( service: S, policy: P, min_data_points: u64, latency_percentile: f32, period: Duration, latencies_ms: &[u64], - ) -> Hedge - where - S: tower_service::Service + Clone, - S::Error: Into, - P: Policy + Clone, - { + ) -> Self { let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); { let mut locked = histo.lock().unwrap(); @@ -125,18 +125,13 @@ impl Hedge { Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) } - fn new_with_histo( + fn new_with_histo( service: S, policy: P, min_data_points: u64, latency_percentile: f32, histo: Histo, - ) -> Hedge - where - S: tower_service::Service + Clone, - S::Error: Into, - P: Policy + Clone, - { + ) -> Self { // Clone the underlying service and wrap both copies in a middleware that // records the latencies in a rotating histogram. let recorded_a = Latency::new(histo.clone(), service.clone()); @@ -164,7 +159,7 @@ impl Hedge { } } -impl tower_service::Service for Hedge +impl tower_service::Service for Hedge where S: tower_service::Service + Clone, S::Error: Into, @@ -172,15 +167,16 @@ where { type Response = S::Response; type Error = crate::BoxError; - type Future = Future, Request>; + type Token = S::Token; + type Future = Future, Request>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { Future { - inner: self.0.call(request), + inner: self.0.call(token, request), } } } diff --git a/tower/src/hedge/select.rs b/tower/src/hedge/select.rs index 5d1573c08..187a0ab60 100644 --- a/tower/src/hedge/select.rs +++ b/tower/src/hedge/select.rs @@ -17,10 +17,12 @@ pub trait Policy { /// the cloned request to the B service. Both resulting futures will be polled /// and whichever future completes first will be used as the result. #[derive(Debug)] -pub struct Select { +pub struct Select, B: Service> { policy: P, a: A, b: B, + a_token: Option, + b_token: Option, } pin_project! { @@ -33,8 +35,12 @@ pin_project! { } } -impl Select { - pub fn new(policy: P, a: A, b: B) -> Self +impl Select +where + A: Service, + B: Service, +{ + pub fn new(policy: P, a: A, b: B) -> Self where P: Policy, A: Service, @@ -42,11 +48,23 @@ impl Select { B: Service, B::Error: Into, { - Select { policy, a, b } + Select { + policy, + a, + b, + a_token: None, + b_token: None, + } } } -impl Service for Select +#[derive(Debug)] +pub struct Token { + a: A, + b: B, +} + +impl Service for Select where P: Policy, A: Service, @@ -56,25 +74,35 @@ where { type Response = A::Response; type Error = crate::BoxError; + type Token = Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match (self.a.poll_ready(cx), self.b.poll_ready(cx)) { - (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), - (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())), - (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())), + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.a_token.is_none() { + if let Poll::Ready(token) = self.a.poll_ready(cx).map_err(Into::into)? { + self.a_token = Some(token); + } + } + if self.b_token.is_none() { + if let Poll::Ready(token) = self.b.poll_ready(cx).map_err(Into::into)? { + self.b_token = Some(token); + } + } + + match (self.a_token, self.b_token) { + (Some(a), Some(b)) => Poll::Ready(Ok(Token { a, b })), _ => Poll::Pending, } } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) { - Some(self.b.call(cloned_req)) + Some(self.b.call(token.b, cloned_req)) } else { None }; ResponseFuture { - a_fut: self.a.call(request), + a_fut: self.a.call(token.a, request), b_fut, } } diff --git a/tower/src/limit/concurrency/service.rs b/tower/src/limit/concurrency/service.rs index 02d85345f..69efeb829 100644 --- a/tower/src/limit/concurrency/service.rs +++ b/tower/src/limit/concurrency/service.rs @@ -15,12 +15,6 @@ use std::{ pub struct ConcurrencyLimit { inner: T, semaphore: PollSemaphore, - /// The currently acquired semaphore permit, if there is sufficient - /// concurrency to send a new request. - /// - /// The permit is acquired in `poll_ready`, and taken in `call` when sending - /// a new request. - permit: Option, } impl ConcurrencyLimit { @@ -34,7 +28,6 @@ impl ConcurrencyLimit { ConcurrencyLimit { inner, semaphore: PollSemaphore::new(semaphore), - permit: None, } } @@ -54,42 +47,38 @@ impl ConcurrencyLimit { } } +#[derive(Debug)] +pub struct Token { + inner: T, + permit: OwnedSemaphorePermit, +} + impl Service for ConcurrencyLimit where S: Service, { type Response = S::Response; type Error = S::Error; + type Token = Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // If we haven't already acquired a permit from the semaphore, try to - // acquire one first. - if self.permit.is_none() { - self.permit = ready!(self.semaphore.poll_acquire(cx)); - debug_assert!( - self.permit.is_some(), - "ConcurrencyLimit semaphore is never closed, so `poll_acquire` \ - should never fail", - ); - } + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let permit = ready!(self.semaphore.poll_acquire(cx)).expect( + "ConcurrencyLimit semaphore is never closed, so `poll_acquire` \ + should never fail", + ); - // Once we've acquired a permit (or if we already had one), poll the - // inner service. - self.inner.poll_ready(cx) + // Once we've acquired a permit, poll the inner service. + self.inner + .poll_ready(cx) + .map_ok(move |inner| Token { inner, permit }) } - fn call(&mut self, request: Request) -> Self::Future { - // Take the permit - let permit = self - .permit - .take() - .expect("max requests in-flight; poll_ready must be called first"); - + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { // Call the inner service - let future = self.inner.call(request); + let future = self.inner.call(token.inner, request); - ResponseFuture::new(future, permit) + ResponseFuture::new(future, token.permit) } } @@ -101,7 +90,6 @@ impl Clone for ConcurrencyLimit { Self { inner: self.inner.clone(), semaphore: self.semaphore.clone(), - permit: None, } } } diff --git a/tower/src/limit/rate/service.rs b/tower/src/limit/rate/service.rs index 550c92d8a..3d7ee60cb 100644 --- a/tower/src/limit/rate/service.rs +++ b/tower/src/limit/rate/service.rs @@ -67,9 +67,10 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.state { State::Ready { .. } => return Poll::Ready(ready!(self.inner.poll_ready(cx))), State::Limited => { @@ -88,7 +89,7 @@ where Poll::Ready(ready!(self.inner.poll_ready(cx))) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { match self.state { State::Ready { mut until, mut rem } => { let now = Instant::now(); @@ -111,7 +112,7 @@ where } // Call the inner future - self.inner.call(request) + self.inner.call(token, request) } State::Limited => panic!("service not ready; poll_ready must be called first"), } diff --git a/tower/src/load/constant.rs b/tower/src/load/constant.rs index a7c874e2b..f2a965b67 100644 --- a/tower/src/load/constant.rs +++ b/tower/src/load/constant.rs @@ -47,14 +47,15 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { - self.inner.call(req) + fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { + self.inner.call(token, req) } } diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index e48c55a20..545f593d5 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -115,17 +115,18 @@ where { type Response = C::Output; type Error = S::Error; + type Token = S::Token; type Future = TrackCompletionFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { TrackCompletionFuture::new( self.completion.clone(), self.handle(), - self.service.call(req), + self.service.call(token, req), ) } } @@ -320,13 +321,14 @@ mod tests { impl Service<()> for Svc { type Response = (); type Error = (); + type Token = (); type Future = future::Ready>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, (): ()) -> Self::Future { + fn call(&mut self, (): (), (): ()) -> Self::Future { future::ok(()) } } @@ -370,11 +372,11 @@ mod tests { assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI)); time::advance(Duration::from_millis(100)).await; - let mut rsp0 = task::spawn(svc.call(())); + let mut rsp0 = task::spawn(svc.call((), ())); assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI)); time::advance(Duration::from_millis(100)).await; - let mut rsp1 = task::spawn(svc.call(())); + let mut rsp1 = task::spawn(svc.call((), ())); assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI)); time::advance(Duration::from_millis(100)).await; diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index 3d8689bbe..000a77f2f 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -80,17 +80,18 @@ where { type Response = C::Output; type Error = S::Error; + type Token = S::Token; type Future = TrackCompletionFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { TrackCompletionFuture::new( self.completion.clone(), self.handle(), - self.service.call(req), + self.service.call(token, req), ) } } @@ -154,13 +155,14 @@ mod tests { impl Service<()> for Svc { type Response = (); type Error = (); + type Token = (); type Future = future::Ready>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, (): ()) -> Self::Future { + fn call(&mut self, (): (), (): ()) -> Self::Future { future::ok(()) } } @@ -170,10 +172,10 @@ mod tests { let mut svc = PendingRequests::new(Svc, CompleteOnResponse); assert_eq!(svc.load(), Count(0)); - let rsp0 = svc.call(()); + let rsp0 = svc.call((), ()); assert_eq!(svc.load(), Count(1)); - let rsp1 = svc.call(()); + let rsp1 = svc.call((), ()); assert_eq!(svc.load(), Count(2)); let () = tokio_test::block_on(rsp0).unwrap(); @@ -197,12 +199,12 @@ mod tests { let mut svc = PendingRequests::new(Svc, IntoHandle); assert_eq!(svc.load(), Count(0)); - let rsp = svc.call(()); + let rsp = svc.call((), ()); assert_eq!(svc.load(), Count(1)); let i0 = tokio_test::block_on(rsp).unwrap(); assert_eq!(svc.load(), Count(1)); - let rsp = svc.call(()); + let rsp = svc.call((), ()); assert_eq!(svc.load(), Count(2)); let i1 = tokio_test::block_on(rsp).unwrap(); assert_eq!(svc.load(), Count(2)); diff --git a/tower/src/load_shed/mod.rs b/tower/src/load_shed/mod.rs index deadf0fcb..580f23337 100644 --- a/tower/src/load_shed/mod.rs +++ b/tower/src/load_shed/mod.rs @@ -16,7 +16,6 @@ pub use self::layer::LoadShedLayer; #[derive(Debug)] pub struct LoadShed { inner: S, - is_ready: bool, } // ===== impl LoadShed ===== @@ -24,13 +23,13 @@ pub struct LoadShed { impl LoadShed { /// Wraps a service in [`LoadShed`] middleware. pub fn new(inner: S) -> Self { - LoadShed { - inner, - is_ready: false, - } + LoadShed { inner } } } +#[derive(Debug)] +pub struct Token(Option); + impl Service for LoadShed where S: Service, @@ -38,26 +37,26 @@ where { type Response = S::Response; type Error = crate::BoxError; + type Token = Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // We check for readiness here, so that we can know in `call` if // the inner service is overloaded or not. - self.is_ready = match self.inner.poll_ready(cx) { + let token = match self.inner.poll_ready(cx) { Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), - r => r.is_ready(), + Poll::Ready(Ok(token)) => Some(token), + Poll::Pending => None, }; // But we always report Ready, so that layers above don't wait until // the inner service is ready (the entire point of this layer!) - Poll::Ready(Ok(())) + Poll::Ready(Ok(Token(token))) } - fn call(&mut self, req: Req) -> Self::Future { - if self.is_ready { - // readiness only counts once, you need to check again! - self.is_ready = false; - ResponseFuture::called(self.inner.call(req)) + fn call(&mut self, token: Self::Token, req: Req) -> Self::Future { + if let Some(token) = token.0 { + ResponseFuture::called(self.inner.call(token, req)) } else { ResponseFuture::overloaded() } @@ -68,9 +67,6 @@ impl Clone for LoadShed { fn clone(&self) -> Self { LoadShed { inner: self.inner.clone(), - // new clones shouldn't carry the readiness state, as a cloneable - // inner service likely tracks readiness per clone. - is_ready: false, } } } diff --git a/tower/src/make/make_connection.rs b/tower/src/make/make_connection.rs index 9566cc68c..87eaf2514 100644 --- a/tower/src/make/make_connection.rs +++ b/tower/src/make/make_connection.rs @@ -16,14 +16,16 @@ pub trait MakeConnection: Sealed<(Target,)> { /// Errors produced by the connecting service type Error; + type Token; + /// The future that eventually produces the transport type Future: Future>; /// Returns `Poll::Ready(Ok(()))` when it is able to make more connections. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; /// Connect and return a transport asynchronously - fn make_connection(&mut self, target: Target) -> Self::Future; + fn make_connection(&mut self, token: Self::Token, target: Target) -> Self::Future; } impl Sealed<(Target,)> for S where S: Service {} @@ -35,13 +37,14 @@ where { type Connection = C::Response; type Error = C::Error; + type Token = C::Token; type Future = C::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(self, cx) } - fn make_connection(&mut self, target: Target) -> Self::Future { - Service::call(self, target) + fn make_connection(&mut self, token: Self::Token, target: Target) -> Self::Future { + Service::call(self, token, target) } } diff --git a/tower/src/make/make_service.rs b/tower/src/make/make_service.rs index aa519d682..4ffda4a97 100644 --- a/tower/src/make/make_service.rs +++ b/tower/src/make/make_service.rs @@ -31,6 +31,8 @@ pub trait MakeService: Sealed<(Target, Request)> { /// Errors produced while building a service. type MakeError; + type Token; + /// The future of the [`Service`] instance. type Future: Future>; @@ -42,10 +44,10 @@ pub trait MakeService: Sealed<(Target, Request)> { /// /// [`Poll::Ready`]: std::task::Poll::Ready /// [`Poll::Pending`]: std::task::Poll::Pending - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; /// Create and return a new service value asynchronously. - fn make_service(&mut self, target: Target) -> Self::Future; + fn make_service(&mut self, token: Self::Token, target: Target) -> Self::Future; /// Consume this [`MakeService`] and convert it into a [`Service`]. /// @@ -145,14 +147,15 @@ where type Error = S::Error; type Service = S; type MakeError = M::Error; + type Token = M::Token; type Future = M::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(self, cx) } - fn make_service(&mut self, target: Target) -> Self::Future { - Service::call(self, target) + fn make_service(&mut self, token: Self::Token, target: Target) -> Self::Future { + Service::call(self, token, target) } } @@ -196,16 +199,17 @@ where { type Response = M::Response; type Error = M::Error; + type Token = M::Token; type Future = M::Future; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.make.poll_ready(cx) } #[inline] - fn call(&mut self, target: Target) -> Self::Future { - self.make.make_service(target) + fn call(&mut self, token: Self::Token, target: Target) -> Self::Future { + self.make.make_service(token, target) } } @@ -237,15 +241,16 @@ where { type Response = M::Response; type Error = M::Error; + type Token = M::Token; type Future = M::Future; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.make.poll_ready(cx) } #[inline] - fn call(&mut self, target: Target) -> Self::Future { - self.make.make_service(target) + fn call(&mut self, token: Self::Token, target: Target) -> Self::Future { + self.make.make_service(token, target) } } diff --git a/tower/src/make/make_service/shared.rs b/tower/src/make/make_service/shared.rs index fd308a02a..4ad964ddb 100644 --- a/tower/src/make/make_service/shared.rs +++ b/tower/src/make/make_service/shared.rs @@ -34,11 +34,11 @@ use tower_service::Service; /// type Error = Infallible; /// type Future = Ready>; /// -/// fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { +/// fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// -/// fn call(&mut self, req: Request) -> Self::Future { +/// fn call(&mut self, token: Self::Token, req: Request) -> Self::Future { /// ready(Ok(Response {})) /// } /// } @@ -86,13 +86,14 @@ where { type Response = S; type Error = Infallible; + type Token = (); type Future = SharedFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, _target: T) -> Self::Future { + fn call(&mut self, _token: (), _target: T) -> Self::Future { SharedFuture::new(futures_util::future::ready(Ok(self.service.clone()))) } } @@ -117,13 +118,13 @@ mod tests { async fn as_make_service() { let mut shared = Shared::new(service_fn(echo::<&'static str>)); - poll_fn(|cx| MakeService::<(), _>::poll_ready(&mut shared, cx)) + let token = poll_fn(|cx| MakeService::<(), _>::poll_ready(&mut shared, cx)) .await .unwrap(); - let mut svc = shared.make_service(()).await.unwrap(); + let mut svc = shared.make_service(token, ()).await.unwrap(); - poll_fn(|cx| svc.poll_ready(cx)).await.unwrap(); - let res = svc.call("foo").await.unwrap(); + let token = poll_fn(|cx| svc.poll_ready(cx)).await.unwrap(); + let res = svc.call(token, "foo").await.unwrap(); assert_eq!(res, "foo"); } @@ -133,13 +134,13 @@ mod tests { let shared = Shared::new(service_fn(echo::<&'static str>)); let mut shared = MakeService::<(), _>::into_service(shared); - poll_fn(|cx| Service::<()>::poll_ready(&mut shared, cx)) + let token = poll_fn(|cx| Service::<()>::poll_ready(&mut shared, cx)) .await .unwrap(); - let mut svc = shared.call(()).await.unwrap(); + let mut svc = shared.call(token, ()).await.unwrap(); - poll_fn(|cx| svc.poll_ready(cx)).await.unwrap(); - let res = svc.call("foo").await.unwrap(); + let token = poll_fn(|cx| svc.poll_ready(cx)).await.unwrap(); + let res = svc.call(token, "foo").await.unwrap(); assert_eq!(res, "foo"); } diff --git a/tower/src/ready_cache/cache.rs b/tower/src/ready_cache/cache.rs index f0fc1d053..fc035cb01 100644 --- a/tower/src/ready_cache/cache.rs +++ b/tower/src/ready_cache/cache.rs @@ -57,6 +57,7 @@ use tracing::{debug, trace}; pub struct ReadyCache where K: Eq + Hash, + S: Service, { /// A stream of services that are not yet ready. pending: FuturesUnordered>, @@ -69,11 +70,11 @@ where /// The cancelation oneshot is preserved (though unused) while the service is /// ready so that it need not be reallocated each time a request is /// dispatched. - ready: IndexMap, + ready: IndexMap, } // Safety: This is safe because we do not use `Pin::new_unchecked`. -impl Unpin for ReadyCache {} +impl, K: Eq + Hash, Req> Unpin for ReadyCache {} type CancelRx = oneshot::Receiver<()>; type CancelTx = oneshot::Sender<()>; @@ -114,7 +115,7 @@ where impl fmt::Debug for ReadyCache where K: fmt::Debug + Eq + Hash, - S: fmt::Debug, + S: fmt::Debug + Service, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let Self { @@ -133,6 +134,7 @@ where impl ReadyCache where K: Eq + Hash, + S: Service, { /// Returns the total number of services in the cache. pub fn len(&self) -> usize { @@ -381,12 +383,12 @@ where /// /// If the specified index is out of range. pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future { - let (key, (mut svc, cancel)) = self + let (key, (mut svc, token, cancel)) = self .ready .swap_remove_index(index) .expect("check_ready_index was not called"); - let fut = svc.call(req); + let fut = svc.call(token, req); // If a new version of this service has been added to the // unready set, don't overwrite it. @@ -407,7 +409,7 @@ impl Future for Pending where S: Service, { - type Output = Result<(K, S, CancelRx), PendingError>; + type Output = Result<(K, S, S::Token, CancelRx), PendingError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut fut = self.cancel.as_mut().expect("polled after complete"); @@ -424,10 +426,16 @@ where .poll_ready(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => { + Poll::Ready(Ok(token)) => { let key = self.key.take().expect("polled after complete"); let cancel = self.cancel.take().expect("polled after complete"); - Ok((key, self.ready.take().expect("polled after ready"), cancel)).into() + Ok(( + key, + self.ready.take().expect("polled after ready"), + token, + cancel, + )) + .into() } Poll::Ready(Err(e)) => { let key = self.key.take().expect("polled after compete"); diff --git a/tower/src/reconnect/mod.rs b/tower/src/reconnect/mod.rs index ff354821c..704c4d00c 100644 --- a/tower/src/reconnect/mod.rs +++ b/tower/src/reconnect/mod.rs @@ -34,7 +34,6 @@ where mk_service: M, state: State, target: Target, - error: Option, } #[derive(Debug)] @@ -54,7 +53,6 @@ where mk_service, state: State::Idle, target, - error: None, } } @@ -64,11 +62,13 @@ where mk_service, state: State::Connected(init_conn), target, - error: None, } } } +#[derive(Debug)] +pub struct ReconnectToken(Result); + impl Service for Reconnect where M: Service, @@ -79,22 +79,23 @@ where { type Response = S::Response; type Error = crate::BoxError; + type Token = ReconnectToken; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { State::Idle => { trace!("poll_ready; idle"); - match self.mk_service.poll_ready(cx) { + let token = match self.mk_service.poll_ready(cx) { Poll::Ready(r) => r?, Poll::Pending => { trace!("poll_ready; MakeService not ready"); return Poll::Pending; } - } + }; - let fut = self.mk_service.make_service(self.target.clone()); + let fut = self.mk_service.make_service(token, self.target.clone()); self.state = State::Connecting(fut); continue; } @@ -111,17 +112,16 @@ where Poll::Ready(Err(e)) => { trace!("poll_ready; error"); self.state = State::Idle; - self.error = Some(e); - break; + return Poll::Ready(Ok(ReconnectToken(Err(e)))); } } } State::Connected(ref mut inner) => { trace!("poll_ready; connected"); match inner.poll_ready(cx) { - Poll::Ready(Ok(())) => { + Poll::Ready(Ok(token)) => { trace!("poll_ready; ready"); - return Poll::Ready(Ok(())); + return Poll::Ready(Ok(ReconnectToken(Ok(token)))); } Poll::Pending => { trace!("poll_ready; not ready"); @@ -135,21 +135,20 @@ where } } } - - Poll::Ready(Ok(())) } - fn call(&mut self, request: Request) -> Self::Future { - if let Some(error) = self.error.take() { - return ResponseFuture::error(error); - } + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + let token = match token.0 { + Ok(token) => token, + Err(error) => return ResponseFuture::error(error), + }; let service = match self.state { State::Connected(ref mut service) => service, _ => panic!("service not ready; poll_ready must be called first"), }; - let fut = service.call(request); + let fut = service.call(token, request); ResponseFuture::new(fut) } } diff --git a/tower/src/retry/future.rs b/tower/src/retry/future.rs index d18a5abb7..a0aebc90f 100644 --- a/tower/src/retry/future.rs +++ b/tower/src/retry/future.rs @@ -107,14 +107,14 @@ where // we need to make that assumption to avoid adding an Unpin bound to the Policy // in Ready to make it Unpin so that we can get &mut Ready as needed to call // poll_ready on it. - ready!(this.retry.as_mut().project().service.poll_ready(cx))?; + let token = ready!(this.retry.as_mut().project().service.poll_ready(cx))?; let req = this .request .take() .expect("retrying requires cloned request"); *this.request = this.retry.policy.clone_request(&req); this.state.set(State::Called { - future: this.retry.as_mut().project().service.call(req), + future: this.retry.as_mut().project().service.call(token, req), }); } } diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index a9e2738a1..e5d8108fe 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -56,18 +56,19 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is // equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated // as well. self.service.poll_ready(cx) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { let cloned = self.policy.clone_request(&request); - let future = self.service.call(request); + let future = self.service.call(token, request); ResponseFuture::new(cloned, self.clone(), future) } diff --git a/tower/src/spawn_ready/make.rs b/tower/src/spawn_ready/make.rs index a8adc73e9..b7d27570d 100644 --- a/tower/src/spawn_ready/make.rs +++ b/tower/src/spawn_ready/make.rs @@ -36,15 +36,16 @@ where { type Response = SpawnReady; type Error = S::Error; + type Token = S::Token; type Future = MakeFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, target: Target) -> Self::Future { + fn call(&mut self, token: Self::Token, target: Target) -> Self::Future { MakeFuture { - inner: self.inner.call(target), + inner: self.inner.call(token, target), } } } diff --git a/tower/src/spawn_ready/service.rs b/tower/src/spawn_ready/service.rs index 74618432a..1005c5d68 100644 --- a/tower/src/spawn_ready/service.rs +++ b/tower/src/spawn_ready/service.rs @@ -49,9 +49,10 @@ where { type Response = S::Response; type Error = BoxError; + type Token = S::Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { self.inner = match self.inner { Inner::Service(ref mut svc) => { @@ -72,10 +73,10 @@ where } } - fn call(&mut self, request: Req) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Req) -> Self::Future { match self.inner { Inner::Service(Some(ref mut svc)) => { - ResponseFuture::new(svc.call(request).map_err(Into::into)) + ResponseFuture::new(svc.call(token, request).map_err(Into::into)) } _ => unreachable!("poll_ready must be called"), } diff --git a/tower/src/steer/mod.rs b/tower/src/steer/mod.rs index f9d2565c0..dca573c70 100644 --- a/tower/src/steer/mod.rs +++ b/tower/src/steer/mod.rs @@ -104,29 +104,36 @@ where /// requests) will prevent head-of-line blocking in [`Steer`]. /// /// [`Buffer`]: crate::buffer::Buffer -pub struct Steer { +pub struct Steer, F, Req> { router: F, services: Vec, - not_ready: VecDeque, + tokens: Vec>, _phantom: PhantomData, } -impl Steer { +impl Steer +where + S: Service, +{ /// Make a new [`Steer`] with a list of [`Service`]'s and a [`Picker`]. /// /// Note: the order of the [`Service`]'s is significant for [`Picker::pick`]'s return value. pub fn new(services: impl IntoIterator, router: F) -> Self { let services: Vec<_> = services.into_iter().collect(); let not_ready: VecDeque<_> = services.iter().enumerate().map(|(i, _)| i).collect(); + let tokens: Vec<_> = services.iter().map(|_| None).collect(); Self { router, services, - not_ready, + tokens, _phantom: PhantomData, } } } +#[derive(Debug)] +pub struct Token(Vec); + impl Service for Steer where S: Service, @@ -134,47 +141,54 @@ where { type Response = S::Response; type Error = S::Error; + type Token = Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - // must wait for *all* services to be ready. - // this will cause head-of-line blocking unless the underlying services are always ready. - if self.not_ready.is_empty() { - return Poll::Ready(Ok(())); - } else { - if let Poll::Pending = self.services[self.not_ready[0]].poll_ready(cx)? { - return Poll::Pending; - } - - self.not_ready.pop_front(); + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // must wait for *all* services to be ready. + // this will cause head-of-line blocking unless the underlying services are always ready. + let mut all = true; + for (token, service) in self.tokens.iter_mut().zip(&mut self.services) { + if token.is_some() { + continue; + } + match service.poll_ready(cx)? { + Poll::Ready(t) => *token = Some(t), + Poll::Pending => all = false, } } + if all { + Poll::Ready(Ok(Token( + self.tokens + .iter_mut() + .map(|token| token.take().unwrap()) + .collect(), + ))) + } else { + Poll::Pending + } } - fn call(&mut self, req: Req) -> Self::Future { - assert!( - self.not_ready.is_empty(), - "Steer must wait for all services to be ready. Did you forget to call poll_ready()?" - ); - + fn call(&mut self, token: Self::Token, req: Req) -> Self::Future { let idx = self.router.pick(&req, &self.services[..]); let cl = &mut self.services[idx]; - self.not_ready.push_back(idx); - cl.call(req) + let token = token.0[idx]; + cl.call(token, req) } } impl Clone for Steer where - S: Clone, + S: Service + Clone, F: Clone, { fn clone(&self) -> Self { + let mut tokens = Vec::new(); + tokens.resize_with(self.tokens.len(), Default::default); Self { router: self.router.clone(), services: self.services.clone(), - not_ready: self.not_ready.clone(), + tokens, _phantom: PhantomData, } } @@ -182,20 +196,20 @@ where impl fmt::Debug for Steer where - S: fmt::Debug, + S: Service + fmt::Debug, F: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let Self { router, services, - not_ready, + tokens, _phantom, } = self; f.debug_struct("Steer") .field("router", router) .field("services", services) - .field("not_ready", not_ready) + .field("tokens", tokens.iter()) .finish() } } diff --git a/tower/src/timeout/mod.rs b/tower/src/timeout/mod.rs index 0e65a3f65..068a28815 100644 --- a/tower/src/timeout/mod.rs +++ b/tower/src/timeout/mod.rs @@ -52,17 +52,18 @@ where { type Response = S::Response; type Error = crate::BoxError; + type Token = S::Token; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.inner.poll_ready(cx) { Poll::Pending => Poll::Pending, Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), } } - fn call(&mut self, request: Request) -> Self::Future { - let response = self.inner.call(request); + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + let response = self.inner.call(token, request); let sleep = tokio::time::sleep(self.timeout); ResponseFuture::new(response, sleep) diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index 819ca273c..4530952ea 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -97,14 +97,15 @@ where { type Response = Fut::Ok; type Error = Fut::Error; + type Token = S::Token; type Future = AndThenFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, request: Request) -> Self::Future { - AndThenFuture::new(self.inner.call(request).err_into().and_then(self.f.clone())) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + AndThenFuture::new(self.inner.call(token, request).err_into().and_then(self.f.clone())) } } diff --git a/tower/src/util/boxed/layer.rs b/tower/src/util/boxed/layer.rs index 34e65fa43..d0e256e04 100644 --- a/tower/src/util/boxed/layer.rs +++ b/tower/src/util/boxed/layer.rs @@ -1,5 +1,5 @@ use crate::util::BoxService; -use std::{fmt, sync::Arc}; +use std::{any::Any, fmt, sync::Arc}; use tower_layer::{layer_fn, Layer}; use tower_service::Service; @@ -61,6 +61,7 @@ impl BoxLayer { where L: Layer + Send + Sync + 'static, L::Service: Service + Send + 'static, + >::Token: Any + Send + 'static, >::Future: Send + 'static, { let layer = layer_fn(move |inner: In| { diff --git a/tower/src/util/boxed/sync.rs b/tower/src/util/boxed/sync.rs index 8c02fb4e2..23d03cd07 100644 --- a/tower/src/util/boxed/sync.rs +++ b/tower/src/util/boxed/sync.rs @@ -2,6 +2,7 @@ use crate::ServiceExt; use tower_layer::{layer_fn, LayerFn}; use tower_service::Service; +use std::any::Any; use std::fmt; use std::{ future::Future, @@ -20,7 +21,9 @@ use std::{ /// /// See module level documentation for more details. pub struct BoxService { - inner: Box> + Send>, + inner: Box< + dyn Service> + Send, + >, } /// A boxed `Future + Send` trait object. @@ -28,15 +31,24 @@ pub struct BoxService { /// This type alias represents a boxed future that is [`Send`] and can be moved /// across threads. type BoxFuture = Pin> + Send>>; +type BoxToken = Box; impl BoxService { #[allow(missing_docs)] pub fn new(inner: S) -> Self where S: Service + Send + 'static, + S::Token: Any + Send + 'static, S::Future: Send + 'static, { - let inner = Box::new(inner.map_future(|f: S::Future| Box::pin(f) as _)); + let inner = Box::new( + inner + .map_token( + |t| Box::new(t) as Box, + |t| *t.downcast().expect("Invalid token passed to BoxService"), + ) + .map_future(|f: S::Future| Box::pin(f) as _), + ); BoxService { inner } } @@ -47,6 +59,7 @@ impl BoxService { pub fn layer() -> LayerFn Self> where S: Service + Send + 'static, + S::Token: Any + Send + 'static, S::Future: Send + 'static, { layer_fn(Self::new) @@ -56,14 +69,15 @@ impl BoxService { impl Service for BoxService { type Response = U; type Error = E; + type Token = BoxToken; type Future = BoxFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, request: T) -> BoxFuture { - self.inner.call(request) + fn call(&mut self, token: Self::Token, request: T) -> BoxFuture { + self.inner.call(token, request) } } diff --git a/tower/src/util/boxed/unsync.rs b/tower/src/util/boxed/unsync.rs index f645f1699..7c93c66c7 100644 --- a/tower/src/util/boxed/unsync.rs +++ b/tower/src/util/boxed/unsync.rs @@ -1,6 +1,7 @@ use tower_layer::{layer_fn, LayerFn}; use tower_service::Service; +use std::any::Any; use std::fmt; use std::{ future::Future, @@ -10,7 +11,15 @@ use std::{ /// A boxed [`Service`] trait object. pub struct UnsyncBoxService { - inner: Box>>, + inner: Box< + dyn Service< + T, + Response = U, + Error = E, + Token = Box, + Future = UnsyncBoxFuture, + >, + >, } /// A boxed [`Future`] trait object. @@ -29,6 +38,7 @@ impl UnsyncBoxService { pub fn new(inner: S) -> Self where S: Service + 'static, + S::Token: Any + 'static, S::Future: 'static, { let inner = Box::new(UnsyncBoxed { inner }); @@ -41,6 +51,7 @@ impl UnsyncBoxService { pub fn layer() -> LayerFn Self> where S: Service + 'static, + S::Token: Any + 'static, S::Future: 'static, { layer_fn(Self::new) @@ -50,14 +61,15 @@ impl UnsyncBoxService { impl Service for UnsyncBoxService { type Response = U; type Error = E; + type Token = Box; type Future = UnsyncBoxFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, request: T) -> UnsyncBoxFuture { - self.inner.call(request) + fn call(&mut self, token: Self::Token, request: T) -> UnsyncBoxFuture { + self.inner.call(token, request) } } @@ -70,17 +82,28 @@ impl fmt::Debug for UnsyncBoxService { impl Service for UnsyncBoxed where S: Service + 'static, + S::Token: Any + 'static, S::Future: 'static, { type Response = S::Response; type Error = S::Error; + type Token = Box; type Future = Pin>>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner + .poll_ready(cx) + .map_ok(|t| Box::new(t) as Box) } - fn call(&mut self, request: Request) -> Self::Future { - Box::pin(self.inner.call(request)) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + Box::pin( + self.inner.call( + *token + .downcast() + .expect("Invalid token passed to UnsyncBoxService."), + request, + ), + ) } } diff --git a/tower/src/util/boxed_clone.rs b/tower/src/util/boxed_clone.rs index 1209fd2ef..d8a2ea75e 100644 --- a/tower/src/util/boxed_clone.rs +++ b/tower/src/util/boxed_clone.rs @@ -55,18 +55,23 @@ use tower_service::Service; /// # fn assert_service(svc: S) -> S /// # where S: Service { svc } /// ``` -pub struct BoxCloneService( +pub struct BoxCloneService( Box< - dyn CloneService>> - + Send, + dyn CloneService< + T, + Response = U, + Error = E, + Token = R, + Future = BoxFuture<'static, Result>, + > + Send, >, ); -impl BoxCloneService { +impl BoxCloneService { /// Create a new `BoxCloneService`. pub fn new(inner: S) -> Self where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + 'static, S::Future: Send + 'static, { let inner = inner.map_future(|f| Box::pin(f) as _); @@ -79,30 +84,31 @@ impl BoxCloneService { /// [`Layer`]: crate::Layer pub fn layer() -> LayerFn Self> where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + 'static, S::Future: Send + 'static, { layer_fn(Self::new) } } -impl Service for BoxCloneService { +impl Service for BoxCloneService { type Response = U; type Error = E; + type Token = R; type Future = BoxFuture<'static, Result>; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } #[inline] - fn call(&mut self, request: T) -> Self::Future { - self.0.call(request) + fn call(&mut self, token: Self::Token, request: T) -> Self::Future { + self.0.call(token, request) } } -impl Clone for BoxCloneService { +impl Clone for BoxCloneService { fn clone(&self) -> Self { Self(self.0.clone_box()) } @@ -112,8 +118,13 @@ trait CloneService: Service { fn clone_box( &self, ) -> Box< - dyn CloneService - + Send, + dyn CloneService< + R, + Response = Self::Response, + Error = Self::Error, + Token = Self::Token, + Future = Self::Future, + > + Send, >; } @@ -123,13 +134,20 @@ where { fn clone_box( &self, - ) -> Box + Send> - { + ) -> Box< + dyn CloneService< + R, + Response = T::Response, + Error = T::Error, + Token = T::Token, + Future = T::Future, + > + Send, + > { Box::new(self.clone()) } } -impl fmt::Debug for BoxCloneService { +impl fmt::Debug for BoxCloneService { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("BoxCloneService").finish() } diff --git a/tower/src/util/call_all/common.rs b/tower/src/util/call_all/common.rs index 09c458a9b..ff95cd1a9 100644 --- a/tower/src/util/call_all/common.rs +++ b/tower/src/util/call_all/common.rs @@ -97,13 +97,13 @@ where .service .as_mut() .expect("Using CallAll after extracing inner Service"); - ready!(svc.poll_ready(cx)).map_err(Into::into)?; + let token = ready!(svc.poll_ready(cx)).map_err(Into::into)?; // If it is, gather the next request (if there is one) match this.stream.as_mut().poll_next(cx) { Poll::Ready(r) => match r { Some(req) => { - this.queue.push(svc.call(req)); + this.queue.push(svc.call(token, req)); } None => { // We're all done once any outstanding requests have completed diff --git a/tower/src/util/call_all/ordered.rs b/tower/src/util/call_all/ordered.rs index a0c9ae4bb..0952b35b3 100644 --- a/tower/src/util/call_all/ordered.rs +++ b/tower/src/util/call_all/ordered.rs @@ -38,11 +38,11 @@ pin_project! { /// type Error = Box; /// type Future = Ready>; /// - /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// Poll::Ready(Ok(())) /// } /// - /// fn call(&mut self, req: &'static str) -> Self::Future { + /// fn call(&mut self, token: Self::Token, req: &'static str) -> Self::Future { /// ready(Ok(&req[..1])) /// } /// } diff --git a/tower/src/util/either.rs b/tower/src/util/either.rs index 5933755ef..db306688c 100644 --- a/tower/src/util/either.rs +++ b/tower/src/util/either.rs @@ -35,23 +35,29 @@ where { type Response = A::Response; type Error = crate::BoxError; + type Token = Either; type Future = Either; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { use self::Either::*; match self { - A(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)).map_err(Into::into)?)), - B(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)).map_err(Into::into)?)), + A(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)) + .map(Either::A) + .map_err(Into::into)?)), + B(service) => Poll::Ready(Ok(ready!(service.poll_ready(cx)) + .map(Either::B) + .map_err(Into::into)?)), } } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { use self::Either::*; - match self { - A(service) => A(service.call(request)), - B(service) => B(service.call(request)), + match (self, token) { + (A(service), A(token)) => A(service.call(token, request)), + (B(service), B(token)) => B(service.call(token, request)), + _ => panic!("Invalid token used for Either service."), } } } diff --git a/tower/src/util/future_service.rs b/tower/src/util/future_service.rs index ef3a74914..e572c5a0a 100644 --- a/tower/src/util/future_service.rs +++ b/tower/src/util/future_service.rs @@ -147,9 +147,10 @@ where { type Response = S::Response; type Error = E; + type Token = S::Token; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { self.state = match &mut self.state { State::Future(fut) => { @@ -162,9 +163,9 @@ where } } - fn call(&mut self, req: R) -> Self::Future { + fn call(&mut self, token: Self::Token, req: R) -> Self::Future { if let State::Service(svc) = &mut self.state { - svc.call(req) + svc.call(token, req) } else { panic!("FutureService::call was called before FutureService::poll_ready") } @@ -202,13 +203,14 @@ mod tests { impl Service<()> for DebugService { type Response = (); type Error = Infallible; + type Token = (); type Future = Ready>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Ok(()).into() } - fn call(&mut self, _req: ()) -> Self::Future { + fn call(&mut self, _token: (), _req: ()) -> Self::Future { ready(Ok(())) } } diff --git a/tower/src/util/map_err.rs b/tower/src/util/map_err.rs index b79c5fee2..a02bc6c00 100644 --- a/tower/src/util/map_err.rs +++ b/tower/src/util/map_err.rs @@ -63,16 +63,17 @@ where { type Response = S::Response; type Error = Error; + type Token = S::Token; type Future = MapErrFuture; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(self.f.clone()) } #[inline] - fn call(&mut self, request: Request) -> Self::Future { - MapErrFuture::new(self.inner.call(request).map_err(self.f.clone())) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + MapErrFuture::new(self.inner.call(token, request).map_err(self.f.clone())) } } diff --git a/tower/src/util/map_future.rs b/tower/src/util/map_future.rs index 9f5670add..abbaff8ff 100644 --- a/tower/src/util/map_future.rs +++ b/tower/src/util/map_future.rs @@ -55,14 +55,15 @@ where { type Response = T; type Error = E; + type Token = S::Token; type Future = Fut; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(From::from) } - fn call(&mut self, req: R) -> Self::Future { - (self.f)(self.inner.call(req)) + fn call(&mut self, token: Self::Token, req: R) -> Self::Future { + (self.f)(self.inner.call(token, req)) } } diff --git a/tower/src/util/map_request.rs b/tower/src/util/map_request.rs index e86e0680b..7fa6aa031 100644 --- a/tower/src/util/map_request.rs +++ b/tower/src/util/map_request.rs @@ -47,16 +47,17 @@ where { type Response = S::Response; type Error = S::Error; + type Token = S::Token; type Future = S::Future; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } #[inline] - fn call(&mut self, request: R1) -> S::Future { - self.inner.call((self.f)(request)) + fn call(&mut self, token: Self::Token, request: R1) -> S::Future { + self.inner.call(token, (self.f)(request)) } } diff --git a/tower/src/util/map_response.rs b/tower/src/util/map_response.rs index 249be3a4d..96ae503e8 100644 --- a/tower/src/util/map_response.rs +++ b/tower/src/util/map_response.rs @@ -63,16 +63,17 @@ where { type Response = Response; type Error = S::Error; + type Token = S::Token; type Future = MapResponseFuture; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } #[inline] - fn call(&mut self, request: Request) -> Self::Future { - MapResponseFuture::new(self.inner.call(request).map_ok(self.f.clone())) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + MapResponseFuture::new(self.inner.call(token, request).map_ok(self.f.clone())) } } diff --git a/tower/src/util/map_result.rs b/tower/src/util/map_result.rs index bfe16b5b4..de1f29ef9 100644 --- a/tower/src/util/map_result.rs +++ b/tower/src/util/map_result.rs @@ -64,16 +64,17 @@ where { type Response = Response; type Error = Error; + type Token = S::Token; type Future = MapResultFuture; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } #[inline] - fn call(&mut self, request: Request) -> Self::Future { - MapResultFuture::new(self.inner.call(request).map(self.f.clone())) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + MapResultFuture::new(self.inner.call(token, request).map(self.f.clone())) } } diff --git a/tower/src/util/map_token.rs b/tower/src/util/map_token.rs new file mode 100644 index 000000000..06bec5068 --- /dev/null +++ b/tower/src/util/map_token.rs @@ -0,0 +1,117 @@ +use std::{ + fmt, + task::{Context, Poll}, +}; +use tower_layer::Layer; +use tower_service::Service; + +/// [`Service`] returned by the [`map_token`] combinator. +/// +/// [`map_future`]: crate::util::ServiceExt::map_token +#[derive(Clone)] +pub struct MapToken { + inner: S, + f: F, + g: G, +} + +impl MapToken { + /// Creates a new [`MapToken`] service. + pub fn new(inner: S, f: F, g: G) -> Self { + Self { inner, f, g } + } + + /// Returns a new [`Layer`] that produces [`MapToken`] services. + /// + /// This is a convenience function that simply calls [`MapTokenLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F, g: G) -> MapTokenLayer { + MapTokenLayer::new(f, g) + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &S { + &self.inner + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut S { + &mut self.inner + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> S { + self.inner + } +} + +impl Service for MapToken +where + S: Service, + F: FnMut(S::Token) -> Token, + G: FnMut(Token) -> S::Token, +{ + type Response = S::Response; + type Error = S::Error; + type Token = Token; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_ok(self.f).map_err(From::from) + } + + fn call(&mut self, token: Self::Token, req: R) -> Self::Future { + self.inner.call((self.g)(token), req) + } +} + +impl fmt::Debug for MapToken +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapToken") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::())) + .field("g", &format_args!("{}", std::any::type_name::())) + .finish() + } +} + +/// A [`Layer`] that produces a [`MapToken`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone)] +pub struct MapTokenLayer { + f: F, + g: G, +} + +impl MapTokenLayer { + /// Creates a new [`MapTokenLayer`] layer. + pub fn new(f: F, g: G) -> Self { + Self { f, g } + } +} + +impl Layer for MapTokenLayer +where + F: Clone, + G: Clone, +{ + type Service = MapToken; + + fn layer(&self, inner: S) -> Self::Service { + MapToken::new(inner, self.f.clone(), self.g.clone()) + } +} + +impl fmt::Debug for MapTokenLayer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapTokenLayer") + .field("f", &format_args!("{}", std::any::type_name::())) + .field("g", &format_args!("{}", std::any::type_name::())) + .finish() + } +} diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 129c9d7ab..3e1377da0 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -11,6 +11,7 @@ mod map_err; mod map_request; mod map_response; mod map_result; +mod map_token; mod map_future; mod oneshot; @@ -31,6 +32,7 @@ pub use self::{ map_request::{MapRequest, MapRequestLayer}, map_response::{MapResponse, MapResponseLayer}, map_result::{MapResult, MapResultLayer}, + map_token::{MapToken, MapTokenLayer}, oneshot::Oneshot, optional::Optional, ready::{Ready, ReadyAnd, ReadyOneshot}, @@ -39,7 +41,7 @@ pub use self::{ }; pub use self::call_all::{CallAll, CallAllUnordered}; -use std::future::Future; +use std::{any::Any, future::Future}; use crate::layer::util::Identity; @@ -149,11 +151,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = u8; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) /// # } /// # } @@ -218,11 +220,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = u8; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) /// # } /// # } @@ -285,11 +287,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = Error; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(String::new())) /// # } /// # } @@ -380,11 +382,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready, DbError>>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }])) /// # } /// # } @@ -440,11 +442,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) /// # } /// # } @@ -504,11 +506,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = u8; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(String::new())) /// # } /// # } @@ -575,11 +577,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = u8; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: String) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: String) -> Self::Future { /// # futures_util::future::ready(Ok(String::new())) /// # } /// # } @@ -643,11 +645,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(String::new())) /// # } /// # } @@ -717,11 +719,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(String::new())) /// # } /// # } @@ -820,11 +822,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(())) /// # } /// # } @@ -906,11 +908,11 @@ pub trait ServiceExt: tower_service::Service { /// # type Error = DbError; /// # type Future = futures_util::future::Ready>; /// # - /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { /// # Poll::Ready(Ok(())) /// # } /// # - /// # fn call(&mut self, request: u32) -> Self::Future { + /// # fn call(&mut self, token: Self::Token, request: u32) -> Self::Future { /// # futures_util::future::ready(Ok(())) /// # } /// # } @@ -997,6 +999,7 @@ pub trait ServiceExt: tower_service::Service { fn boxed(self) -> BoxService where Self: Sized + Send + 'static, + Self::Token: Any + Send + 'static, Self::Future: Send + 'static, { BoxService::new(self) @@ -1043,13 +1046,22 @@ pub trait ServiceExt: tower_service::Service { /// /// [`Service`]: crate::Service /// [`boxed`]: Self::boxed - fn boxed_clone(self) -> BoxCloneService + fn boxed_clone(self) -> BoxCloneService where Self: Clone + Sized + Send + 'static, Self::Future: Send + 'static, { BoxCloneService::new(self) } + + fn map_token(self, f: F, g: G) -> MapToken + where + Self: Sized, + F: FnMut(Self::Token) -> NewToken, + G: FnMut(NewToken) -> Self::Token, + { + MapToken::new(self, f, g) + } } impl ServiceExt for T where T: tower_service::Service {} diff --git a/tower/src/util/oneshot.rs b/tower/src/util/oneshot.rs index 93b5070be..cacf3c32e 100644 --- a/tower/src/util/oneshot.rs +++ b/tower/src/util/oneshot.rs @@ -89,8 +89,8 @@ where loop { match this.state.as_mut().project() { StateProj::NotReady { svc, req } => { - let _ = ready!(svc.poll_ready(cx))?; - let f = svc.call(req.take().expect("already called")); + let token = ready!(svc.poll_ready(cx))?; + let f = svc.call(token, req.take().expect("already called")); this.state.set(State::called(f)); } StateProj::Called { fut } => { diff --git a/tower/src/util/optional/mod.rs b/tower/src/util/optional/mod.rs index 8ba3ab95d..6e180e984 100644 --- a/tower/src/util/optional/mod.rs +++ b/tower/src/util/optional/mod.rs @@ -39,21 +39,27 @@ where { type Response = T::Response; type Error = crate::BoxError; + type Token = Option; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.inner { Some(ref mut inner) => match inner.poll_ready(cx) { - Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + Poll::Ready(r) => Poll::Ready(r.map(Some).map_err(Into::into)), Poll::Pending => Poll::Pending, }, // None services are always ready - None => Poll::Ready(Ok(())), + None => Poll::Ready(Ok(None)), } } - fn call(&mut self, request: Request) -> Self::Future { - let inner = self.inner.as_mut().map(|i| i.call(request)); + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + let inner = self.inner.as_mut().map(|i| { + i.call( + token.expect("Invalid token used for Optional service."), + request, + ) + }); ResponseFuture::new(inner) } } diff --git a/tower/src/util/service_fn.rs b/tower/src/util/service_fn.rs index d6e6be878..3c5c6ad90 100644 --- a/tower/src/util/service_fn.rs +++ b/tower/src/util/service_fn.rs @@ -70,13 +70,14 @@ where { type Response = R; type Error = E; + type Token = (); type Future = F; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Ok(()).into() } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, _token: (), req: Request) -> Self::Future { (self.f)(req) } } diff --git a/tower/src/util/then.rs b/tower/src/util/then.rs index 1ec3c1492..e745b7c44 100644 --- a/tower/src/util/then.rs +++ b/tower/src/util/then.rs @@ -68,16 +68,17 @@ where { type Response = Response; type Error = Error; + type Token = S::Token; type Future = ThenFuture; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } #[inline] - fn call(&mut self, request: Request) -> Self::Future { - ThenFuture::new(self.inner.call(request).then(self.f.clone())) + fn call(&mut self, token: Self::Token, request: Request) -> Self::Future { + ThenFuture::new(self.inner.call(token, request).then(self.f.clone())) } } diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index aed51203a..7f1187207 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -17,11 +17,11 @@ impl Service for Mock { type Response = as Service>::Response; type Error = as Service>::Error; type Future = as Service>::Future; - fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { self.0.poll_ready(cx) } - fn call(&mut self, req: Req) -> Self::Future { - self.0.call(req) + fn call(&mut self, token: Self::Token, req: Req) -> Self::Future { + self.0.call(token, req) } } diff --git a/tower/tests/steer/main.rs b/tower/tests/steer/main.rs index 1ff08d32c..5a5baf057 100644 --- a/tower/tests/steer/main.rs +++ b/tower/tests/steer/main.rs @@ -16,7 +16,7 @@ impl Service for MyService { type Error = StdError; type Future = Ready>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { if !self.1 { Poll::Pending } else { diff --git a/tower/tests/support.rs b/tower/tests/support.rs index b54708229..4a21bd1ad 100644 --- a/tower/tests/support.rs +++ b/tower/tests/support.rs @@ -96,7 +96,7 @@ impl Service<()> for AssertSpanSvc { type Error = AssertSpanError; type Future = future::Ready>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.polled { return Poll::Ready(self.check("poll_ready")); } diff --git a/tower/tests/util/call_all.rs b/tower/tests/util/call_all.rs index 6bc092918..09573d5e9 100644 --- a/tower/tests/util/call_all.rs +++ b/tower/tests/util/call_all.rs @@ -23,7 +23,7 @@ impl Service<&'static str> for Srv { type Error = Error; type Future = Ready>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { if !self.admit.get() { return Poll::Pending; } @@ -32,7 +32,7 @@ impl Service<&'static str> for Srv { Poll::Ready(Ok(())) } - fn call(&mut self, req: &'static str) -> Self::Future { + fn call(&mut self, token: Self::Token, req: &'static str) -> Self::Future { self.count.set(self.count.get() + 1); ready(Ok(req)) }