Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix scheduler restart #825

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,17 @@ func (s *scheduler) stopScheduler() {
for id, j := range s.jobs {
<-j.ctx.Done()

j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
oldCtx := j.ctx
if j.parentCtx == nil {
j.parentCtx = s.shutdownCtx
}
j.ctx, j.cancel = context.WithCancel(j.parentCtx)

// also replace the old context with the new one in the parameters
if len(j.parameters) > 0 && j.parameters[0] == oldCtx {
j.parameters[0] = j.ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @27149chen thanks for the PR. Right now, this causes a race condition because we haven't waited for the jobs to fully stopped. If you duplicate and move this for loop block down right above s.stopErrCh <- err then we'll have waited for the <-s.exec.done chan to report back and update the job params race free.

Here is the full stopScheduler func with that change:

func (s *scheduler) stopScheduler() {
	s.logger.Debug("gocron: stopping scheduler")
	if s.started {
		s.exec.stopCh <- struct{}{}
	}

	for _, j := range s.jobs {
		j.stop()
	}
	for _, j := range s.jobs {
		<-j.ctx.Done()
	}
	var err error
	if s.started {
		t := time.NewTimer(s.exec.stopTimeout + 1*time.Second)
		select {
		case err = <-s.exec.done:
			t.Stop()
		case <-t.C:
			err = ErrStopExecutorTimedOut
		}
	}
	for id, j := range s.jobs {
		oldCtx := j.ctx
		if j.parentCtx == nil {
			j.parentCtx = s.shutdownCtx
		}
		j.ctx, j.cancel = context.WithCancel(j.parentCtx)

		// also replace the old context with the new one in the parameters
		if len(j.parameters) > 0 && j.parameters[0] == oldCtx {
			j.parameters[0] = j.ctx
		}

		s.jobs[id] = j
	}

	s.stopErrCh <- err
	s.started = false
	s.logger.Debug("gocron: scheduler stopped")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JohnRoesler thanks, updated

}

s.jobs[id] = j
}
var err error
Expand Down
49 changes: 49 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,55 @@ func TestScheduler_StopLongRunningJobs(t *testing.T) {
})
}

func TestScheduler_StopAndStartLongRunningJobs(t *testing.T) {
t.Run("start, run job, stop jobs before job is completed", func(t *testing.T) {
s := newTestScheduler(t,
WithStopTimeout(50*time.Millisecond),
)

restart := false
restartP := &restart

_, err := s.NewJob(
DurationJob(
50*time.Millisecond,
),
NewTask(
func(ctx context.Context) {
select {
case <-ctx.Done():
if *restartP {
t.Fatal("job should not been canceled after restart")
}
case <-time.After(100 * time.Millisecond):
if !*restartP {
t.Fatal("job can not been canceled")
}

}
},
),
WithStartAt(
WithStartImmediately(),
),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)

s.Start()

time.Sleep(20 * time.Millisecond)
// the running job is canceled, no unexpected timeout error
require.NoError(t, s.StopJobs())

*restartP = true

s.Start()

time.Sleep(200 * time.Millisecond)
})
}

func TestScheduler_Shutdown(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

Expand Down
Loading