-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a buffered wrapper around BytesStream #1501
Merged
akoshelev
merged 3 commits into
private-attribution:main
from
akoshelev:rc-round-robin-buffered
Dec 17, 2024
+313
−0
Merged
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,313 @@ | ||
use std::{ | ||
mem, | ||
num::NonZeroUsize, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use bytes::Bytes; | ||
use futures::Stream; | ||
use pin_project::pin_project; | ||
|
||
use crate::helpers::BytesStream; | ||
|
||
/// An adaptor to buffer items coming from the upstream | ||
/// [`BytesStream`](BytesStream) until the buffer is full, or the upstream is | ||
/// done. This may need to be used when writing into HTTP streams as Hyper | ||
/// does not provide any buffering functionality and we turn NODELAY on | ||
#[pin_project] | ||
pub struct BufferedBytesStream<S> { | ||
/// Inner stream to poll | ||
#[pin] | ||
inner: S, | ||
/// Buffer of bytes pending release | ||
buffer: Vec<u8>, | ||
/// Number of bytes released per single poll. | ||
/// All items except the last one are guaranteed to have | ||
/// exactly this number of bytes written to them. | ||
sz: usize, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: chunk_size? |
||
} | ||
|
||
impl<S> BufferedBytesStream<S> { | ||
fn new(inner: S, buf_size: NonZeroUsize) -> Self { | ||
Self { | ||
inner, | ||
buffer: Vec::with_capacity(buf_size.get()), | ||
sz: buf_size.get(), | ||
} | ||
} | ||
} | ||
|
||
impl<S: BytesStream> Stream for BufferedBytesStream<S> { | ||
type Item = S::Item; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
fn take_next(buf: &mut Vec<u8>) -> Vec<u8> { | ||
mem::replace(buf, Vec::with_capacity(buf.len())) | ||
} | ||
|
||
let mut this = self.as_mut().project(); | ||
loop { | ||
// If we are at capacity, return what we have | ||
if this.buffer.len() >= *this.sz { | ||
// if we have more than we need in the buffer, split it | ||
// otherwise, return the whole buffer to the reader | ||
let next = if this.buffer.len() > *this.sz { | ||
this.buffer.drain(..*this.sz).collect() | ||
} else { | ||
take_next(this.buffer) | ||
}; | ||
break Poll::Ready(Some(Ok(Bytes::from(next)))); | ||
} | ||
|
||
match this.inner.as_mut().poll_next(cx) { | ||
Poll::Ready(Some(item)) => { | ||
// Received next portion of data, buffer it | ||
match item { | ||
Ok(bytes) => { | ||
this.buffer.extend(bytes); | ||
} | ||
Err(e) => { | ||
break Poll::Ready(Some(Err(e))); | ||
} | ||
} | ||
} | ||
Poll::Ready(None) => { | ||
// yield what we have because the upstream is done | ||
let next = if this.buffer.is_empty() { | ||
None | ||
} else { | ||
Some(Ok(Bytes::from(take_next(this.buffer)))) | ||
}; | ||
|
||
break Poll::Ready(next); | ||
} | ||
Poll::Pending => { | ||
// we don't have enough data in the buffer (otherwise we wouldn't be here) | ||
break Poll::Pending; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg(all(test, unit_test))] | ||
mod tests { | ||
use std::{ | ||
cmp::min, | ||
mem, | ||
num::NonZeroUsize, | ||
pin::Pin, | ||
sync::{Arc, Mutex}, | ||
task::Poll, | ||
}; | ||
|
||
use bytes::Bytes; | ||
use futures::{stream::TryStreamExt, FutureExt, Stream, StreamExt}; | ||
use proptest::{ | ||
prop_compose, proptest, | ||
strategy::{Just, Strategy}, | ||
}; | ||
|
||
use crate::{ | ||
error::BoxError, helpers::transport::stream::buffered::BufferedBytesStream, | ||
test_executor::run, | ||
}; | ||
|
||
#[test] | ||
fn success() { | ||
run(|| async move { | ||
verify_success(infallible_stream(11, 2), 3).await; | ||
verify_success(infallible_stream(12, 3), 3).await; | ||
verify_success(infallible_stream(12, 5), 12).await; | ||
verify_success(infallible_stream(12, 12), 12).await; | ||
verify_success(infallible_stream(24, 12), 12).await; | ||
verify_success(infallible_stream(24, 12), 1).await; | ||
}); | ||
} | ||
|
||
#[test] | ||
fn fails_on_first_error() { | ||
run(|| async move { | ||
let stream = fallible_stream(12, 3, 5); | ||
let mut buffered = BufferedBytesStream::new(stream, NonZeroUsize::try_from(2).unwrap()); | ||
let mut buf = Vec::new(); | ||
while let Some(next) = buffered.next().await { | ||
match next { | ||
Ok(bytes) => { | ||
assert_eq!(2, bytes.len()); | ||
buf.extend(bytes); | ||
} | ||
Err(_) => { | ||
break; | ||
} | ||
} | ||
} | ||
|
||
// we could only receive 2 bytes from the stream and here is why. | ||
// first read puts 3 bytes into the buffer and we take 2 bytes off it. | ||
// second read does not have sufficient bytes in the buffer, and we need | ||
// to read from the stream again. Next read results in an error and we | ||
// return it immediately | ||
assert_eq!(2, buf.len()); | ||
}); | ||
} | ||
|
||
#[test] | ||
fn pending() { | ||
let status = Arc::new(Mutex::new(vec![1, 2])); | ||
let stream = futures::stream::poll_fn({ | ||
let status = Arc::clone(&status); | ||
move |_cx| { | ||
let mut vec = status.lock().unwrap(); | ||
if vec.is_empty() { | ||
Poll::Pending | ||
} else { | ||
Poll::Ready(Some(Ok(Bytes::from(mem::take(&mut *vec))))) | ||
} | ||
} | ||
}); | ||
|
||
let mut buffered = BufferedBytesStream::new(stream, NonZeroUsize::try_from(4).unwrap()); | ||
let mut fut = std::pin::pin!(buffered.next()); | ||
assert!(fut.as_mut().now_or_never().is_none()); | ||
|
||
status.lock().unwrap().extend([3, 4]); | ||
let actual = fut.now_or_never().flatten().unwrap().unwrap(); | ||
assert_eq!(Bytes::from(vec![1, 2, 3, 4]), actual); | ||
} | ||
|
||
async fn verify_success(input: TestStream, chunk_size: usize) { | ||
let total_size = input.total_size; | ||
assert!(total_size >= chunk_size); | ||
let expected = input.clone(); | ||
let mut buffered = BufferedBytesStream::new(input, chunk_size.try_into().unwrap()); | ||
|
||
let mut last_chunk_size = None; | ||
let mut actual = Vec::new(); | ||
while let Ok(Some(bytes)) = buffered.try_next().await { | ||
assert!(bytes.len() <= chunk_size); | ||
// All chunks except the last one must be exactly of `chunk_size` size. | ||
if let Some(last) = last_chunk_size { | ||
assert_eq!(last, chunk_size); | ||
} | ||
last_chunk_size = Some(bytes.len()); | ||
actual.extend(bytes); | ||
} | ||
|
||
// compare with what the original stream returned | ||
assert_eq!(actual.len(), total_size); | ||
let expected = expected | ||
.try_collect::<Vec<_>>() | ||
.await | ||
.unwrap() | ||
.into_iter() | ||
.flatten() | ||
.collect::<Vec<_>>(); | ||
assert_eq!(expected, actual); | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
struct TestStream { | ||
total_size: usize, | ||
remaining: usize, | ||
chunk: usize, | ||
} | ||
|
||
struct FallibleTestStream { | ||
total_size: usize, | ||
remaining: usize, | ||
chunk: usize, | ||
error_after: usize, | ||
} | ||
|
||
fn infallible_stream(total_size: usize, chunk: usize) -> TestStream { | ||
TestStream { | ||
total_size, | ||
remaining: total_size, | ||
chunk, | ||
} | ||
} | ||
|
||
fn fallible_stream(total_size: usize, chunk: usize, error_after: usize) -> FallibleTestStream { | ||
FallibleTestStream { | ||
total_size, | ||
remaining: total_size, | ||
chunk, | ||
error_after, | ||
} | ||
} | ||
|
||
impl Stream for TestStream { | ||
type Item = Result<Bytes, BoxError>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
if self.remaining == 0 { | ||
return Poll::Ready(None); | ||
} | ||
let next_chunk_size = min(self.remaining, self.chunk); | ||
let next_chunk = (0..next_chunk_size) | ||
.map(|v| u8::try_from(v % 256).unwrap()) | ||
.collect::<Vec<_>>(); | ||
|
||
self.remaining -= next_chunk_size; | ||
Poll::Ready(Some(Ok(Bytes::from(next_chunk)))) | ||
} | ||
} | ||
|
||
impl Stream for FallibleTestStream { | ||
akoshelev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
type Item = Result<Bytes, BoxError>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
if self.remaining == 0 { | ||
return Poll::Ready(None); | ||
} | ||
let next_chunk_size = min(self.remaining, self.chunk); | ||
let next_chunk = (0..next_chunk_size) | ||
.map(|v| u8::try_from(v % 256).unwrap()) | ||
.collect::<Vec<_>>(); | ||
|
||
self.remaining -= next_chunk_size; | ||
if self.total_size - self.remaining >= self.error_after { | ||
Poll::Ready(Some(Err("error".into()))) | ||
} else { | ||
Poll::Ready(Some(Ok(Bytes::from(next_chunk)))) | ||
} | ||
} | ||
} | ||
|
||
prop_compose! { | ||
fn arb_infallible_stream(max_size: u16) | ||
(total_size in 1..max_size) | ||
(total_size in Just(total_size), chunk in 1..total_size) | ||
-> TestStream { | ||
TestStream { | ||
total_size: total_size as usize, | ||
remaining: total_size as usize, | ||
chunk: chunk as usize, | ||
} | ||
} | ||
} | ||
|
||
fn stream_and_chunk() -> impl Strategy<Value = (TestStream, usize)> { | ||
arb_infallible_stream(24231).prop_flat_map(|stream| { | ||
let len = stream.total_size; | ||
(Just(stream), 1..len) | ||
}) | ||
} | ||
|
||
proptest! { | ||
#[test] | ||
fn proptest_success((stream, chunk) in stream_and_chunk()) { | ||
run(move || async move { | ||
verify_success(stream, chunk).await; | ||
}); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few nits
To me
BufferedBytesStream
doesn't tell why this stream is special. I think the key thing is the poll with is being "chunked". I would rather call thisBufferedChunkedStream
why not have the trait bound
S: BytesStream
here as well? Thinking more about documentation/readabilityThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bounds need to be repeated everywhere if put on the struct - generally we try to avoid that if that's not necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not attempt to chunk the inner stream, the whole purpose of it is to accumulate enough bytes (buffer) before sending them down for processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But doesn't the output get chunked? This adapter is buffering and chunking... another name would be paginating.
I still think BufferedBytesStream is vague. My 2 cents.