Skip to content

Commit

Permalink
feat: support streaming http responses
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Aug 2, 2024
1 parent 1681844 commit 355e305
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes = "1.6.0"
cacache = "13"
clap = { version = "4", features = ["derive"] }
fjall = "1"
futures = "0.3.30"
http = "1.1.0"
http-body-util = "0.1"
http-serde = "2.1.1"
Expand Down
148 changes: 123 additions & 25 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use serde::{Deserialize, Serialize};

use tokio::io::AsyncWriteExt;

// needed to convert async-std Async to a tokio Async
use tokio_stream::StreamExt;
use tokio_stream::{Stream, StreamExt};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::io::ReaderStream;
Expand Down Expand Up @@ -122,8 +121,7 @@ async fn handle(
)
.await;

let (hash, meta) = wait_for_response(&store, frame.id).await.unwrap();
let hash = hash.unwrap();
let (meta, hashes) = wait_for_response(&store, frame.id).await.unwrap();

let res = hyper::Response::builder();
let mut res = res.status(meta.status.unwrap_or(200));
Expand All @@ -143,18 +141,8 @@ async fn handle(
}
}

let reader = store.cas_reader(hash).await?;
// convert reader from async-std -> tokio
let reader = reader.compat();
let stream = ReaderStream::new(reader);

let stream = stream.map(|frame| {
let frame = frame.unwrap();
Ok(hyper::body::Frame::data(frame))
});

let stream = transform_hash_stream(store.clone(), hashes).await;
let body = StreamBody::new(stream).boxed();

Ok(res.body(body)?)
}

Expand Down Expand Up @@ -192,29 +180,139 @@ pub async fn serve(
}
}

use tokio_stream::wrappers::ReceiverStream;

async fn wait_for_response(
store: &Store,
frame_id: Scru128Id,
) -> Result<(Option<ssri::Integrity>, ResponseMeta), &str> {
let mut recver = store
) -> Result<(ResponseMeta, impl Stream<Item = ssri::Integrity>), &'static str> {
let recver = store
.read(ReadOptions {
follow: FollowOption::On,
tail: false,
last_id: Some(frame_id),
})
.await;

while let Some(frame) = recver.recv().await {
if frame.topic == "http.response" {
if let Some(meta) = frame.meta {
if let Ok(res) = serde_json::from_value::<ResponseMeta>(meta) {
if res.request_id == frame_id {
return Ok((frame.hash, res));
}
let mut stream = ReceiverStream::new(recver)
.filter(|frame| frame.topic == "http.response")
.filter_map(move |frame| {
frame.meta.and_then(|meta| {
serde_json::from_value::<ResponseMeta>(meta.clone())
.ok()
.and_then(|res| {
if res.request_id == frame_id {
Some((frame.hash, res))
} else {
None
}
})
})
});

if let Some((first_hash, meta)) = stream.next().await {
let hash_stream = tokio_stream::once((first_hash, meta.clone()))
.chain(stream)
.take_while_inclusive(|(_, meta)| meta.more.unwrap_or(false))
.filter_map(|(hash, _)| hash);
Ok((meta, hash_stream))
} else {
Err("timeout")
}
}

type ResultFrame = Result<hyper::body::Frame<bytes::Bytes>, Box<dyn Error + Send + Sync>>;

async fn transform_hash_stream(
store: Store,
hash_stream: impl futures::Stream<Item = ssri::Integrity>,
) -> impl futures::Stream<Item = ResultFrame> {
let mapped_stream = hash_stream.then(move |hash| {
let store = store.clone();
async move {
match store.cas_reader(hash).await {
Ok(reader) => {
let reader = reader.compat();
let stream = ReaderStream::new(reader);
Ok::<_, Box<dyn Error + Send + Sync>>(futures::StreamExt::map(
stream,
|frame| {
let frame = frame.unwrap();
Ok(hyper::body::Frame::data(frame))
},
))
}
Err(e) => Err(Box::new(e) as Box<dyn Error + Send + Sync>),
}
}
});

futures::stream::TryStreamExt::try_flatten(mapped_stream)
}

use std::pin::Pin;
use std::task::{Context, Poll};

pub struct TakeWhileInclusive<St, F> {
stream: St,
predicate: F,
done: bool,
}

impl<St, F> TakeWhileInclusive<St, F>
where
St: Stream,
F: FnMut(&St::Item) -> bool,
{
pub fn new(stream: St, predicate: F) -> Self {
Self {
stream,
predicate,
done: false,
}
}
}

impl<St, F> Stream for TakeWhileInclusive<St, F>
where
St: Stream,
F: FnMut(&St::Item) -> bool,
{
type Item = St::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

let this = unsafe { self.as_mut().get_unchecked_mut() };
let stream = unsafe { Pin::new_unchecked(&mut this.stream) };

match stream.poll_next(cx) {
Poll::Ready(Some(item)) => {
let keep = (this.predicate)(&item);
if !keep {
this.done = true;
}
Poll::Ready(Some(item))
}
other => other,
}
}

Err("event stream ended")
fn size_hint(&self) -> (usize, Option<usize>) {
let (_, upper) = self.stream.size_hint();
(0, upper)
}
}

pub trait TakeWhileInclusiveExt: Stream + Sized {
fn take_while_inclusive<F>(self, predicate: F) -> TakeWhileInclusive<Self, F>
where
F: FnMut(&Self::Item) -> bool,
{
TakeWhileInclusive::new(self, predicate)
}
}

impl<T: Stream> TakeWhileInclusiveExt for T {}

0 comments on commit 355e305

Please sign in to comment.