Skip to content

Commit

Permalink
fix: run continuous jobs immediately [DHIS2-15276]
Browse files Browse the repository at this point in the history
  • Loading branch information
jbee committed Nov 8, 2023
1 parent c5086fe commit 8278d6c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public void executeNow(@Nonnull String jobId) throws NotFoundException, Conflict
if (job == null) throw new NotFoundException(JobConfiguration.class, jobId);
// run "execute now" request directly when scheduling is not active (tests)
jobRunner.runDueJob(job);
} else {
JobConfiguration job = jobConfigurationStore.getByUid(jobId);
if (job == null) throw new NotFoundException(JobConfiguration.class, jobId);
if (job.getJobType().isUsingContinuousExecution()) {
jobRunner.runIfDue(job);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,27 @@
*/
public interface JobRunner {

/**
* During testing the scheduler might not be active in which case this is false. Otherwise, this
* should always be true in a production environment.
*
* @return true, if the scheduler is running a scheduling loop cycle, otherwise false
*/
boolean isScheduling();

/**
* Runs a job if it should now run according to its {@link SchedulingType} and related information
* like the CRON expression or the delay time.
*
* @param config the job to check and potentially run
*/
void runIfDue(JobConfiguration config);

/**
* Manually runs a job. OBS! This bypasses any actual checking if the job is due to run. When this
* is called the job will run.
*
* @param config The job to run.
*/
void runDueJob(JobConfiguration config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,27 @@ private void createHousekeepingJob() {
}
}

@Override
public void runIfDue(JobConfiguration job) {
runIfDue(Instant.now().truncatedTo(ChronoUnit.SECONDS), job.getJobType(), List.of(job));
}

private void runIfDue(Instant now, JobType type, List<JobConfiguration> jobs) {
if (!type.isUsingContinuousExecution()) {
runIfDue(now, jobs.get(0));
return;
}
Queue<String> jobIds =
continuousJobsByType.computeIfAbsent(type, key -> new ConcurrentLinkedQueue<>());
// add a worker either if no worker is on it (empty new queue) or if there are many jobs
boolean spawnWorker = jobIds.isEmpty();
Queue<String> jobIds = continuousJobsByType.get(type);
boolean spawnWorker = false;
if (jobIds == null) {
Queue<String> localQueue = new ConcurrentLinkedQueue<>();
Queue<String> sharedQueue = continuousJobsByType.putIfAbsent(type, localQueue);
spawnWorker = sharedQueue == null; // no previous queue => this thread put the queue
jobIds = continuousJobsByType.get(type);
}
// add those IDs to the queue that are not yet in it
jobs.stream()
.map(JobConfiguration::getUid)
.filter(jobId -> !jobIds.contains(jobId))
.forEach(jobIds::add);
jobs.stream().map(JobConfiguration::getUid).forEach(jobIds::add);

if (spawnWorker) {
// we want to prevent starting more than one worker per job type
// but if this does happen it is no issue as both will be pulling
Expand Down

0 comments on commit 8278d6c

Please sign in to comment.