From 096c12d8bf4a4ae33fb1cd33f3d421581ebb3987 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 18 Apr 2024 10:31:33 +1000 Subject: [PATCH] test: integration test for cron jobs service (#1269) Tests the real service with the real dal / postgres. Executes jobs with a 2 second schedule 3 times. --- .github/workflows/integration.yml | 2 +- Justfile | 2 +- backend/controller/cronjobs/cronjobs.go | 2 +- .../cronjobs/cronjobs_integration_test.go | 43 +++ backend/controller/cronjobs/cronjobs_test.go | 260 ++---------------- .../cronjobs/cronjobs_utils_test.go | 259 +++++++++++++++++ 6 files changed, 330 insertions(+), 238 deletions(-) create mode 100644 backend/controller/cronjobs/cronjobs_integration_test.go create mode 100644 backend/controller/cronjobs/cronjobs_utils_test.go diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 425ea0d824..faccdf78cc 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -19,7 +19,7 @@ jobs: id: extract-tests run: | # shellcheck disable=SC2046 - echo "matrix={\"test\":$(jq -c -n '$ARGS.positional' --args $(grep '^func Test' integration/*_test.go | awk '{print $2}' | cut -d'(' -f1))}" >> "$GITHUB_OUTPUT" + echo "matrix={\"test\":$(jq -c -n '$ARGS.positional' --args $(grep '^func Test' integration/*_test.go ./backend/controller/cronjobs | awk '{print $2}' | cut -d'(' -f1))}" >> "$GITHUB_OUTPUT" integration: needs: prepare runs-on: ubuntu-latest diff --git a/Justfile b/Justfile index 462fdc762f..0ff3ebedb8 100644 --- a/Justfile +++ b/Justfile @@ -99,7 +99,7 @@ integration-tests *test: #!/bin/bash set -euo pipefail testName=${1:-} - go test -fullpath -count 1 -v -tags integration -run "$testName" ./integration + go test -fullpath -count 1 -v -tags integration -run "$testName" ./integration ./backend/controller/cronjobs # Run README doc tests test-readme *args: diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 1d5f35ecce..26003ab193 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -168,7 +168,7 @@ func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentK _ = s.syncJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } -// syncJobs is run periodically via a scheduled task +// SyncJobs is run periodically via a scheduled task func (s *Service) syncJobs(ctx context.Context) (time.Duration, error) { err := s.syncJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) if err != nil { diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go new file mode 100644 index 0000000000..debc018f36 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -0,0 +1,43 @@ +//go:build integration + +package cronjobs + +import ( + "context" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + db "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" +) + +func TestServiceWithRealDal(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + conn := sqltest.OpenForTesting(ctx, t) + dal, err := db.New(ctx, conn) + assert.NoError(t, err) + + // Using a real clock because real db queries use db clock + // delay until we are on an odd second + clk := clock.New() + if clk.Now().Second()%2 == 0 { + time.Sleep(time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond) + } else { + time.Sleep(2*time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond) + } + + testServiceWithDal(ctx, t, dal, clk) +} diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 04e045099b..00a2f7b0f1 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -1,224 +1,42 @@ +//go:build !integration + package cronjobs import ( "context" - "fmt" - "strconv" "sync" "testing" "time" "connectrpc.com/connect" - "github.com/TBD54566975/ftl/backend/controller/dal" - "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + db "github.com/TBD54566975/ftl/backend/controller/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/slices" "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" - "github.com/jpillora/backoff" xslices "golang.org/x/exp/slices" ) -type mockDAL struct { - lock sync.Mutex - clock *clock.Mock - jobs []model.CronJob - attemptCountMap map[string]int -} - -func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - return d.jobs, nil -} - -func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) { - d.lock.Lock() - defer d.lock.Unlock() - - job := model.CronJob{ - Key: model.NewCronJobKey(module, verb), - DeploymentKey: deploymentKey, - Verb: model.VerbRef{Module: module, Name: verb}, - Schedule: schedule, - StartTime: startTime, - NextExecution: nextExecution, - State: model.CronJobStateIdle, - } - d.jobs = append(d.jobs, job) -} - -func (d *mockDAL) indexForJob(job model.CronJob) (int, error) { - for i, j := range d.jobs { - if j.Key.String() == job.Key.String() { - return i, nil - } - } - return -1, fmt.Errorf("job not found") -} - -func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { - d.lock.Lock() - defer d.lock.Unlock() - - attemptedJobs = []dal.AttemptedCronJob{} - now := (*d.clock).Now() - - for _, inputJob := range jobs { - i, err := d.indexForJob(inputJob) - if err != nil { - return nil, err - } - job := d.jobs[i] - if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle { - job.State = model.CronJobStateExecuting - job.StartTime = (*d.clock).Now() - d.jobs[i] = job - attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ - CronJob: job, - DidStartExecution: true, - HasMinReplicas: true, - }) - } else { - attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ - CronJob: job, - DidStartExecution: false, - HasMinReplicas: true, - }) - } - d.attemptCountMap[job.Key.String()]++ - } - return attemptedJobs, nil -} - -func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - i, err := d.indexForJob(job) - if err != nil { - return model.CronJob{}, err - } - internalJob := d.jobs[i] - if internalJob.State != model.CronJobStateExecuting { - return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") - } - if internalJob.StartTime != job.StartTime { - return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") - } - internalJob.State = model.CronJobStateIdle - internalJob.NextExecution = next - d.jobs[i] = internalJob - return internalJob, nil -} - -func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { - d.lock.Lock() - defer d.lock.Unlock() - - return slices.Filter(d.jobs, func(job model.CronJob) bool { - return (*d.clock).Now().After(job.StartTime.Add(duration)) - }), nil -} - -type mockScheduler struct { -} - -func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { - // do nothing -} - -func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { - // do nothing -} - -type controller struct { - key model.ControllerKey - DAL DAL - clock *clock.Mock - cronJobs *Service -} - -func TestService(t *testing.T) { +func TestServiceWithMockDal(t *testing.T) { t.Parallel() ctx := log.ContextWithNewDefaultLogger(context.Background()) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) - config := Config{Timeout: time.Minute * 5} - clock := clock.NewMock() + clk := clock.NewMock() + clk.Add(time.Second) // half way between cron job executions + mockDal := &mockDAL{ - clock: clock, + clock: clk, lock: sync.Mutex{}, attemptCountMap: map[string]int{}, } - scheduler := &mockScheduler{} - - verbCallCount := map[string]int{} - verbCallCountLock := sync.Mutex{} - - // initial jobs - for i := range 20 { - deploymentKey := model.NewDeploymentKey("initial") - now := clock.Now() - cronStr := "*/10 * * * * * *" - pattern, err := cron.Parse(cronStr) - assert.NoError(t, err) - next, err := cron.NextAfter(pattern, now, false) - assert.NoError(t, err) - mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) - } - - controllers := []*controller{} - for i := range 5 { - key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) - controller := &controller{ - key: key, - DAL: mockDal, - clock: clock, - cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { - verbRef := schema.RefFromProto(r.Msg.Verb) - - verbCallCountLock.Lock() - verbCallCount[verbRef.Name]++ - verbCallCountLock.Unlock() - - return &connect.Response[ftlv1.CallResponse]{}, nil - }, clock), - } - controllers = append(controllers, controller) - } - - time.Sleep(time.Millisecond * 100) - - for _, c := range controllers { - go func() { - c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { - return dal.Controller{ - Key: ctrl.key, - } - })) - _, _ = c.cronJobs.syncJobs(ctx) - }() - } - - clock.Add(time.Second * 5) - time.Sleep(time.Millisecond * 100) - for range 3 { - clock.Add(time.Second * 10) - time.Sleep(time.Millisecond * 100) - } - for _, j := range mockDal.jobs { - count := verbCallCount[j.Verb.Name] - assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) - } + testServiceWithDal(ctx, t, mockDal, clk) } func TestHashRing(t *testing.T) { @@ -229,63 +47,35 @@ func TestHashRing(t *testing.T) { ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) - config := Config{Timeout: time.Minute * 5} mockDal := &mockDAL{ clock: clock.NewMock(), lock: sync.Mutex{}, attemptCountMap: map[string]int{}, } - scheduler := &mockScheduler{} verbCallCount := map[string]int{} verbCallCountLock := sync.Mutex{} - // initial jobs - for i := range 100 { - deploymentKey := model.NewDeploymentKey("initial") - now := mockDal.clock.Now() - cronStr := "*/10 * * * * * *" - pattern, err := cron.Parse(cronStr) - assert.NoError(t, err) - next, err := cron.NextAfter(pattern, now, false) - assert.NoError(t, err) - mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) - } + moduleName := "initial" + jobsToCreate := newJobs(t, moduleName, "*/10 * * * * * *", mockDal.clock, 100) - controllers := []*controller{} - for i := range 20 { - key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) - clock := clock.NewMock() - controller := &controller{ - key: key, - DAL: mockDal, - clock: clock, - cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { - verbRef := schema.RefFromProto(r.Msg.Verb) + deploymentKey, err := mockDal.CreateDeployment(ctx, "go", &schema.Module{ + Name: moduleName, + }, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate) + assert.NoError(t, err) - verbCallCountLock.Lock() - verbCallCount[verbRef.Name]++ - verbCallCountLock.Unlock() + err = mockDal.ReplaceDeployment(ctx, deploymentKey, 1) + assert.NoError(t, err) - return &connect.Response[ftlv1.CallResponse]{}, nil - }, clock), - } - controllers = append(controllers, controller) - } + controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) - time.Sleep(time.Millisecond * 100) + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() - for _, c := range controllers { - go func() { - c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { - return dal.Controller{ - Key: ctrl.key, - } - })) - _, _ = c.cronJobs.syncJobs(ctx) - }() - } - time.Sleep(time.Millisecond * 100) + return &connect.Response[ftlv1.CallResponse]{}, nil + }) // progress time for each controller one at a time, noting which verbs got attempted each time // to build a map of verb to controller keys @@ -296,7 +86,7 @@ func TestHashRing(t *testing.T) { beforeAttemptCount[k] = v } - c.clock.Add(time.Second * 15) + c.mockClock.Add(time.Second * 15) time.Sleep(time.Millisecond * 100) for k, v := range mockDal.attemptCountMap { diff --git a/backend/controller/cronjobs/cronjobs_utils_test.go b/backend/controller/cronjobs/cronjobs_utils_test.go new file mode 100644 index 0000000000..0f7738e0cc --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_utils_test.go @@ -0,0 +1,259 @@ +package cronjobs + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + db "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" +) + +type ExtendedDAL interface { + DAL + CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) + ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) +} + +type mockDAL struct { + lock sync.Mutex + clock clock.Clock + jobs []model.CronJob + attemptCountMap map[string]int +} + +var _ ExtendedDAL = &mockDAL{} + +func (d *mockDAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []db.DeploymentArtefact, ingressRoutes []db.IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { + deploymentKey := model.NewDeploymentKey(moduleSchema.Name) + d.jobs = []model.CronJob{} + for _, job := range cronJobs { + job.DeploymentKey = deploymentKey + d.jobs = append(d.jobs, job) + } + return deploymentKey, nil +} + +func (d *mockDAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) { + return nil +} + +func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return d.jobs, nil +} + +func (d *mockDAL) indexForJob(job model.CronJob) (int, error) { + for i, j := range d.jobs { + if j.Key.String() == job.Key.String() { + return i, nil + } + } + return -1, fmt.Errorf("job not found") +} + +func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []db.AttemptedCronJob, err error) { + d.lock.Lock() + defer d.lock.Unlock() + + attemptedJobs = []db.AttemptedCronJob{} + now := d.clock.Now() + + for _, inputJob := range jobs { + i, err := d.indexForJob(inputJob) + if err != nil { + return nil, err + } + job := d.jobs[i] + if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle { + job.State = model.CronJobStateExecuting + job.StartTime = d.clock.Now() + d.jobs[i] = job + attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{ + CronJob: job, + DidStartExecution: true, + HasMinReplicas: true, + }) + } else { + attemptedJobs = append(attemptedJobs, db.AttemptedCronJob{ + CronJob: job, + DidStartExecution: false, + HasMinReplicas: true, + }) + } + d.attemptCountMap[job.Key.String()]++ + } + return attemptedJobs, nil +} + +func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + i, err := d.indexForJob(job) + if err != nil { + return model.CronJob{}, err + } + internalJob := d.jobs[i] + if internalJob.State != model.CronJobStateExecuting { + return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") + } + if internalJob.StartTime != job.StartTime { + return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") + } + internalJob.State = model.CronJobStateIdle + internalJob.NextExecution = next + d.jobs[i] = internalJob + return internalJob, nil +} + +func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return slices.Filter(d.jobs, func(job model.CronJob) bool { + return d.clock.Now().After(job.StartTime.Add(duration)) + }), nil +} + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + dal DAL + clock clock.Clock + mockClock *clock.Mock // only set when clock is a mock + cronJobs *Service +} + +func newJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Clock, count int) []model.CronJob { + t.Helper() + newJobs := []model.CronJob{} + for i := range count { + now := clock.Now() + pattern, err := cron.Parse(cronPattern) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + newJobs = append(newJobs, model.CronJob{ + Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%d", i)), + Verb: model.VerbRef{Module: moduleName, Name: fmt.Sprintf("verb%d", i)}, + Schedule: pattern.String(), + StartTime: now, + NextExecution: next, + State: model.CronJobStateIdle, + }) + } + return newJobs +} + +func newControllers(ctx context.Context, count int, dal DAL, clockFactory func() clock.Clock, call ExecuteCallFunc) []*controller { + controllers := []*controller{} + for i := range count { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + clk := clockFactory() + controller := &controller{ + key: key, + dal: dal, + clock: clk, + cronJobs: NewForTesting(ctx, + key, "test.com", + Config{Timeout: time.Minute * 5}, + dal, + &mockScheduler{}, + call, + clk), + } + if mockClock, ok := clk.(*clock.Mock); ok { + controller.mockClock = mockClock + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + s := c.cronJobs + go func() { + s.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) db.Controller { + return db.Controller{ + Key: ctrl.key, + } + })) + _, _ = s.syncJobs(ctx) + }() + } + + time.Sleep(time.Millisecond * 100) + + return controllers +} + +// should be called when clk is half way between cron job executions (ie on an odd second) +func testServiceWithDal(ctx context.Context, t *testing.T, dal ExtendedDAL, clk clock.Clock) { + t.Helper() + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + moduleName := "initial" + jobsToCreate := newJobs(t, moduleName, "*/2 * * * * * *", clk, 20) + + deploymentKey, err := dal.CreateDeployment(ctx, "go", &schema.Module{ + Name: moduleName, + }, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate) + assert.NoError(t, err) + + err = dal.ReplaceDeployment(ctx, deploymentKey, 1) + assert.NoError(t, err) + + _ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }) + + if mockClock, ok := clk.(*clock.Mock); ok { + // We don't need to wait in real-time + time.Sleep(time.Millisecond * 100) + for range 3 { + mockClock.Add(time.Second * 2) + time.Sleep(time.Millisecond * 100) + } + } else { + time.Sleep(time.Second * 2 * 3) + } + + for _, j := range jobsToCreate { + count := verbCallCount[j.Verb.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) + } +}