Skip to content

Commit

Permalink
test: integration test for cron jobs service (#1269)
Browse files Browse the repository at this point in the history
Tests the real service with the real dal / postgres.
Executes jobs with a 2 second schedule 3 times.
  • Loading branch information
matt2e authored Apr 18, 2024
1 parent 34650d7 commit 096c12d
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 238 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
id: extract-tests
run: |
# shellcheck disable=SC2046
echo "matrix={\"test\":$(jq -c -n '$ARGS.positional' --args $(grep '^func Test' integration/*_test.go | awk '{print $2}' | cut -d'(' -f1))}" >> "$GITHUB_OUTPUT"
echo "matrix={\"test\":$(jq -c -n '$ARGS.positional' --args $(grep '^func Test' integration/*_test.go ./backend/controller/cronjobs | awk '{print $2}' | cut -d'(' -f1))}" >> "$GITHUB_OUTPUT"
integration:
needs: prepare
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ integration-tests *test:
#!/bin/bash
set -euo pipefail
testName=${1:-}
go test -fullpath -count 1 -v -tags integration -run "$testName" ./integration
go test -fullpath -count 1 -v -tags integration -run "$testName" ./integration ./backend/controller/cronjobs

# Run README doc tests
test-readme *args:
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentK
_ = s.syncJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey))
}

// syncJobs is run periodically via a scheduled task
// SyncJobs is run periodically via a scheduled task
func (s *Service) syncJobs(ctx context.Context) (time.Duration, error) {
err := s.syncJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]())
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//go:build integration

package cronjobs

import (
"context"
"sync"
"testing"
"time"

"connectrpc.com/connect"
db "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
)

func TestServiceWithRealDal(t *testing.T) {
t.Parallel()
ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

conn := sqltest.OpenForTesting(ctx, t)
dal, err := db.New(ctx, conn)
assert.NoError(t, err)

// Using a real clock because real db queries use db clock
// delay until we are on an odd second
clk := clock.New()
if clk.Now().Second()%2 == 0 {
time.Sleep(time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond)
} else {
time.Sleep(2*time.Second - time.Duration(clk.Now().Nanosecond())*time.Nanosecond)
}

testServiceWithDal(ctx, t, dal, clk)
}
260 changes: 25 additions & 235 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
@@ -1,224 +1,42 @@
//go:build !integration

package cronjobs

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
db "github.com/TBD54566975/ftl/backend/controller/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
xslices "golang.org/x/exp/slices"
)

type mockDAL struct {
lock sync.Mutex
clock *clock.Mock
jobs []model.CronJob
attemptCountMap map[string]int
}

func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

return d.jobs, nil
}

func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) {
d.lock.Lock()
defer d.lock.Unlock()

job := model.CronJob{
Key: model.NewCronJobKey(module, verb),
DeploymentKey: deploymentKey,
Verb: model.VerbRef{Module: module, Name: verb},
Schedule: schedule,
StartTime: startTime,
NextExecution: nextExecution,
State: model.CronJobStateIdle,
}
d.jobs = append(d.jobs, job)
}

func (d *mockDAL) indexForJob(job model.CronJob) (int, error) {
for i, j := range d.jobs {
if j.Key.String() == job.Key.String() {
return i, nil
}
}
return -1, fmt.Errorf("job not found")
}

func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) {
d.lock.Lock()
defer d.lock.Unlock()

attemptedJobs = []dal.AttemptedCronJob{}
now := (*d.clock).Now()

for _, inputJob := range jobs {
i, err := d.indexForJob(inputJob)
if err != nil {
return nil, err
}
job := d.jobs[i]
if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle {
job.State = model.CronJobStateExecuting
job.StartTime = (*d.clock).Now()
d.jobs[i] = job
attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{
CronJob: job,
DidStartExecution: true,
HasMinReplicas: true,
})
} else {
attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{
CronJob: job,
DidStartExecution: false,
HasMinReplicas: true,
})
}
d.attemptCountMap[job.Key.String()]++
}
return attemptedJobs, nil
}

func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

i, err := d.indexForJob(job)
if err != nil {
return model.CronJob{}, err
}
internalJob := d.jobs[i]
if internalJob.State != model.CronJobStateExecuting {
return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running")
}
if internalJob.StartTime != job.StartTime {
return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match")
}
internalJob.State = model.CronJobStateIdle
internalJob.NextExecution = next
d.jobs[i] = internalJob
return internalJob, nil
}

func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

return slices.Filter(d.jobs, func(job model.CronJob) bool {
return (*d.clock).Now().After(job.StartTime.Add(duration))
}), nil
}

