Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cron service #3399

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (
Deployment *DeploymentMetrics
Ingress *IngressMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
Controller *ControllerTracing
Timeline *TimelineMetrics
)
Expand All @@ -34,8 +33,6 @@ func init() {
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)
Controller = initControllerTracing()
Timeline, err = initTimelineMetrics()
errs = errors.Join(errs, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package observability
import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
Expand All @@ -20,6 +21,8 @@ const (

cronJobKilledStatus = "killed"
cronJobFailedStartStatus = "failed_start"

deploymentMeterName = "ftl.deployments.cron"
)

type CronMetrics struct {
Expand All @@ -28,6 +31,16 @@ type CronMetrics struct {
jobLatency metric.Int64Histogram
}

var Cron *CronMetrics

func init() {
var err error
Cron, err = initCronMetrics()
if err != nil {
panic(fmt.Errorf("could not initialize cron metrics: %w", err))
}
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{
jobsActive: noop.Int64UpDownCounter{},
Expand Down Expand Up @@ -108,3 +121,11 @@ func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metr
}
return metric.WithAttributes(attributes...)
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
204 changes: 204 additions & 0 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package cron

import (
"context"
"fmt"
"sort"
"time"

"connectrpc.com/connect"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
)

type PullSchemaClient interface {
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
}

type CallClient interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
}

type cronJob struct {
module string
verb *schema.Verb
cronmd *schema.MetadataCronJob
pattern cron.Pattern
next time.Time
}

func (c cronJob) String() string {
desc := fmt.Sprintf("%s.%s (%s)", c.module, c.verb.Name, c.pattern)
var next string
if time.Until(c.next) > 0 {
next = fmt.Sprintf(" (next run in %s)", time.Until(c.next))
}
return desc + next
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, pullSchemaClient PullSchemaClient, verbClient CallClient) error {
wg, ctx := errgroup.WithContext(ctx)
changes := make(chan *ftlv1.PullSchemaResponse, 8)
// Start processing cron jobs and schema changes.
wg.Go(func() error {
return run(ctx, verbClient, changes)
})
// Start watching for schema changes.
wg.Go(func() error {
rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error {
changes <- resp
return nil
}, rpc.AlwaysRetry())
return nil
})
err := wg.Wait()
if err != nil {
return fmt.Errorf("cron service stopped: %w", err)
}
return nil
}

func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSchemaResponse) error {
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
// Cron jobs ordered by next execution.
cronQueue := []cronJob{}

logger.Debugf("Starting cron service")

for {
next, ok := scheduleNext(cronQueue)
var nextCh <-chan time.Time
if ok {
logger.Tracef("Next cron job scheduled in %s", next)
nextCh = time.After(next)
}
select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())

case resp := <-changes:
if err := updateCronJobs(cronJobs, resp); err != nil {
logger.Errorf(err, "Failed to update cron jobs")
continue
}
cronQueue = rebuildQueue(cronJobs)

// Execute scheduled cron job
case <-nextCh:
job := cronQueue[0]
logger.Debugf("Executing cron job %s", job)

nextRun, err := cron.Next(job.pattern, false)
if err != nil {
logger.Errorf(err, "Failed to calculate next run time")
continue
}
job.next = nextRun
cronQueue[0] = job
orderQueue(cronQueue)

if err := callCronJob(ctx, verbClient, job); err != nil {
logger.Errorf(err, "Failed to execute cron job")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're dropping support for retry policies on cron? We should probably make that a validation check to ensure no retry directives on cron job verbs if so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe just a ticket to add retry logic back in

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems likely we'll be moving retries to Istio.

}
}
}
}

func callCronJob(ctx context.Context, verbClient CallClient, cronJob cronJob) error {
logger := log.FromContext(ctx).Scope("cron")
ref := schema.Ref{Module: cronJob.module, Name: cronJob.verb.Name}
logger.Debugf("Calling cron job %s", cronJob)
resp, err := verbClient.Call(ctx, connect.NewRequest(&ftlv1.CallRequest{
Verb: ref.ToProto().(*schemapb.Ref),
Body: []byte(`{}`),
Metadata: &ftlv1.Metadata{},
}))
if err != nil {
return fmt.Errorf("%s: call to cron job failed: %w", ref, err)
}
switch resp := resp.Msg.Response.(type) {
default:
return nil

case *ftlv1.CallResponse_Error_:
return fmt.Errorf("%s: cron job failed: %s", ref, resp.Error.Message)
}
}

func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
if len(cronQueue) == 0 {
return 0, false
}
return time.Until(cronQueue[0].next), true
}

