Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cronjobs db changes #1229

Merged
merged 9 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,14 +732,14 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}
deploymentLogger := s.getDeploymentLogger(ctx, dname)
deploymentLogger.Debugf("Created deployment %s", dname)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dname.String()}), nil
deploymentLogger := s.getDeploymentLogger(ctx, dkey)
deploymentLogger.Debugf("Created deployment %s", dkey)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
}

// Load schemas for existing modules, combine with our new one, and validate the new module in the context
Expand Down
112 changes: 111 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type IngressRoutingEntry struct {
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []CronJob) (key model.DeploymentKey, err error) {
logger := log.FromContext(ctx)

// Start the transaction
Expand Down Expand Up @@ -490,6 +490,23 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

for _, job := range cronJobs {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: deploymentKey,
ModuleName: job.Ref.Module,
Verb: job.Ref.Name,
StartTime: job.StartTime,
Schedule: job.Schedule,
NextExecution: job.NextExecution,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create cron job: %w", translatePGError(err))
}
}

return deploymentKey, nil
}

Expand Down Expand Up @@ -899,6 +916,99 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) {
return count, translatePGError(err)
}

type JobState string

const (
JobStateIdle = JobState(sql.CronJobStateIdle)
JobStateExecuting = JobState(sql.CronJobStateExecuting)
)

type CronJob struct {
Key model.CronJobKey
DeploymentKey model.DeploymentKey
Ref schema.Ref
Schedule string
StartTime time.Time
NextExecution time.Time
State JobState
}

type AttemptedCronJob struct {
DidStartExecution bool
HasMinReplicas bool
CronJob
}

func cronJobFromRow(row sql.GetCronJobsRow) CronJob {
return CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: JobState(row.State),
}
}

func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) {
rows, err := d.db.GetCronJobs(ctx)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, cronJobFromRow), nil
}

// StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row
// Also returns a map of deployment keys that have no replicas, so the caller can remove them from the list of jobs
matt2e marked this conversation as resolved.
Show resolved Hide resolved
// Note: jobs that were started may overlap with deployments that have no replicas, and this must be handled specially.
func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs []AttemptedCronJob, err error) {
if len(jobs) == 0 {
return nil, nil
}
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() }))
if err != nil {
return nil, translatePGError(err)
}

attemptedJobs = []AttemptedCronJob{}
for _, row := range rows {
job := AttemptedCronJob{
CronJob: CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: JobState(row.State),
},
DidStartExecution: row.Updated,
HasMinReplicas: row.HasMinReplicas,
}
attemptedJobs = append(attemptedJobs, job)
}
return attemptedJobs, nil
}

func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) {
matt2e marked this conversation as resolved.
Show resolved Hide resolved
row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime)
if err != nil {
return CronJob{}, translatePGError(err)
}
return cronJobFromRow(sql.GetCronJobsRow(row)), nil
}

func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]CronJob, error) {
rows, err := d.db.GetStaleCronJobs(ctx, duration)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, func(row sql.GetStaleCronJobsRow) CronJob {
return cronJobFromRow(sql.GetCronJobsRow(row))
}), nil
}

func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
attributes, err := json.Marshal(log.Attributes)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDAL(t *testing.T) {
Digest: testSha,
Executable: true,
Path: "dir/filename",
}}, nil)
}}, nil, nil)
assert.NoError(t, err)
})

Expand Down
54 changes: 54 additions & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,66 @@ WITH rows AS (
SELECT COUNT(*)
FROM rows;

-- name: GetCronJobs :many
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE d.min_replicas > 0;

-- name: CreateCronJob :exec
INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES (
sqlc.arg('key')::cron_job_key,
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('module_name')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('schedule')::TEXT,
sqlc.arg('start_time')::TIMESTAMPTZ,
sqlc.arg('next_execution')::TIMESTAMPTZ);

-- name: StartCronJobs :many
WITH updates AS (
UPDATE cron_jobs
SET state = 'executing',
start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
WHERE key = ANY (sqlc.arg('keys'))
AND state = 'idle'
AND start_time < next_execution
AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
RETURNING id, key, state, start_time, next_execution)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
COALESCE(u.start_time, j.start_time) as start_time,
COALESCE(u.next_execution, j.next_execution) as next_execution,
COALESCE(u.state, j.state) as state,
d.min_replicas > 0 as has_min_replicas,
CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
LEFT JOIN updates u on j.id = u.id
WHERE j.key = ANY (sqlc.arg('keys'));

-- name: EndCronJob :one
WITH j AS (
UPDATE cron_jobs
SET state = 'idle',
next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ
WHERE key = sqlc.arg('key')::cron_job_key
AND state = 'executing'
AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ
RETURNING *
)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM j
INNER JOIN deployments d on j.deployment_id = d.id
LIMIT 1;

-- name: GetStaleCronJobs :many
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE state = 'executing'
AND start_time < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL;

-- name: InsertLogEvent :exec
INSERT INTO events (deployment_id, request_id, time_stamp, custom_key_1, type, payload)
VALUES ((SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
Expand Down Expand Up @@ -349,6 +409,10 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: CreateCronRequest :exec
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: UpsertController :one
INSERT INTO controller (key, endpoint)
VALUES ($1, $2)
Expand Down
Loading