From 093c1cb8719236016e83dbfe79e1f5b55c5235f9 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Thu, 15 Aug 2024 13:03:25 -0400 Subject: [PATCH] wip: feat(wasm): allow streaming incoming body Signed-off-by: Brooks Townsend --- examples/wasm_component/src/lib.rs | 8 ++--- src/wasm/component/client/future.rs | 14 +++++--- src/wasm/component/response.rs | 52 ++++++++++------------------- 3 files changed, 31 insertions(+), 43 deletions(-) diff --git a/examples/wasm_component/src/lib.rs b/examples/wasm_component/src/lib.rs index 8c013d88e..d329423c9 100644 --- a/examples/wasm_component/src/lib.rs +++ b/examples/wasm_component/src/lib.rs @@ -15,13 +15,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent { .expect("should be able to get response body"); ResponseOutparam::set(response_out, Ok(response)); - let response = + let mut response = futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send()) .expect("should get response bytes"); - let incoming_body = response.bytes_stream().expect("should get incoming body"); - let stream = incoming_body.stream().expect("should get bytes stream"); + let body_stream = response.bytes_stream().expect("should get incoming body"); stream_input_to_output( - stream, + body_stream, response_body .write() .expect("should be able to write to response body"), @@ -32,6 +31,7 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent { } } +/// Helper function to stream all data from an input stream to an output stream. pub fn stream_input_to_output(data: InputStream, out: OutputStream) -> Result<(), StreamError> { loop { match out.blocking_splice(&data, u64::MAX) { diff --git a/src/wasm/component/client/future.rs b/src/wasm/component/client/future.rs index 5dfb0d3e6..8fb67ee02 100644 --- a/src/wasm/component/client/future.rs +++ b/src/wasm/component/client/future.rs @@ -5,11 +5,11 @@ use std::{ use futures_core::Future; use wasi::{ - self, - http::{ - outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, - types::{OutgoingBody, OutputStream}, - }, + self, + http::{ + outgoing_handler::{FutureIncomingResponse, OutgoingRequest}, + types::{OutgoingBody, OutputStream}, + }, }; use crate::{Body, Request, Response}; @@ -69,6 +69,10 @@ impl Future for ResponseFuture { }, RequestState::Response(future) => { if !future.subscribe().ready() { + // NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that + // the future is ready to be polled again. Sleeping for a nanosecond appears to + // allow the future to be polled again without causing a busy loop. + std::thread::sleep(std::time::Duration::from_nanos(1)); cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/src/wasm/component/response.rs b/src/wasm/component/response.rs index 07534a470..7cb47392c 100644 --- a/src/wasm/component/response.rs +++ b/src/wasm/component/response.rs @@ -13,6 +13,8 @@ pub struct Response { // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, + // The incoming body must be persisted if streaming to keep the stream open + incoming_body: Option, } impl Response { @@ -23,6 +25,7 @@ impl Response { Response { http: res, url: Box::new(url), + incoming_body: None, } } @@ -83,21 +86,9 @@ impl Response { /// Get the response text. pub async fn text(self) -> crate::Result { - // let p = self - // .http - // .body() - // .text() - // .map_err(crate::error::wasm) - // .map_err(crate::error::decode)?; - // let js_val = super::promise::(p) - // .await - // .map_err(crate::error::decode)?; - // if let Some(s) = js_val.as_string() { - // Ok(s) - // } else { - // Err(crate::error::decode("response.text isn't string")) - // } - Ok("str_resp".to_string()) + self.bytes() + .await + .map(|s| String::from_utf8(s.to_vec()).map_err(crate::error::decode))? } /// Get the response as bytes @@ -121,27 +112,20 @@ impl Response { Ok(body.into()) } - /// Convert the response into a `Stream` of `Bytes` from the body. + /// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can + /// then be used to stream the body. #[cfg(feature = "stream")] - pub fn bytes_stream(self) -> impl futures_core::Stream> { - let web_response = self.http.into_body(); - let abort = self._abort; - let body = web_response + pub fn bytes_stream(&mut self) -> crate::Result { + let body = self + .http .body() - .expect("could not create wasm byte stream"); - let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); - Box::pin(body.into_stream().map(move |buf_js| { - // Keep the abort guard alive as long as this stream is. - let _abort = &abort; - let buffer = Uint8Array::new( - &buf_js - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?, - ); - let mut bytes = vec![0; buffer.length() as usize]; - buffer.copy_to(&mut bytes); - Ok(bytes.into()) - })) + .consume() + .map_err(|_| crate::error::decode("failed to consume response body"))?; + let stream = body + .stream() + .map_err(|_| crate::error::decode("failed to stream response body")); + self.incoming_body = Some(body); + stream } // util methods