Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add support for realtime #1054

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bats/lana-sim-time.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ app:
time:
realtime: false
sim_time:
start_at: "2021-01-01T00:00:00Z"
start_at: "2024-01-01T00:00:00Z"
tick_interval_ms: 1
tick_duration_secs: 1000
transform_to_realtime: false

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions lib/job/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@ pub struct JobExecutorConfig {
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(default = "default_poll_interval")]
pub poll_interval: Duration,
#[serde(default = "default_max_jobs_per_process")]
pub max_jobs_per_process: usize,
#[serde(default = "default_min_jobs_per_process")]
pub min_jobs_per_process: usize,
}

impl Default for JobExecutorConfig {
fn default() -> Self {
Self {
poll_interval: default_poll_interval(),
max_jobs_per_process: default_max_jobs_per_process(),
min_jobs_per_process: default_min_jobs_per_process(),
}
}
}

fn default_poll_interval() -> Duration {
Duration::from_secs(5)
}

fn default_max_jobs_per_process() -> usize {
20
}

fn default_min_jobs_per_process() -> usize {
10
}
33 changes: 23 additions & 10 deletions lib/job/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ impl JobExecutor {

pub async fn start_poll(&mut self) -> Result<(), JobError> {
let poll_interval = self.config.poll_interval;
let max_concurrency = self.config.max_jobs_per_process;
let min_concurrency = self.config.min_jobs_per_process;
let pg_interval = PgInterval::try_from(poll_interval * 4)
.map_err(|e| JobError::InvalidPollInterval(e.to_string()))?;
let running_jobs = Arc::clone(&self.running_jobs);
let registry = Arc::clone(&self.registry);
let jobs = self.jobs.clone();
let handle = tokio::spawn(async move {
let poll_limit = 2;
let mut keep_alive = false;
loop {
let _ = Self::poll_jobs(
&registry,
&mut keep_alive,
poll_limit,
max_concurrency,
min_concurrency,
pg_interval.clone(),
&running_jobs,
&jobs,
Expand All @@ -99,25 +101,27 @@ impl JobExecutor {
level = "trace",
name = "job_executor.poll_jobs",
skip(registry, running_jobs, jobs),
fields(n_jobs_to_spawn, n_jobs_running),
fields(n_jobs_to_spawn, n_jobs_running, n_jobs_to_poll),
err
)]
async fn poll_jobs(
registry: &Arc<RwLock<JobRegistry>>,
keep_alive: &mut bool,
poll_limit: u32,
max_concurrency: usize,
min_concurrency: usize,
pg_interval: PgInterval,
running_jobs: &Arc<RwLock<HashMap<JobId, JobHandle>>>,
jobs: &JobRepo,
) -> Result<(), JobError> {
let span = Span::current();
span.record("keep_alive", *keep_alive);
{
let now = crate::time::now();
let n_jobs_running = {
let running_jobs = running_jobs.read().await;
span.record("n_jobs_running", running_jobs.len());
let n_jobs_running = running_jobs.len();
span.record("n_jobs_running", n_jobs_running);
if *keep_alive {
let ids = running_jobs.keys().cloned().collect::<Vec<_>>();
let now = crate::time::now();
sqlx::query!(
r#"
UPDATE job_executions
Expand All @@ -143,8 +147,16 @@ impl JobExecutor {
.fetch_all(jobs.pool())
.await?;
}
}
n_jobs_running
};
*keep_alive = !*keep_alive;

if n_jobs_running > min_concurrency {
return Ok(());
}
let n_jobs_to_poll = max_concurrency - n_jobs_running;
span.record("n_jobs_to_poll", n_jobs_to_poll);

let rows = sqlx::query!(
r#"
WITH selected_jobs AS (
Expand All @@ -153,6 +165,7 @@ impl JobExecutor {
JOIN jobs ON je.id = jobs.id
WHERE reschedule_after < $2::timestamptz
AND je.state = 'pending'
ORDER BY reschedule_after ASC
LIMIT $1
FOR UPDATE
)
Expand All @@ -162,8 +175,8 @@ impl JobExecutor {
WHERE je.id = selected_jobs.id
RETURNING je.id AS "id!: JobId", selected_jobs.data_json, je.attempt_index
"#,
poll_limit as i32,
crate::time::now(),
n_jobs_to_poll as i32,
now,
pg_interval
)
.fetch_all(jobs.pool())
Expand Down
2 changes: 2 additions & 0 deletions lib/sim-time/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ pub struct SimTimeConfig {
pub(super) tick_interval_ms: u64,
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub(super) tick_duration_secs: std::time::Duration,
#[serde(default)]
pub(super) transform_to_realtime: bool,
}
45 changes: 43 additions & 2 deletions lib/sim-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ impl Time {
.as_ref()
.expect("sim_time required when realtime is false");
let elapsed_ms = self.elapsed_ms.load(Ordering::Relaxed);
sim_config.start_at + chrono::Duration::milliseconds(elapsed_ms as i64)

let simulated_time =
sim_config.start_at + chrono::Duration::milliseconds(elapsed_ms as i64);

if sim_config.transform_to_realtime && simulated_time >= Utc::now() {
Utc::now()
} else {
simulated_time
}
}
}

Expand All @@ -80,7 +88,14 @@ impl Time {
.as_ref()
.expect("sim_time required when realtime is false");

// Calculate how many real milliseconds we need to wait based on the simulation speed
let current_time = self.now();
let real_now = Utc::now();

if sim_config.transform_to_realtime && current_time >= real_now {
tokio::time::sleep(duration).await;
return;
}

let sim_ms_per_real_ms = sim_config.tick_duration_secs.as_millis() as f64
/ sim_config.tick_interval_ms as f64;

Expand All @@ -89,6 +104,31 @@ impl Time {
tokio::time::sleep(Duration::from_millis(real_ms)).await
}
}

pub async fn wait_until_realtime(&self) {
if self.config.realtime {
return;
}

let current = self.now();
let real_now = Utc::now();

if current >= real_now {
return;
}

let wait_duration =
std::time::Duration::from_millis((real_now - current).num_milliseconds() as u64);

self.sleep(wait_duration).await;
}
}

pub async fn wait_until_realtime() {
INSTANCE
.get_or_init(|| Time::new(TimeConfig::default()))
.wait_until_realtime()
.await
}

pub fn init(config: TimeConfig) {
Expand Down Expand Up @@ -123,6 +163,7 @@ mod tests {
start_at: Utc::now(),
tick_interval_ms: 10,
tick_duration_secs: StdDuration::from_secs(10 * 24 * 60 * 60), // 10 days in seconds
transform_to_realtime: false,
}),
};

Expand Down
Loading