From 97896efe70a5cba9786387c923abc945fe2bfee8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 8 Mar 2025 11:22:33 +0000 Subject: [PATCH 1/4] Move public HttpClient into separate module --- object_store/src/client/builder.rs | 2 +- object_store/src/client/{ => http}/body.rs | 2 +- .../src/client/{ => http}/connection.rs | 3 +-- object_store/src/client/http/mod.rs | 27 +++++++++++++++++++ object_store/src/client/mod.rs | 14 +++------- object_store/src/client/retry.rs | 3 +-- 6 files changed, 34 insertions(+), 17 deletions(-) rename object_store/src/client/{ => http}/body.rs (99%) rename object_store/src/client/{ => http}/connection.rs (98%) create mode 100644 object_store/src/client/http/mod.rs diff --git a/object_store/src/client/builder.rs b/object_store/src/client/builder.rs index fcbc6e8baee..4de29ca7102 100644 --- a/object_store/src/client/builder.rs +++ b/object_store/src/client/builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::HttpErrorKind; +use crate::client::HttpErrorKind; use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; use http::header::{InvalidHeaderName, InvalidHeaderValue}; use http::uri::InvalidUri; diff --git a/object_store/src/client/body.rs b/object_store/src/client/http/body.rs similarity index 99% rename from object_store/src/client/body.rs rename to object_store/src/client/http/body.rs index 8f62afa4ff2..652c5d52a6e 100644 --- a/object_store/src/client/body.rs +++ b/object_store/src/client/http/body.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::{HttpError, HttpErrorKind}; +use crate::client::{HttpError, HttpErrorKind}; use crate::{collect_bytes, PutPayload}; use bytes::Bytes; use futures::stream::BoxStream; diff --git a/object_store/src/client/connection.rs b/object_store/src/client/http/connection.rs similarity index 98% rename from object_store/src/client/connection.rs rename to object_store/src/client/http/connection.rs index 7e2daf4cdb8..4ad965a221c 100644 --- a/object_store/src/client/connection.rs +++ b/object_store/src/client/http/connection.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::client::body::{HttpRequest, HttpResponse}; use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; -use crate::client::HttpResponseBody; +use crate::client::{HttpRequest, HttpResponse, HttpResponseBody}; use crate::ClientOptions; use async_trait::async_trait; use http::{Method, Uri}; diff --git a/object_store/src/client/http/mod.rs b/object_store/src/client/http/mod.rs new file mode 100644 index 00000000000..512550aa354 --- /dev/null +++ b/object_store/src/client/http/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Generic HTTP client abstraction + +mod body; +pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; + +mod connection; +pub(crate) use connection::http_connector; +#[cfg(not(target_arch = "wasm32"))] +pub use connection::ReqwestConnector; +pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index bd0347b4311..551837a0a37 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -43,19 +43,11 @@ pub(crate) mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) mod s3; -mod body; -pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; - pub(crate) mod builder; - -mod connection; -pub(crate) use connection::http_connector; -#[cfg(not(target_arch = "wasm32"))] -pub use connection::ReqwestConnector; -pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; - +mod http; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub(crate) mod parts; +pub use http::*; use async_trait::async_trait; use reqwest::header::{HeaderMap, HeaderValue}; @@ -859,9 +851,9 @@ mod cloud { } } -use crate::client::builder::HttpRequestBuilder; #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] pub(crate) use cloud::*; +use crate::client::builder::HttpRequestBuilder; #[cfg(test)] mod tests { diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 96244aac9b0..dff263797ac 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -19,8 +19,7 @@ use crate::client::backoff::{Backoff, BackoffConfig}; use crate::client::builder::HttpRequestBuilder; -use crate::client::connection::HttpErrorKind; -use crate::client::{HttpClient, HttpError, HttpRequest, HttpResponse}; +use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse}; use crate::PutPayload; use futures::future::BoxFuture; use http::{Method, Uri}; From 8df4b5e8e87366c519960ad9467d0d03af636243 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 8 Mar 2025 12:22:16 +0000 Subject: [PATCH 2/4] Add SpawnService --- object_store/src/client/http/body.rs | 12 +++ object_store/src/client/http/mod.rs | 10 +- object_store/src/client/http/spawn.rs | 141 ++++++++++++++++++++++++++ object_store/src/client/mod.rs | 2 +- 4 files changed, 159 insertions(+), 6 deletions(-) create mode 100644 object_store/src/client/http/spawn.rs diff --git a/object_store/src/client/http/body.rs b/object_store/src/client/http/body.rs index 652c5d52a6e..66d0575e347 100644 --- a/object_store/src/client/http/body.rs +++ b/object_store/src/client/http/body.rs @@ -195,6 +195,18 @@ impl HttpResponseBody { } } +impl Body for HttpResponseBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) + } +} + impl From for HttpResponseBody { fn from(value: Bytes) -> Self { Self::new(Full::new(value).map_err(|e| match e {})) diff --git a/object_store/src/client/http/mod.rs b/object_store/src/client/http/mod.rs index 512550aa354..7f4644a788e 100644 --- a/object_store/src/client/http/mod.rs +++ b/object_store/src/client/http/mod.rs @@ -18,10 +18,10 @@ //! Generic HTTP client abstraction mod body; -pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; +pub use body::*; mod connection; -pub(crate) use connection::http_connector; -#[cfg(not(target_arch = "wasm32"))] -pub use connection::ReqwestConnector; -pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; +pub use connection::*; + +mod spawn; +pub use spawn::*; diff --git a/object_store/src/client/http/spawn.rs b/object_store/src/client/http/spawn.rs new file mode 100644 index 00000000000..6c16251e15d --- /dev/null +++ b/object_store/src/client/http/spawn.rs @@ -0,0 +1,141 @@ +use crate::client::{ + HttpError, HttpErrorKind, HttpRequest, HttpResponse, HttpResponseBody, HttpService, +}; +use async_trait::async_trait; +use bytes::Bytes; +use http::Response; +use http_body_util::BodyExt; +use hyper::body::{Body, Frame}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use thiserror::Error; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +/// Spawn error +#[derive(Debug, Error)] +#[error("SpawnError")] +struct SpawnError {} + +impl From for HttpError { + fn from(value: SpawnError) -> Self { + Self::new(HttpErrorKind::Interrupted, value) + } +} + +/// Wraps a provided [`HttpService`] and runs it on a separate tokio runtime +#[derive(Debug)] +pub struct SpawnService { + inner: T, + runtime: Handle, +} + +impl SpawnService { + /// Creates a new [`SpawnService`] from the provided + pub fn new(inner: T, runtime: Handle) -> Self { + Self { inner, runtime } + } +} + +#[async_trait] +impl HttpService for SpawnService { + async fn call(&self, req: HttpRequest) -> Result { + let inner = self.inner.clone(); + let (send, recv) = tokio::sync::oneshot::channel(); + + // We use an unbounded channel to prevent backpressure across the runtime boundary + // which could in turn starve the underlying IO operations + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + + let handle = SpawnHandle(self.runtime.spawn(async move { + let r = match HttpService::call(&inner, req).await { + Ok(resp) => resp, + Err(e) => { + let _ = send.send(Err(e)); + return; + } + }; + + let (parts, mut body) = r.into_parts(); + if send.send(Ok(parts)).is_err() { + return; + } + + while let Some(x) = body.frame().await { + sender.send(x).unwrap(); + } + })); + + let parts = recv.await.map_err(|_| SpawnError {})??; + + Ok(Response::from_parts( + parts, + HttpResponseBody::new(SpawnBody { + stream: receiver, + _worker: handle, + }), + )) + } +} + +/// A wrapper around a [`JoinHandle`] that aborts on drop +struct SpawnHandle(JoinHandle<()>); +impl Drop for SpawnHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +type StreamItem = Result, HttpError>; + +struct SpawnBody { + stream: tokio::sync::mpsc::UnboundedReceiver, + _worker: SpawnHandle, +} + +impl Body for SpawnBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_recv(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::mock_server::MockServer; + use crate::client::retry::RetryExt; + use crate::client::HttpClient; + use crate::RetryConfig; + + async fn test_client(client: HttpClient) { + let (send, recv) = tokio::sync::oneshot::channel(); + + let mock = MockServer::new().await; + let url = mock.url().to_string(); + let thread = std::thread::spawn(|| { + futures::executor::block_on(async move { + let retry = RetryConfig::default(); + let _ = client.get(url).send_retry(&retry).await.unwrap(); + let _ = send.send(()); + }) + }); + recv.await.unwrap(); + thread.join().unwrap(); + } + + #[tokio::test] + async fn test_spawn() { + let client = HttpClient::new(SpawnService::new(reqwest::Client::new(), Handle::current())); + test_client(client).await; + } + + #[tokio::test] + #[should_panic] + async fn test_no_spawn() { + let client = HttpClient::new(reqwest::Client::new()); + test_client(client).await; + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 551837a0a37..c829f50b739 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -851,9 +851,9 @@ mod cloud { } } +use crate::client::builder::HttpRequestBuilder; #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] pub(crate) use cloud::*; -use crate::client::builder::HttpRequestBuilder; #[cfg(test)] mod tests { From b16465e2a4422caab3fec498304790eaec3bb284 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 8 Mar 2025 12:27:39 +0000 Subject: [PATCH 3/4] RAT --- object_store/src/client/http/spawn.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/object_store/src/client/http/spawn.rs b/object_store/src/client/http/spawn.rs index 6c16251e15d..c85f09d4623 100644 --- a/object_store/src/client/http/spawn.rs +++ b/object_store/src/client/http/spawn.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::client::{ HttpError, HttpErrorKind, HttpRequest, HttpResponse, HttpResponseBody, HttpService, }; From ea48b657947312a3a5aa4097ab3a92c9d9e00e13 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 8 Mar 2025 12:29:04 +0000 Subject: [PATCH 4/4] More tests --- object_store/src/client/http/spawn.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/object_store/src/client/http/spawn.rs b/object_store/src/client/http/spawn.rs index c85f09d4623..e2423e816f8 100644 --- a/object_store/src/client/http/spawn.rs +++ b/object_store/src/client/http/spawn.rs @@ -131,11 +131,15 @@ mod tests { let (send, recv) = tokio::sync::oneshot::channel(); let mock = MockServer::new().await; + mock.push(Response::new("BANANAS".to_string())); + let url = mock.url().to_string(); let thread = std::thread::spawn(|| { futures::executor::block_on(async move { let retry = RetryConfig::default(); - let _ = client.get(url).send_retry(&retry).await.unwrap(); + let ret = client.get(url).send_retry(&retry).await.unwrap(); + let payload = ret.into_body().bytes().await.unwrap(); + assert_eq!(payload.as_ref(), b"BANANAS"); let _ = send.send(()); }) });