Skip to content

Commit

Permalink
Use stream buffering in report collector
Browse files Browse the repository at this point in the history
Continuation of #1501, we want to avoid submitting reports one by one from RC to each individual shard. That likely leads to fragmentation and we've been observing slow execution on the client side.

This sets up the buffer size to be divisible by TCP MSS, but I don't have any real evidence that this is going to work well. We would need to experiment with it
  • Loading branch information
akoshelev committed Dec 17, 2024
1 parent f148eac commit d1cbb58
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 28 deletions.
10 changes: 7 additions & 3 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
fs::{File, OpenOptions},
io,
io::{stdout, BufReader, Write},
num::NonZeroUsize,
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -15,8 +16,8 @@ use ipa_core::{
cli::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission, StreamingSubmission,
run_query_and_validate, validate, validate_dp, BufferedRoundRobinSubmission,
HybridQueryResult, InputSource, StreamingSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
Expand Down Expand Up @@ -430,6 +431,9 @@ async fn hybrid(
count: usize,
set_fixed_polling_ms: Option<u64>,
) -> Result<(), Box<dyn Error>> {
// twice the size of TCP MSS. This may get messed up if TCP options are used which is not
// in our control, but hopefully fragmentation is not too bad
const BUF_SIZE: NonZeroUsize = NonZeroUsize::new(1072).unwrap();
let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

let [h1_streams, h2_streams, h3_streams] = [
Expand All @@ -439,7 +443,7 @@ async fn hybrid(
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
BufferedRoundRobinSubmission::new(BufReader::new(file), BUF_SIZE)

Check warning on line 446 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L446

Added line #L446 was not covered by tests
})
.map(|s| s.into_byte_streams(args.shard_count));

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
streaming::{BufferedRoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
155 changes: 144 additions & 11 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io::BufRead,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll, Waker},
};
Expand All @@ -9,7 +10,7 @@ use futures::Stream;

use crate::{
error::BoxError,
helpers::BytesStream,
helpers::{BufferedBytesStream, BytesStream},
sync::{Arc, Mutex},
};

Expand All @@ -20,6 +21,34 @@ pub trait StreamingSubmission {
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}

/// Same as [`RoundRobinSubmission`] but buffers the destination stream
/// until it accumulates at least `buf_size` bytes of data
pub struct BufferedRoundRobinSubmission<R> {
inner: R,
buf_size: NonZeroUsize,
}

impl<R: BufRead> BufferedRoundRobinSubmission<R> {
/// Creates a new instance with the specified buffer size. All streams created
/// using [`StreamingSubmission::into_byte_streams`] will have their own buffer set.
pub fn new(read_from: R, buf_size: NonZeroUsize) -> Self {
Self {
inner: read_from,
buf_size,
}
}
}

impl<R: BufRead + Send> StreamingSubmission for BufferedRoundRobinSubmission<R> {
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream> {
RoundRobinSubmission::new(self.inner)
.into_byte_streams(count)
.into_iter()
.map(|s| BufferedBytesStream::new(s, self.buf_size))
.collect()
}
}

/// Round-Robin strategy to read off the provided buffer
/// and distribute them. Inputs is expected to be hex-encoded
/// and delimited by newlines. The output streams will have
Expand Down Expand Up @@ -149,6 +178,7 @@ impl<R: BufRead> State<R> {
#[cfg(all(test, unit_test))]
mod tests {
use std::{
collections::HashSet,
fs::File,
io::{BufReader, Write},
iter,
Expand All @@ -159,24 +189,98 @@ mod tests {
use tempfile::TempDir;

use crate::{
cli::playbook::streaming::{RoundRobinSubmission, StreamingSubmission},
cli::playbook::streaming::{
BufferedRoundRobinSubmission, RoundRobinSubmission, StreamingSubmission,
},
helpers::BytesStream,
test_executor::run,
};

async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
async fn drain_all_buffered<S: BytesStream>(
streams: Vec<S>,
buf_size: Option<usize>,
) -> Vec<Vec<u8>> {
let mut futs = FuturesOrdered::default();
for s in streams {
futs.push_back(s.try_fold(String::new(), |mut acc, chunk| async move {
// remove RLE decoding
let len = usize::from(u16::from_le_bytes(chunk[..2].try_into().unwrap()));
assert_eq!(len, chunk.len() - 2);
acc.push_str(&String::from_utf8_lossy(&chunk[2..]));
Ok(acc)
}));
futs.push_back(s.try_fold(
(Vec::new(), HashSet::new(), 0, 0),
|(mut acc, mut sizes, mut leftover, mut pending_len), mut chunk| async move {
// keep track of chunk sizes we've seen from the stream. Only the last chunk
// can have size that is not equal to `buf_size`
sizes.insert(chunk.len());

// if we have a leftover from previous buffer, push it first
if leftover > 0 {
let next_chunk = std::cmp::min(leftover, chunk.len());
leftover -= next_chunk;
acc.extend(&chunk.split_to(next_chunk));
}

while !chunk.is_empty() {
// remove RLE decoding
let len = if pending_len > 0 {
// len (2 byte value) can be fragmented as well
let next_byte =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
let r = u16::from_le_bytes([pending_len, next_byte]);
pending_len = 0;
r
} else if chunk.len() > 1 {
let len =
u16::from_le_bytes(chunk.split_to(2).as_ref().try_into().unwrap());
len
} else {
pending_len =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
assert!(chunk.is_empty());
break;
};

let len = usize::from(len);

// the next item may span across multiple buffers
let take_len = if len > chunk.len() {
leftover = len - chunk.len();
chunk.len()
} else {
len
};
acc.extend(&chunk.split_to(take_len));
}

Ok((acc, sizes, leftover, pending_len))
},
));
}
futs.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|(s, sizes, leftover, pending_len)| {
assert_eq!(0, leftover);
assert_eq!(0, pending_len);

// We can have only one chunk that can be at or less than `buf_size`.
// If there are multiple chunks, then at least one must have `buf_size` and there
// can be at most two chunks.
if let Some(buf_size) = buf_size {
assert!(sizes.len() <= 2);
if sizes.len() > 1 {
assert!(sizes.contains(&buf_size));
}
}

s
})
.collect()
}

futs.try_collect::<Vec<_>>().await.unwrap()
async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
drain_all_buffered(streams, None)
.await
.into_iter()
.map(|v| String::from_utf8_lossy(&v).to_string())
.collect()
}

fn encoded<I: IntoIterator<Item: AsRef<[u8]>>>(input: I) -> Vec<String> {
Expand All @@ -188,6 +292,12 @@ mod tests {
run(|| verify_one(vec!["foo", "bar", "baz", "qux", "quux"], 3));
}

#[test]
fn basic_buffered() {
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 1, 1));
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 3, 5));
}

