Skip to content

Commit

Permalink
Return immediately if no-one is receiving
Browse files Browse the repository at this point in the history
(cherry picked from commit ed2c220)
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 29, 2024
1 parent cbe243e commit 94dca5f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
10 changes: 9 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,15 @@ impl Interval {
}
}

pub fn seconds(&self) -> Option<f32> {
pub fn period(&self) -> Option<tokio::time::Duration> {
if let Interval::Time(d) = self {
Some(*d)
} else {
None
}
}

pub fn period_secs(&self) -> Option<f32> {
if let Interval::Time(d) = self {
Some(d.as_secs_f32())
} else {
Expand Down
19 changes: 7 additions & 12 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,11 @@ async fn run_stream<T>(
progress: Arc<StatusLine<Progress>>,
mut out: Sender<Result<WorkloadStats>>,
) {
workload.reset(Instant::now());

let mut iter_counter = cycle_counter;
let sample_size = sampling.count().unwrap_or(u64::MAX);
let sample_duration = sampling.period().unwrap_or(tokio::time::Duration::MAX);

let (sample_size, sample_duration) = match sampling {
Interval::Count(cnt) => (cnt, tokio::time::Duration::MAX),
Interval::Time(duration) => (u64::MAX, duration),
Interval::Unbounded => (u64::MAX, tokio::time::Duration::MAX),
};

let mut result_stream = stream
let mut stats_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
Expand All @@ -77,13 +71,14 @@ async fn run_stream<T>(
})
.map(|errors| (workload.take_stats(Instant::now()), errors));

while let Some((stats, errors)) = result_stream.next().await {
workload.reset(Instant::now());
while let Some((stats, errors)) = stats_stream.next().await {
if out.send(Ok(stats)).await.is_err() {
break;
return;
}
for err in errors {
if out.send(Err(err)).await.is_err() {
break;
return;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,19 +547,19 @@ impl<'a> Display for RunConfigCmp<'a> {
}),
self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate)),
self.line("Warmup", "s", |conf| {
Quantity::from(conf.warmup_duration.seconds())
Quantity::from(conf.warmup_duration.period_secs())
}),
self.line("└─", "op", |conf| {
Quantity::from(conf.warmup_duration.count())
}),
self.line("Run time", "s", |conf| {
Quantity::from(conf.run_duration.seconds()).with_precision(1)
Quantity::from(conf.run_duration.period_secs()).with_precision(1)
}),
self.line("└─", "op", |conf| {
Quantity::from(conf.run_duration.count())
}),
self.line("Sampling", "s", |conf| {
Quantity::from(conf.sampling_interval.seconds()).with_precision(1)
Quantity::from(conf.sampling_interval.period_secs()).with_precision(1)
}),
self.line("└─", "op", |conf| {
Quantity::from(conf.sampling_interval.count())
Expand Down

0 comments on commit 94dca5f

Please sign in to comment.