Skip to content

Commit

Permalink
Initial multi scheduler created
Browse files Browse the repository at this point in the history
  • Loading branch information
maallen committed Aug 18, 2023
1 parent 1a09eae commit 66fb228
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 26 deletions.
21 changes: 12 additions & 9 deletions webapp/src/main/java/com/box/l10n/mojito/quartz/QuartzConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class QuartzConfig {

public static final String DYNAMIC_GROUP_NAME = "DYNAMIC";

@Autowired Scheduler scheduler;
@Autowired List<Scheduler> schedulers;

@Autowired(required = false)
List<Trigger> triggers = new ArrayList<>();
Expand All @@ -41,22 +41,25 @@ public class QuartzConfig {
* @throws SchedulerException
*/
@PostConstruct
void startScheduler() throws SchedulerException {
void startSchedulers() throws SchedulerException {
Properties quartzProps = quartzPropertiesConfig.getQuartzProperties();
removeOutdatedJobs();
if (Boolean.parseBoolean(quartzProps.getProperty("org.quartz.scheduler.enabled", "true"))) {
logger.info("Starting scheduler");
scheduler.startDelayed(2);
logger.info("Starting schedulers");
for (Scheduler scheduler : schedulers) {
scheduler.startDelayed(2);
}
}
}

void removeOutdatedJobs() throws SchedulerException {
scheduler.unscheduleJobs(new ArrayList<TriggerKey>(getOutdatedTriggerKeys()));
scheduler.deleteJobs(new ArrayList<JobKey>(getOutdatedJobKeys()));
for (Scheduler scheduler : schedulers) {
scheduler.unscheduleJobs(new ArrayList<TriggerKey>(getOutdatedTriggerKeys(scheduler)));
scheduler.deleteJobs(new ArrayList<JobKey>(getOutdatedJobKeys(scheduler)));
}
}

Set<JobKey> getOutdatedJobKeys() throws SchedulerException {

Set<JobKey> getOutdatedJobKeys(Scheduler scheduler) throws SchedulerException {
Set<JobKey> jobKeys =
scheduler.getJobKeys(GroupMatcher.jobGroupEquals(Scheduler.DEFAULT_GROUP));
Set<JobKey> newJobKeys = new HashSet<>();
Expand All @@ -72,7 +75,7 @@ Set<JobKey> getOutdatedJobKeys() throws SchedulerException {
return jobKeys;
}

Set<TriggerKey> getOutdatedTriggerKeys() throws SchedulerException {
Set<TriggerKey> getOutdatedTriggerKeys(Scheduler scheduler) throws SchedulerException {

Set<TriggerKey> triggerKeys =
scheduler.getTriggerKeys(GroupMatcher.triggerGroupEquals(Scheduler.DEFAULT_GROUP));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.box.l10n.mojito.quartz;

import static com.box.l10n.mojito.quartz.QuartzConfig.DYNAMIC_GROUP_NAME;
import static com.box.l10n.mojito.quartz.QuartzQueue.DEFAULT;

import com.box.l10n.mojito.entity.PollableTask;
import com.box.l10n.mojito.json.ObjectMapper;
Expand Down Expand Up @@ -29,7 +30,7 @@ public class QuartzPollableTaskScheduler {
/** logger */
static Logger logger = LoggerFactory.getLogger(QuartzPollableTaskScheduler.class);

@Autowired Scheduler scheduler;
@Autowired QuartzSchedulerManager schedulerManager;

@Autowired PollableTaskService pollableTaskService;

Expand All @@ -41,8 +42,7 @@ public <I, O> PollableFuture<O> scheduleJob(
Class<? extends QuartzPollableJob<I, O>> clazz, I input) {
QuartzJobInfo<I, O> quartzJobInfo =
QuartzJobInfo.newBuilder(clazz).withInput(input).withMessage(clazz.getSimpleName()).build();

return scheduleJob(quartzJobInfo);
return scheduleJob(quartzJobInfo, DEFAULT);
}

public <I, O> PollableFuture<O> scheduleJobWithCustomTimeout(
Expand All @@ -54,7 +54,11 @@ public <I, O> PollableFuture<O> scheduleJobWithCustomTimeout(
.withMessage(clazz.getSimpleName())
.build();

return scheduleJob(quartzJobInfo);
return scheduleJob(quartzJobInfo, DEFAULT);
}

public <I, O> PollableFuture<O> scheduleJob(QuartzJobInfo<I, O> quartzJobInfo) {
return scheduleJob(quartzJobInfo, DEFAULT);
}

/**
Expand All @@ -75,7 +79,12 @@ public <I, O> PollableFuture<O> scheduleJobWithCustomTimeout(
* @param <O>
* @return
*/
public <I, O> PollableFuture<O> scheduleJob(QuartzJobInfo<I, O> quartzJobInfo) {
public <I, O> PollableFuture<O> scheduleJob(
QuartzJobInfo<I, O> quartzJobInfo, QuartzQueue quartzQueue) {

Scheduler scheduler = schedulerManager.getScheduler(quartzQueue);

logger.info("Scheduling job on queue: {}", quartzQueue.getDescription());

String pollableTaskName = getPollableTaskName(quartzJobInfo.getClazz());

Expand Down
17 changes: 17 additions & 0 deletions webapp/src/main/java/com/box/l10n/mojito/quartz/QuartzQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.box.l10n.mojito.quartz;

public enum QuartzQueue {
LOW_PRIORITY("Low priority"),
DEFAULT("Default"),
HIGH_PRIORITY("High priority");

private String description;

QuartzQueue(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import java.util.List;
import java.util.Properties;
import javax.sql.DataSource;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -50,22 +50,51 @@ public class QuartzSchedulerConfig {
* removed.
*
* @return
* @throws SchedulerException
*/
@Bean
public SchedulerFactoryBean scheduler() throws SchedulerException {

logger.info("Create SchedulerFactoryBean");
// TODO (maallen): Update config to read multiple schedulers config from app.properties so each
// scheduler can have it's own configured thread pool.
@Bean(name = "defaultScheduler")
public SchedulerFactoryBean defaultScheduler(
@Qualifier("defaultJobFactory") SpringBeanJobFactory jobFactory) {

logger.info("Create default Scheduler");

Properties quartzProperties = quartzPropertiesConfig.getQuartzProperties();
quartzProperties.put("org.quartz.scheduler.instanceName", "defaultScheduler");

SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
SchedulerFactoryBean factory = getSchedulerFactory(quartzProperties, jobFactory);
factory.setTriggers(triggers.toArray(new Trigger[] {}));
return factory;
}

@Bean(name = "lowPriorityScheduler")
public SchedulerFactoryBean lowPriorityScheduler(
@Qualifier("lowPriorityJobFactory") SpringBeanJobFactory jobFactory) {

logger.info("Create Low Priority Scheduler");

Properties quartzProperties = quartzPropertiesConfig.getQuartzProperties();
quartzProperties.put("org.quartz.scheduler.instanceName", "lowPriorityScheduler");

return getSchedulerFactory(quartzProperties, jobFactory);
}

@Bean(name = "highPriorityScheduler")
public SchedulerFactoryBean highPriorityScheduler(
@Qualifier("highPriorityJobFactory") SpringBeanJobFactory jobFactory) {
logger.info("Create High Priority Scheduler");
Properties quartzProperties = quartzPropertiesConfig.getQuartzProperties();
quartzProperties.put("org.quartz.scheduler.instanceName", "highPriorityScheduler");
return getSchedulerFactory(quartzProperties, jobFactory);
}

String dataSource = quartzProperties.getProperty("org.quartz.jobStore.dataSource");
private SchedulerFactoryBean getSchedulerFactory(
Properties quartzProperties, SpringBeanJobFactory springBeanJobFactory) {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setQuartzProperties(quartzProperties);
schedulerFactory.setJobFactory(springBeanJobFactory());
schedulerFactory.setJobFactory(springBeanJobFactory);
schedulerFactory.setOverwriteExistingJobs(true);
schedulerFactory.setTriggers(triggers.toArray(new Trigger[] {}));
schedulerFactory.setAutoStartup(false);

if (quartzMetricsReportingJobListener != null) {
Expand All @@ -75,8 +104,22 @@ public SchedulerFactoryBean scheduler() throws SchedulerException {
return schedulerFactory;
}

@Bean
public SpringBeanJobFactory springBeanJobFactory() {
@Bean(name = "lowPriorityJobFactory")
public SpringBeanJobFactory lowPrioritySpringBeanJobFactory() {
AutoWiringSpringBeanJobFactory jobFactory = new AutoWiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}

@Bean(name = "defaultJobFactory")
public SpringBeanJobFactory defaultSpringBeanJobFactory() {
AutoWiringSpringBeanJobFactory jobFactory = new AutoWiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}

@Bean(name = "highPriorityJobFactory")
public SpringBeanJobFactory highPrioritySpringBeanJobFactory() {
AutoWiringSpringBeanJobFactory jobFactory = new AutoWiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.box.l10n.mojito.quartz;

import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
public class QuartzSchedulerManager {

@Autowired
@Qualifier("lowPriorityScheduler")
Scheduler lowPriorityScheduler;

@Autowired
@Qualifier("defaultScheduler")
Scheduler defaultScheduler;

@Autowired
@Qualifier("highPriorityScheduler")
Scheduler highPriorityScheduler;

public Scheduler getScheduler(QuartzQueue quartzQueue) {
switch (quartzQueue) {
case LOW_PRIORITY:
return lowPriorityScheduler;
case HIGH_PRIORITY:
return highPriorityScheduler;
case DEFAULT:
default:
return defaultScheduler;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
Expand All @@ -21,8 +22,11 @@ public class QuartzService {
/** logger */
static Logger logger = getLogger(QuartzService.class);

@Autowired Scheduler scheduler;
@Autowired
@Qualifier("lowPriorityScheduler")
Scheduler scheduler;

// TODO(mallen): Add handling for an injected list of Schedulers instead.
public List<String> getDynamicJobs() throws SchedulerException {
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(DYNAMIC_GROUP_NAME));
return jobKeys.stream().map(jobKey -> jobKey.getName()).collect(Collectors.toList());
Expand Down

0 comments on commit 66fb228

Please sign in to comment.