Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: job scheduler delayed execution window handling and info [DHIS2-15027] #16112

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import javax.annotation.CheckForNull;
Expand Down Expand Up @@ -88,6 +89,14 @@
@ToString
public class JobConfiguration extends BaseIdentifiableObject implements SecondaryMetadataObject {

/**
* A CRON based job may trigger on the same day after it has missed its execution. If time has
* passed past this point the execution is skipped, and it will trigger on the intended execution
* after that. This is the default value for the setting giving a 4h window to succeed with the
* execution for each occurrence.
*/
public static final int MAX_CRON_DELAY_HOURS = 4;

/** The type of job. */
@JsonProperty(required = true)
private JobType jobType;
Expand Down Expand Up @@ -309,10 +318,20 @@ public JobParameters getJobParameters() {
/** Kept for backwards compatibility of the REST API */
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public Date getNextExecutionTime() {
Instant next = nextExecutionTime(Instant.now(), Duration.ofDays(1));
// this is a "best guess" because the 4h max delay could have been changed in the settings
Instant next = nextExecutionTime(Instant.now(), Duration.ofHours(MAX_CRON_DELAY_HOURS));
return next == null ? null : Date.from(next);
}

@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public Date getMaxDelayedExecutionTime() {
Duration maxCronDelay = Duration.ofHours(MAX_CRON_DELAY_HOURS);
Instant nextExecutionTime = nextExecutionTime(Instant.now(), maxCronDelay);
if (nextExecutionTime == null) return null;
Instant instant = maxDelayedExecutionTime(this, maxCronDelay, nextExecutionTime);
return instant == null ? null : Date.from(instant);
}

/** Kept for backwards compatibility of the REST API */
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
public String getLastRuntimeExecution() {
Expand Down Expand Up @@ -375,23 +394,29 @@ public Instant nextExecutionTime(@Nonnull Instant now, @Nonnull Duration maxCron
Instant nextExecutionTime(
@Nonnull ZoneId zone, @Nonnull Instant now, @Nonnull Duration maxCronDelay) {
// for good measure we offset the last time by 1 second
Instant since = lastExecuted == null ? now : lastExecuted.toInstant().plusSeconds(1);
boolean isFirstExecution = lastExecuted == null;
Instant since = isFirstExecution ? now : lastExecuted.toInstant().plusSeconds(1);
if (isUsedInQueue() && getQueuePosition() > 0) return null;
return switch (getSchedulingType()) {
case ONCE_ASAP -> nextOnceExecutionTime(since);
case FIXED_DELAY -> nextDelayExecutionTime(since);
case CRON -> nextCronExecutionTime(zone, since, now, maxCronDelay);
case CRON -> nextCronExecutionTime(
zone, isFirstExecution ? since.minus(maxCronDelay) : since, now, maxCronDelay);
};
}

private Instant nextCronExecutionTime(
@Nonnull ZoneId zone, @Nonnull Instant since, Instant now, @Nonnull Duration maxDelay) {
if (isUndefinedCronExpression(cronExpression)) return null;
SimpleTriggerContext context = new SimpleTriggerContext(Clock.fixed(since, zone));
// we use a no offset zone for the context as we want the given since value to be taken as is
// the zone is not actually used by this is closest to what would be required
ZoneOffset noOffsetZone = ZoneOffset.UTC;
SimpleTriggerContext context = new SimpleTriggerContext(Clock.fixed(since, noOffsetZone));
Date next = new CronTrigger(cronExpression, zone).nextExecutionTime(context);
if (next == null) return null;
while (next != null && now.isAfter(next.toInstant().plus(maxDelay))) {
context = new SimpleTriggerContext(Clock.fixed(next.toInstant().plusSeconds(1), zone));
context =
new SimpleTriggerContext(Clock.fixed(next.toInstant().plusSeconds(1), noOffsetZone));
next = new CronTrigger(cronExpression, zone).nextExecutionTime(context);
}
return next == null ? null : next.toInstant();
Expand All @@ -409,6 +434,14 @@ private Instant nextOnceExecutionTime(@Nonnull Instant since) {
return since;
}

@CheckForNull
public static Instant maxDelayedExecutionTime(
JobConfiguration config, Duration maxCronDelay, Instant nextExecutionTime) {
return nextExecutionTime == null || config.getSchedulingType() != SchedulingType.CRON
? null
: nextExecutionTime.plus(maxCronDelay);
}

private static boolean isUndefinedCronExpression(String cronExpression) {
return cronExpression == null
|| cronExpression.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,18 @@ void cronNextExecutionTime_MaxCronDelay() {
ZonedDateTime today10am = todayMidnight.withHour(10);
ZonedDateTime tomorrow8_40am = today8_40am.plusDays(1);

// when the job never executed the next execution is on the next day the intended time
// if now is already after the intended time
// when the job never executed the next execution is today's as long as
// now is in the window from intended execution time to max delayed execution time
assertEquals(
tomorrow8_40am.toInstant(),
today8_40am.toInstant(),
config.nextExecutionTime(zone, today10am.toInstant(), maxCronDelay));

// when the job never executed the next execution is tomorrow when
// now is after the window from intended execution time to max delayed execution time
assertEquals(
tomorrow8_40am.toInstant(),
config.nextExecutionTime(zone, today10am.plusHours(5).toInstant(), maxCronDelay));

// when the job did execute last yesterday the intended time,
// and we are still in the 2h window after 8:40am at 10am
// the job still wants to run today 8:40am (immediately as that time has passed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,10 @@ public List<JobConfiguration> getDueJobConfigurations(
Instant endOfWindow = now.plusSeconds(dueInNextSeconds);
Duration maxCronDelay =
Duration.ofHours(systemSettings.getIntSetting(SettingKey.JOBS_MAX_CRON_DELAY_HOURS));
Stream<JobConfiguration> dueJobs =
jobConfigurationStore
.getDueJobConfigurations(includeWaiting)
.filter(c -> c.isDueBetween(now, endOfWindow, maxCronDelay));
return dueJobs.toList();
return jobConfigurationStore
.getDueJobConfigurations(includeWaiting)
.filter(c -> c.isDueBetween(now, endOfWindow, maxCronDelay))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ public boolean tryStart(@Nonnull String jobId) {
"""
update jobconfiguration j1
set
lastupdated = now(),
jobstatus = 'RUNNING',
lastexecuted = now(),
lastalive = now(),
Expand All @@ -360,6 +361,7 @@ public boolean tryCancel(@Nonnull String jobId) {
"""
update jobconfiguration
set
lastupdated = now(),
cancel = case when jobstatus = 'RUNNING' then true else false end,
enabled = case
when queueposition is null and schedulingtype = 'ONCE_ASAP' and cronexpression is null and delay is null then false
Expand Down Expand Up @@ -389,6 +391,7 @@ public boolean tryFinish(@Nonnull String jobId, JobStatus status) {
"""
update jobconfiguration
set
lastupdated = now(),
lastexecutedstatus = case when cancel = true then 'STOPPED' else :status end,
lastfinished = now(),
lastalive = null,
Expand Down Expand Up @@ -421,6 +424,7 @@ public boolean trySkip(@Nonnull String queue) {
"""
update jobconfiguration
set
lastupdated = now(),
lastexecutedstatus = 'NOT_STARTED',
lastexecuted = now(),
lastfinished = now(),
Expand Down Expand Up @@ -463,7 +467,9 @@ public int updateDisabledJobs() {
String sql =
"""
update jobconfiguration
set jobstatus = 'DISABLED'
set
lastupdated = now(),
jobstatus = 'DISABLED'
where jobstatus = 'SCHEDULED'
and enabled = false
""";
Expand Down Expand Up @@ -505,6 +511,7 @@ public int rescheduleStaleJobs(int timeoutMinutes) {
"""
update jobconfiguration
set
lastupdated = now(),
jobstatus = 'SCHEDULED',
cancel = false,
lastexecutedstatus = 'FAILED',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
@Component
@RequiredArgsConstructor
public class HeartbeatJob implements Job {
public class HousekeepingJob implements Job {

private final JobSchedulerLoopService jobSchedulerService;
private final JobConfigurationService jobConfigurationService;
Expand All @@ -59,7 +59,7 @@ public JobType getJobType() {

@Override
public void execute(JobConfiguration config, JobProgress progress) {
progress.startingProcess("Heartbeat");
progress.startingProcess("Housekeeping");

progress.startingStage("Apply job cancellation", SKIP_STAGE);
progress.runStage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class JobScheduler implements Runnable, JobRunner {

public void start() {
long loopTimeMs = LOOP_SECONDS * 1000L;
long alignment = currentTimeMillis() % loopTimeMs;
long alignment = loopTimeMs - (currentTimeMillis() % loopTimeMs);
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, alignment, loopTimeMs, TimeUnit.MILLISECONDS);
scheduling.set(true);
Expand Down Expand Up @@ -221,6 +221,7 @@ private void runDueJob(JobConfiguration config, Instant start) {
jobId, start.atZone(ZoneId.systemDefault())));
return;
}
log.debug("Running job %s");
JobProgress progress = null;
try {
AtomicLong lastAlive = new AtomicLong(currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.hisp.dhis.fileresource.FileResourceRetentionStrategy;
import org.hisp.dhis.i18n.locale.LocaleManager;
import org.hisp.dhis.period.RelativePeriodEnum;
import org.hisp.dhis.scheduling.JobConfiguration;
import org.hisp.dhis.sms.config.SmsConfiguration;

/**
Expand Down Expand Up @@ -253,7 +254,8 @@ public enum SettingKey {
* its intended time of the day to trigger. If time has passed past this point the execution for
* that day is skipped, and it will trigger on the intended time the day after.
*/
JOBS_MAX_CRON_DELAY_HOURS("jobsMaxCronDelayHours", 4, Integer.class),
JOBS_MAX_CRON_DELAY_HOURS(
"jobsMaxCronDelayHours", JobConfiguration.MAX_CRON_DELAY_HOURS, Integer.class),

/**
* A job running with a smaller delay than the given value is logged on debug level instead of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.hisp.dhis.scheduling.JobConfiguration.maxDelayedExecutionTime;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import javax.annotation.CheckForNull;
import lombok.Value;
import org.hisp.dhis.scheduling.JobConfiguration;
import org.hisp.dhis.scheduling.JobStatus;
Expand All @@ -51,6 +53,27 @@ class SchedulerEntry {

@JsonProperty Date nextExecutionTime;

/**
* The end of the window for the current {@link #nextExecutionTime}. This means if execution is
* missed on the intended time this will be the latest time an attempt is made for that occurrence
* before the next time will be the occurrence after that.
*/
@JsonProperty Date maxDelayedExecutionTime;

/**
* The number of seconds until the next execution will happen. This is purely for convenience to
* spare the client to fetch the server time to compute this value as the client can not use the
* client local time to compute it from {@link #nextExecutionTime}
*/
@JsonProperty Long secondsToNextExecutionTime;

/**
* The number of seconds until the end of the execution window. This is purely for convenience to
* * spare the client to fetch the server time to compute this value as the client can not use the
* * client local time to compute it from {@link #maxDelayedExecutionTime}
*/
@JsonProperty Long secondsToMaxDelayedExecutionTime;

@JsonProperty JobStatus status;

@JsonProperty boolean enabled;
Expand All @@ -61,12 +84,17 @@ class SchedulerEntry {

static SchedulerEntry of(JobConfiguration config, Duration maxCronDelay) {
Instant nextExecutionTime = config.nextExecutionTime(Instant.now(), maxCronDelay);
Instant maxDelayedExecutionTime =
maxDelayedExecutionTime(config, maxCronDelay, nextExecutionTime);
return new SchedulerEntry(
config.getName(),
config.getJobType().name(),
config.getCronExpression(),
config.getDelay(),
nextExecutionTime == null ? null : Date.from(nextExecutionTime),
dateOf(nextExecutionTime),
dateOf(maxDelayedExecutionTime),
secondsUntil(nextExecutionTime),
secondsUntil(maxDelayedExecutionTime),
config.getJobStatus(),
config.isEnabled(),
config.getJobType().isUserDefined(),
Expand All @@ -87,12 +115,17 @@ static SchedulerEntry of(List<JobConfiguration> jobs, Duration maxCronDelay) {
.findAny()
.orElse(trigger.getJobStatus());
Instant nextExecutionTime = trigger.nextExecutionTime(Instant.now(), maxCronDelay);
Instant maxDelayedExecutionTime =
maxDelayedExecutionTime(trigger, maxCronDelay, nextExecutionTime);
return new SchedulerEntry(
trigger.getQueueName(),
"Sequence",
trigger.getCronExpression(),
trigger.getDelay(),
nextExecutionTime == null ? null : Date.from(nextExecutionTime),
dateOf(nextExecutionTime),
dateOf(maxDelayedExecutionTime),
secondsUntil(nextExecutionTime),
secondsUntil(maxDelayedExecutionTime),
queueStatus,
trigger.isEnabled(),
true,
Expand All @@ -101,4 +134,14 @@ static SchedulerEntry of(List<JobConfiguration> jobs, Duration maxCronDelay) {
.map(SchedulerEntryJob::of)
.collect(toList()));
}

private static Date dateOf(Instant instant) {
return instant == null ? null : Date.from(instant);
}

@CheckForNull
public static Long secondsUntil(Instant instant) {
if (instant == null) return null;
return Duration.between(Instant.now(), instant).getSeconds();
}
}
Loading