Skip to content

Commit

Permalink
fix streaming feature
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 27, 2023
1 parent d5efcd3 commit 34a0d78
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion src/async_impl/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,33 @@ impl Response {
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
self.res.into_body()
use std::task::{Context, Poll};

struct DataStream(Decoder);

impl futures_core::Stream for DataStream {
type Item = crate::Result<Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use hyper::body::Body as _;
loop {
return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
Some(Ok(frame)) => {
// skip non-data frames
if let Ok(buf) = frame.into_data() {
Poll::Ready(Some(Ok(buf)))
} else {
continue;
}
},
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
}
}
}

DataStream(self.res.into_body())
}

// util methods
Expand Down

0 comments on commit 34a0d78

Please sign in to comment.