Skip to content

Commit

Permalink
Add --start-cycle and --end-cycle parameters
Browse files Browse the repository at this point in the history
Fixes #55
  • Loading branch information
pkolaczk committed Aug 3, 2024
1 parent 0589df7 commit 7e1dae9
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 22 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ pub struct RunCommand {
)]
pub run_duration: Interval,

/// The initial value of the cycle counter.
///
/// Normally the cycle counter starts from 0, but you can start from a different value.
/// This is particularly useful for splitting the workload into
/// parts executed from different client nodes.
#[clap(long, default_value = "0")]
pub start_cycle: i64,

/// The maximum value of the cycle counter at which the cycle counter wraps-around back
/// to the start value.
#[clap(long, default_value = "9223372036854775807")]
pub end_cycle: i64,

/// Number of worker threads used by the driver.
#[clap(short('t'), long, default_value = "1", value_name = "COUNT")]
pub threads: NonZeroUsize,
Expand Down
25 changes: 18 additions & 7 deletions src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,49 +57,60 @@ impl CycleCounter {
/// Decides when to stop the benchmark execution.
pub struct BoundedCycleCounter {
pub duration: config::Interval,
cycle_start: i64,
cycle_range_size: u64,
start_time: Instant,
cycle_counter: CycleCounter,
}

impl BoundedCycleCounter {
/// Creates a new counter based on configured benchmark duration.
/// For time-based deadline, the clock starts ticking when this object is created.
pub fn new(duration: config::Interval) -> Self {
pub fn new(duration: config::Interval, cycle_range: (i64, i64)) -> Self {
BoundedCycleCounter {
duration,
start_time: Instant::now(),
cycle_counter: CycleCounter::new(0),
cycle_start: cycle_range.0,
cycle_range_size: cycle_range.1.saturating_sub(cycle_range.0) as u64,
}
}

/// Returns the next cycle number or `None` if deadline or cycle count was exceeded.
pub fn next(&mut self) -> Option<u64> {
pub fn next(&mut self) -> Option<i64> {
match self.duration {
Interval::Count(count) => {
let result = self.cycle_counter.next();
if result < count {
Some(result)
Some(self.cycle_number(result))
} else {
None
}
}
Interval::Time(duration) => {
if Instant::now() < self.start_time + duration {
Some(self.cycle_counter.next())
let result = self.cycle_counter.next();
Some(self.cycle_number(result))
} else {
None
}
}
Interval::Unbounded => Some(self.cycle_counter.next()),
Interval::Unbounded => {
let result = self.cycle_counter.next();
Some(self.cycle_number(result))
}
}
}

fn cycle_number(&mut self, result: u64) -> i64 {
self.cycle_start + (result % self.cycle_range_size) as i64
}

/// Shares this counter e.g. with another thread.
pub fn share(&self) -> Self {
BoundedCycleCounter {
start_time: self.start_time,
duration: self.duration,
cycle_counter: self.cycle_counter.share(),
..*self
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub enum LatteError {

#[error(display = "Failed to launch external editor {}: {}", _0, _1)]
ExternalEditorLaunch(String, std::io::Error),

#[error(display = "Invalid configuration: {}", _0)]
Configuration(String),
}

impl LatteError {}

pub type Result<T> = std::result::Result<T, LatteError>;
11 changes: 10 additions & 1 deletion src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ where
pub struct ExecutionOptions {
/// How long to execute
pub duration: Interval,
/// Range of the cycle counter
pub cycle_range: (i64, i64),
/// Maximum rate of requests in requests per second, `None` means no limit
pub rate: Option<f64>,
/// Number of parallel threads of execution
Expand All @@ -176,6 +178,13 @@ pub async fn par_execute(
workload: Workload,
show_progress: bool,
) -> Result<BenchmarkStats> {
if exec_options.cycle_range.1 <= exec_options.cycle_range.0 {
return Err(LatteError::Configuration(format!(
"End cycle {} must not be lower than start cycle {}",
exec_options.cycle_range.1, exec_options.cycle_range.0
)));
}

let thread_count = exec_options.threads.get();
let concurrency = exec_options.concurrency;
let rate = exec_options.rate;
Expand All @@ -189,7 +198,7 @@ pub async fn par_execute(
..Default::default()
};
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedCycleCounter::new(exec_options.duration);
let deadline = BoundedCycleCounter::new(exec_options.duration, exec_options.cycle_range);
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency);

Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ async fn load(conf: LoadCommand) -> Result<()> {
let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN));
let load_options = ExecutionOptions {
duration: config::Interval::Count(load_count),
cycle_range: (0, i64::MAX),
rate: conf.rate,
threads: conf.threads,
concurrency: conf.concurrency,
Expand Down Expand Up @@ -238,6 +239,7 @@ async fn run(conf: RunCommand) -> Result<()> {
eprintln!("info: Warming up...");
let warmup_options = ExecutionOptions {
duration: conf.warmup_duration,
cycle_range: (conf.start_cycle, conf.end_cycle),
rate: None,
threads: conf.threads,
concurrency: conf.concurrency,
Expand All @@ -264,6 +266,7 @@ async fn run(conf: RunCommand) -> Result<()> {

let exec_options = ExecutionOptions {
duration: conf.run_duration,
cycle_range: (conf.start_cycle, conf.end_cycle),
concurrency: conf.concurrency,
rate: conf.rate,
threads: conf.threads,
Expand Down
4 changes: 2 additions & 2 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,12 @@ impl Workload {
/// This should be idempotent –
/// the generated action should be a function of the iteration number.
/// Returns the cycle number and the end time of the query.
pub async fn run(&self, cycle: u64) -> Result<(u64, Instant), LatteError> {
pub async fn run(&self, cycle: i64) -> Result<(i64, Instant), LatteError> {
let start_time = Instant::now();
let context = SessionRef::new(&self.context);
let result = self
.program
.async_call(&self.function, (context, cycle as i64))
.async_call(&self.function, (context, cycle))
.await;
let end_time = Instant::now();
let mut state = self.state.try_lock().unwrap();
Expand Down

0 comments on commit 7e1dae9

Please sign in to comment.