diff --git a/examples/queue/file_system.go b/examples/queue/file_system.go index d11421b..be95915 100644 --- a/examples/queue/file_system.go +++ b/examples/queue/file_system.go @@ -234,6 +234,32 @@ func findHead() (quartz.ScheduledJob, error) { return job, nil } +// Get returns the scheduled job with the specified key without removing it +// from the queue. +func (jq *jobQueue) Get(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) { + jq.mtx.Lock() + defer jq.mtx.Unlock() + logger.Trace("Get") + fileInfo, err := os.ReadDir(dataFolder) + if err != nil { + return nil, err + } + for _, file := range fileInfo { + if !file.IsDir() { + data, err := os.ReadFile(fmt.Sprintf("%s/%s", dataFolder, file.Name())) + if err == nil { + job, err := unmarshal(data) + if err == nil { + if jobKey.Equals(job.JobDetail().JobKey()) { + return job, nil + } + } + } + } + } + return nil, errors.New("no jobs found") +} + // Remove removes and returns the scheduled job with the specified key. func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) { jq.mtx.Lock() diff --git a/quartz/error.go b/quartz/error.go index 14c2c28..338871c 100644 --- a/quartz/error.go +++ b/quartz/error.go @@ -8,6 +8,7 @@ import ( // Errors var ( ErrIllegalArgument = errors.New("illegal argument") + ErrIllegalState = errors.New("illegal state") ErrCronParse = errors.New("parse cron expression") ErrJobNotFound = errors.New("job not found") ) @@ -18,6 +19,12 @@ func illegalArgumentError(message string) error { return fmt.Errorf("%w: %s", ErrIllegalArgument, message) } +// illegalStateError returns an illegal state error with a custom +// error message, which unwraps to ErrIllegalState. +func illegalStateError(message string) error { + return fmt.Errorf("%w: %s", ErrIllegalState, message) +} + // cronParseError returns a cron parse error with a custom error message, // which unwraps to ErrCronParse. func cronParseError(message string) error { diff --git a/quartz/error_test.go b/quartz/error_test.go index bad168d..d335c84 100644 --- a/quartz/error_test.go +++ b/quartz/error_test.go @@ -17,6 +17,15 @@ func TestIllegalArgumentError(t *testing.T) { assert.Equal(t, err.Error(), fmt.Sprintf("%s: %s", ErrIllegalArgument, message)) } +func TestIllegalStateError(t *testing.T) { + message := "job already exists" + err := illegalStateError(message) + if !errors.Is(err, ErrIllegalState) { + t.Fatal("error must match ErrIllegalState") + } + assert.Equal(t, err.Error(), fmt.Sprintf("%s: %s", ErrIllegalState, message)) +} + func TestCronParseError(t *testing.T) { message := "invalid field" err := cronParseError(message) diff --git a/quartz/queue.go b/quartz/queue.go index df2e50b..dc81c78 100644 --- a/quartz/queue.go +++ b/quartz/queue.go @@ -52,6 +52,10 @@ type JobQueue interface { // Head returns the first scheduled job without removing it from the queue. Head() (ScheduledJob, error) + // Get returns the scheduled job with the specified key without removing it + // from the queue. + Get(jobKey *JobKey) (ScheduledJob, error) + // Remove removes and returns the scheduled job with the specified key. Remove(jobKey *JobKey) (ScheduledJob, error) @@ -136,8 +140,8 @@ func (jq *jobQueue) Push(job ScheduledJob) error { heap.Remove(&jq.delegate, i) break } - return fmt.Errorf("job with the key %s already exists", - job.JobDetail().jobKey) + return illegalStateError(fmt.Sprintf("job with the key %s already exists", + job.JobDetail().jobKey)) } } heap.Push(&jq.delegate, job) @@ -164,6 +168,19 @@ func (jq *jobQueue) Head() (ScheduledJob, error) { return jq.delegate[0], nil } +// Get returns the scheduled job with the specified key without removing it +// from the queue. +func (jq *jobQueue) Get(jobKey *JobKey) (ScheduledJob, error) { + jq.mtx.Lock() + defer jq.mtx.Unlock() + for _, scheduled := range jq.delegate { + if scheduled.JobDetail().jobKey.Equals(jobKey) { + return scheduled, nil + } + } + return nil, jobNotFoundError(jobKey.String()) +} + // Remove removes and returns the scheduled job with the specified key. func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) { jq.mtx.Lock() @@ -174,7 +191,7 @@ func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) { return heap.Remove(&jq.delegate, i).(ScheduledJob), nil } } - return nil, jobNotFoundError(fmt.Sprintf("for key %s", jobKey)) + return nil, jobNotFoundError(jobKey.String()) } // ScheduledJobs returns the slice of all scheduled jobs in the queue.