Skip to content

Commit

Permalink
Improved sampling
Browse files Browse the repository at this point in the history
Previously switching to the next sample could only
happen at the moment of completing a cycle. If the db
didn't respond for a longer period of time, the samples
collected were of unequal sizes or weren't recorded at all.

Now the timer for sampling (chunking) ticks independently
of the workload progress.

(cherry picked from commit 5a62794)
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 29, 2024
1 parent 467a47d commit aaacee4
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ metrohash = "1.0"
num_cpus = "1.13.0"
openssl = "0.10.38"
parse_duration = "2.1.1"
pin-project = "1.1"
plotters = "0.3.4"
plotters-svg = "0.3.3"
rand = "0.8"
Expand Down
200 changes: 200 additions & 0 deletions src/chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use futures::stream::{Fuse, Skip};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::interval;
use tokio::time::{Duration, MissedTickBehavior};
use tokio_stream::wrappers::IntervalStream;

pub trait ChunksExt: Stream {
/// Splits the stream into chunks delimited by time or by number of items.
///
/// When polled, it collects the items from the original stream into the current chunk
/// until the desired number of items is collected or until the period of time passes.
/// Then it emits the chunk and sets a new one as the current one and the cycle repeats.
/// Can emit an empty chunk if no items from the original stream were ready before the
/// period of time elapses.
///
/// # Parameters
/// - `count`: maximum number of items added to a chunk
/// - `period`: maximum amount of time a chunk can be kept before releasing it
/// - `new_chunk`: a function to create an empty chunk
/// - `accumulate`: a function to add original stream items to the current chunk
fn chunks_aggregated<Chunk, NewChunkFn, AccumulateFn>(
self,
count: u64,
period: Duration,
new_chunk: NewChunkFn,
accumulate: AccumulateFn,
) -> ChunksAggregated<Self, Chunk, NewChunkFn, AccumulateFn>
where
Self: Sized,
NewChunkFn: Fn() -> Chunk,
AccumulateFn: Fn(&mut Chunk, Self::Item),
{
ChunksAggregated::new(self, count, period, new_chunk, accumulate)
}
}

impl<S: Stream> ChunksExt for S {}

#[pin_project]
pub struct ChunksAggregated<Src, Chunk, NewChunkFn, AddFn> {
#[pin]
src: Fuse<Src>,
new_chunk: NewChunkFn,
accumulate: AddFn,
max_chunk_size: u64,
#[pin]
clock: Clock,
current_chunk: Option<Chunk>,
current_chunk_size: u64,
}

#[pin_project(project = ClockProj)]
enum Clock {
Some(#[pin] Skip<IntervalStream>),
None,
}

impl<Src, Item, Chunk, NewChunkFn, AccumulateFn>
ChunksAggregated<Src, Chunk, NewChunkFn, AccumulateFn>
where
Src: Stream<Item = Item>,
NewChunkFn: Fn() -> Chunk,
AccumulateFn: Fn(&mut Chunk, Item),
{
pub fn new(
src: Src,
max_chunk_size: u64,
period: Duration,
new_chunk: NewChunkFn,
accumulate: AccumulateFn,
) -> Self {
let clock = if period < Duration::MAX {
let mut interval = interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
Clock::Some(IntervalStream::new(interval).skip(1))
} else {
Clock::None
};

let current_chunk = Some(new_chunk());

Self {
new_chunk,
accumulate,
src: src.fuse(),
max_chunk_size,
clock,
current_chunk,
current_chunk_size: 0,
}
}

fn next_batch(self: Pin<&mut Self>) -> Option<Chunk> {
let this = self.project();
*this.current_chunk_size = 0;
this.current_chunk.replace((this.new_chunk)())
}

fn final_batch(self: Pin<&mut Self>) -> Option<Chunk> {
let this = self.project();
*this.current_chunk_size = 0;
this.current_chunk.take()
}
}

impl<Src, Item, Chunk, NewChunkFn, AddFn> Stream for ChunksAggregated<Src, Chunk, NewChunkFn, AddFn>
where
Item: Debug,
Src: Stream<Item = Item>,
NewChunkFn: Fn() -> Chunk,
AddFn: Fn(&mut Chunk, Item),
{
type Item = Chunk;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

// Add all ready items in the source stream to the current chunk:
while this.current_chunk_size < this.max_chunk_size {
match this.src.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
*this.current_chunk_size += 1;
let chunk = this.current_chunk.as_mut().expect("chunk must be set");
(this.accumulate)(chunk, item);
}
Poll::Ready(None) => {
return Poll::Ready(self.final_batch());
}
Poll::Pending => {
// No more items, but we can't leave yet, we need to check the clock
// at the end of the loop
break;
}
}
}
let deadline_reached = match this.clock.as_mut().project() {
ClockProj::Some(clock) => clock.poll_next(cx).is_ready(),
ClockProj::None => false,
};

if deadline_reached || this.current_chunk_size >= this.max_chunk_size {
Poll::Ready(self.next_batch())
} else {
Poll::Pending
}
}
}