type mockScheduler struct {
}

func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

type controller struct {
key model.ControllerKey
DAL DAL
clock *clock.Mock
cronJobs *Service
}

func TestService(t *testing.T) {
func TestServiceWithMockDal(t *testing.T) {
t.Parallel()
ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

config := Config{Timeout: time.Minute * 5}
clock := clock.NewMock()
clk := clock.NewMock()
clk.Add(time.Second) // half way between cron job executions

mockDal := &mockDAL{
clock: clock,
clock: clk,
lock: sync.Mutex{},
attemptCountMap: map[string]int{},
}
scheduler := &mockScheduler{}

verbCallCount := map[string]int{}
verbCallCountLock := sync.Mutex{}

// initial jobs
for i := range 20 {
deploymentKey := model.NewDeploymentKey("initial")
now := clock.Now()
cronStr := "*/10 * * * * * *"
pattern, err := cron.Parse(cronStr)
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next)
}

controllers := []*controller{}
for i := range 5 {
key := model.NewControllerKey("localhost", strconv.Itoa(8080+i))
controller := &controller{
key: key,
DAL: mockDal,
clock: clock,
cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)

verbCallCountLock.Lock()
verbCallCount[verbRef.Name]++
verbCallCountLock.Unlock()

return &connect.Response[ftlv1.CallResponse]{}, nil
}, clock),
}
controllers = append(controllers, controller)
}

time.Sleep(time.Millisecond * 100)

for _, c := range controllers {
go func() {
c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller {
return dal.Controller{
Key: ctrl.key,
}
}))
_, _ = c.cronJobs.syncJobs(ctx)
}()
}

clock.Add(time.Second * 5)
time.Sleep(time.Millisecond * 100)
for range 3 {
clock.Add(time.Second * 10)
time.Sleep(time.Millisecond * 100)
}

for _, j := range mockDal.jobs {
count := verbCallCount[j.Verb.Name]
assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name)
}
testServiceWithDal(ctx, t, mockDal, clk)
}

func TestHashRing(t *testing.T) {
Expand All @@ -229,63 +47,35 @@ func TestHashRing(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

config := Config{Timeout: time.Minute * 5}
mockDal := &mockDAL{
clock: clock.NewMock(),
lock: sync.Mutex{},
attemptCountMap: map[string]int{},
}
scheduler := &mockScheduler{}

verbCallCount := map[string]int{}
verbCallCountLock := sync.Mutex{}

// initial jobs
for i := range 100 {
deploymentKey := model.NewDeploymentKey("initial")
now := mockDal.clock.Now()
cronStr := "*/10 * * * * * *"
pattern, err := cron.Parse(cronStr)
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next)
}
moduleName := "initial"
jobsToCreate := newJobs(t, moduleName, "*/10 * * * * * *", mockDal.clock, 100)

controllers := []*controller{}
for i := range 20 {
key := model.NewControllerKey("localhost", strconv.Itoa(8080+i))
clock := clock.NewMock()
controller := &controller{
key: key,
DAL: mockDal,
clock: clock,
cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)
deploymentKey, err := mockDal.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []db.DeploymentArtefact{}, []db.IngressRoutingEntry{}, jobsToCreate)
assert.NoError(t, err)

verbCallCountLock.Lock()
verbCallCount[verbRef.Name]++
verbCallCountLock.Unlock()
err = mockDal.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

return &connect.Response[ftlv1.CallResponse]{}, nil
}, clock),
}
controllers = append(controllers, controller)
}
controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)

time.Sleep(time.Millisecond * 100)
verbCallCountLock.Lock()
verbCallCount[verbRef.Name]++
verbCallCountLock.Unlock()

for _, c := range controllers {
go func() {
c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller {
return dal.Controller{
Key: ctrl.key,
}
}))
_, _ = c.cronJobs.syncJobs(ctx)
}()
}
time.Sleep(time.Millisecond * 100)
return &connect.Response[ftlv1.CallResponse]{}, nil
})

// progress time for each controller one at a time, noting which verbs got attempted each time
// to build a map of verb to controller keys
Expand All @@ -296,7 +86,7 @@ func TestHashRing(t *testing.T) {
beforeAttemptCount[k] = v
}

c.clock.Add(time.Second * 15)
c.mockClock.Add(time.Second * 15)
time.Sleep(time.Millisecond * 100)

for k, v := range mockDal.attemptCountMap {
Expand Down
Loading

0 comments on commit 096c12d

Please sign in to comment.