From 996fdcb9d596e486dc48b2e83ba4423709e42df3 Mon Sep 17 00:00:00 2001 From: lou Date: Thu, 6 Feb 2025 20:50:00 +0800 Subject: [PATCH 1/2] fix scheduler restart Signed-off-by: lou --- scheduler.go | 12 +++++++++++- scheduler_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/scheduler.go b/scheduler.go index 7b3e72b..dc29e8d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -241,7 +241,17 @@ func (s *scheduler) stopScheduler() { for id, j := range s.jobs { <-j.ctx.Done() - j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) + oldCtx := j.ctx + if j.parentCtx == nil { + j.parentCtx = s.shutdownCtx + } + j.ctx, j.cancel = context.WithCancel(j.parentCtx) + + // also replace the old context with the new one in the parameters + if len(j.parameters) > 0 && j.parameters[0] == oldCtx { + j.parameters[0] = j.ctx + } + s.jobs[id] = j } var err error diff --git a/scheduler_test.go b/scheduler_test.go index b9c591b..4fc7549 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -467,6 +467,55 @@ func TestScheduler_StopLongRunningJobs(t *testing.T) { }) } +func TestScheduler_StopAndStartLongRunningJobs(t *testing.T) { + t.Run("start, run job, stop jobs before job is completed", func(t *testing.T) { + s := newTestScheduler(t, + WithStopTimeout(50*time.Millisecond), + ) + + restart := false + restartP := &restart + + _, err := s.NewJob( + DurationJob( + 50*time.Millisecond, + ), + NewTask( + func(ctx context.Context) { + select { + case <-ctx.Done(): + if *restartP { + t.Fatal("job should not been canceled after restart") + } + case <-time.After(100 * time.Millisecond): + if !*restartP { + t.Fatal("job can not been canceled") + } + + } + }, + ), + WithStartAt( + WithStartImmediately(), + ), + WithSingletonMode(LimitModeReschedule), + ) + require.NoError(t, err) + + s.Start() + + time.Sleep(20 * time.Millisecond) + // the running job is canceled, no unexpected timeout error + require.NoError(t, s.StopJobs()) + + *restartP = true + + s.Start() + + time.Sleep(200 * time.Millisecond) + }) +} + func TestScheduler_Shutdown(t *testing.T) { defer verifyNoGoroutineLeaks(t) From 1d366cbab73e34d423189e66cdab19ce3cf92e59 Mon Sep 17 00:00:00 2001 From: lou Date: Fri, 7 Feb 2025 15:49:06 +0800 Subject: [PATCH 2/2] update after review Signed-off-by: lou --- scheduler.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/scheduler.go b/scheduler.go index dc29e8d..fd8fac2 100644 --- a/scheduler.go +++ b/scheduler.go @@ -238,9 +238,20 @@ func (s *scheduler) stopScheduler() { for _, j := range s.jobs { j.stop() } - for id, j := range s.jobs { + for _, j := range s.jobs { <-j.ctx.Done() - + } + var err error + if s.started { + t := time.NewTimer(s.exec.stopTimeout + 1*time.Second) + select { + case err = <-s.exec.done: + t.Stop() + case <-t.C: + err = ErrStopExecutorTimedOut + } + } + for id, j := range s.jobs { oldCtx := j.ctx if j.parentCtx == nil { j.parentCtx = s.shutdownCtx @@ -254,16 +265,7 @@ func (s *scheduler) stopScheduler() { s.jobs[id] = j } - var err error - if s.started { - t := time.NewTimer(s.exec.stopTimeout + 1*time.Second) - select { - case err = <-s.exec.done: - t.Stop() - case <-t.C: - err = ErrStopExecutorTimedOut - } - } + s.stopErrCh <- err s.started = false s.logger.Debug("gocron: scheduler stopped")