Skip to content

Commit

Permalink
chore: max_concurrency for jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts authored and vindard committed Nov 26, 2024
1 parent d938e12 commit fad17d4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 12 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions lib/job/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@ pub struct JobExecutorConfig {
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(default = "default_poll_interval")]
pub poll_interval: Duration,
#[serde(default = "default_max_jobs_per_process")]
pub max_jobs_per_process: usize,
#[serde(default = "default_min_jobs_per_process")]
pub min_jobs_per_process: usize,
}

impl Default for JobExecutorConfig {
fn default() -> Self {
Self {
poll_interval: default_poll_interval(),
max_jobs_per_process: default_max_jobs_per_process(),
min_jobs_per_process: default_min_jobs_per_process(),
}
}
}

fn default_poll_interval() -> Duration {
Duration::from_secs(5)
}

fn default_max_jobs_per_process() -> usize {
20
}

fn default_min_jobs_per_process() -> usize {
10
}
33 changes: 23 additions & 10 deletions lib/job/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ impl JobExecutor {

pub async fn start_poll(&mut self) -> Result<(), JobError> {
let poll_interval = self.config.poll_interval;
let max_concurrency = self.config.max_jobs_per_process;
let min_concurrency = self.config.min_jobs_per_process;
let pg_interval = PgInterval::try_from(poll_interval * 4)
.map_err(|e| JobError::InvalidPollInterval(e.to_string()))?;
let running_jobs = Arc::clone(&self.running_jobs);
let registry = Arc::clone(&self.registry);
let jobs = self.jobs.clone();
let handle = tokio::spawn(async move {
let poll_limit = 2;
let mut keep_alive = false;
loop {
let _ = Self::poll_jobs(
&registry,
&mut keep_alive,
poll_limit,
max_concurrency,
min_concurrency,
pg_interval.clone(),
&running_jobs,
&jobs,
Expand All @@ -99,25 +101,27 @@ impl JobExecutor {
level = "trace",
name = "job_executor.poll_jobs",
skip(registry, running_jobs, jobs),
fields(n_jobs_to_spawn, n_jobs_running),
fields(n_jobs_to_spawn, n_jobs_running, n_jobs_to_poll),
err
)]
async fn poll_jobs(
registry: &Arc<RwLock<JobRegistry>>,
keep_alive: &mut bool,
poll_limit: u32,
max_concurrency: usize,
min_concurrency: usize,
pg_interval: PgInterval,
running_jobs: &Arc<RwLock<HashMap<JobId, JobHandle>>>,
jobs: &JobRepo,
) -> Result<(), JobError> {
let span = Span::current();
span.record("keep_alive", *keep_alive);
{
let now = crate::time::now();
let n_jobs_running = {
let running_jobs = running_jobs.read().await;
span.record("n_jobs_running", running_jobs.len());
let n_jobs_running = running_jobs.len();
span.record("n_jobs_running", n_jobs_running);
if *keep_alive {
let ids = running_jobs.keys().cloned().collect::<Vec<_>>();
let now = crate::time::now();
sqlx::query!(
r#"
UPDATE job_executions
Expand All @@ -143,8 +147,16 @@ impl JobExecutor {
.fetch_all(jobs.pool())
.await?;
}
}
n_jobs_running
};
*keep_alive = !*keep_alive;

if n_jobs_running > min_concurrency {
return Ok(());
}
let n_jobs_to_poll = max_concurrency - n_jobs_running;
span.record("n_jobs_to_poll", n_jobs_to_poll);

let rows = sqlx::query!(
r#"
WITH selected_jobs AS (
Expand All @@ -153,6 +165,7 @@ impl JobExecutor {
JOIN jobs ON je.id = jobs.id
WHERE reschedule_after < $2::timestamptz
AND je.state = 'pending'
ORDER BY reschedule_after ASC
LIMIT $1
FOR UPDATE
)
Expand All @@ -162,8 +175,8 @@ impl JobExecutor {
WHERE je.id = selected_jobs.id
RETURNING je.id AS "id!: JobId", selected_jobs.data_json, je.attempt_index
"#,
poll_limit as i32,
crate::time::now(),
n_jobs_to_poll as i32,
now,
pg_interval
)
.fetch_all(jobs.pool())
Expand Down

0 comments on commit fad17d4

Please sign in to comment.