Skip to content

Commit

Permalink
wip: feat(wasm): allow streaming incoming body
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Aug 15, 2024
1 parent b494dff commit 093c1cb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 43 deletions.
8 changes: 4 additions & 4 deletions examples/wasm_component/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent {
.expect("should be able to get response body");
ResponseOutparam::set(response_out, Ok(response));

let response =
let mut response =
futures::executor::block_on(reqwest::Client::new().get("https://hyper.rs").send())
.expect("should get response bytes");
let incoming_body = response.bytes_stream().expect("should get incoming body");
let stream = incoming_body.stream().expect("should get bytes stream");
let body_stream = response.bytes_stream().expect("should get incoming body");
stream_input_to_output(
stream,
body_stream,
response_body
.write()
.expect("should be able to write to response body"),
Expand All @@ -32,6 +31,7 @@ impl wasi::exports::http::incoming_handler::Guest for ReqwestComponent {
}
}

/// Helper function to stream all data from an input stream to an output stream.
pub fn stream_input_to_output(data: InputStream, out: OutputStream) -> Result<(), StreamError> {
loop {
match out.blocking_splice(&data, u64::MAX) {
Expand Down
14 changes: 9 additions & 5 deletions src/wasm/component/client/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{

use futures_core::Future;
use wasi::{
self,
http::{
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
types::{OutgoingBody, OutputStream},
},
self,
http::{
outgoing_handler::{FutureIncomingResponse, OutgoingRequest},
types::{OutgoingBody, OutputStream},
},
};

use crate::{Body, Request, Response};
Expand Down Expand Up @@ -69,6 +69,10 @@ impl Future for ResponseFuture {
},
RequestState::Response(future) => {
if !future.subscribe().ready() {
// NOTE(brooksmtownsend): We shouldn't be waking here since we don't know that
// the future is ready to be polled again. Sleeping for a nanosecond appears to
// allow the future to be polled again without causing a busy loop.
std::thread::sleep(std::time::Duration::from_nanos(1));
cx.waker().wake_by_ref();
return Poll::Pending;
}
Expand Down
52 changes: 18 additions & 34 deletions src/wasm/component/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct Response {
// Boxed to save space (11 words to 1 word), and it's not accessed
// frequently internally.
url: Box<Url>,
// The incoming body must be persisted if streaming to keep the stream open
incoming_body: Option<wasi::http::types::IncomingBody>,
}

impl Response {
Expand All @@ -23,6 +25,7 @@ impl Response {
Response {
http: res,
url: Box::new(url),
incoming_body: None,
}
}

Expand Down Expand Up @@ -83,21 +86,9 @@ impl Response {

/// Get the response text.
pub async fn text(self) -> crate::Result<String> {
// let p = self
// .http
// .body()
// .text()
// .map_err(crate::error::wasm)
// .map_err(crate::error::decode)?;
// let js_val = super::promise::<wasm_bindgen::JsValue>(p)
// .await
// .map_err(crate::error::decode)?;
// if let Some(s) = js_val.as_string() {
// Ok(s)
// } else {
// Err(crate::error::decode("response.text isn't string"))
// }
Ok("str_resp".to_string())
self.bytes()
.await
.map(|s| String::from_utf8(s.to_vec()).map_err(crate::error::decode))?
}

/// Get the response as bytes
Expand All @@ -121,27 +112,20 @@ impl Response {
Ok(body.into())
}

/// Convert the response into a `Stream` of `Bytes` from the body.
/// Convert the response into a [`wasi::http::types::IncomingBody`] resource which can
/// then be used to stream the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
let web_response = self.http.into_body();
let abort = self._abort;
let body = web_response
pub fn bytes_stream(&mut self) -> crate::Result<wasi::io::streams::InputStream> {
let body = self
.http
.body()
.expect("could not create wasm byte stream");
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive as long as this stream is.
let _abort = &abort;
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
.consume()
.map_err(|_| crate::error::decode("failed to consume response body"))?;
let stream = body
.stream()
.map_err(|_| crate::error::decode("failed to stream response body"));
self.incoming_body = Some(body);
stream
}

// util methods
Expand Down

0 comments on commit 093c1cb

Please sign in to comment.