From a9f577d5f91f6618096e398861bde2d884eff999 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Thu, 15 Aug 2024 13:03:25 -0400 Subject: [PATCH] wip: feat(wasm): allow streaming incoming body Signed-off-by: Brooks Townsend io::copy instead of streams Signed-off-by: Brooks Townsend --- examples/wasm_component/Cargo.toml | 2 +- examples/wasm_component/src/lib.rs | 32 ++++-------------- src/wasm/component/client/future.rs | 14 +++++--- src/wasm/component/response.rs | 52 ++++++++++------------------- 4 files changed, 34 insertions(+), 66 deletions(-) diff --git a/examples/wasm_component/Cargo.toml b/examples/wasm_component/Cargo.toml index 117cb0a66..426633529 100644 --- a/examples/wasm_component/Cargo.toml +++ b/examples/wasm_component/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] [dependencies] futures = "0.3.30" reqwest = { version = "0.12.4", path = "../../", features = ["stream"] } -wasi = "0.13.2" +wasi = "=0.13.1" # For compatibility, pin to wasi@0.2.0 bindings [profile.release] # Optimize for small code size diff --git a/examples/wasm_component/src/lib.rs b/examples/wasm_component/src/lib.rs index 8c013d88e..d87d6e1f8 100644 --- a/examples/wasm_component/src/lib.rs +++ b/examples/wasm_component/src/lib.rs @@ -1,6 +1,5 @@ -use wasi::{ - http::types::{Fields, IncomingRequest, OutgoingBody, OutgoingResponse, ResponseOutparam}, - io::streams::{InputStream, OutputStream, StreamError}, +use wasi::http::types::{ + Fields, IncomingRequest, OutgoingBody, OutgoingResponse, ResponseOutparam, }; #[allow(unused)] @@ -15,14 +14,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent { .expect("should be able to get response body"); ResponseOutparam::set(response_out, Ok(response)); - let response = + let mut response = futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send()) .expect("should get response bytes"); - let incoming_body = response.bytes_stream().expect("should get incoming body"); - let stream = incoming_body.stream().expect("should get bytes stream"); - stream_input_to_output( - stream, - response_body + std::io::copy( + &mut response.bytes_stream().expect("should get incoming body"), + &mut response_body .write() .expect("should be able to write to response body"), ) @@ -32,21 +29,4 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent { } } -pub fn stream_input_to_output(data: InputStream, out: OutputStream) -> Result<(), StreamError> { - loop { - match out.blocking_splice(&data, u64::MAX) { - Ok(bytes_spliced) if bytes_spliced == 0 => return Ok(()), - Ok(_) => {} - Err(e) => match e { - StreamError::Closed => { - return Ok(()); - } - StreamError::LastOperationFailed(e) => { - return Err(StreamError::LastOperationFailed(e)); - } - }, - } - } -} - wasi::http::proxy::export!(ReqwestComponent); diff --git a/src/wasm/component/client/future.rs b/src/wasm/component/client/future.rs index 5dfb0d3e6..8fb67ee02 100644 --- a/src/wasm/component/client/future.rs +++ b/src/wasm/component/client/future.rs @@ -5,11 +5,11 @@ use std::{ use futures_core::Future; use wasi::{ - self, - http::{ - outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, - types::{OutgoingBody, OutputStream}, - }, + self, + http::{ + outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, + types::{OutgoingBody, OutputStream}, + }, }; use crate::{Body, Request, Response}; @@ -69,6 +69,10 @@ impl Future for ResponseFuture { }, RequestState::Response(future) => { if !future.subscribe().ready() { + // NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that + // the future is ready to be polled again. Sleeping for a nanosecond appears to + // allow the future to be polled again without causing a busy loop. + std::thread::sleep(std::time::Duration::from_nanos(1)); cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/src/wasm/component/response.rs b/src/wasm/component/response.rs index 07534a470..7cb47392c 100644 --- a/src/wasm/component/response.rs +++ b/src/wasm/component/response.rs @@ -13,6 +13,8 @@ pub struct Response { // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, + // The incoming body must be persisted if streaming to keep the stream open + incoming_body: Option, } impl Response { @@ -23,6 +25,7 @@ impl Response { Response { http: res, url: Box::new(url), + incoming_body: None, } } @@ -83,21 +86,9 @@ impl Response { /// Get the response text. pub async fn text(self) -> crate::Result { - // let p = self - // .http - // .body() - // .text() - // .map_err(crate::error::wasm) - // .map_err(crate::error::decode)?; - // let js_val = super::promise::(p) - // .await - // .map_err(crate::error::decode)?; - // if let Some(s) = js_val.as_string() { - // Ok(s) - // } else { - // Err(crate::error::decode("response.text isn't string")) - // } - Ok("str_resp".to_string()) + self.bytes() + .await + .map(|s| String::from_utf8(s.to_vec()).map_err(crate::error::decode))? } /// Get the response as bytes @@ -121,27 +112,20 @@ impl Response { Ok(body.into()) } - /// Convert the response into a `Stream` of `Bytes` from the body. + /// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can + /// then be used to stream the body. #[cfg(feature = "stream")] - pub fn bytes_stream(self) -> impl futures_core::Stream> { - let web_response = self.http.into_body(); - let abort = self._abort; - let body = web_response + pub fn bytes_stream(&mut self) -> crate::Result { + let body = self + .http .body() - .expect("could not create wasm byte stream"); - let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); - Box::pin(body.into_stream().map(move |buf_js| { - // Keep the abort guard alive as long as this stream is. - let _abort = &abort; - let buffer = Uint8Array::new( - &buf_js - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?, - ); - let mut bytes = vec![0; buffer.length() as usize]; - buffer.copy_to(&mut bytes); - Ok(bytes.into()) - })) + .consume() + .map_err(|_| crate::error::decode("failed to consume response body"))?; + let stream = body + .stream() + .map_err(|_| crate::error::decode("failed to stream response body")); + self.incoming_body = Some(body); + stream } // util methods