Skip to content

Commit

Permalink
add cron job key
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 9190345 commit cbd2e5d
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 47 deletions.
11 changes: 6 additions & 5 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: deploymentKey,
ModuleName: job.Ref.Module,
Verb: job.Ref.Name,
Expand Down Expand Up @@ -923,7 +924,7 @@ const (
)

type CronJob struct {
id int64
Key model.CronJobKey
DeploymentKey model.DeploymentKey
Ref schema.Ref
Schedule string
Expand All @@ -940,7 +941,7 @@ type AttemptedCronJob struct {

func cronJobFromRow(row sql.GetCronJobsRow) CronJob {
return CronJob{
id: row.ID,
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
Expand All @@ -965,7 +966,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
if len(jobs) == 0 {
return nil, nil
}
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) int64 { return job.id }))
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() }))
if err != nil {
return nil, translatePGError(err)
}
Expand All @@ -974,7 +975,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
for _, row := range rows {
job := AttemptedCronJob{
CronJob: CronJob{
id: row.ID,
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
Expand All @@ -991,7 +992,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
}

func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) {
row, err := d.db.EndCronJob(ctx, next, job.id, job.StartTime)
row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime)
if err != nil {
return CronJob{}, translatePGError(err)
}
Expand Down
1 change: 1 addition & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 13 additions & 11 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ SELECT COUNT(*)
FROM rows;

-- name: GetCronJobs :many
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE d.min_replicas > 0;

-- name: CreateCronJob :exec
INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES ((SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES (
sqlc.arg('key')::cron_job_key,
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('module_name')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('schedule')::TEXT,
Expand All @@ -300,39 +302,39 @@ WITH updates AS (
UPDATE cron_jobs
SET state = 'executing',
start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
WHERE id = ANY (sqlc.arg('ids'))
WHERE key = ANY (sqlc.arg('keys'))
AND state = 'idle'
AND start_time < next_execution
AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
RETURNING id, state, start_time, next_execution)
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
RETURNING id, key, state, start_time, next_execution)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
COALESCE(u.start_time, j.start_time) as start_time,
COALESCE(u.next_execution, j.next_execution) as next_execution,
COALESCE(u.state, j.state) as state,
d.min_replicas > 0 as has_min_replicas,
CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated
CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
LEFT JOIN updates u on j.id = u.id
WHERE j.id = ANY (sqlc.arg('ids'));
WHERE j.key = ANY (sqlc.arg('keys'));

-- name: EndCronJob :one
WITH j AS (
UPDATE cron_jobs
SET state = 'idle',
next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ
WHERE id = sqlc.arg('id')::BIGINT
WHERE key = sqlc.arg('key')::cron_job_key
AND state = 'executing'
AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ
RETURNING *
)
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM j
INNER JOIN deployments d on j.deployment_id = d.id
LIMIT 1;

-- name: GetStaleCronJobs :many
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE state = 'executing'
Expand Down
62 changes: 33 additions & 29 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,12 @@ CREATE TYPE cron_job_state AS ENUM (
'executing'
);

CREATE DOMAIN cron_job_key AS VARCHAR;

CREATE TABLE cron_jobs
(
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
key cron_job_key UNIQUE NOT NULL,
deployment_id BIGINT NOT NULL REFERENCES deployments (id) ON DELETE CASCADE,
verb VARCHAR NOT NULL,
schedule VARCHAR NOT NULL,
Expand All @@ -252,6 +255,7 @@ CREATE TABLE cron_jobs
);

CREATE INDEX cron_jobs_executing_start_time_idx ON cron_jobs (start_time) WHERE state = 'executing';
CREATE UNIQUE INDEX cron_jobs_key_idx ON cron_jobs (key);

CREATE TYPE event_type AS ENUM (
'call',
Expand Down
4 changes: 4 additions & 0 deletions backend/controller/sql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import (
type NullTime = optional.Option[time.Time]
type NullDuration = optional.Option[time.Duration]
type NullRunnerKey = optional.Option[model.RunnerKey]
type NullCronJobKey = optional.Option[model.CronJobKey]
type NullDeploymentKey = optional.Option[model.DeploymentKey]
type NullRequestKey = optional.Option[model.RequestKey]

var _ sql.Scanner = (*NullRunnerKey)(nil)
var _ driver.Valuer = (*NullRunnerKey)(nil)

var _ sql.Scanner = (*NullCronJobKey)(nil)
var _ driver.Valuer = (*NullCronJobKey)(nil)

var _ sql.Scanner = (*NullDeploymentKey)(nil)
var _ driver.Valuer = (*NullDeploymentKey)(nil)

Expand Down
31 changes: 31 additions & 0 deletions internal/model/cron_job_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package model

import (
"errors"
"strings"
)

type CronJobKey = KeyType[CronJobPayload, *CronJobPayload]

func NewCronJobKey(module, verb string) CronJobKey {
return newKey[CronJobPayload](strings.Join([]string{module, verb}, "-"))
}

func ParseCronJobKey(key string) (CronJobKey, error) { return parseKey[CronJobPayload](key) }

type CronJobPayload struct {
Ref string
}

var _ KeyPayload = (*CronJobPayload)(nil)

func (d *CronJobPayload) Kind() string { return "crn" }
func (d *CronJobPayload) String() string { return d.Ref }
func (d *CronJobPayload) Parse(parts []string) error {
if len(parts) == 0 {
return errors.New("expected <module>-<verb> but got empty string")
}
d.Ref = strings.Join(parts, "-")
return nil
}
func (d *CronJobPayload) RandomBytes() int { return 10 }
Loading

0 comments on commit cbd2e5d

Please sign in to comment.