Skip to content

Commit

Permalink
[CAPPL-324] rehydrate artifacts from db if they haven't changed (#15668)
Browse files Browse the repository at this point in the history
* fix: rehydrate artifacts from db if they haven't changed

* feat: implement GetWorkflowSpecByID
  • Loading branch information
agparadiso authored Dec 13, 2024
1 parent ea2d4f7 commit bc77243
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 16 deletions.
56 changes: 40 additions & 16 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,25 +400,13 @@ func (h *eventHandler) workflowRegisteredEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) error {
// Download the contents of binaryURL, configURL and secretsURL and cache them locally.
binary, err := h.fetcher(ctx, payload.BinaryURL)
// Fetch the workflow artifacts from the database or download them from the specified URLs
decodedBinary, config, err := h.getWorkflowArtifacts(ctx, payload)
if err != nil {
return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

decodedBinary, err := base64.StdEncoding.DecodeString(string(binary))
if err != nil {
return fmt.Errorf("failed to decode binary: %w", err)
}

var config []byte
if payload.ConfigURL != "" {
config, err = h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
return err
}

// Always fetch secrets from the SecretsURL
var secrets []byte
if payload.SecretsURL != "" {
secrets, err = h.fetcher(ctx, payload.SecretsURL)
Expand Down Expand Up @@ -499,6 +487,42 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

// getWorkflowArtifacts retrieves the workflow artifacts from the database if they exist,
// or downloads them from the specified URLs if they are not found in the database.
func (h *eventHandler) getWorkflowArtifacts(
ctx context.Context,
payload WorkflowRegistryWorkflowRegisteredV1,
) ([]byte, []byte, error) {
spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:]))
if err != nil {
binary, err2 := h.fetcher(ctx, payload.BinaryURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary))
if err2 != nil {
return nil, nil, fmt.Errorf("failed to decode binary: %w", err)
}

var config []byte
if payload.ConfigURL != "" {
config, err2 = h.fetcher(ctx, payload.ConfigURL)
if err2 != nil {
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
}
return decodedBinary, config, nil
}

// there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts
decodedBinary, err := hex.DecodeString(spec.Workflow)
if err != nil {
return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err)
}
return decodedBinary, []byte(spec.Config), nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
Expand Down
59 changes: 59 additions & 0 deletions core/services/workflows/syncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type WorkflowSpecsDS interface {

// DeleteWorkflowSpec deletes the workflow spec for the given owner and name.
DeleteWorkflowSpec(ctx context.Context, owner, name string) error

// GetWorkflowSpecByID returns the workflow spec for the given workflowID.
GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error)
}

type ORM interface {
Expand Down Expand Up @@ -370,6 +373,22 @@ func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.W
return &spec, nil
}

func (orm *orm) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) {
query := `
SELECT *
FROM workflow_specs
WHERE workflow_id = $1
`

var spec job.WorkflowSpec
err := orm.ds.GetContext(ctx, &spec, query, id)
if err != nil {
return nil, err
}

return &spec, nil
}

func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error {
query := `
DELETE FROM workflow_specs
Expand Down
39 changes: 39 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,45 @@ func Test_GetWorkflowSpec(t *testing.T) {
})
}

func Test_GetWorkflowSpecByID(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("gets a workflow spec by ID", func(t *testing.T) {
spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

id, err := orm.UpsertWorkflowSpec(ctx, spec)
require.NoError(t, err)
require.NotZero(t, id)

dbSpec, err := orm.GetWorkflowSpecByID(ctx, spec.WorkflowID)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)

err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
})

t.Run("fails if no workflow spec exists", func(t *testing.T) {
dbSpec, err := orm.GetWorkflowSpecByID(ctx, "inexistent-workflow-id")
require.Error(t, err)
require.Nil(t, dbSpec)
})
}

func Test_GetContentsByWorkflowID(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
Expand Down

0 comments on commit bc77243

Please sign in to comment.