Skip to content

Commit

Permalink
Merge branch 'develop' into chore/release-2.11.0-to-develop
Browse files Browse the repository at this point in the history
  • Loading branch information
snehaagni authored May 1, 2024
2 parents 57b7a15 + ded90f0 commit 80cb3f9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
34 changes: 16 additions & 18 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Engine struct {
stopCh services.StopChan
newWorkerTimeout time.Duration

// Used for testing to wait for an execution to complete
xxxExecutionFinished chan string
// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
// testing lifecycle hook to signal initialization status
afterInit func(success bool)
// Used for testing to control the number of retries
Expand Down Expand Up @@ -511,14 +511,7 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status
return err
}

// Signal that an execution has finished in a
// non-blocking fashion. This is intended for
// testing purposes only.
select {
case e.xxxExecutionFinished <- executionID:
default:
}

e.onExecutionFinished(executionID)
return nil
}

Expand Down Expand Up @@ -688,9 +681,10 @@ type Config struct {
PeerID func() *p2ptypes.PeerID

// For testing purposes only
maxRetries int
retryMs int
afterInit func(success bool)
maxRetries int
retryMs int
afterInit func(success bool)
onExecutionFinished func(weid string)
}

const (
Expand Down Expand Up @@ -720,6 +714,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
cfg.afterInit = func(success bool) {}
}

if cfg.onExecutionFinished == nil {
cfg.onExecutionFinished = func(weid string) {}
}

// TODO: validation of the workflow spec
// We'll need to check, among other things:
// - that there are no step `ref` called `trigger` as this is reserved for any triggers
Expand Down Expand Up @@ -756,11 +754,11 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
triggerEvents: make(chan capabilities.CapabilityResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
// For testing purposes only
xxxExecutionFinished: make(chan string),
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
retryMs: cfg.retryMs,

onExecutionFinished: cfg.onExecutionFinished,
afterInit: cfg.afterInit,
maxRetries: cfg.maxRetries,
retryMs: cfg.retryMs,
}
return engine, nil
}
34 changes: 22 additions & 12 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ targets:
abi: "receive(report bytes)"
`

type testHooks struct {
initFailed chan struct{}
executionFinished chan string
}

// newTestEngine creates a new engine with some test defaults.
func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engine, initFailed chan struct{}) {
func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (*Engine, *testHooks) {
peerID := p2ptypes.PeerID{}
initFailed = make(chan struct{})
initFailed := make(chan struct{})
executionFinished := make(chan string, 100)
cfg := Config{
Lggr: logger.TestLogger(t),
Registry: reg,
Expand All @@ -82,24 +88,28 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string) (eng *Engin
close(initFailed)
}
},
onExecutionFinished: func(weid string) {
executionFinished <- weid
},
}
eng, err := NewEngine(cfg)
require.NoError(t, err)
return eng, initFailed
return eng, &testHooks{initFailed: initFailed, executionFinished: executionFinished}
}

// getExecutionId returns the execution id of the workflow that is
// currently being executed by the engine.
//
// If the engine fails to initialize, the test will fail rather
// than blocking indefinitely.
func getExecutionId(t *testing.T, eng *Engine, initFailed <-chan struct{}) string {
func getExecutionId(t *testing.T, eng *Engine, hooks *testHooks) string {
var eid string
select {
case <-initFailed:
case <-hooks.initFailed:
t.FailNow()
case eid = <-eng.xxxExecutionFinished:
case eid = <-hooks.executionFinished:
}

return eid
}

Expand Down Expand Up @@ -186,13 +196,13 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
)
require.NoError(t, reg.Add(ctx, target2))

eng, initFailed := newTestEngine(t, reg, hardcodedWorkflow)
eng, hooks := newTestEngine(t, reg, hardcodedWorkflow)

err := eng.Start(ctx)
require.NoError(t, err)
defer eng.Close()

eid := getExecutionId(t, eng, initFailed)
eid := getExecutionId(t, eng, hooks)
assert.Equal(t, cr, <-target1.response)
assert.Equal(t, cr, <-target2.response)

Expand Down Expand Up @@ -340,13 +350,13 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) {
require.NoError(t, reg.Add(ctx, mockFailingConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

eng, initFailed := newTestEngine(t, reg, simpleWorkflow)
eng, hooks := newTestEngine(t, reg, simpleWorkflow)

err := eng.Start(ctx)
require.NoError(t, err)
defer eng.Close()

eid := getExecutionId(t, eng, initFailed)
eid := getExecutionId(t, eng, hooks)
state, err := eng.executionStates.get(ctx, eid)
require.NoError(t, err)

Expand Down Expand Up @@ -439,12 +449,12 @@ func TestEngine_MultiStepDependencies(t *testing.T) {
action, out := mockAction()
require.NoError(t, reg.Add(ctx, action))

eng, initFailed := newTestEngine(t, reg, multiStepWorkflow)
eng, hooks := newTestEngine(t, reg, multiStepWorkflow)
err := eng.Start(ctx)
require.NoError(t, err)
defer eng.Close()

eid := getExecutionId(t, eng, initFailed)
eid := getExecutionId(t, eng, hooks)
state, err := eng.executionStates.get(ctx, eid)
require.NoError(t, err)

Expand Down

0 comments on commit 80cb3f9

Please sign in to comment.