Skip to content

Commit

Permalink
Avoid running all suites at the same time at scheduling start
Browse files Browse the repository at this point in the history
This could lead to a high system load. Instead, we start as soon as the
current timestamp is fully divisible by the execution interval.
  • Loading branch information
jherbel committed Dec 7, 2023
1 parent 53300ef commit 2a57121
Showing 1 changed file with 45 additions and 3 deletions.
48 changes: 45 additions & 3 deletions v2/robotmk/src/bin/scheduler/scheduling/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use crate::internal_config::{GlobalConfig, Suite};
use crate::logging::log_and_return_error;

use anyhow::{bail, Result};
use chrono::Utc;
use log::error;
use std::time::Duration;
use tokio::task::{spawn_blocking, JoinSet};
use tokio::time::interval;
use tokio::time::{interval_at, Instant};
use tokio_util::sync::CancellationToken;

#[tokio::main]
Expand Down Expand Up @@ -35,7 +36,17 @@ pub async fn run_suites_and_cleanup(global_config: &GlobalConfig, suites: &[Suit
}

async fn run_suite_scheduler(suite: Suite) {
let mut clock = interval(Duration::from_secs(suite.execution_interval_seconds));
// It is debatable whether MissedTickBehavior::Burst (the default) is correct. In practice, as
// long as timeout * number of attempts is shorter than the execution interval, it shouldn't
// make a difference anyway. However, in case we consider changing this, note that using
// `MissedTickBehavior::Delay` leads to a strange sort of lag on Windows (as if we added ~10 ms
// to the scheduling interval). See also:
// https://www.reddit.com/r/rust/comments/13yymkh/weird_tokiotimeinterval_tick_behavior/
// https://github.com/tokio-rs/tokio/issues/5021
let mut clock = interval_at(
compute_start_time(suite.execution_interval_seconds),
Duration::from_secs(suite.execution_interval_seconds),
);
loop {
let suite = suite.clone();
tokio::select! {
Expand All @@ -47,7 +58,7 @@ async fn run_suite_scheduler(suite: Suite) {
}

async fn run_cleanup_job(cancellation_token: CancellationToken, suites: Vec<Suite>) {
let mut clock = interval(Duration::from_secs(300));
let mut clock = interval_at(compute_start_time(300), Duration::from_secs(300));
loop {
let suites = suites.clone();
tokio::select! {
Expand All @@ -57,3 +68,34 @@ async fn run_cleanup_job(cancellation_token: CancellationToken, suites: Vec<Suit
spawn_blocking(move || cleanup_working_directories(suites.iter()));
}
}

fn compute_start_time(execution_interval_secs: u64) -> Instant {
let now = Instant::now();
now.checked_add(Duration::from_millis(compute_start_time_offset_millis(
Utc::now().timestamp_millis() as u64,
execution_interval_secs * 1000,
)))
.unwrap_or(now)
}

fn compute_start_time_offset_millis(now_millis: u64, execution_interval_millis: u64) -> u64 {
execution_interval_millis - now_millis % execution_interval_millis
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_compute_start_time_offset_millis() {
let now_millis: u64 = 1701942935796;
let five_min_interval_millis = 5 * 60 * 1000;
let expected_offset = 264204;
assert_eq!(
compute_start_time_offset_millis(now_millis, five_min_interval_millis),
expected_offset
);
assert_eq!((now_millis + expected_offset) % five_min_interval_millis, 0);
assert!(expected_offset <= five_min_interval_millis);
}
}

0 comments on commit 2a57121

Please sign in to comment.