diff --git a/object_store/src/client/builder.rs b/object_store/src/client/builder.rs index fcbc6e8baee..4de29ca7102 100644 --- a/object_store/src/client/builder.rs +++ b/object_store/src/client/builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::HttpErrorKind; +use crate::client::HttpErrorKind; use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; use http::header::{InvalidHeaderName, InvalidHeaderValue}; use http::uri::InvalidUri; diff --git a/object_store/src/client/body.rs b/object_store/src/client/http/body.rs similarity index 95% rename from object_store/src/client/body.rs rename to object_store/src/client/http/body.rs index 8f62afa4ff2..66d0575e347 100644 --- a/object_store/src/client/body.rs +++ b/object_store/src/client/http/body.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::connection::{HttpError, HttpErrorKind}; +use crate::client::{HttpError, HttpErrorKind}; use crate::{collect_bytes, PutPayload}; use bytes::Bytes; use futures::stream::BoxStream; @@ -195,6 +195,18 @@ impl HttpResponseBody { } } +impl Body for HttpResponseBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) + } +} + impl From for HttpResponseBody { fn from(value: Bytes) -> Self { Self::new(Full::new(value).map_err(|e| match e {})) diff --git a/object_store/src/client/connection.rs b/object_store/src/client/http/connection.rs similarity index 98% rename from object_store/src/client/connection.rs rename to object_store/src/client/http/connection.rs index 7e2daf4cdb8..4ad965a221c 100644 --- a/object_store/src/client/connection.rs +++ b/object_store/src/client/http/connection.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::client::body::{HttpRequest, HttpResponse}; use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; -use crate::client::HttpResponseBody; +use crate::client::{HttpRequest, HttpResponse, HttpResponseBody}; use crate::ClientOptions; use async_trait::async_trait; use http::{Method, Uri}; diff --git a/object_store/src/client/http/mod.rs b/object_store/src/client/http/mod.rs new file mode 100644 index 00000000000..7f4644a788e --- /dev/null +++ b/object_store/src/client/http/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Generic HTTP client abstraction + +mod body; +pub use body::*; + +mod connection; +pub use connection::*; + +mod spawn; +pub use spawn::*; diff --git a/object_store/src/client/http/spawn.rs b/object_store/src/client/http/spawn.rs new file mode 100644 index 00000000000..e2423e816f8 --- /dev/null +++ b/object_store/src/client/http/spawn.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::{ + HttpError, HttpErrorKind, HttpRequest, HttpResponse, HttpResponseBody, HttpService, +}; +use async_trait::async_trait; +use bytes::Bytes; +use http::Response; +use http_body_util::BodyExt; +use hyper::body::{Body, Frame}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use thiserror::Error; +use tokio::runtime::Handle; +use tokio::task::JoinHandle; + +/// Spawn error +#[derive(Debug, Error)] +#[error("SpawnError")] +struct SpawnError {} + +impl From for HttpError { + fn from(value: SpawnError) -> Self { + Self::new(HttpErrorKind::Interrupted, value) + } +} + +/// Wraps a provided [`HttpService`] and runs it on a separate tokio runtime +#[derive(Debug)] +pub struct SpawnService { + inner: T, + runtime: Handle, +} + +impl SpawnService { + /// Creates a new [`SpawnService`] from the provided + pub fn new(inner: T, runtime: Handle) -> Self { + Self { inner, runtime } + } +} + +#[async_trait] +impl HttpService for SpawnService { + async fn call(&self, req: HttpRequest) -> Result { + let inner = self.inner.clone(); + let (send, recv) = tokio::sync::oneshot::channel(); + + // We use an unbounded channel to prevent backpressure across the runtime boundary + // which could in turn starve the underlying IO operations + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + + let handle = SpawnHandle(self.runtime.spawn(async move { + let r = match HttpService::call(&inner, req).await { + Ok(resp) => resp, + Err(e) => { + let _ = send.send(Err(e)); + return; + } + }; + + let (parts, mut body) = r.into_parts(); + if send.send(Ok(parts)).is_err() { + return; + } + + while let Some(x) = body.frame().await { + sender.send(x).unwrap(); + } + })); + + let parts = recv.await.map_err(|_| SpawnError {})??; + + Ok(Response::from_parts( + parts, + HttpResponseBody::new(SpawnBody { + stream: receiver, + _worker: handle, + }), + )) + } +} + +/// A wrapper around a [`JoinHandle`] that aborts on drop +struct SpawnHandle(JoinHandle<()>); +impl Drop for SpawnHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +type StreamItem = Result, HttpError>; + +struct SpawnBody { + stream: tokio::sync::mpsc::UnboundedReceiver, + _worker: SpawnHandle, +} + +impl Body for SpawnBody { + type Data = Bytes; + type Error = HttpError; + + fn poll_frame(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_recv(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::mock_server::MockServer; + use crate::client::retry::RetryExt; + use crate::client::HttpClient; + use crate::RetryConfig; + + async fn test_client(client: HttpClient) { + let (send, recv) = tokio::sync::oneshot::channel(); + + let mock = MockServer::new().await; + mock.push(Response::new("BANANAS".to_string())); + + let url = mock.url().to_string(); + let thread = std::thread::spawn(|| { + futures::executor::block_on(async move { + let retry = RetryConfig::default(); + let ret = client.get(url).send_retry(&retry).await.unwrap(); + let payload = ret.into_body().bytes().await.unwrap(); + assert_eq!(payload.as_ref(), b"BANANAS"); + let _ = send.send(()); + }) + }); + recv.await.unwrap(); + thread.join().unwrap(); + } + + #[tokio::test] + async fn test_spawn() { + let client = HttpClient::new(SpawnService::new(reqwest::Client::new(), Handle::current())); + test_client(client).await; + } + + #[tokio::test] + #[should_panic] + async fn test_no_spawn() { + let client = HttpClient::new(reqwest::Client::new()); + test_client(client).await; + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index bd0347b4311..c829f50b739 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -43,19 +43,11 @@ pub(crate) mod header; #[cfg(any(feature = "aws", feature = "gcp"))] pub(crate) mod s3; -mod body; -pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody}; - pub(crate) mod builder; - -mod connection; -pub(crate) use connection::http_connector; -#[cfg(not(target_arch = "wasm32"))] -pub use connection::ReqwestConnector; -pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService}; - +mod http; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub(crate) mod parts; +pub use http::*; use async_trait::async_trait; use reqwest::header::{HeaderMap, HeaderValue}; diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index 96244aac9b0..dff263797ac 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -19,8 +19,7 @@ use crate::client::backoff::{Backoff, BackoffConfig}; use crate::client::builder::HttpRequestBuilder; -use crate::client::connection::HttpErrorKind; -use crate::client::{HttpClient, HttpError, HttpRequest, HttpResponse}; +use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, HttpResponse}; use crate::PutPayload; use futures::future::BoxFuture; use http::{Method, Uri};