diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 8dbc3de88d..4e7fba86c2 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -178,7 +178,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } - cronSvc := cronjobs.New(ctx, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc) + cronSvc := cronjobs.New(ctx, key, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc) svc.cronJobs = cronSvc svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) @@ -739,7 +739,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl return nil, fmt.Errorf("%s: %w", "could not create deployment", err) } - _ = s.cronJobs.CreatedDeployment(ctx, dname, module) + s.cronJobs.CreatedDeployment(ctx, dname, module) deploymentLogger := s.getDeploymentLogger(ctx, dname) deploymentLogger.Debugf("Created deployment %s", dname) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 1d0bec76ff..732654368e 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -17,12 +17,19 @@ import ( "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/slices" "github.com/adhocore/gronx" + "github.com/alecthomas/atomic" "github.com/alecthomas/types/pubsub" "github.com/benbjohnson/clock" "github.com/jpillora/backoff" + "github.com/serialx/hashring" sl "golang.org/x/exp/slices" ) +const ( + controllersPerJob = 2 + hashRingUpdateInterval = 5 * time.Second +) + type Config struct { Timeout time.Duration } @@ -34,6 +41,7 @@ const ( createJobs updateJobs removedDeploymentKey + updatedHashring ) type jobChange struct { @@ -42,28 +50,33 @@ type jobChange struct { deploymentKey model.DeploymentKey } -type jobIdentifier struct { - deploymentKey model.DeploymentKey - verb string +type hashRingState struct { + hashRing *hashring.HashRing + controllers []dal.Controller + idx int } type Service struct { config Config + key model.ControllerKey dal *dal.DAL scheduler *scheduledtask.Scheduler executor CallExecuter // Change the type from *CallExecuter to CallExecuter clock clock.Clock jobChanges *pubsub.Topic[jobChange] + + hashRingState atomic.Value[*hashRingState] } type CallExecuter interface { Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) } -func New(ctx context.Context, config Config, dal *dal.DAL, scheduler *scheduledtask.Scheduler, executor CallExecuter) *Service { +func New(ctx context.Context, key model.ControllerKey, config Config, dal *dal.DAL, scheduler *scheduledtask.Scheduler, executor CallExecuter) *Service { svc := &Service{ config: config, + key: key, dal: dal, scheduler: scheduler, executor: executor, @@ -73,12 +86,14 @@ func New(ctx context.Context, config Config, dal *dal.DAL, scheduler *scheduledt svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.resetJobs) svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.timeOutJobs) + _, _ = svc.updateHashRing(ctx) + go svc.syncHashRing(ctx) go svc.watchForUpdates(ctx) return svc } -func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.DeploymentKey, module *schema.Module) error { +func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.DeploymentKey, module *schema.Module) { logger := log.FromContext(ctx) newJobs := []dal.CronJob{} @@ -88,19 +103,17 @@ func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.Dep //TODO: remove prefix trimming when swapping out cron lib schedule := strings.TrimPrefix(cron.String(), "+cron ") - now := time.Now().UTC() - next, err := gronx.NextTickAfter(schedule, now, false) + start := time.Now().UTC() + next, err := gronx.NextTickAfter(schedule, start, false) if err != nil { logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", deploymentKey, verb.Name, schedule) - //TODO: error shouldn't propagate, just log it? + continue + } + created, err := s.dal.CreateCronJob(ctx, deploymentKey, module.Name, verb.Name, schedule, start, next) + if err != nil { + logger.Errorf(err, "failed to create cron job %v:%v", deploymentKey, verb.Name) } else { - created, err := s.dal.CreateCronJob(ctx, deploymentKey, module.Name, verb.Name, schedule, now, next) - if err != nil { - logger.Errorf(err, "failed to create cron job %v:%v", deploymentKey, verb.Name) - //TODO: what should be the error logic? - } else { - newJobs = append(newJobs, created) - } + newJobs = append(newJobs, created) } } } @@ -112,9 +125,6 @@ func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.Dep jobs: newJobs, }) } - - // TODO: on error, do we need to delete rows we tried to create? - return nil } func (s *Service) UpdatedDeploymentMinReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) error { @@ -159,7 +169,6 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { Body: requestJSON, }) _, err = s.executor.Call(ctx, req) - fmt.Printf("executed %v:%v\n", job.DeploymentKey, job.Verb) if err != nil { logger.Errorf(err, "failed to execute cron job %v:%v", job.DeploymentKey, job.Verb) } @@ -172,7 +181,6 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { updatedJob, err := s.dal.EndCronJob(ctx, job, next) if err != nil { logger.Errorf(err, "failed to end cronjob %v:%v", job.DeploymentKey, job.Verb) - // TODO: maybe due to timeout? do we fetch again? } else { s.jobChanges.Publish(jobChange{ changeType: updateJobs, @@ -186,50 +194,54 @@ func (s *Service) timeOutJobs(ctx context.Context) (time.Duration, error) { return time.Minute, nil } -// watchForUpdates listens for CronJobs we know have been updated -// Other parts of the service either update CronJobs or fetches new versions, -// which are then processed here to maintain the current known list of jobs. +// watchForUpdates listens for updates and schedules jobs +// Other parts of the service publish into this channel func (s *Service) watchForUpdates(ctx context.Context) { logger := log.FromContext(ctx) - jobs := []dal.CronJob{} - jobChanges := make(chan jobChange, 128) s.jobChanges.Subscribe(jobChanges) defer s.jobChanges.Unsubscribe(jobChanges) - for { - //TODO: how to handle jobs we didn't schedule because hash ring told us not to? - // ... maybe only do a timer in loop for jobs we know are assigned to controller, but with a max of 15s or so? + state := &State{ + executing: map[jobIdentifier]bool{}, + newJobs: map[jobIdentifier]bool{}, + } - //TODO: find next job's time (without looking at hashring, because hashring may change in that time) + //TODO: do we need some memroy of errors, or something to prevent constant attempts? + for { + sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { + return s.sortJobs(state, i, j) + }) - //TODO: do we need some memroy of errors, or something to prevent constant attempts? now := s.clock.Now() - next := now.Add(time.Minute) - if len(jobs) > 0 { - sl.SortFunc(jobs, func(i, j dal.CronJob) int { - return i.NextExecution.Compare(j.NextExecution) - }) - for _, j := range jobs { - if j.State == dal.JobStateIdle { - next = jobs[0].NextExecution - break - } + next := time.Now().Add(time.Hour) // should never be reached, expect a different signal long beforehand + for _, j := range state.jobs { + if possibleNext, err := s.nextCheckForJob(j, state, false); err == nil { + next = *possibleNext + break } } + if next.Sub(now) < time.Second { next = now.Add(time.Second) + logger.Tracef("loop while gated for 1s") + } else if next.Sub(now) > time.Minute*59 { + logger.Tracef("loop while idling") + } else { + logger.Tracef("loop with next %v, %d jobs", next.Sub(now), len(state.jobs)) } - fmt.Printf("loop with next %v, %d jobs\n", next.Sub(now), len(jobs)) select { case <-ctx.Done(): return case <-s.clock.After(next.Sub(now)): // Try starting jobs in db - jobsToAttempt := slices.Filter(jobs, func(j dal.CronJob) bool { - return j.State == dal.JobStateIdle && !j.NextExecution.After(time.Now().UTC()) + jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool { + if next, err := s.nextCheckForJob(j, state, true); err == nil { + return !next.After(time.Now().UTC()) + } + return false }) jobResults, removedDeploymentKeys, err := s.dal.StartCronJobs(ctx, jobsToAttempt) if err != nil { @@ -241,78 +253,174 @@ func (s *Service) watchForUpdates(ctx context.Context) { // Start jobs that were successfully updated updatedJobs := []dal.CronJob{} for job, shouldStart := range jobResults { - if shouldStart { - if _, matched := removedDeploymentKeys[job.DeploymentKey]; matched { - // We successfully updated the db to start this job but the deployment has min replicas set to 0 - // We need to update the db to end this job - _, err := s.dal.EndCronJob(ctx, job, next) - if err != nil { - logger.Errorf(err, "failed to end cronjob %v:%v", job.DeploymentKey, job.Verb) - } - } else { - go s.executeJob(ctx, job) + updatedJobs = append(updatedJobs, job) + if !shouldStart { + continue + } + if removedDeploymentKey := removedDeploymentKeys[job.DeploymentKey]; removedDeploymentKey { + // We successfully updated the db to start this job but the deployment has min replicas set to 0 + // We need to update the db to end this job + _, err := s.dal.EndCronJob(ctx, job, next) + if err != nil { + logger.Errorf(err, "failed to end cronjob %v:%v", job.DeploymentKey, job.Verb) } + continue } - updatedJobs = append(updatedJobs, job) + logger.Infof("executing job %v:%v", job.DeploymentKey, job.Verb) + state.startedExecutingJob(job) + go s.executeJob(ctx, job) } // Update job list - updateJobsInList(&jobs, updatedJobs) + state.updateJobs(updatedJobs) for key := range removedDeploymentKeys { - removeDeploymentKeyInList(&jobs, key) + state.removeDeploymentKey(key) } case event := <-jobChanges: switch event.changeType { case resetJobs: logger.Tracef("resetting job list: %d jobs", len(event.jobs)) - jobs = event.jobs + state.reset(event.jobs) case createJobs: logger.Tracef("adding %d jobs", len(event.jobs)) - // check if created job already arrived due to a reset - existingMap := jobMap(jobs) - for _, job := range event.jobs { - if _, exists := existingMap[jobVerbForJob(job)]; !exists { - jobs = append(jobs, job) - } - } + state.addJobs(event.jobs) case updateJobs: logger.Tracef("updating %d jobs", len(event.jobs)) - updateJobsInList(&jobs, event.jobs) + state.updateJobs(event.jobs) case removedDeploymentKey: logger.Tracef("removing jobs for %q", event.deploymentKey) - removeDeploymentKeyInList(&jobs, event.deploymentKey) + state.removeDeploymentKey(event.deploymentKey) + case updatedHashring: + // do another cycle through the loop to see if new jobs need to be scheduled } } } } -func updateJobsInList(list *[]dal.CronJob, jobs []dal.CronJob) { - updatedJobMap := jobMap(*list) - for idx, job := range jobs { - if updated, exists := updatedJobMap[jobVerbForJob(job)]; exists { - //TODO: compare to see if outdated - jobs[idx] = updated +func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { + iNext, err := s.nextCheckForJob(i, state, false) + if err != nil { + return 1 + } + jNext, err := s.nextCheckForJob(j, state, false) + if err != nil { + return -1 + } + return iNext.Compare(*jNext) +} + +func (s *Service) nextCheckForJob(job dal.CronJob, state *State, allowsNow bool) (*time.Time, error) { + if !s.isResponsibleForJob(job, state) { + return nil, fmt.Errorf("controller is not responsible for job") + } + if job.State == dal.JobStateExecuting { + if state.isExecutingInCurrentController(job) { + // return a time in the future, meaning don't schedule at this time + return nil, fmt.Errorf("controller is already waiting for job to finish") + } + // We don't know when the other controller will finish this job + // We should check again when the next execution date is assuming the job finishes + next, err := gronx.NextTickAfter(job.Schedule, time.Now().UTC(), allowsNow) + if err == nil { + return &next, nil } } + return &job.NextExecution, nil } -func removeDeploymentKeyInList(list *[]dal.CronJob, key model.DeploymentKey) { - *list = slices.Filter(*list, func(j dal.CronJob) bool { - return j.DeploymentKey != key - }) +// Synchronise the hash ring with the active controllers. +func (s *Service) syncHashRing(ctx context.Context) { + // TODO: now cronjobs and scheduledtask poll the db with the same thing every 5 seconds. Can we combine this? + logger := log.FromContext(ctx).Scope("cron") + for { + select { + case <-ctx.Done(): + return + + case <-s.clock.After(hashRingUpdateInterval): + updated, err := s.updateHashRing(ctx) + if err != nil { + logger.Warnf("Failed to get controllers: %s", err) + continue + } + if updated { + s.jobChanges.Publish(jobChange{ + changeType: updatedHashring, + }) + } + } + } } -func jobMap(jobs []dal.CronJob) map[jobIdentifier]dal.CronJob { - m := map[jobIdentifier]dal.CronJob{} - for _, job := range jobs { - m[jobVerbForJob(job)] = job +func (s *Service) updateHashRing(ctx context.Context) (bool, error) { + controllers, err := s.dal.GetControllers(ctx, false) + if err != nil { + return false, err + } + + controllerIdx := -1 + for idx, controller := range controllers { + if controller.Key == s.key { + controllerIdx = idx + break + } + } + if controllerIdx == -1 { + return false, fmt.Errorf("controller %v not found in list of controllers", s.key) } - return m + + oldState := s.hashRingState.Load() + if oldState != nil && len(oldState.controllers) == len(controllers) { + hasChanged := false + for idx, new := range controllers { + old := oldState.controllers[idx] + if new.Key != old.Key { + hasChanged = true + break + } + } + if !hasChanged { + return false, nil + } + } + + hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + s.hashRingState.Store(&hashRingState{ + hashRing: hashRing, + controllers: controllers, + idx: controllerIdx, + }) + return true, nil } -func jobVerbForJob(job dal.CronJob) jobIdentifier { - return jobIdentifier{ - deploymentKey: job.DeploymentKey, - verb: job.Verb, +func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { + if state.isJobTooNewForHashRing(job) { + return true + } + hashringState := s.hashRingState.Load() + if hashringState == nil { + return true + } + + initialKey, ok := hashringState.hashRing.GetNode(identifierForJob(job).String()) + if !ok { + return true + } + + initialIdx := -1 + for idx, controller := range hashringState.controllers { + if controller.Key.String() == initialKey { + initialIdx = idx + break + } + } + if initialIdx == -1 { + return true + } + + if initialIdx+controllersPerJob > len(hashringState.controllers) { + // wraps around + return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerJob)-len(hashringState.controllers) } + return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerJob } diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go new file mode 100644 index 0000000000..416c3fc36e --- /dev/null +++ b/backend/controller/cronjobs/state.go @@ -0,0 +1,96 @@ +package cronjobs + +import ( + "fmt" + + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" +) + +type jobIdentifier struct { + deploymentKey model.DeploymentKey + verb string +} + +func identifierForJob(job dal.CronJob) jobIdentifier { + return jobIdentifier{ + deploymentKey: job.DeploymentKey, + verb: job.Verb, + } +} + +func (i jobIdentifier) String() string { + return fmt.Sprintf("%s:::%s", i.deploymentKey.String(), i.verb) +} + +type State struct { + jobs []dal.CronJob + + // Used to determine if this controller is currently executing a job + executing map[jobIdentifier]bool + + // Newly created jobs should be attempted by the controller that created them until other controllers + // have a chance to reset their job lists and share responsibilities through the hash ring + newJobs map[jobIdentifier]bool +} + +func (s *State) isExecutingInCurrentController(job dal.CronJob) bool { + return s.executing[identifierForJob(job)] +} + +func (s *State) startedExecutingJob(job dal.CronJob) { + s.executing[identifierForJob(job)] = true +} + +func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { + return s.newJobs[identifierForJob(job)] +} + +func (s *State) reset(jobs []dal.CronJob) { + s.jobs = jobs + for _, job := range s.jobs { + if job.State != dal.JobStateExecuting { + delete(s.executing, identifierForJob(job)) + } + } + s.newJobs = map[jobIdentifier]bool{} +} + +func (s *State) addJobs(added []dal.CronJob) { + // check if created job already arrived due to a reset + existingMap := jobMap(s.jobs) + for _, job := range added { + if _, exists := existingMap[identifierForJob(job)]; !exists { + s.jobs = append(s.jobs, job) + } + s.newJobs[identifierForJob(job)] = true + } +} + +func (s *State) updateJobs(jobs []dal.CronJob) { + updatedJobMap := jobMap(jobs) + for idx, old := range s.jobs { + if updated, exists := updatedJobMap[identifierForJob(old)]; exists { + //TODO: compare to see if outdated + s.jobs[idx] = updated + if updated.State != dal.JobStateExecuting { + delete(s.executing, identifierForJob(updated)) + } + } + } +} + +func (s *State) removeDeploymentKey(key model.DeploymentKey) { + s.jobs = slices.Filter(s.jobs, func(j dal.CronJob) bool { + return j.DeploymentKey != key + }) +} + +func jobMap(jobs []dal.CronJob) map[jobIdentifier]dal.CronJob { + m := map[jobIdentifier]dal.CronJob{} + for _, job := range jobs { + m[identifierForJob(job)] = job + } + return m +} diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index c7dd7da5c2..9f7521c225 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -303,6 +303,7 @@ WITH updates AS ( WHERE id = ANY (sqlc.arg('ids')) AND state = 'idle' AND start_time < next_execution + AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ RETURNING id, state, start_time, next_execution) SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, @@ -312,7 +313,8 @@ SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.s CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id - LEFT JOIN updates u on j.id = u.id; + LEFT JOIN updates u on j.id = u.id +WHERE j.id = ANY (sqlc.arg('ids')); -- name: EndCronJob :one WITH j AS ( diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 493b018cff..0c06a7bf79 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1444,6 +1444,7 @@ WITH updates AS ( WHERE id = ANY ($1) AND state = 'idle' AND start_time < next_execution + AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ RETURNING id, state, start_time, next_execution) SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, @@ -1454,6 +1455,7 @@ SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.s FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id LEFT JOIN updates u on j.id = u.id +WHERE j.id = ANY ($1) ` type StartCronJobsRow struct {