From 2716ff7c4768b0c416c2dd2e4b3f86f7b0586b67 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 15:59:25 +1000 Subject: [PATCH] feat: cron job service --- backend/controller/controller.go | 48 +- backend/controller/cronjobs/cronjobs.go | 472 ++++++++++++++++++ backend/controller/cronjobs/cronjobs_test.go | 220 ++++++++ backend/controller/cronjobs/state.go | 82 +++ .../controller/scheduledtask/scheduledtask.go | 59 +-- .../scheduledtask/scheduledtask_test.go | 9 +- backend/controller/sql/querier.go | 1 - backend/controller/sql/queries.sql | 4 - backend/controller/sql/queries.sql.go | 10 - backend/schema/normalise.go | 6 +- backend/schema/validate.go | 6 + backend/schema/validate_test.go | 15 + 12 files changed, 859 insertions(+), 73 deletions(-) create mode 100644 backend/controller/cronjobs/cronjobs.go create mode 100644 backend/controller/cronjobs/cronjobs_test.go create mode 100644 backend/controller/cronjobs/state.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 563750bed4..57be46cf39 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/scaling" @@ -49,10 +50,11 @@ import ( // CommonConfig between the production controller and development server. type CommonConfig struct { - AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` - NoConsole bool `help:"Disable the console."` - IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` - WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` + NoConsole bool `help:"Disable the console."` + IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` + WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"` } type Config struct { @@ -138,12 +140,18 @@ type clients struct { runner ftlv1connect.RunnerServiceClient } +type ControllerListListener interface { + UpdatedControllerList(ctx context.Context, controllers []dal.Controller) +} + type Service struct { dal *dal.DAL key model.ControllerKey deploymentLogsSink *deploymentLogsSink - tasks *scheduledtask.Scheduler + tasks *scheduledtask.Scheduler + cronJobs *cronjobs.Service + controllerListListeners []ControllerListListener // Map from endpoint to client. clients *ttlcache.Cache[string, clients] @@ -163,7 +171,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. } config.SetDefaults() svc := &Service{ - tasks: scheduledtask.New(ctx, key, db), + tasks: scheduledtask.New(ctx, key), dal: db, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), @@ -174,8 +182,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) + svc.cronJobs = cronSvc + svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) + _, _ = svc.updateControllersList(ctx) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments) @@ -422,6 +436,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re return nil, fmt.Errorf("could not replace deployment: %w", err) } } + + s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey) + return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil } @@ -732,11 +749,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl } ingressRoutes := extractIngressRoutingEntries(req.Msg) - dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil) + cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema) + if err != nil { + logger.Errorf(err, "Could not generate cron jobs for new deployment") + return nil, fmt.Errorf("%s: %w", "could not generate cron jobs for new deployment", err) + } + + dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs) if err != nil { logger.Errorf(err, "Could not create deployment") return nil, fmt.Errorf("could not create deployment: %w", err) } + deploymentLogger := s.getDeploymentLogger(ctx, dkey) deploymentLogger.Debugf("Created deployment %s", dkey) return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil @@ -999,7 +1023,17 @@ func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error return 0, fmt.Errorf("failed to heartbeat controller: %w", err) } return time.Second * 3, nil +} +func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) { + controllers, err := s.dal.GetControllers(ctx, false) + if err != nil { + return 0, err + } + for _, listener := range s.controllerListListeners { + listener.UpdatedControllerList(ctx, controllers) + } + return time.Second * 5, nil } func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error { diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go new file mode 100644 index 0000000000..2ecf7df1c3 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs.go @@ -0,0 +1,472 @@ +package cronjobs + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/atomic" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" + "github.com/serialx/hashring" + sl "golang.org/x/exp/slices" +) + +const ( + controllersPerJob = 2 + jobResetInterval = time.Minute + newJobHashRingOverrideInterval = time.Minute + time.Second*20 +) + +type Config struct { + Timeout time.Duration +} + +type jobChangeType int + +const ( + resetJobs jobChangeType = iota + finishedJobs + updatedHashring +) + +type jobChange struct { + changeType jobChangeType + jobs []dal.CronJob + addedDeploymentKey optional.Option[model.DeploymentKey] +} + +type hashRingState struct { + hashRing *hashring.HashRing + controllers []dal.Controller + idx int +} + +type DAL interface { + GetCronJobs(ctx context.Context) ([]dal.CronJob, error) + StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) + EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) + GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) +} + +type Scheduler interface { + Singleton(retry backoff.Backoff, job scheduledtask.Job) + Parallel(retry backoff.Backoff, job scheduledtask.Job) +} + +type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) + +type Service struct { + config Config + key model.ControllerKey + originURL *url.URL + + dal DAL + scheduler Scheduler + call ExecuteCallFunc + + clock clock.Clock + jobChanges *pubsub.Topic[jobChange] + + hashRingState atomic.Value[*hashRingState] +} + +func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, originURL, config, dal, scheduler, call, clock.New()) +} + +func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { + svc := &Service{ + config: config, + key: key, + originURL: originURL, + dal: dal, + scheduler: scheduler, + call: call, + clock: clock, + jobChanges: pubsub.New[jobChange](), + } + svc.UpdatedControllerList(ctx, nil) + + svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.resetJobs) + svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs) + + go svc.watchForUpdates(ctx) + + return svc +} + +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) (jobs []dal.CronJob, err error) { + logger := log.FromContext(ctx) + + start := s.clock.Now().UTC() + newJobs := []dal.CronJob{} + for _, decl := range module.Decls { + if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok { + for _, metadata := range verb.Verb.Metadata { + if cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob); ok { + cronStr := cronMetadata.CronJob.Cron + schedule, err := cron.Parse(cronStr) + if err != nil { + logger.Errorf(err, "failed to parse cron schedule %q", cronStr) + continue + } + next, err := cron.NextAfter(schedule, start, false) + if err != nil { + logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", module.Name, verb.Verb.Name, schedule) + continue + } + newJobs = append(newJobs, dal.CronJob{ + Key: model.NewCronJobKey(module.Name, verb.Verb.Name), + Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, + Schedule: cronStr, + StartTime: start, + NextExecution: next, + State: dal.JobStateIdle, + // DeploymentKey: Filled in by DAL + }) + } + } + } + } + return newJobs, nil +} + +func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { + // Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs + _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) +} + +func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) { + err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) + if err != nil { + return 0, err + } + return jobResetInterval, nil +} + +// resetJobsWithNewDeploymentKey resets the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. +func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { + logger := log.FromContext(ctx) + + jobs, err := s.dal.GetCronJobs(ctx) + if err != nil { + logger.Errorf(err, "failed to get cron jobs") + return fmt.Errorf("%s: %w", "failed to get cron jobs", err) + } + s.jobChanges.Publish(jobChange{ + changeType: resetJobs, + jobs: jobs, + addedDeploymentKey: deploymentKey, + }) + return nil +} + +func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { + logger := log.FromContext(ctx) + requestBody := map[string]any{} + requestJSON, err := json.Marshal(requestBody) + if err != nil { + logger.Errorf(err, "could not build cron job body %v:%v", job.DeploymentKey, job.Ref.String()) + return + } + + req := connect.NewRequest(&ftlv1.CallRequest{ + Verb: &schemapb.Ref{Module: job.Ref.Module, Name: job.Ref.Name}, + Body: requestJSON, + }) + + requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Ref.Module, job.Ref.Name)) + + callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) + if err != nil { + logger.Errorf(err, "failed to execute cron job %s", job.Ref.String()) + } + + schedule, err := cron.Parse(job.Schedule) + if err != nil { + logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule) + return + } + next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) + if err != nil { + logger.Errorf(err, "failed to calculate next execution for cron job %s with schedule %q", job.Ref.String(), job.Schedule) + } + + updatedJob, err := s.dal.EndCronJob(ctx, job, next) + if err != nil { + logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + } else { + s.jobChanges.Publish(jobChange{ + changeType: finishedJobs, + jobs: []dal.CronJob{updatedJob}, + }) + } +} + +func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { + logger := log.FromContext(ctx) + staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) + if err != nil { + return 0, err + } + + updatedJobs := []dal.CronJob{} + for _, stale := range staleJobs { + start := s.clock.Now().UTC() + pattern, err := cron.Parse(stale.Schedule) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because schedule could not be parsed: %q", stale.Ref.String(), stale.Schedule) + continue + } + next, err := cron.NextAfter(pattern, start, false) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because next date could not be calculated: %q", stale.Ref.String(), stale.Schedule) + continue + } + + updated, err := s.dal.EndCronJob(ctx, stale, next) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Ref.String(), err) + continue + } + logger.Warnf("Killed stale cron job %s", stale.Ref.String()) + updatedJobs = append(updatedJobs, updated) + } + + s.jobChanges.Publish(jobChange{ + changeType: finishedJobs, + jobs: updatedJobs, + }) + + return time.Minute, nil +} + +// watchForUpdates is the centralized place that handles: +// - list of known jobs and their state +// - executing jobs when they are due +// - reacting to events that change the list of jobs, deployments or hash ring +func (s *Service) watchForUpdates(ctx context.Context) { + logger := log.FromContext(ctx) + + jobChanges := make(chan jobChange, 128) + s.jobChanges.Subscribe(jobChanges) + defer s.jobChanges.Unsubscribe(jobChanges) + + state := &State{ + executing: map[string]bool{}, + newJobs: map[string]time.Time{}, + blockedUntil: s.clock.Now(), + } + + for { + sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { + return s.sortJobs(state, i, j) + }) + + now := s.clock.Now() + next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand + for _, j := range state.jobs { + if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil { + next = possibleNext + break + } + } + + if next.Before(state.blockedUntil) { + next = state.blockedUntil + logger.Tracef("loop blocked for %vs", next.Sub(now)) + } else if next.Sub(now) < time.Second { + next = now.Add(time.Second) + logger.Tracef("loop while gated for 1s") + } else if next.Sub(now) > time.Minute*59 { + logger.Tracef("loop while idling") + } else { + logger.Tracef("loop with next %v, %d jobs", next.Sub(now), len(state.jobs)) + } + + select { + case <-ctx.Done(): + return + case <-s.clock.After(next.Sub(now)): + // Try starting jobs in db + jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool { + if n, err := s.nextAttemptForJob(j, state, true); err == nil { + return !n.After(s.clock.Now().UTC()) + } + return false + }) + jobResults, err := s.dal.StartCronJobs(ctx, jobsToAttempt) + if err != nil { + logger.Errorf(err, "failed to start cron jobs in db") + state.blockedUntil = s.clock.Now().Add(time.Second * 5) + continue + } + + // Start jobs that were successfully updated + updatedJobs := []dal.CronJob{} + removedDeploymentKeys := map[string]model.DeploymentKey{} + + for _, job := range jobResults { + updatedJobs = append(updatedJobs, job.CronJob) + if !job.DidStartExecution { + continue + } + if !job.HasMinReplicas { + // We successfully updated the db to start this job but the deployment has min replicas set to 0 + // We need to update the db to end this job + removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey + _, err := s.dal.EndCronJob(ctx, job.CronJob, next) + if err != nil { + logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + } + continue + } + logger.Infof("executing job %s", job.Ref.String()) + state.startedExecutingJob(job.CronJob) + go s.executeJob(ctx, job.CronJob) + } + + // Update job list + state.updateJobs(updatedJobs) + for _, key := range removedDeploymentKeys { + state.removeDeploymentKey(key) + } + case event := <-jobChanges: + switch event.changeType { + case resetJobs: + logger.Tracef("resetting job list: %d jobs", len(event.jobs)) + state.reset(event.jobs, event.addedDeploymentKey) + case finishedJobs: + logger.Tracef("updating %d jobs", len(event.jobs)) + state.updateJobs(event.jobs) + case updatedHashring: + // do another cycle through the loop to see if new jobs need to be scheduled + } + } + } +} + +func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { + iNext, err := s.nextAttemptForJob(i, state, false) + if err != nil { + return 1 + } + jNext, err := s.nextAttemptForJob(j, state, false) + if err != nil { + return -1 + } + return iNext.Compare(jNext) +} + +func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) { + if !s.isResponsibleForJob(job, state) { + return s.clock.Now(), fmt.Errorf("controller is not responsible for job") + } + if job.State == dal.JobStateExecuting { + if state.isExecutingInCurrentController(job) { + // return a time in the future, meaning don't schedule at this time + return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") + } + // We don't know when the other controller will finish this job + // We should check again when the next execution date is assuming the job finishes + pattern, err := cron.Parse(job.Schedule) + if err != nil { + return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) + } + next, err := cron.NextAfter(pattern, s.clock.Now().UTC(), allowsNow) + if err == nil { + return next, nil + } + } + return job.NextExecution, nil +} + +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { + logger := log.FromContext(ctx).Scope("cron") + controllerIdx := -1 + for idx, controller := range controllers { + if controller.Key.String() == s.key.String() { + controllerIdx = idx + break + } + } + if controllerIdx == -1 { + logger.Tracef("controller %q not found in list of controllers", s.key) + } + + oldState := s.hashRingState.Load() + if oldState != nil && len(oldState.controllers) == len(controllers) { + hasChanged := false + for idx, new := range controllers { + old := oldState.controllers[idx] + if new.Key.String() != old.Key.String() { + hasChanged = true + break + } + } + if !hasChanged { + return + } + } + + hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + s.hashRingState.Store(&hashRingState{ + hashRing: hashRing, + controllers: controllers, + idx: controllerIdx, + }) + + s.jobChanges.Publish(jobChange{ + changeType: updatedHashring, + }) +} + +func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { + if state.isJobTooNewForHashRing(job) { + return true + } + hashringState := s.hashRingState.Load() + if hashringState == nil { + return true + } + + initialKey, ok := hashringState.hashRing.GetNode(job.Key.String()) + if !ok { + return true + } + + initialIdx := -1 + for idx, controller := range hashringState.controllers { + if controller.Key.String() == initialKey { + initialIdx = idx + break + } + } + if initialIdx == -1 { + return true + } + + if initialIdx+controllersPerJob > len(hashringState.controllers) { + // wraps around + return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerJob)-len(hashringState.controllers) + } + return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerJob +} diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go new file mode 100644 index 0000000000..424bcdf542 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -0,0 +1,220 @@ +package cronjobs + +import ( + "context" + "fmt" + "net/url" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" +) + +type mockDAL struct { + lock sync.Mutex + clock *clock.Mock + jobs []dal.CronJob + attemptCountMap map[string]int +} + +func (d *mockDAL) GetCronJobs(ctx context.Context) ([]dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return d.jobs, nil +} + +func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) { + d.lock.Lock() + defer d.lock.Unlock() + + job := dal.CronJob{ + Key: model.NewCronJobKey(module, verb), + DeploymentKey: deploymentKey, + Ref: schema.Ref{Module: module, Name: verb}, + Schedule: schedule, + StartTime: startTime, + NextExecution: nextExecution, + State: dal.JobStateIdle, + } + d.jobs = append(d.jobs, job) +} + +func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { + for i, j := range d.jobs { + if j.DeploymentKey.String() == job.DeploymentKey.String() && j.Ref.Name == job.Ref.Name { + return i, nil + } + } + return -1, fmt.Errorf("job not found") +} + +func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { + d.lock.Lock() + defer d.lock.Unlock() + + attemptedJobs = []dal.AttemptedCronJob{} + now := (*d.clock).Now() + + for _, inputJob := range jobs { + i, err := d.indexForJob(inputJob) + if err != nil { + return nil, err + } + job := d.jobs[i] + if !job.NextExecution.After(now) && job.State == dal.JobStateIdle { + job.State = dal.JobStateExecuting + job.StartTime = (*d.clock).Now() + d.jobs[i] = job + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: true, + HasMinReplicas: true, + }) + } else { + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: false, + HasMinReplicas: true, + }) + } + d.attemptCountMap[job.Ref.String()]++ + } + return attemptedJobs, nil +} + +func (d *mockDAL) EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + i, err := d.indexForJob(job) + if err != nil { + return dal.CronJob{}, err + } + internalJob := d.jobs[i] + if internalJob.State != dal.JobStateExecuting { + return dal.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") + } + if internalJob.StartTime != job.StartTime { + return dal.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") + } + internalJob.State = dal.JobStateIdle + internalJob.NextExecution = next + d.jobs[i] = internalJob + return internalJob, nil +} + +func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return slices.Filter(d.jobs, func(job dal.CronJob) bool { + return (*d.clock).Now().After(job.StartTime.Add(duration)) + }), nil +} + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + DAL DAL + cronJobs *Service +} + +func TestService(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + config := Config{Timeout: time.Minute * 5} + clock := clock.NewMock() + mockDal := &mockDAL{ + clock: clock, + lock: sync.Mutex{}, + attemptCountMap: map[string]int{}, + } + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + for i := range 20 { + deploymentKey := model.NewDeploymentKey("initial") + now := clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) + } + + controllers := []*controller{} + for i := range 5 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + controller := &controller{ + key: key, + DAL: mockDal, + cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.resetJobs(ctx) + }() + } + + clock.Add(time.Second * 5) + time.Sleep(time.Millisecond * 100) + for range 3 { + clock.Add(time.Second * 10) + time.Sleep(time.Millisecond * 100) + } + + for _, j := range mockDal.jobs { + count := verbCallCount[j.Ref.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) + } +} diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go new file mode 100644 index 0000000000..080c655360 --- /dev/null +++ b/backend/controller/cronjobs/state.go @@ -0,0 +1,82 @@ +package cronjobs + +import ( + "time" + + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" +) + +type State struct { + jobs []dal.CronJob + + // Used to determine if this controller is currently executing a job + executing map[string]bool + + // Newly created jobs should be attempted by the controller that created them until other controllers + // have a chance to reset their job lists and share responsibilities through the hash ring + newJobs map[string]time.Time + + blockedUntil time.Time +} + +func (s *State) isExecutingInCurrentController(job dal.CronJob) bool { + return s.executing[job.Key.String()] +} + +func (s *State) startedExecutingJob(job dal.CronJob) { + s.executing[job.Key.String()] = true +} + +func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { + if t, ok := s.newJobs[job.Key.String()]; ok { + if time.Since(t) < newJobHashRingOverrideInterval { + return true + } + delete(s.newJobs, job.Key.String()) + } + return false +} + +func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { + s.jobs = make([]dal.CronJob, len(jobs)) + copy(s.jobs, jobs) + for _, job := range s.jobs { + if job.State != dal.JobStateExecuting { + delete(s.executing, job.Key.String()) + } + if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { + // This job is new and should be attempted by the current controller + s.newJobs[job.Key.String()] = time.Now() + } + } +} + +func (s *State) updateJobs(jobs []dal.CronJob) { + updatedJobMap := jobMap(jobs) + for idx, old := range s.jobs { + if updated, exists := updatedJobMap[old.Key.String()]; exists { + //TODO: compare to see if outdated + s.jobs[idx] = updated + if updated.State != dal.JobStateExecuting { + delete(s.executing, updated.Key.String()) + } + } + } +} + +func (s *State) removeDeploymentKey(key model.DeploymentKey) { + s.jobs = slices.Filter(s.jobs, func(j dal.CronJob) bool { + return j.DeploymentKey.String() != key.String() + }) +} + +func jobMap(jobs []dal.CronJob) map[string]dal.CronJob { + m := map[string]dal.CronJob{} + for _, job := range jobs { + m[job.Key.String()] = job + } + return m +} diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index e770197ba4..e3f3a049be 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -35,16 +35,8 @@ type descriptor struct { // run. type Job func(ctx context.Context) (time.Duration, error) -type DAL interface { - GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) -} - type DALFunc func(ctx context.Context, all bool) ([]dal.Controller, error) -func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) { - return f(ctx, all) -} - // Scheduler is a task scheduler for the controller. // // Each job runs in its own goroutine. @@ -54,28 +46,25 @@ func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller // as the hash ring is only updated periodically and controllers may have // inconsistent views of the hash ring. type Scheduler struct { - controller DAL - key model.ControllerKey - jobs chan *descriptor - clock clock.Clock + key model.ControllerKey + jobs chan *descriptor + clock clock.Clock hashring atomic.Value[*hashring.HashRing] } // New creates a new [Scheduler]. -func New(ctx context.Context, id model.ControllerKey, controller DAL) *Scheduler { - return NewForTesting(ctx, id, controller, clock.New()) +func New(ctx context.Context, id model.ControllerKey) *Scheduler { + return NewForTesting(ctx, id, clock.New()) } -func NewForTesting(ctx context.Context, id model.ControllerKey, controller DAL, clock clock.Clock) *Scheduler { +func NewForTesting(ctx context.Context, id model.ControllerKey, clock clock.Clock) *Scheduler { s := &Scheduler{ - controller: controller, - key: id, - jobs: make(chan *descriptor), - clock: clock, + key: id, + jobs: make(chan *descriptor), + clock: clock, } - _ = s.updateHashring(ctx) - go s.syncHashRing(ctx) + s.UpdatedControllerList(ctx, nil) go s.run(ctx) return s } @@ -107,7 +96,7 @@ func (s *Scheduler) schedule(retry backoff.Backoff, job Job, singlyHomed bool) { } func (s *Scheduler) run(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") + logger := log.FromContext(ctx).Scope("scheduler") // List of jobs to run. // For singleton jobs running on a different host, this can include jobs // scheduled in the past. These are skipped on each run. @@ -147,7 +136,7 @@ func (s *Scheduler) run(ctx context.Context) { } } jobs[i] = nil // Zero out scheduled jobs. - logger.Scope(job.name).Tracef("Running cron job") + logger.Scope(job.name).Tracef("Running scheduled task") go func() { if delay, err := job.job(ctx); err != nil { logger.Scope(job.name).Warnf("%s", err) @@ -168,28 +157,8 @@ func (s *Scheduler) run(ctx context.Context) { } } -// Synchronise the hash ring with the active controllers. -func (s *Scheduler) syncHashRing(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") - for { - select { - case <-ctx.Done(): - return - - case <-s.clock.After(time.Second * 5): - if err := s.updateHashring(ctx); err != nil { - logger.Warnf("Failed to get controllers: %s", err) - } - } - } -} - -func (s *Scheduler) updateHashring(ctx context.Context) error { - controllers, err := s.controller.GetControllers(ctx, false) - if err != nil { - return err - } +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Scheduler) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { hashring := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) s.hashring.Store(hashring) - return nil } diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go index 6f759a68fb..a9c36a953e 100644 --- a/backend/controller/scheduledtask/scheduledtask_test.go +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -40,9 +40,7 @@ func TestCron(t *testing.T) { clock := clock.NewMock() for _, c := range controllers { - c.cron = NewForTesting(ctx, c.controller.Key, DALFunc(func(ctx context.Context, all bool) ([]dal.Controller, error) { - return slices.Map(controllers, func(c *controller) dal.Controller { return c.controller }), nil - }), clock) + c.cron = NewForTesting(ctx, c.controller.Key, clock) c.cron.Singleton(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) { singletonCount.Add(1) return time.Second, nil @@ -51,6 +49,11 @@ func TestCron(t *testing.T) { multiCount.Add(1) return time.Second, nil }) + c.cron.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.controller.Key, + } + })) } clock.Add(time.Second * 6) diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 85256f2dd0..eac52ac819 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -17,7 +17,6 @@ type Querier interface { // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error - CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index eb7e0c2421..9d728deedd 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -409,10 +409,6 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3); --- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3); - -- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 4bbce650bf..728ca631b2 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -86,16 +86,6 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } -const createCronRequest = `-- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3) -` - -func (q *Queries) CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error { - _, err := q.db.Exec(ctx, createCronRequest, origin, key, sourceAddr) - return err -} - const createDeployment = `-- name: CreateDeployment :exec INSERT INTO deployments (module_id, "schema", "key") VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::BYTEA, $3::deployment_key) diff --git a/backend/schema/normalise.go b/backend/schema/normalise.go index 1676e407ac..78cb0d2d19 100644 --- a/backend/schema/normalise.go +++ b/backend/schema/normalise.go @@ -111,6 +111,9 @@ func Normalise[T Node](n T) T { c.Pos = zero c.Path = normaliseSlice(c.Path) + case *MetadataCronJob: + c.Pos = zero + case *MetadataAlias: c.Pos = zero @@ -123,9 +126,6 @@ func Normalise[T Node](n T) T { case *IngressPathParameter: c.Pos = zero - case *MetadataCronJob: - c.Pos = zero - case *Config: c.Pos = zero c.Type = Normalise(c.Type) diff --git a/backend/schema/validate.go b/backend/schema/validate.go index 61cabfddd1..272047043f 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -480,6 +480,12 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) { if err != nil { merr = append(merr, err) } + if _, ok := n.Request.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cronjob can not have a request type", n.Name)) + } + if _, ok := n.Response.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cronjob can not have a response type", n.Name)) + } case *MetadataCalls, *MetadataDatabases, *MetadataAlias: } } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index 92d8cd8377..c4788b1560 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -171,6 +171,21 @@ func TestValidate(t *testing.T) { "6:10-10: verb can not have multiple instances of ingress", }, }, + + {name: "CronOnNonEmptyVerb", + schema: ` + module one { + verb verbWithWrongInput(Empty) Unit + +cron * * * * * * * + verb verbWithWrongOutput(Unit) Empty + +cron * * * * * * * + } + `, + errs: []string{ + "4:7-7: verb verbWithWrongInput: cronjob can not have a request type", + "6:7-7: verb verbWithWrongOutput: cronjob can not have a response type", + }, + }, } for _, test := range tests {