#[cfg(test)]
mod test {
use crate::chunks::{ChunksAggregated, ChunksExt};
use futures::{stream, FutureExt, StreamExt};
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[tokio::test]
async fn test_empty() {
let s = stream::empty::<u64>();
let batched = ChunksAggregated::new(s, 2, Duration::from_secs(100), Vec::new, Vec::push);
let results: Vec<_> = batched.collect().await;
assert_eq!(results, vec![vec![0; 0]]);
}

#[tokio::test]
async fn test_count() {
let s = stream::iter(vec![1, 2, 3, 4, 5]);
let batched = ChunksAggregated::new(s, 2, Duration::from_secs(100), Vec::new, Vec::push);
let results: Vec<_> = batched.collect().await;
assert_eq!(results, vec![vec![1, 2], vec![3, 4], vec![5]]);
}

#[tokio::test]
async fn test_period() {
tokio::time::pause();

let s = IntervalStream::new(interval(Duration::from_secs(1)))
.enumerate()
.map(|x| x.0)
.skip(1)
.take(5);
let mut batched =
s.chunks_aggregated(u64::MAX, Duration::from_secs(2), Vec::new, Vec::push);
assert!(batched.next().now_or_never().is_none());
tokio::time::advance(Duration::from_secs(1)).await;
assert!(batched.next().now_or_never().is_none());
tokio::time::advance(Duration::from_secs(1)).await;
assert_eq!(batched.next().await, Some(vec![1, 2]));
tokio::time::advance(Duration::from_secs(1)).await;
assert!(batched.next().now_or_never().is_none());
tokio::time::advance(Duration::from_secs(1)).await;
assert_eq!(batched.next().await, Some(vec![3, 4]));
tokio::time::advance(Duration::from_secs(1)).await;
assert_eq!(batched.next().await, Some(vec![5]));
}
}
98 changes: 85 additions & 13 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{SinkExt, Stream, StreamExt};
use itertools::Itertools;
use pin_project::pin_project;
use status_line::StatusLine;
use std::cmp::max;
use std::future::ready;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio_stream::wrappers::IntervalStream;

use crate::chunks::ChunksExt;
use crate::error::{LatteError, Result};
use crate::{
BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Sampler, Workload,
WorkloadStats,
BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Workload, WorkloadStats,
};

/// Returns a stream emitting `rate` events per second.
Expand Down Expand Up @@ -49,27 +52,38 @@ async fn run_stream<T>(
workload.reset(Instant::now());

let mut iter_counter = cycle_counter;
let mut sampler = Sampler::new(iter_counter.duration, sampling, &workload, &mut out);

let (sample_size, sample_duration) = match sampling {
Interval::Count(cnt) => (cnt, tokio::time::Duration::MAX),
Interval::Time(duration) => (u64::MAX, duration),
Interval::Unbounded => (u64::MAX, tokio::time::Duration::MAX),
};

let mut result_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
.map(|i| tokio::task::unconstrained(workload.run(i.unwrap())))
.buffer_unordered(concurrency.get())
.inspect(|_| progress.tick());

while let Some(res) = result_stream.next().await {
match res {
Ok((iter, end_time)) => sampler.cycle_completed(iter, end_time).await,
Err(e) => {
out.send(Err(e)).await.unwrap();
return;
.inspect(|_| progress.tick())
.terminate_after_error()
.chunks_aggregated(sample_size, sample_duration, Vec::new, |errors, result| {
if let Err(e) = result {
errors.push(e)
}
})
.map(|errors| (workload.take_stats(Instant::now()), errors));

while let Some((stats, errors)) = result_stream.next().await {
if out.send(Ok(stats)).await.is_err() {
break;
}
for err in errors {
if out.send(Err(err)).await.is_err() {
break;
}
}
}
// Send the statistics of remaining requests
sampler.finish().await;
}

/// Launches a new worker task that runs a series of invocations of the workload function.
Expand Down Expand Up @@ -217,3 +231,61 @@ pub async fn par_execute(
}
}
}

trait TerminateAfterErrorExt: Stream + Sized {
/// Terminates the stream immediately after returning the first error.
fn terminate_after_error(self) -> TerminateAfterError<Self>;
}

impl<S, Item, E> TerminateAfterErrorExt for S
where
S: Stream<Item = std::result::Result<Item, E>>,
{
fn terminate_after_error(self) -> TerminateAfterError<Self> {
TerminateAfterError {
stream: self,
error: false,
}
}
}

#[pin_project]
struct TerminateAfterError<S: Stream> {
#[pin]
stream: S,
error: bool,
}

impl<S, Item, E> Stream for TerminateAfterError<S>
where
S: Stream<Item = std::result::Result<Item, E>>,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.error {
return Poll::Ready(None);
}
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(Err(e))) => {
*this.error = true;
Poll::Ready(Some(Err(e)))
}
other => other,
}
}
}

#[cfg(test)]
mod test {
use crate::exec::TerminateAfterErrorExt;
use futures::stream;
use futures::StreamExt;

#[tokio::test]
async fn test_terminate() {
let s = stream::iter(vec![Ok(1), Ok(2), Err(3), Ok(4), Err(5)]).terminate_after_error();
assert_eq!(s.collect::<Vec<_>>().await, vec![Ok(1), Ok(2), Err(3)])
}
}
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ use crate::exec::{par_execute, ExecutionOptions};
use crate::plot::plot_graph;
use crate::progress::Progress;
use crate::report::{PathAndSummary, Report, RunConfigCmp};
use crate::sampler::Sampler;
use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder};
use crate::table::{Alignment, Table};
use crate::workload::{FnRef, Program, Workload, WorkloadStats, LOAD_FN};

mod chunks;
mod config;
mod context;
mod cycle;
Expand All @@ -45,7 +45,6 @@ mod histogram;
mod plot;
mod progress;
mod report;
mod sampler;
mod stats;
mod table;
mod workload;
Expand Down
Loading

0 comments on commit aaacee4

Please sign in to comment.