From a833d392e7e512191ccaa5481a3399e795f57b5c Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Thu, 2 May 2024 13:59:55 +1000 Subject: [PATCH] feat: implement DAL functions for full async lifecycle (#1382) Creating an async call, acquiring a call for execution, and finalising it either successfully or through error. Also implemented the code in the controller to perform this execution and finalisation. --- backend/controller/controller.go | 72 +++++++++++++++++-- backend/controller/cronjobs/cronjobs.go | 3 +- .../cronjobs/cronjobs_utils_test.go | 11 +-- backend/controller/dal/async_calls.go | 64 +++++++++++++++-- backend/controller/dal/async_calls_test.go | 29 ++++++-- backend/controller/dal/dal.go | 4 +- .../controller/scheduledtask/scheduledtask.go | 2 +- backend/controller/sql/models.go | 2 +- backend/controller/sql/querier.go | 4 ++ backend/controller/sql/queries.sql | 24 ++++++- backend/controller/sql/queries.sql.go | 69 ++++++++++++++++-- backend/controller/sql/schema/001_init.sql | 5 +- backend/controller/sql/types.go | 2 + backend/schema/ref.go | 37 ++++++++-- internal/model/cron_job.go | 4 +- internal/model/verb_ref.go | 12 ---- sqlc.yaml | 6 ++ 17 files changed, 299 insertions(+), 51 deletions(-) delete mode 100644 internal/model/verb_ref.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 1cf7c16b9c..eb4981ecd4 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -190,21 +190,27 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) - // Use min, max backoff if we are running in production, otherwise use develBackoff if available. + // Use min, max backoff if we are running in production, otherwise use + // (1s, 1s) or develBackoff if available. maybeDevelBackoff := func(min, max time.Duration, develBackoff ...backoff.Backoff) backoff.Backoff { if len(develBackoff) > 1 { panic("too many devel backoffs") } - if _, devel := runnerScaling.(*localscaling.LocalScaling); devel && len(develBackoff) == 1 { - return develBackoff[0] + if _, devel := runnerScaling.(*localscaling.LocalScaling); devel { + if len(develBackoff) == 1 { + return develBackoff[0] + } + return backoff.Backoff{Min: time.Second, Max: time.Second} } return makeBackoff(min, max) } // Parallel tasks. svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.syncRoutes) - svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5, makeBackoff(time.Second*2, time.Second*2)), svc.heartbeatController) + svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5), svc.heartbeatController) svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*5), svc.updateControllersList) + svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*10), svc.executeAsyncCalls) + // This should be a singleton task, but because this is the task that // actually expires the leases used to run singleton tasks, it must be // parallel. @@ -649,7 +655,12 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "") } -func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], key optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { +func (s *Service) callWithRequest( + ctx context.Context, + req *connect.Request[ftlv1.CallRequest], + key optional.Option[model.RequestKey], + sourceAddress string, +) (*connect.Response[ftlv1.CallResponse], error) { start := time.Now() if req.Msg.Verb == nil { return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("verb is required")) @@ -709,7 +720,7 @@ func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv return nil, err } else if !ok { requestKey = model.NewRequestKey(model.OriginIngress, "grpc") - requestSource = req.Peer().Addr + sourceAddress = req.Peer().Addr isNewRequestKey = true } else { requestKey = k @@ -717,7 +728,7 @@ func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv } if isNewRequestKey { headers.SetRequestKey(req.Header(), requestKey) - if err = s.dal.CreateRequest(ctx, requestKey, requestSource); err != nil { + if err = s.dal.CreateRequest(ctx, requestKey, sourceAddress); err != nil { return nil, err } } @@ -1002,6 +1013,53 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) { return time.Second, nil } +func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) { + logger := log.FromContext(ctx) + logger.Tracef("Acquiring async call") + + call, err := s.dal.AcquireAsyncCall(ctx) + if errors.Is(err, dal.ErrNotFound) { + return time.Second * 2, nil + } else if err != nil { + return 0, err + } + defer call.Release() //nolint:errcheck + + logger = logger.Scope(fmt.Sprintf("%s:%s:%s", call.Origin, call.OriginKey, call.Verb)) + + logger.Tracef("Executing async call") + req := &ftlv1.CallRequest{ //nolint:forcetypeassert + Verb: call.Verb.ToProto().(*schemapb.Ref), + Body: call.Request, + } + resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String()) + if err != nil { + return 0, fmt.Errorf("async call failed: %w", err) + } + var callError optional.Option[string] + if perr := resp.Msg.GetError(); perr != nil { + logger.Warnf("Async call failed: %s", perr.Message) + callError = optional.Some(perr.Message) + } else { + logger.Debugf("Async call succeeded") + } + err = s.dal.CompleteAsyncCall(ctx, call, resp.Msg.GetBody(), callError) + if err != nil { + return 0, fmt.Errorf("failed to complete async call: %w", err) + } + switch call.Origin { + case dal.AsyncCallOriginFSM: + return time.Second * 2, s.onAsyncFSMCallCompletion(ctx, call, resp.Msg) + + default: + panic(fmt.Sprintf("unexpected async call origin: %s", call.Origin)) + } +} + +func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, call *dal.AsyncCall, response *ftlv1.CallResponse) error { + return nil +} + func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error) { err := s.dal.ExpireLeases(ctx) if err != nil { diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 26003ab193..cef9d69ffc 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -19,6 +19,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/scheduledtask" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -145,7 +146,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod } newJobs = append(newJobs, model.CronJob{ Key: model.NewCronJobKey(module.Name, verb.Verb.Name), - Verb: model.VerbRef{Module: module.Name, Name: verb.Verb.Name}, + Verb: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, Schedule: cronStr, StartTime: start, NextExecution: next, diff --git a/backend/controller/cronjobs/cronjobs_utils_test.go b/backend/controller/cronjobs/cronjobs_utils_test.go index 70faf82fa4..c616897218 100644 --- a/backend/controller/cronjobs/cronjobs_utils_test.go +++ b/backend/controller/cronjobs/cronjobs_utils_test.go @@ -9,6 +9,11 @@ import ( "time" "connectrpc.com/connect" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" + db "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" @@ -16,10 +21,6 @@ import ( "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/slices" - "github.com/alecthomas/assert/v2" - "github.com/alecthomas/types/optional" - "github.com/benbjohnson/clock" - "github.com/jpillora/backoff" ) type ExtendedDAL interface { @@ -161,7 +162,7 @@ func newJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Cl assert.NoError(t, err) newJobs = append(newJobs, model.CronJob{ Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%d", i)), - Verb: model.VerbRef{Module: moduleName, Name: fmt.Sprintf("verb%d", i)}, + Verb: schema.Ref{Module: moduleName, Name: fmt.Sprintf("verb%d", i)}, Schedule: pattern.String(), StartTime: now, NextExecution: next, diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 35ea7a3b1d..d96c09e50d 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/sql" "github.com/TBD54566975/ftl/backend/schema" ) @@ -29,16 +31,35 @@ func (d *DAL) SendFSMEvent(ctx context.Context, name, executionKey, destinationS Key: executionKey, Name: name, State: destinationState, - Verb: verb.String(), + Verb: verb, Request: request, }) return translatePGError(err) } +// AsyncCallOrigin represents the kind of originator of the async call. +type AsyncCallOrigin sql.AsyncCallOrigin + +const ( + AsyncCallOriginFSM = AsyncCallOrigin(sql.AsyncCallOriginFsm) + AsyncCallOriginCron = AsyncCallOrigin(sql.AsyncCallOriginCron) + AsyncCallOriginPubSub = AsyncCallOrigin(sql.AsyncCallOriginPubsub) +) + +type AsyncCall struct { + *Lease + ID int64 + Origin AsyncCallOrigin + // A key identifying the origin, e.g. the key of the FSM, cron job reference, etc. + OriginKey string + Verb schema.Ref + Request json.RawMessage +} + // AcquireAsyncCall acquires a pending async call to execute. // // Returns ErrNotFound if there are no async calls to acquire. -func (d *DAL) AcquireAsyncCall(ctx context.Context) (*Lease, error) { +func (d *DAL) AcquireAsyncCall(ctx context.Context) (*AsyncCall, error) { ttl := time.Second * 5 row, err := d.db.AcquireAsyncCall(ctx, ttl) if err != nil { @@ -47,7 +68,42 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (*Lease, error) { if errors.Is(err, ErrConstraint) { return nil, fmt.Errorf("no pending async calls: %w", ErrNotFound) } - return nil, err + return nil, fmt.Errorf("failed to acquire async call: %w", err) + } + return &AsyncCall{ + ID: row.AsyncCallID, + Verb: row.Verb, + Origin: AsyncCallOrigin(row.Origin), + OriginKey: row.OriginKey, + Request: row.Request, + Lease: d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl), + }, nil +} + +// CompleteAsyncCall completes an async call. +// +// Either [response] or [responseError] must be provided, but not both. +func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, response []byte, responseError optional.Option[string]) error { + if (response == nil) != responseError.Ok() { + return fmt.Errorf("must provide exactly one of response or error") + } + _, err := d.db.CompleteAsyncCall(ctx, response, responseError, call.ID) + if err != nil { + return translatePGError(err) + } + return nil +} + +func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) { + row, err := d.db.LoadAsyncCall(ctx, id) + if err != nil { + return nil, translatePGError(err) } - return d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl), nil + return &AsyncCall{ + ID: row.ID, + Verb: row.Verb, + Origin: AsyncCallOrigin(row.Origin), + OriginKey: row.OriginKey, + Request: row.Request, + }, nil } diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index b28ac1f502..0463c3d80e 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/schema" @@ -20,15 +21,35 @@ func TestSendFSMEvent(t *testing.T) { _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, ErrNotFound) - err = dal.SendFSMEvent(ctx, "test", "test", "state", schema.Ref{Module: "module", Name: "verb"}, []byte(`{}`)) + ref := schema.Ref{Module: "module", Name: "verb"} + err = dal.SendFSMEvent(ctx, "test", "invoiceID", "state", ref, []byte(`{}`)) assert.NoError(t, err) - lease, err := dal.AcquireAsyncCall(ctx) + call, err := dal.AcquireAsyncCall(ctx) assert.NoError(t, err) t.Cleanup(func() { - err := lease.Release() + err := call.Lease.Release() assert.NoError(t, err) }) - assert.HasPrefix(t, lease.String(), "/system/async_call/1:") + assert.HasPrefix(t, call.Lease.String(), "/system/async_call/1:") + expectedCall := &AsyncCall{ + ID: 1, + Lease: call.Lease, + Origin: AsyncCallOriginFSM, + OriginKey: "invoiceID", + Verb: ref, + Request: []byte(`{}`), + } + assert.Equal(t, expectedCall, call) + + err = dal.CompleteAsyncCall(ctx, call, nil, optional.None[string]()) + assert.EqualError(t, err, "must provide exactly one of response or error") + + err = dal.CompleteAsyncCall(ctx, call, []byte(`{}`), optional.None[string]()) + assert.NoError(t, err) + + actual, err := dal.LoadAsyncCall(ctx, call.ID) + assert.NoError(t, err) + assert.Equal(t, call, actual, assert.Exclude[*Lease]()) } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 625b1c094a..1690114b1a 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -910,7 +910,7 @@ func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { return model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, + Verb: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, @@ -949,7 +949,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attempte CronJob: model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, + Verb: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index 4a58cb69e3..f4a949b1fb 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -133,7 +133,7 @@ func (s *Scheduler) run(ctx context.Context) { if errors.Is(err, leases.ErrConflict) { logger.Scope(job.name).Tracef("Lease is held by another controller, will try again shortly.") } else { - logger.Scope(job.name).Warnf("Failed to acquire lease: %s", err) + logger.Scope(job.name).Debugf("Failed to acquire lease: %s", err) } job.next = s.clock.Now().Add(job.retry.Duration()) continue diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 03cf3d4ddb..5d9e029dc9 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -373,7 +373,7 @@ type AsyncCall struct { ID int64 CreatedAt time.Time LeaseID optional.Option[int64] - Verb string + Verb schema.Ref State AsyncCallState Origin AsyncCallOrigin OriginKey string diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index a5f1796efa..248e493dcc 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -20,6 +20,7 @@ type Querier interface { AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) AddAsyncCall(ctx context.Context, arg AddAsyncCallParams) (bool, error) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error + CompleteAsyncCall(ctx context.Context, response []byte, error optional.Option[string], iD int64) (bool, error) // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error @@ -69,6 +70,7 @@ type Querier interface { // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) + LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) NewLease(ctx context.Context, key leases.Key, ttl time.Duration) (uuid.UUID, error) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) RenewLease(ctx context.Context, ttl time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) @@ -76,6 +78,8 @@ type Querier interface { // Find an idle runner and reserve it for the given deployment. ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) // Creates a new FSM execution, including initial async call and transition. + // + // "key" is the unique identifier for the FSM execution. SendFSMEvent(ctx context.Context, arg SendFSMEventParams) (int64, error) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 3491433e94..c85170dec7 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -505,10 +505,32 @@ WITH async_call AS ( UPDATE async_calls SET state = 'executing', lease_id = (SELECT id FROM lease) WHERE id = (SELECT id FROM async_call) -RETURNING id AS async_call_id, (SELECT idempotency_key FROM lease) AS lease_idempotency_key, (SELECT key FROM lease) AS lease_key; +RETURNING + id AS async_call_id, + (SELECT idempotency_key FROM lease) AS lease_idempotency_key, + (SELECT key FROM lease) AS lease_key, + origin, + origin_key, + verb, + request; + +-- name: CompleteAsyncCall :one +UPDATE async_calls +SET state = 'success', + response = @response, + error = @error +WHERE id = @id +RETURNING true; + +-- name: LoadAsyncCall :one +SELECT * +FROM async_calls +WHERE id = @id; -- name: SendFSMEvent :one -- Creates a new FSM execution, including initial async call and transition. +-- +-- "key" is the unique identifier for the FSM execution. WITH execution AS ( INSERT INTO fsm_executions (key, name, state) VALUES (@key, @name, @state) diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 934b07bc41..114dee0bae 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -31,13 +31,24 @@ WITH async_call AS ( UPDATE async_calls SET state = 'executing', lease_id = (SELECT id FROM lease) WHERE id = (SELECT id FROM async_call) -RETURNING id AS async_call_id, (SELECT idempotency_key FROM lease) AS lease_idempotency_key, (SELECT key FROM lease) AS lease_key +RETURNING + id AS async_call_id, + (SELECT idempotency_key FROM lease) AS lease_idempotency_key, + (SELECT key FROM lease) AS lease_key, + origin, + origin_key, + verb, + request ` type AcquireAsyncCallRow struct { AsyncCallID int64 LeaseIdempotencyKey uuid.UUID LeaseKey leases.Key + Origin AsyncCallOrigin + OriginKey string + Verb schema.Ref + Request []byte } // Reserve a pending async call for execution, returning the associated lease @@ -45,7 +56,15 @@ type AcquireAsyncCallRow struct { func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) { row := q.db.QueryRow(ctx, acquireAsyncCall, ttl) var i AcquireAsyncCallRow - err := row.Scan(&i.AsyncCallID, &i.LeaseIdempotencyKey, &i.LeaseKey) + err := row.Scan( + &i.AsyncCallID, + &i.LeaseIdempotencyKey, + &i.LeaseKey, + &i.Origin, + &i.OriginKey, + &i.Verb, + &i.Request, + ) return i, err } @@ -56,7 +75,7 @@ RETURNING true ` type AddAsyncCallParams struct { - Verb string + Verb schema.Ref Origin AsyncCallOrigin OriginKey string Request []byte @@ -96,6 +115,22 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc return err } +const completeAsyncCall = `-- name: CompleteAsyncCall :one +UPDATE async_calls +SET state = 'success', + response = $1, + error = $2 +WHERE id = $3 +RETURNING true +` + +func (q *Queries) CompleteAsyncCall(ctx context.Context, response []byte, error optional.Option[string], iD int64) (bool, error) { + row := q.db.QueryRow(ctx, completeAsyncCall, response, error, iD) + var column_1 bool + err := row.Scan(&column_1) + return column_1, err +} + const createArtefact = `-- name: CreateArtefact :one INSERT INTO artefacts (digest, content) VALUES ($1, $2) @@ -1507,6 +1542,30 @@ func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) ( return count, err } +const loadAsyncCall = `-- name: LoadAsyncCall :one +SELECT id, created_at, lease_id, verb, state, origin, origin_key, request, response, error +FROM async_calls +WHERE id = $1 +` + +func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) { + row := q.db.QueryRow(ctx, loadAsyncCall, id) + var i AsyncCall + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.LeaseID, + &i.Verb, + &i.State, + &i.Origin, + &i.OriginKey, + &i.Request, + &i.Response, + &i.Error, + ) + return i, err +} + const newLease = `-- name: NewLease :one INSERT INTO leases (idempotency_key, key, expires_at) VALUES (gen_random_uuid(), $1::lease_key, (NOW() AT TIME ZONE 'utc') + $2::interval) @@ -1624,11 +1683,13 @@ type SendFSMEventParams struct { Key string Name string State string - Verb string + Verb schema.Ref Request []byte } // Creates a new FSM execution, including initial async call and transition. +// +// "key" is the unique identifier for the FSM execution. func (q *Queries) SendFSMEvent(ctx context.Context, arg SendFSMEventParams) (int64, error) { row := q.db.QueryRow(ctx, sendFSMEvent, arg.Key, diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 365236aa7a..b1e7ebfa4c 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -40,6 +40,9 @@ CREATE TABLE modules name TEXT UNIQUE NOT NULL ); +-- [.] represented as a schema.Ref +CREATE DOMAIN schema_ref AS TEXT; + -- Proto-encoded module schema. CREATE DOMAIN module_schema_pb AS BYTEA; @@ -405,7 +408,7 @@ CREATE TABLE async_calls ( -- with (lease_id IS NULL AND async_call_state != 'pending') lease_id BIGINT REFERENCES leases(id) ON DELETE SET NULL, - verb TEXT NOT NULL, + verb schema_ref NOT NULL, state async_call_state NOT NULL DEFAULT 'pending', -- Origin of the call. origin async_call_origin NOT NULL, diff --git a/backend/controller/sql/types.go b/backend/controller/sql/types.go index 49b1c9fdc0..5ac6a5fc02 100644 --- a/backend/controller/sql/types.go +++ b/backend/controller/sql/types.go @@ -9,9 +9,11 @@ import ( "github.com/google/uuid" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" ) +type NullRef = optional.Option[schema.Ref] type NullUUID = optional.Option[uuid.UUID] type NullLeaseKey = optional.Option[leases.Key] type NullTime = optional.Option[time.Time] diff --git a/backend/schema/ref.go b/backend/schema/ref.go index 36e4c9869e..a590bf0a98 100644 --- a/backend/schema/ref.go +++ b/backend/schema/ref.go @@ -1,12 +1,23 @@ package schema import ( + "database/sql" + "database/sql/driver" + "google.golang.org/protobuf/proto" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/internal/slices" ) +// RefKey is a map key for a reference. +type RefKey struct { + Module string + Name string +} + +func (r RefKey) String() string { return makeRef(r.Module, r.Name) } + // Ref is an untyped reference to a symbol. type Ref struct { Pos Position `parser:"" protobuf:"1,optional"` @@ -17,17 +28,24 @@ type Ref struct { TypeParameters []Type `parser:"[ '<' @@ (',' @@)* '>' ]" protobuf:"4"` } -// RefKey is a map key for a reference. -type RefKey struct { - Module string - Name string -} +var _ sql.Scanner = (*Ref)(nil) +var _ driver.Valuer = (*Ref)(nil) -func (r RefKey) String() string { return makeRef(r.Module, r.Name) } +func (r Ref) Value() (driver.Value, error) { return r.String(), nil } + +func (r *Ref) Scan(src any) error { + p, err := ParseRef(src.(string)) + if err != nil { + return err + } + *r = *p + return nil +} func (r Ref) ToRefKey() RefKey { return RefKey{Module: r.Module, Name: r.Name} } + func (r *Ref) ToProto() proto.Message { return &schemapb.Ref{ Pos: posToProto(r.Pos), @@ -75,7 +93,12 @@ func RefFromProto(s *schemapb.Ref) *Ref { } func ParseRef(ref string) (*Ref, error) { - return refParser.ParseString("", ref) + out, err := refParser.ParseString("", ref) + if err != nil { + return nil, err + } + out.Pos = Position{} + return out, nil } func refListToSchema(s []*schemapb.Ref) []*Ref { diff --git a/internal/model/cron_job.go b/internal/model/cron_job.go index 42c5bf8347..8b190f437e 100644 --- a/internal/model/cron_job.go +++ b/internal/model/cron_job.go @@ -2,6 +2,8 @@ package model import ( "time" + + "github.com/TBD54566975/ftl/backend/schema" ) type CronJobState string @@ -14,7 +16,7 @@ const ( type CronJob struct { Key CronJobKey DeploymentKey DeploymentKey - Verb VerbRef + Verb schema.Ref Schedule string StartTime time.Time NextExecution time.Time diff --git a/internal/model/verb_ref.go b/internal/model/verb_ref.go deleted file mode 100644 index 3f4e507e33..0000000000 --- a/internal/model/verb_ref.go +++ /dev/null @@ -1,12 +0,0 @@ -package model - -import "fmt" - -type VerbRef struct { - Module string - Name string -} - -func (v VerbRef) String() string { - return fmt.Sprintf("%s.%s", v.Module, v.Name) -} diff --git a/sqlc.yaml b/sqlc.yaml index 61197979fb..420fd6a6cc 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -42,6 +42,12 @@ sql: nullable: true go_type: type: "NullRunnerKey" + - db_type: "schema_ref" + go_type: "github.com/TBD54566975/ftl/backend/schema.Ref" + - db_type: "schema_ref" + nullable: true + go_type: + type: "NullSchemaRef" - db_type: "cron_job_key" go_type: "github.com/TBD54566975/ftl/internal/model.CronJobKey" - db_type: "cron_job_key"