Skip to content

Commit

Permalink
Upgrade to Jedis 3.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jlinn committed Jul 2, 2019
1 parent 9352e51 commit c1e4349
Show file tree
Hide file tree
Showing 9 changed files with 801 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Maven dependency:
<dependency>
<groupId>net.joelinn</groupId>
<artifactId>quartz-redis-jobstore</artifactId>
<version>1.1.14</version>
<version>1.2.0</version>
</dependency>
```

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
### 2019-07-02
* Upgrade to Jedis 3.0.1

### 2019-06-26
* Delete job data map set from Redis prior to storing new job data when updating / overwriting a job.
This will prevent keys which were removed from the job's data map prior to storage from being preserved.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>net.joelinn</groupId>
<artifactId>quartz-redis-jobstore</artifactId>
<version>1.1.16-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>quartz-redis-jobstore</name>
<description>A Quartz Scheduler JobStore using Redis.</description>
Expand Down Expand Up @@ -63,7 +63,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<version>3.0.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import org.quartz.utils.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.SetParams;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -82,7 +83,7 @@ public AbstractRedisStorage setClusterCheckInterval(long clusterCheckInterval) {
*/
public boolean lock(T jedis){
UUID lockId = UUID.randomUUID();
final String setResponse = jedis.set(redisSchema.lockKey(), lockId.toString(), "NX", "PX", lockTimeout);
final String setResponse = jedis.set(redisSchema.lockKey(), lockId.toString(), SetParams.setParams().nx().px(lockTimeout));
boolean lockAcquired = !isNullOrEmpty(setResponse) && setResponse.equals("OK");
if(lockAcquired){
// save the random value used to lock so that we can successfully unlock later
Expand Down Expand Up @@ -787,7 +788,7 @@ protected boolean isJobConcurrentExecutionDisallowed(Class<? extends Job> jobCla
* @return true if lock was acquired successfully; false otherwise
*/
protected boolean lockTrigger(TriggerKey triggerKey, T jedis){
return jedis.set(redisSchema.triggerLockKey(triggerKey), schedulerInstanceId, "NX", "PX", TRIGGER_LOCK_TIMEOUT).equals("OK");
return jedis.set(redisSchema.triggerLockKey(triggerKey), schedulerInstanceId, SetParams.setParams().nx().px(TRIGGER_LOCK_TIMEOUT)).equals("OK");
}

/**
Expand Down
40 changes: 20 additions & 20 deletions src/main/java/net/joelinn/quartz/jobstore/RedisClusterStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.joelinn.quartz.jobstore.jedis.JedisClusterCommandsWrapper;
import org.quartz.Calendar;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
Expand All @@ -13,15 +14,14 @@
import org.quartz.spi.TriggerFiredResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;

import java.util.*;

/**
* Joe Linn
* 8/22/2015
*/
public class RedisClusterStorage extends AbstractRedisStorage<JedisCluster> {
public class RedisClusterStorage extends AbstractRedisStorage<JedisClusterCommandsWrapper> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterStorage.class);

public RedisClusterStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper, SchedulerSignaler signaler, String schedulerInstanceId, int lockTimeout) {
Expand All @@ -38,7 +38,7 @@ public RedisClusterStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper,
*/
@Override
@SuppressWarnings("unchecked")
public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisCluster jedis) throws ObjectAlreadyExistsException {
public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisClusterCommandsWrapper jedis) throws ObjectAlreadyExistsException {
final String jobHashKey = redisSchema.jobHashKey(jobDetail.getKey());
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobDetail.getKey());
final String jobGroupSetKey = redisSchema.jobGroupSetKey(jobDetail.getKey());
Expand Down Expand Up @@ -67,7 +67,7 @@ public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisCluster
* @return true if the job was removed; false if it did not exist
*/
@Override
public boolean removeJob(JobKey jobKey, JedisCluster jedis) throws JobPersistenceException {
public boolean removeJob(JobKey jobKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String jobHashKey = redisSchema.jobHashKey(jobKey);
final String jobBlockedKey = redisSchema.jobBlockedKey(jobKey);
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobKey);
Expand Down Expand Up @@ -123,7 +123,7 @@ public boolean removeJob(JobKey jobKey, JedisCluster jedis) throws JobPersistenc
* @throws ObjectAlreadyExistsException
*/
@Override
public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, JedisCluster jedis) throws JobPersistenceException {
public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(trigger.getKey());
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(trigger.getJobKey());
Expand Down Expand Up @@ -185,7 +185,7 @@ public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, Jedis
* @return true if the trigger was found and removed
*/
@Override
protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJob, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJob, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(triggerKey);

Expand Down Expand Up @@ -240,7 +240,7 @@ protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJ
* @throws JobPersistenceException if the unset operation failed
*/
@Override
public boolean unsetTriggerState(String triggerHashKey, JedisCluster jedis) throws JobPersistenceException {
public boolean unsetTriggerState(String triggerHashKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
boolean removed = false;
List<Long> responses = new ArrayList<>(RedisTriggerState.values().length);
for (RedisTriggerState state : RedisTriggerState.values()) {
Expand All @@ -267,7 +267,7 @@ public boolean unsetTriggerState(String triggerHashKey, JedisCluster jedis) thro
* @throws JobPersistenceException
*/
@Override
public void storeCalendar(String name, Calendar calendar, boolean replaceExisting, boolean updateTriggers, JedisCluster jedis) throws JobPersistenceException {
public void storeCalendar(String name, Calendar calendar, boolean replaceExisting, boolean updateTriggers, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String calendarHashKey = redisSchema.calendarHashKey(name);
if (!replaceExisting && jedis.exists(calendarHashKey)) {
throw new ObjectAlreadyExistsException(String.format("Calendar with key %s already exists.", calendarHashKey));
Expand Down Expand Up @@ -305,7 +305,7 @@ public void storeCalendar(String name, Calendar calendar, boolean replaceExistin
* @return true if a calendar with the given name was found and removed
*/
@Override
public boolean removeCalendar(String calendarName, JedisCluster jedis) throws JobPersistenceException {
public boolean removeCalendar(String calendarName, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String calendarTriggersSetKey = redisSchema.calendarTriggersSetKey(calendarName);

if (jedis.scard(calendarTriggersSetKey) > 0) {
Expand All @@ -326,7 +326,7 @@ public boolean removeCalendar(String calendarName, JedisCluster jedis) throws Jo
* @return the set of all JobKeys which have the given group name
*/
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisCluster jedis) {
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisClusterCommandsWrapper jedis) {
Set<JobKey> jobKeys = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -362,7 +362,7 @@ public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisCluster jedis)
* @return the set of all TriggerKeys which have the given group name
*/
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) {
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) {
Set<TriggerKey> triggerKeys = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -398,7 +398,7 @@ public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisClu
* @return the state of the trigger
*/
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisCluster jedis) {
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Map<RedisTriggerState, Double> scores = new HashMap<>(RedisTriggerState.values().length);
for (RedisTriggerState redisTriggerState : RedisTriggerState.values()) {
Expand All @@ -420,7 +420,7 @@ public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisCluster
* @throws JobPersistenceException if the desired trigger does not exist
*/
@Override
public void pauseTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobPersistenceException {
public void pauseTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Boolean exists = jedis.exists(triggerHashKey);
Double completedScore = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.COMPLETED), triggerHashKey);
Expand Down Expand Up @@ -453,7 +453,7 @@ public void pauseTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobPe
* @throws JobPersistenceException
*/
@Override
public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) throws JobPersistenceException {
public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
Set<String> pausedTriggerGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -494,7 +494,7 @@ public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisC
* @throws JobPersistenceException
*/
@Override
public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisCluster jedis) throws JobPersistenceException {
public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
Set<String> pausedJobGroups = new HashSet<>();
if (groupMatcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", groupMatcher.getCompareToValue()));
Expand Down Expand Up @@ -531,7 +531,7 @@ public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisClus
* @param jedis a thread-safe Redis connection
*/
@Override
public void resumeTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobPersistenceException {
public void resumeTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Boolean exists = jedis.sismember(redisSchema.triggersSet(), triggerHashKey);
Double isPaused = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), triggerHashKey);
Expand Down Expand Up @@ -567,7 +567,7 @@ public void resumeTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobP
* @return the names of trigger groups which were resumed
*/
@Override
public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) throws JobPersistenceException {
public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
Set<String> resumedTriggerGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -596,7 +596,7 @@ public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, Jedis
* @return the set of job groups which were matched and resumed
*/
@Override
public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisCluster jedis) throws JobPersistenceException {
public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
Set<String> resumedJobGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -631,7 +631,7 @@ public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisCluster
* could be fired.
*/
@Override
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
List<TriggerFiredResult> results = new ArrayList<>();
for (OperableTrigger trigger : triggers) {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
Expand Down Expand Up @@ -716,7 +716,7 @@ public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, Je
* @param jedis a thread-safe Redis connection
*/
@Override
public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, Trigger.CompletedExecutionInstruction triggerInstCode, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, Trigger.CompletedExecutionInstruction triggerInstCode, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
final String jobHashKey = redisSchema.jobHashKey(jobDetail.getKey());
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobDetail.getKey());
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/net/joelinn/quartz/jobstore/RedisJobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.joelinn.quartz.jobstore.jedis.JedisClusterCommandsWrapper;
import net.joelinn.quartz.jobstore.mixin.CronTriggerMixin;
import net.joelinn.quartz.jobstore.mixin.HolidayCalendarMixin;
import net.joelinn.quartz.jobstore.mixin.JobDetailMixin;
Expand All @@ -14,7 +15,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.util.Pool;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.util.Pool;

import java.util.*;

Expand All @@ -28,7 +30,7 @@ public class RedisJobStore implements JobStore {

private Pool<Jedis> jedisPool;

private JedisCluster jedisCluster;
private JedisClusterCommandsWrapper jedisCluster;

/**
* Redis lock timeout in milliseconds
Expand Down Expand Up @@ -122,7 +124,7 @@ public RedisJobStore setJedisPool(Pool<Jedis> jedisPool) {
}


public RedisJobStore setJedisCluster(JedisCluster jedisCluster) {
public RedisJobStore setJedisCluster(JedisClusterCommandsWrapper jedisCluster) {
this.jedisCluster = jedisCluster;
return this;
}
Expand Down Expand Up @@ -162,7 +164,7 @@ public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) t
if (redisCluster && jedisCluster == null) {
Set<HostAndPort> nodes = buildNodesSetFromHost();
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisCluster = new JedisCluster(nodes, this.conTimeout, this.soTimeout, this.conRetries, this.password,jedisPoolConfig);
jedisCluster = new JedisClusterCommandsWrapper(new JedisCluster(nodes, this.conTimeout, this.soTimeout, this.conRetries, this.password,jedisPoolConfig));
storage = new RedisClusterStorage(redisSchema, mapper, signaler, instanceId, lockTimeout);
} else if (jedisPool == null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
Expand Down
Loading

0 comments on commit c1e4349

Please sign in to comment.