diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 040e123567e..9629e845336 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -43,8 +43,8 @@ type Engine struct { stopCh services.StopChan newWorkerTimeout time.Duration - // Used for testing to wait for an execution to complete - xxxExecutionFinished chan string + // testing lifecycle hook to signal when an execution is finished. + onExecutionFinished func(string) // testing lifecycle hook to signal initialization status afterInit func(success bool) // Used for testing to control the number of retries @@ -511,14 +511,7 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status return err } - // Signal that an execution has finished in a - // non-blocking fashion. This is intended for - // testing purposes only. - select { - case e.xxxExecutionFinished <- executionID: - default: - } - + e.onExecutionFinished(executionID) return nil } @@ -688,9 +681,10 @@ type Config struct { PeerID func() *p2ptypes.PeerID // For testing purposes only - maxRetries int - retryMs int - afterInit func(success bool) + maxRetries int + retryMs int + afterInit func(success bool) + onExecutionFinished func(weid string) } const ( @@ -720,6 +714,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) { cfg.afterInit = func(success bool) {} } + if cfg.onExecutionFinished == nil { + cfg.onExecutionFinished = func(weid string) {} + } + // TODO: validation of the workflow spec // We'll need to check, among other things: // - that there are no step `ref` called `trigger` as this is reserved for any triggers @@ -756,11 +754,11 @@ func NewEngine(cfg Config) (engine *Engine, err error) { triggerEvents: make(chan capabilities.CapabilityResponse), stopCh: make(chan struct{}), newWorkerTimeout: cfg.NewWorkerTimeout, - // For testing purposes only - xxxExecutionFinished: make(chan string), - afterInit: cfg.afterInit, - maxRetries: cfg.maxRetries, - retryMs: cfg.retryMs, + + onExecutionFinished: cfg.onExecutionFinished, + afterInit: cfg.afterInit, + maxRetries: cfg.maxRetries, + retryMs: cfg.retryMs, } return engine, nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 4821b5800c7..3896f840ee8 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -65,10 +65,16 @@ targets: abi: "receive(report bytes)" ` +type testHooks struct { + initFailed chan struct{} + executionFinished chan string +} + // newTestEngine creates a new engine with some test defaults. -func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engine, initFailed chan struct{}) { +func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (*Engine, *testHooks) { peerID := p2ptypes.PeerID{} - initFailed = make(chan struct{}) + initFailed := make(chan struct{}) + executionFinished := make(chan string, 100) cfg := Config{ Lggr: logger.TestLogger(t), Registry: reg, @@ -82,10 +88,13 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engin close(initFailed) } }, + onExecutionFinished: func(weid string) { + executionFinished <- weid + }, } eng, err := NewEngine(cfg) require.NoError(t, err) - return eng, initFailed + return eng, &testHooks{initFailed: initFailed, executionFinished: executionFinished} } // getExecutionId returns the execution id of the workflow that is @@ -93,13 +102,14 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engin // // If the engine fails to initialize, the test will fail rather // than blocking indefinitely. -func getExecutionId(t *testing.T, eng *Engine, initFailed <-chan struct{}) string { +func getExecutionId(t *testing.T, eng *Engine, hooks *testHooks) string { var eid string select { - case <-initFailed: + case <-hooks.initFailed: t.FailNow() - case eid = <-eng.xxxExecutionFinished: + case eid = <-hooks.executionFinished: } + return eid } @@ -186,13 +196,13 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { ) require.NoError(t, reg.Add(ctx, target2)) - eng, initFailed := newTestEngine(t, reg, hardcodedWorkflow) + eng, hooks := newTestEngine(t, reg, hardcodedWorkflow) err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := getExecutionId(t, eng, initFailed) + eid := getExecutionId(t, eng, hooks) assert.Equal(t, cr, <-target1.response) assert.Equal(t, cr, <-target2.response) @@ -340,13 +350,13 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { require.NoError(t, reg.Add(ctx, mockFailingConsensus())) require.NoError(t, reg.Add(ctx, mockTarget())) - eng, initFailed := newTestEngine(t, reg, simpleWorkflow) + eng, hooks := newTestEngine(t, reg, simpleWorkflow) err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := getExecutionId(t, eng, initFailed) + eid := getExecutionId(t, eng, hooks) state, err := eng.executionStates.get(ctx, eid) require.NoError(t, err) @@ -439,12 +449,12 @@ func TestEngine_MultiStepDependencies(t *testing.T) { action, out := mockAction() require.NoError(t, reg.Add(ctx, action)) - eng, initFailed := newTestEngine(t, reg, multiStepWorkflow) + eng, hooks := newTestEngine(t, reg, multiStepWorkflow) err := eng.Start(ctx) require.NoError(t, err) defer eng.Close() - eid := getExecutionId(t, eng, initFailed) + eid := getExecutionId(t, eng, hooks) state, err := eng.executionStates.get(ctx, eid) require.NoError(t, err)