Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default wasm32-unknown-unknown client #7310

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-ut
[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.29.0", features = ["fs"] }

[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
web-time = { version = "1.1.0" }
wasm-bindgen-futures = "0.4.18"

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded"]
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ impl HttpRequestBody {
)),
}
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
pub(crate) fn into_reqwest(self) -> reqwest::Body {
match self.0 {
Inner::Bytes(b) => b.into(),
Inner::PutPayload(_, payload) => Into::<Bytes>::into(payload).into(),
}
}

/// Returns true if this body is empty
pub fn is_empty(&self) -> bool {
Expand Down
61 changes: 56 additions & 5 deletions object_store/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,57 @@ impl HttpService for reqwest::Client {
}
}

#[async_trait]
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();
let url = parts.uri.to_string().parse().unwrap();
let mut req = reqwest::Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());

use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt, TryStreamExt,
};
use http_body_util::{Empty, StreamBody};
use wasm_bindgen_futures::spawn_local;

let (mut tx, rx) = mpsc::channel(1);
let (tx_parts, rx_parts) = oneshot::channel();
let res_fut = self.execute(req);

spawn_local(async move {
match res_fut.await.map_err(HttpError::reqwest) {
Err(err) => {
let _ = tx_parts.send(Err(err));
drop(tx);
}
Ok(res) => {
let (mut parts, _) = http::Response::new(Empty::<()>::new()).into_parts();
parts.headers = res.headers().clone();
parts.status = res.status();
let _ = tx_parts.send(Ok(parts));
let mut stream = res.bytes_stream().map_err(HttpError::reqwest);
while let Some(chunk) = stream.next().await {
tx.send(chunk).await.unwrap();
}
}
}
});

let parts = rx_parts.await.unwrap()?;
let safe_stream = rx.map(|chunk| {
let frame = hyper::body::Frame::data(chunk?);
Ok(frame)
});
let body = HttpResponseBody::new(StreamBody::new(safe_stream));

Ok(HttpResponse::from_parts(parts, body))
}
}

/// A factory for [`HttpClient`]
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// Create a new [`HttpClient`] with the provided [`ClientOptions`]
Expand All @@ -233,32 +284,32 @@ pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub struct ReqwestConnector {}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
}
}

#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", target_os = "wasi"))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Err(crate::Error::NotSupported {
source: "WASM32 architectures must provide an HTTPConnector"
source: "WASI architectures must provide an HTTPConnector"
.to_string()
.into(),
}),
}
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
Expand Down
18 changes: 17 additions & 1 deletion object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) mod builder;

mod connection;
pub(crate) use connection::http_connector;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))]
pub use connection::ReqwestConnector;
pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService};

Expand Down Expand Up @@ -718,6 +718,22 @@ impl ClientOptions {
.build()
.map_err(map_client_error)
}

#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
pub(crate) fn client(&self) -> Result<reqwest::Client> {
let mut builder = reqwest::ClientBuilder::new();

match &self.user_agent {
Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
None => builder = builder.user_agent(DEFAULT_USER_AGENT),
}

if let Some(headers) = &self.default_headers {
builder = builder.default_headers(headers.clone())
}

builder.build().map_err(map_client_error)
}
}

pub(crate) trait GetOptionsExt {
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use futures::future::BoxFuture;
use http::{Method, Uri};
use reqwest::header::LOCATION;
use reqwest::StatusCode;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use std::time::{Duration, Instant};
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
use web_time::{Duration, Instant};

use tracing::info;

/// Retry request error
Expand Down
Loading