Skip to content

Commit

Permalink
[fix] Only wrap targets with local transmission (#13537)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Jun 13, 2024
1 parent bc01abe commit c55db97
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 13 deletions.
25 changes: 14 additions & 11 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,21 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err)
}

// Special treatment for local targets - wrap into a transmission capability
target, isTarget := cp.(capabilities.TargetCapability)
if isTarget {
capInfo, err2 := target.Info(ctx)
if err2 != nil {
return fmt.Errorf("failed to get info of target capability: %w", err2)
}
info, err := cp.Info(ctx)
if err != nil {
return fmt.Errorf("failed to get info of capability with id %s: %w", step.ID, err)
}

// If the DON is nil this is a local target
if capInfo.DON == nil {
cp = transmission.NewLocalTargetCapability(e.logger, *e.donInfo.PeerID(), *e.donInfo.DON, target)
}
// Special treatment for local targets - wrap into a transmission capability
// If the DON is nil, this is a local target.
if info.CapabilityType == capabilities.CapabilityTypeTarget && info.DON == nil {
e.logger.Debugf("wrapping capability %s in local transmission protocol", info.ID)
cp = transmission.NewLocalTargetCapability(
e.logger,
*e.donInfo.PeerID(),
*e.donInfo.DON,
cp.(capabilities.TargetCapability),
)
}

// We configure actions, consensus and targets here, and
Expand Down
101 changes: 99 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflows
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -72,13 +73,15 @@ targets:

type testHooks struct {
initFailed chan struct{}
initSuccessful chan struct{}
executionFinished chan string
}

// newTestEngine creates a new engine with some test defaults.
func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...func(c *Config)) (*Engine, *testHooks) {
peerID := p2ptypes.PeerID{}
initFailed := make(chan struct{})
initSuccessful := make(chan struct{})
executionFinished := make(chan string, 100)
clock := clockwork.NewFakeClock()
cfg := Config{
Expand All @@ -92,7 +95,9 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...fun
maxRetries: 1,
retryMs: 100,
afterInit: func(success bool) {
if !success {
if success {
close(initSuccessful)
} else {
close(initFailed)
}
},
Expand All @@ -107,7 +112,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...fun
}
eng, err := NewEngine(cfg)
require.NoError(t, err)
return eng, &testHooks{initFailed: initFailed, executionFinished: executionFinished}
return eng, &testHooks{initSuccessful: initSuccessful, initFailed: initFailed, executionFinished: executionFinished}
}

// getExecutionId returns the execution id of the workflow that is
Expand Down Expand Up @@ -651,3 +656,95 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, store.StatusTimeout, gotEx.Status)
}

const (
delayedWorkflow = `
triggers:
- id: "[email protected]"
config:
feedlist:
- "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD
- "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD
- "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD
consensus:
- id: "[email protected]"
ref: "evm_median"
inputs:
observations:
- "$(trigger.outputs)"
config:
aggregation_method: "data_feeds_2_0"
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"
targets:
- id: "[email protected]"
inputs:
report: "$(evm_median.outputs.report)"
config:
address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef"
params: ["$(report)"]
abi: "receive(report bytes)"
deltaStage: 2s
schedule: allAtOnce
`
)

func TestEngine_WrapsTargets(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
reg := coreCap.NewRegistry(logger.TestLogger(t))

trigger, _ := mockTrigger(t)

require.NoError(t, reg.Add(ctx, trigger))
require.NoError(t, reg.Add(ctx, mockConsensus()))
require.NoError(t, reg.Add(ctx, mockTarget()))

clock := clockwork.NewFakeClock()
dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clock)

eng, hooks := newTestEngine(
t,
reg,
delayedWorkflow,
func(c *Config) {
c.Store = dbstore
c.clock = clock
},
)
err := eng.Start(ctx)
require.NoError(t, err)

<-hooks.initSuccessful

err = eng.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error {
if s.Ref == workflows.KeywordTrigger {
return nil
}

info, err2 := s.capability.Info(ctx)
require.NoError(t, err2)

if info.CapabilityType == capabilities.CapabilityTypeTarget {
assert.Equal(t, "*transmission.LocalTargetCapability", fmt.Sprintf("%T", s.capability))
} else {
assert.NotEqual(t, "*transmission.LocalTargetCapability", fmt.Sprintf("%T", s.capability))
}

return nil
})
require.NoError(t, err)
}

0 comments on commit c55db97

Please sign in to comment.