#[test]
#[should_panic(expected = "InvalidHexCharacter")]
fn non_hex() {
Expand Down Expand Up @@ -272,12 +382,35 @@ mod tests {
assert_eq!(expected, drain_all(streams).await);
}

/// The reason we work with bytes is that string character may span multiple bytes,
/// making [`String::from_utf8`] method work incorrectly as it is not commutative with
/// buffering.
async fn verify_buffered<R: AsRef<[u8]>>(input: Vec<R>, count: usize, buf_size: usize) {
assert!(count > 0);
let data = encoded(input.iter().map(AsRef::as_ref)).join("\n");
let streams =
BufferedRoundRobinSubmission::new(data.as_bytes(), buf_size.try_into().unwrap())
.into_byte_streams(count);
let mut expected: Vec<Vec<u8>> = vec![vec![]; count];
for (i, next) in input.into_iter().enumerate() {
expected[i % count].extend(next.as_ref());
}
assert_eq!(expected, drain_all_buffered(streams, Some(buf_size)).await);
}

proptest! {
#[test]
fn proptest_round_robin(input: Vec<String>, count in 1_usize..953) {
run(move || async move {
verify_one(input, count).await;
});
}

#[test]
fn proptest_round_robin_buffered(input: Vec<Vec<u8>>, count in 1_usize..953, buf_size in 1_usize..1024) {
run(move || async move {
verify_buffered(input, count, buf_size).await;
});
}
}
}
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ pub use transport::{
InMemoryTransportError,
};
pub use transport::{
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BytesStream,
HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity, LengthDelimitedStream,
LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords,
RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding, StreamCollection,
StreamKey, Transport, WrappedBoxBodyStream,
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BufferedBytesStream,
BytesStream, HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity,
LengthDelimitedStream, LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding,
ReceiveRecords, RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding,
StreamCollection, StreamKey, Transport, WrappedBoxBodyStream,
};
use typenum::{Const, ToUInt, Unsigned, U8};
use x25519_dalek::PublicKey;
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use receive::{LogErrors, ReceiveRecords};
#[cfg(feature = "web-app")]
pub use stream::WrappedAxumBodyStream;
pub use stream::{
BodyStream, BytesStream, LengthDelimitedStream, RecordsStream, SingleRecordStream,
StreamCollection, StreamKey, WrappedBoxBodyStream,
BodyStream, BufferedBytesStream, BytesStream, LengthDelimitedStream, RecordsStream,
SingleRecordStream, StreamCollection, StreamKey, WrappedBoxBodyStream,
};

/// An identity of a peer that can be communicated with using [`Transport`]. There are currently two
Expand Down
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/transport/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use bytes::Bytes;
use futures::Stream;
use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;

use crate::helpers::BytesStream;
Expand All @@ -19,7 +19,7 @@ use crate::helpers::BytesStream;
pub struct BufferedBytesStream<S> {
/// Inner stream to poll
#[pin]
inner: S,
inner: Fuse<S>,
/// Buffer of bytes pending release
buffer: Vec<u8>,
/// Number of bytes released per single poll.
Expand All @@ -28,10 +28,10 @@ pub struct BufferedBytesStream<S> {
sz: usize,
}

impl<S> BufferedBytesStream<S> {
fn new(inner: S, buf_size: NonZeroUsize) -> Self {
impl<S: BytesStream> BufferedBytesStream<S> {
pub fn new(inner: S, buf_size: NonZeroUsize) -> Self {
Self {
inner,
inner: inner.fuse(),
buffer: Vec::with_capacity(buf_size.get()),
sz: buf_size.get(),
}
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(feature = "web-app")]
mod axum_body;
mod box_body;
#[allow(dead_code)]
mod buffered;
mod collection;
mod input;
Expand All @@ -14,6 +13,7 @@ use std::{
#[cfg(feature = "web-app")]
pub use axum_body::WrappedAxumBodyStream;
pub use box_body::WrappedBoxBodyStream;
pub use buffered::BufferedBytesStream;
use bytes::Bytes;
pub use collection::{StreamCollection, StreamKey};
use futures::{stream::iter, Stream};
Expand Down

0 comments on commit d1cbb58

Please sign in to comment.