Skip to content

Commit

Permalink
fix(workflows/syncer): check that no engine is running before registe…
Browse files Browse the repository at this point in the history
…ring new one (#15574)

* fix(workflows/syncer): skip handling new registration if engine running

* refactor(syncer): address test changes

* refactor: swap to services.Service
  • Loading branch information
MStreet3 authored Dec 10, 2024
1 parent 10ef5a4 commit b5e3d9f
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 34 deletions.
5 changes: 5 additions & 0 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ func (h *eventHandler) workflowRegisteredEvent(
return fmt.Errorf("workflowID mismatch: %x != %x", hash, payload.WorkflowID)
}

// Ensure that there is no running workflow engine for the given workflow ID.
if h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) {
return fmt.Errorf("workflow is already running, so not starting it : %s", hex.EncodeToString(payload.WorkflowID[:]))
}

// Save the workflow secrets
urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL))
if err != nil {
Expand Down
162 changes: 128 additions & 34 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
Expand Down Expand Up @@ -47,6 +48,28 @@ func newMockFetcher(m map[string]mockFetchResp) FetcherFunc {
return (&mockFetcher{responseMap: m}).Fetch
}

type mockEngine struct {
CloseErr error
ReadyErr error
StartErr error
}

func (m *mockEngine) Ready() error {
return m.ReadyErr
}

func (m *mockEngine) Close() error {
return m.CloseErr
}

func (m *mockEngine) Start(_ context.Context) error {
return m.StartErr
}

func (m *mockEngine) HealthReport() map[string]error { return nil }

func (m *mockEngine) Name() string { return "mockEngine" }

func Test_Handler(t *testing.T) {
lggr := logger.TestLogger(t)
emitter := custmsg.NewLabeler()
Expand Down Expand Up @@ -181,7 +204,11 @@ func Test_workflowRegisteredHandler(t *testing.T) {
var wfOwner = []byte("0xOwner")
var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
var encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary))
defaultValidationFn := func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {

defaultValidationFn := func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
Expand All @@ -204,6 +231,9 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -223,6 +253,71 @@ func Test_workflowRegisteredHandler(t *testing.T) {
},
validationFn: defaultValidationFn,
},
{
Name: "fails to start engine",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
},
},
{
Name: "fails if running engine exists",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
me := &mockEngine{}
h.engineRegistry.Add(wfID, me)
err := h.workflowRegisteredEvent(ctx, event)
require.Error(t, err)
require.ErrorContains(t, err, "workflow is already running")
},
},
{
Name: "success with paused workflow registered",
fetcher: newMockFetcher(map[string]mockFetchResp{
Expand All @@ -247,7 +342,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
Expand Down Expand Up @@ -315,62 +413,58 @@ func Test_workflowRegisteredHandler(t *testing.T) {
}

type testCase struct {
Name string
SecretsURL string
BinaryURL string
GiveBinary []byte
GiveConfig []byte
ConfigURL string
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string)
Name string
SecretsURL string
BinaryURL string
GiveBinary []byte
GiveConfig []byte
ConfigURL string
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
}

func testRunningWorkflow(t *testing.T, cmd testCase) {
func testRunningWorkflow(t *testing.T, tc testCase) {
t.Helper()
t.Run(cmd.Name, func(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
var (
ctx = testutils.Context(t)
lggr = logger.TestLogger(t)
db = pgtest.NewSqlxDB(t)
orm = NewWorkflowRegistryDS(db, lggr)
emitter = custmsg.NewLabeler()

binary = cmd.GiveBinary
config = cmd.GiveConfig
secretsURL = cmd.SecretsURL
wfOwner = cmd.WFOwner
binary = tc.GiveBinary
config = tc.GiveConfig
secretsURL = tc.SecretsURL
wfOwner = tc.WFOwner

fetcher = cmd.fetcher
fetcher = tc.fetcher
)

giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)

wfID := hex.EncodeToString(giveWFID[:])

event := cmd.Event(giveWFID[:])
event := tc.Event(giveWFID[:])

er := NewEngineRegistry()
opts := []func(*eventHandler){
WithEngineRegistry(er),
}
if tc.engineFactoryFn != nil {
opts = append(opts, WithEngineFactoryFn(tc.engineFactoryFn))
}
store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock())
registry := capabilities.NewRegistry(lggr)
registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{})
h := NewEventHandler(
lggr,
orm,
fetcher,
store,
registry,
emitter,
clockwork.NewFakeClock(),
workflowkey.Key{},
WithEngineRegistry(er),
)
err = h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)
h := NewEventHandler(lggr, orm, fetcher, store, registry, emitter, clockwork.NewFakeClock(),
workflowkey.Key{}, opts...)

cmd.validationFn(t, ctx, h, wfOwner, "workflow-name", wfID)
tc.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID)
})
}

Expand Down

0 comments on commit b5e3d9f

Please sign in to comment.