diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index a9071026f..9ca32acc8 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -317,7 +317,33 @@ impl Response { #[cfg(feature = "stream")] #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn bytes_stream(self) -> impl futures_core::Stream> { - self.res.into_body() + use std::task::{Context, Poll}; + + struct DataStream(Decoder); + + impl futures_core::Stream for DataStream { + type Item = crate::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use hyper::body::Body as _; + loop { + return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) { + Some(Ok(frame)) => { + // skip non-data frames + if let Ok(buf) = frame.into_data() { + Poll::Ready(Some(Ok(buf))) + } else { + continue; + } + }, + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + }; + } + } + } + + DataStream(self.res.into_body()) } // util methods