Skip to content

Commit

Permalink
wait for new job to be fully created before returning (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler authored Jan 17, 2024
1 parent ae366d9 commit 800821c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
6 changes: 1 addition & 5 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,6 @@ func ExampleScheduler_removeByTags() {
)
fmt.Println(len(s.Jobs()))

time.Sleep(20 * time.Millisecond)

s.RemoveByTags("tag1", "tag2")

fmt.Println(len(s.Jobs()))
Expand All @@ -391,7 +389,6 @@ func ExampleScheduler_removeJob() {
)

fmt.Println(len(s.Jobs()))
time.Sleep(20 * time.Millisecond)

_ = s.RemoveJob(j.ID())

Expand Down Expand Up @@ -664,8 +661,8 @@ func ExampleWithLimitedRuns() {
s.Start()

time.Sleep(100 * time.Millisecond)
fmt.Printf("no jobs in scheduler: %v\n", s.Jobs())
_ = s.StopJobs()
fmt.Printf("no jobs in scheduler: %v\n", s.Jobs())
// Output:
// one, 2
// no jobs in scheduler: []
Expand Down Expand Up @@ -748,7 +745,6 @@ func ExampleWithStartAt() {
),
)
s.Start()
time.Sleep(20 * time.Millisecond)

next, _ := j.NextRun()
fmt.Println(next)
Expand Down
30 changes: 24 additions & 6 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ type scheduler struct {
allJobsOutRequest chan allJobsOutRequest
jobOutRequestCh chan jobOutRequest
runJobRequestCh chan runJobRequest
newJobCh chan internalJob
newJobCh chan newJobIn
removeJobCh chan uuid.UUID
removeJobsByTagsCh chan []string
}

type newJobIn struct {
ctx context.Context
cancel context.CancelFunc
job internalJob
}

type jobOutRequest struct {
id uuid.UUID
outChan chan internalJob
Expand Down Expand Up @@ -118,7 +124,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
clock: clockwork.NewRealClock(),
logger: &noOpLogger{},

newJobCh: make(chan internalJob),
newJobCh: make(chan newJobIn),
removeJobCh: make(chan uuid.UUID),
removeJobsByTagsCh: make(chan []string),
startCh: make(chan struct{}),
Expand All @@ -144,8 +150,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case id := <-s.exec.jobIDsOut:
s.selectExecJobIDsOut(id)

case j := <-s.newJobCh:
s.selectNewJob(j)
case in := <-s.newJobCh:
s.selectNewJob(in)

case id := <-s.removeJobCh:
s.selectRemoveJob(id)
Expand Down Expand Up @@ -346,7 +352,8 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan)
}

func (s *scheduler) selectNewJob(j internalJob) {
func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started {
next := j.startTime
if j.startImmediately {
Expand Down Expand Up @@ -378,6 +385,7 @@ func (s *scheduler) selectNewJob(j internalJob) {
}

s.jobs[j.id] = j
in.cancel()
}

func (s *scheduler) selectRemoveJobsByTags(tags []string) {
Expand Down Expand Up @@ -548,9 +556,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
return nil, err
}

newJobCtx, newJobCancel := context.WithCancel(context.Background())
select {
case <-s.shutdownCtx.Done():
case s.newJobCh <- newJobIn{
ctx: newJobCtx,
cancel: newJobCancel,
job: j,
}:
}

select {
case <-newJobCtx.Done():
case <-s.shutdownCtx.Done():
case s.newJobCh <- j:
}

return &job{
Expand Down
11 changes: 1 addition & 10 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ func TestScheduler_StopTimeout(t *testing.T) {
require.NoError(t, err)

s.Start()
time.Sleep(time.Millisecond * 200)
err = s.Shutdown()
assert.ErrorIs(t, err, ErrStopJobsTimedOut)
assert.ErrorIs(t, err, s.Shutdown())
cancel()
time.Sleep(2 * time.Second)
})
Expand Down Expand Up @@ -332,15 +330,11 @@ func TestScheduler_Shutdown(t *testing.T) {
require.NoError(t, err)

s.Start()
time.Sleep(50 * time.Millisecond)
require.NoError(t, s.StopJobs())

time.Sleep(200 * time.Millisecond)
s.Start()

time.Sleep(50 * time.Millisecond)
require.NoError(t, s.Shutdown())
time.Sleep(200 * time.Millisecond)
})

t.Run("calling Job methods after shutdown errors", func(t *testing.T) {
Expand All @@ -361,7 +355,6 @@ func TestScheduler_Shutdown(t *testing.T) {
require.NoError(t, err)

s.Start()
time.Sleep(50 * time.Millisecond)
require.NoError(t, s.Shutdown())

_, err = j.LastRun()
Expand Down Expand Up @@ -465,7 +458,6 @@ func TestScheduler_NewJob(t *testing.T) {

s.Start()
require.NoError(t, s.Shutdown())
time.Sleep(50 * time.Millisecond)
})
}
}
Expand Down Expand Up @@ -1303,7 +1295,6 @@ func TestScheduler_RemoveJob(t *testing.T) {
id = uuid.New()
}

time.Sleep(50 * time.Millisecond)
err := s.RemoveJob(id)
assert.ErrorIs(t, err, err)
require.NoError(t, s.Shutdown())
Expand Down

0 comments on commit 800821c

Please sign in to comment.