func updateCronJobs(cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error {
switch resp.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(cronJobs, resp.ModuleName)

case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(resp.Schema)
if err != nil {
return fmt.Errorf("failed to extract module schema: %w", err)
}
moduleJobs, err := extractCronJobs(moduleSchema)
if err != nil {
return fmt.Errorf("failed to extract cron jobs: %w", err)
}
cronJobs[resp.ModuleName] = moduleJobs
}
return nil
}

func orderQueue(queue []cronJob) {
sort.SliceStable(queue, func(i, j int) bool {
return queue[i].next.Before(queue[j].next)
})
}

func rebuildQueue(cronJobs map[string][]cronJob) []cronJob {
queue := make([]cronJob, 0, len(cronJobs)*2) // Assume 2 cron jobs per module.
for _, jobs := range cronJobs {
queue = append(queue, jobs...)
}
orderQueue(queue)
return queue
}

func extractCronJobs(module *schema.Module) ([]cronJob, error) {
cronJobs := []cronJob{}
for verb := range slices.FilterVariants[*schema.Verb](module.Decls) {
cronmd, ok := slices.FindVariant[*schema.MetadataCronJob](verb.Metadata)
if !ok {
continue
}
pattern, err := cron.Parse(cronmd.Cron)
if err != nil {
return nil, fmt.Errorf("%s: %w", cronmd.Pos, err)
}
next, err := cron.Next(pattern, false)
if err != nil {
return nil, fmt.Errorf("%s: %w", cronmd.Pos, err)
}
cronJobs = append(cronJobs, cronJob{
module: module.Name,
verb: verb,
cronmd: cronmd,
pattern: pattern,
next: next,
})
}
return cronJobs, nil
}
109 changes: 109 additions & 0 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package cron

import (
"context"
"os"
"sort"
"testing"
"time"

"connectrpc.com/connect"
"golang.org/x/sync/errgroup"

"github.com/alecthomas/assert/v2"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

type verbClient struct {
requests chan *ftlv1.CallRequest
}

var _ CallClient = (*verbClient)(nil)

func (v *verbClient) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
v.requests <- req.Msg
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: []byte("{}")}}), nil
}

func TestCron(t *testing.T) {
changes := make(chan *ftlv1.PullSchemaResponse, 8)
module := &schema.Module{
Name: "echo",
Decls: []schema.Decl{
&schema.Verb{
Name: "echo",
Request: &schema.Unit{},
Response: &schema.Unit{},
Metadata: []schema.Metadata{
&schema.MetadataCronJob{Cron: "*/2 * * * * *"},
},
},
&schema.Verb{
Name: "time",
Request: &schema.Unit{},
Response: &schema.Unit{},
Metadata: []schema.Metadata{
&schema.MetadataCronJob{Cron: "*/2 * * * * *"},
},
},
},
}
changes <- &ftlv1.PullSchemaResponse{
ModuleName: "echo",
Schema: module.ToProto().(*schemapb.Module), //nolint:forcetypeassert
}

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
t.Cleanup(cancel)

wg, ctx := errgroup.WithContext(ctx)

requestsch := make(chan *ftlv1.CallRequest, 8)
client := &verbClient{
requests: requestsch,
}

wg.Go(func() error { return run(ctx, client, changes) })

requests := make([]*ftlv1.CallRequest, 0, 2)

done:
for range 2 {
select {
case <-ctx.Done():
t.Fatalf("timed out: %s", ctx.Err())

case request := <-requestsch:
requests = append(requests, request)
if len(requests) == 2 {
break done
}
}
}

cancel()

sort.SliceStable(requests, func(i, j int) bool {
return requests[i].Verb.Name < requests[j].Verb.Name
})
assert.Equal(t, []*ftlv1.CallRequest{
{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: "echo", Name: "echo"},
Body: []byte("{}"),
},
{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: "echo", Name: "time"},
Body: []byte("{}"),
},
}, requests, assert.Exclude[*schemapb.Position]())

err := wg.Wait()
assert.IsError(t, err, context.Canceled)
}
1 change: 1 addition & 0 deletions internal/cron/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
parser = participle.MustBuild[Pattern](parserOptions...)
)

// Pattern represents a cron schedule.
type Pattern struct {
Duration *string `parser:"@(Number ('s' | 'm' | 'h'))"`
DayOfWeek *DayOfWeek `parser:"| @('Mon' | 'Tue' | 'Wed' | 'Thu' | 'Fri' | 'Sat' | 'Sun')"`
Expand Down
4 changes: 2 additions & 2 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func AlwaysRetry() func(error) bool {
}

// RetryStreamingServerStream will repeatedly call handler with responses from
// the stream returned by "rpc" until handler returns an error or the context is
// cancelled.
// the stream returned by "rpc" until either the context is cancelled or the
// errorRetryCallback returns false.
func RetryStreamingServerStream[Req, Resp any](
ctx context.Context,
name string,
Expand Down
Loading
Loading