From cbd2e5dbb110c5308a1c3302ecdf66243328aa0c Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 13:32:51 +1000 Subject: [PATCH] add cron job key --- backend/controller/dal/dal.go | 11 ++-- backend/controller/sql/models.go | 1 + backend/controller/sql/querier.go | 4 +- backend/controller/sql/queries.sql | 24 +++++---- backend/controller/sql/queries.sql.go | 62 ++++++++++++---------- backend/controller/sql/schema/001_init.sql | 4 ++ backend/controller/sql/types.go | 4 ++ internal/model/cron_job_key.go | 31 +++++++++++ sqlc.yaml | 6 +++ 9 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 internal/model/cron_job_key.go diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index e446f309fb..9ba0c2d614 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -494,6 +494,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem // Start time must be calculated by the caller rather than generated by db // This ensures that nextExecution is after start time, otherwise the job will never be triggered err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{ + Key: job.Key, DeploymentKey: deploymentKey, ModuleName: job.Ref.Module, Verb: job.Ref.Name, @@ -923,7 +924,7 @@ const ( ) type CronJob struct { - id int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Ref schema.Ref Schedule string @@ -940,7 +941,7 @@ type AttemptedCronJob struct { func cronJobFromRow(row sql.GetCronJobsRow) CronJob { return CronJob{ - id: row.ID, + Key: row.Key, DeploymentKey: row.DeploymentKey, Ref: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, @@ -965,7 +966,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs if len(jobs) == 0 { return nil, nil } - rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) int64 { return job.id })) + rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() })) if err != nil { return nil, translatePGError(err) } @@ -974,7 +975,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs for _, row := range rows { job := AttemptedCronJob{ CronJob: CronJob{ - id: row.ID, + Key: row.Key, DeploymentKey: row.DeploymentKey, Ref: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, @@ -991,7 +992,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs } func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) { - row, err := d.db.EndCronJob(ctx, next, job.id, job.StartTime) + row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime) if err != nil { return CronJob{}, translatePGError(err) } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 5a0a8a75fd..6626f34cf9 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -248,6 +248,7 @@ type Controller struct { type CronJob struct { ID int64 + Key model.CronJobKey DeploymentID int64 Verb string Schedule string diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 7a910a184c..85256f2dd0 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -22,7 +22,7 @@ type Querier interface { CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) - EndCronJob(ctx context.Context, nextExecution time.Time, iD int64, startTime time.Time) (EndCronJobRow, error) + EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) ExpireRunnerReservations(ctx context.Context) (int64, error) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) @@ -67,7 +67,7 @@ type Querier interface { // Find an idle runner and reserve it for the given deployment. ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error - StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJobsRow, error) + StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index c7bfd4927d..eb7e0c2421 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -281,14 +281,16 @@ SELECT COUNT(*) FROM rows; -- name: GetCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0; -- name: CreateCronJob :exec -INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution) - VALUES ((SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), +INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) + VALUES ( + sqlc.arg('key')::cron_job_key, + (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), sqlc.arg('module_name')::TEXT, sqlc.arg('verb')::TEXT, sqlc.arg('schedule')::TEXT, @@ -300,39 +302,39 @@ WITH updates AS ( UPDATE cron_jobs SET state = 'executing', start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE id = ANY (sqlc.arg('ids')) + WHERE key = ANY (sqlc.arg('keys')) AND state = 'idle' AND start_time < next_execution AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, state, start_time, next_execution) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, + RETURNING id, key, state, start_time, next_execution) +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated + CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id LEFT JOIN updates u on j.id = u.id -WHERE j.id = ANY (sqlc.arg('ids')); +WHERE j.key = ANY (sqlc.arg('keys')); -- name: EndCronJob :one WITH j AS ( UPDATE cron_jobs SET state = 'idle', next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ - WHERE id = sqlc.arg('id')::BIGINT + WHERE key = sqlc.arg('key')::cron_job_key AND state = 'executing' AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ RETURNING * ) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM j INNER JOIN deployments d on j.deployment_id = d.id LIMIT 1; -- name: GetStaleCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE state = 'executing' diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index de42d85d7f..4bbce650bf 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -52,16 +52,19 @@ func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []b } const createCronJob = `-- name: CreateCronJob :exec - INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution) - VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1), - $2::TEXT, - $3::TEXT, - $4::TEXT, - $5::TIMESTAMPTZ, - $6::TIMESTAMPTZ) +INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) + VALUES ( + $1::cron_job_key, + (SELECT id FROM deployments WHERE key = $2::deployment_key LIMIT 1), + $3::TEXT, + $4::TEXT, + $5::TEXT, + $6::TIMESTAMPTZ, + $7::TIMESTAMPTZ) ` type CreateCronJobParams struct { + Key model.CronJobKey DeploymentKey model.DeploymentKey ModuleName string Verb string @@ -72,6 +75,7 @@ type CreateCronJobParams struct { func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error { _, err := q.db.Exec(ctx, createCronJob, + arg.Key, arg.DeploymentKey, arg.ModuleName, arg.Verb, @@ -159,19 +163,19 @@ WITH j AS ( UPDATE cron_jobs SET state = 'idle', next_execution = $1::TIMESTAMPTZ - WHERE id = $2::BIGINT + WHERE key = $2::cron_job_key AND state = 'executing' AND start_time = $3::TIMESTAMPTZ - RETURNING id, deployment_id, verb, schedule, start_time, next_execution, state, module_name + RETURNING id, key, deployment_id, verb, schedule, start_time, next_execution, state, module_name ) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM j INNER JOIN deployments d on j.deployment_id = d.id LIMIT 1 ` type EndCronJobRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -181,11 +185,11 @@ type EndCronJobRow struct { State CronJobState } -func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, iD int64, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRow(ctx, endCronJob, nextExecution, iD, startTime) +func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { + row := q.db.QueryRow(ctx, endCronJob, nextExecution, key, startTime) var i EndCronJobRow err := row.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -472,14 +476,14 @@ func (q *Queries) GetControllers(ctx context.Context, all bool) ([]Controller, e } const getCronJobs = `-- name: GetCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 ` type GetCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -499,7 +503,7 @@ func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { for rows.Next() { var i GetCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -1159,7 +1163,7 @@ func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.Deploym } const getStaleCronJobs = `-- name: GetStaleCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE state = 'executing' @@ -1167,7 +1171,7 @@ WHERE state = 'executing' ` type GetStaleCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -1187,7 +1191,7 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) for rows.Next() { var i GetStaleCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -1514,25 +1518,25 @@ WITH updates AS ( UPDATE cron_jobs SET state = 'executing', start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE id = ANY ($1) + WHERE key = ANY ($1) AND state = 'idle' AND start_time < next_execution AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, state, start_time, next_execution) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, + RETURNING id, key, state, start_time, next_execution) +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated + CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id LEFT JOIN updates u on j.id = u.id -WHERE j.id = ANY ($1) +WHERE j.key = ANY ($1) ` type StartCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -1544,8 +1548,8 @@ type StartCronJobsRow struct { Updated bool } -func (q *Queries) StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJobsRow, error) { - rows, err := q.db.Query(ctx, startCronJobs, ids) +func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { + rows, err := q.db.Query(ctx, startCronJobs, keys) if err != nil { return nil, err } @@ -1554,7 +1558,7 @@ func (q *Queries) StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJo for rows.Next() { var i StartCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 3023009700..82bdef4c51 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -237,9 +237,12 @@ CREATE TYPE cron_job_state AS ENUM ( 'executing' ); +CREATE DOMAIN cron_job_key AS VARCHAR; + CREATE TABLE cron_jobs ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + key cron_job_key UNIQUE NOT NULL, deployment_id BIGINT NOT NULL REFERENCES deployments (id) ON DELETE CASCADE, verb VARCHAR NOT NULL, schedule VARCHAR NOT NULL, @@ -252,6 +255,7 @@ CREATE TABLE cron_jobs ); CREATE INDEX cron_jobs_executing_start_time_idx ON cron_jobs (start_time) WHERE state = 'executing'; +CREATE UNIQUE INDEX cron_jobs_key_idx ON cron_jobs (key); CREATE TYPE event_type AS ENUM ( 'call', diff --git a/backend/controller/sql/types.go b/backend/controller/sql/types.go index 6f4d66bd56..9c23751131 100644 --- a/backend/controller/sql/types.go +++ b/backend/controller/sql/types.go @@ -13,12 +13,16 @@ import ( type NullTime = optional.Option[time.Time] type NullDuration = optional.Option[time.Duration] type NullRunnerKey = optional.Option[model.RunnerKey] +type NullCronJobKey = optional.Option[model.CronJobKey] type NullDeploymentKey = optional.Option[model.DeploymentKey] type NullRequestKey = optional.Option[model.RequestKey] var _ sql.Scanner = (*NullRunnerKey)(nil) var _ driver.Valuer = (*NullRunnerKey)(nil) +var _ sql.Scanner = (*NullCronJobKey)(nil) +var _ driver.Valuer = (*NullCronJobKey)(nil) + var _ sql.Scanner = (*NullDeploymentKey)(nil) var _ driver.Valuer = (*NullDeploymentKey)(nil) diff --git a/internal/model/cron_job_key.go b/internal/model/cron_job_key.go new file mode 100644 index 0000000000..fea4d2984d --- /dev/null +++ b/internal/model/cron_job_key.go @@ -0,0 +1,31 @@ +package model + +import ( + "errors" + "strings" +) + +type CronJobKey = KeyType[CronJobPayload, *CronJobPayload] + +func NewCronJobKey(module, verb string) CronJobKey { + return newKey[CronJobPayload](strings.Join([]string{module, verb}, "-")) +} + +func ParseCronJobKey(key string) (CronJobKey, error) { return parseKey[CronJobPayload](key) } + +type CronJobPayload struct { + Ref string +} + +var _ KeyPayload = (*CronJobPayload)(nil) + +func (d *CronJobPayload) Kind() string { return "crn" } +func (d *CronJobPayload) String() string { return d.Ref } +func (d *CronJobPayload) Parse(parts []string) error { + if len(parts) == 0 { + return errors.New("expected - but got empty string") + } + d.Ref = strings.Join(parts, "-") + return nil +} +func (d *CronJobPayload) RandomBytes() int { return 10 } diff --git a/sqlc.yaml b/sqlc.yaml index 050c7648b6..efaabc9c0a 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -36,6 +36,12 @@ sql: nullable: true go_type: type: "NullRunnerKey" + - db_type: "cron_job_key" + go_type: "github.com/TBD54566975/ftl/internal/model.CronJobKey" + - db_type: "cron_job_key" + nullable: true + go_type: + type: "NullCronJobKey" - db_type: "deployment_key" go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" - db_type: "deployment_key"