diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 8985f9d1599..3261bdd3fce 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -2,12 +2,9 @@ package workflows import ( "context" - "errors" "fmt" "time" - "github.com/shopspring/decimal" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -24,23 +21,14 @@ const ( type Engine struct { services.StateMachine - logger logger.Logger - registry types.CapabilitiesRegistry - triggerType string - triggerConfig *values.Map - trigger capabilities.TriggerCapability - consensusType string - consensusConfig *values.Map - consensus capabilities.ConsensusCapability - targets []target - callbackCh chan capabilities.CapabilityResponse - cancel func() -} - -type target struct { - typeStr string - config *values.Map - capability capabilities.TargetCapability + logger logger.Logger + registry types.CapabilitiesRegistry + trigger capabilities.TriggerCapability + consensus capabilities.ConsensusCapability + targets []capabilities.TargetCapability + workflow *Workflow + callbackCh chan capabilities.CapabilityResponse + cancel func() } func (e *Engine) Start(ctx context.Context) error { @@ -58,6 +46,12 @@ func (e *Engine) init(ctx context.Context) { retrySec := 5 ticker := time.NewTicker(time.Duration(retrySec) * time.Second) defer ticker.Stop() + + // Note: in our hardcoded workflow, there is only one trigger, + // and one consensus step. + trigger := e.workflow.Triggers[0] + consensus := e.workflow.Consensus[0] + var err error LOOP: for { @@ -65,19 +59,21 @@ LOOP: case <-ctx.Done(): return case <-ticker.C: - e.trigger, err = e.registry.GetTrigger(ctx, e.triggerType) + e.trigger, err = e.registry.GetTrigger(ctx, trigger.Type) if err != nil { e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec) break } - e.consensus, err = e.registry.GetConsensus(ctx, e.consensusType) + + e.consensus, err = e.registry.GetConsensus(ctx, consensus.Type) if err != nil { e.logger.Errorf("failed to get consensus capability: %s, retrying in %d seconds", err, retrySec) break } failed := false - for i := range e.targets { - e.targets[i].capability, err = e.registry.GetTarget(ctx, e.targets[i].typeStr) + e.targets = make([]capabilities.TargetCapability, len(e.workflow.Targets)) + for i, target := range e.workflow.Targets { + e.targets[i], err = e.registry.GetTarget(ctx, target.Type) if err != nil { e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec) failed = true @@ -97,11 +93,15 @@ LOOP: } // also register for consensus + cm, err := values.NewMap(consensus.Config) + if err != nil { + e.logger.Errorf("failed to convert config to values.Map: %s", err) + } reg := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ WorkflowID: mockedWorkflowID, }, - Config: e.consensusConfig, + Config: cm, } err = e.consensus.RegisterToWorkflow(ctx, reg) if err != nil { @@ -112,6 +112,7 @@ LOOP: } func (e *Engine) registerTrigger(ctx context.Context) error { + trigger := e.workflow.Triggers[0] triggerInputs, err := values.NewMap( map[string]any{ "triggerId": mockedTriggerID, @@ -121,11 +122,16 @@ func (e *Engine) registerTrigger(ctx context.Context) error { return err } + tc, err := values.NewMap(trigger.Config) + if err != nil { + return err + } + triggerRegRequest := capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: mockedWorkflowID, }, - Config: e.triggerConfig, + Config: tc, Inputs: triggerInputs, } err = e.trigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) @@ -148,73 +154,99 @@ func (e *Engine) triggerHandlerLoop(ctx context.Context) { func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) { e.logger.Debugw("executing on a trigger event", "event", event) - result, err := e.handleConsensus(ctx, event) - if err != nil { - e.logger.Errorf("error in handleConsensus %v", err) + trigger := e.workflow.Triggers[0] + if event.Err != nil { + e.logger.Errorf("trigger event was an error; not executing", event.Err) return } - err = e.handleTargets(ctx, result) - if err != nil { - e.logger.Error("error in handleTargets %v", err) - } -} -func (e *Engine) handleConsensus(ctx context.Context, event capabilities.CapabilityResponse) (values.Value, error) { - e.logger.Debugw("running consensus", "event", event) - cr := capabilities.CapabilityRequest{ - Metadata: capabilities.RequestMetadata{ - WorkflowID: mockedWorkflowID, - WorkflowExecutionID: mockedExecutionID, - }, - Inputs: &values.Map{ - Underlying: map[string]values.Value{ - // each node provides a single observation - outputs of mercury trigger - "observations": &values.List{ - Underlying: []values.Value{event.Value}, + ec := &executionState{ + steps: map[string]*stepState{ + trigger.Ref: { + outputs: &stepOutput{ + value: event.Value, }, }, }, - Config: e.consensusConfig, + workflowID: mockedWorkflowID, + executionID: mockedExecutionID, } - chReports := make(chan capabilities.CapabilityResponse, 10) - newCtx, cancel := context.WithCancel(ctx) - defer cancel() - err := e.consensus.Execute(newCtx, chReports, cr) + + consensus := e.workflow.Consensus[0] + err := e.handleStep(ctx, ec, consensus) if err != nil { - return nil, err + e.logger.Errorf("error in handleConsensus %v", err) + return } - select { - case resp := <-chReports: - if resp.Err != nil { - return nil, resp.Err + + for _, trg := range e.workflow.Targets { + err := e.handleStep(ctx, ec, trg) + if err != nil { + e.logger.Errorf("error in handleTargets %v", err) + return } - return resp.Value, nil - case <-ctx.Done(): - return nil, ctx.Err() } } -func (e *Engine) handleTargets(ctx context.Context, resp values.Value) error { - e.logger.Debugw("handle targets") - inputs := map[string]values.Value{ - "report": resp, +func (e *Engine) handleStep(ctx context.Context, es *executionState, node Capability) error { + stepState := &stepState{ + outputs: &stepOutput{}, } + es.steps[node.Ref] = stepState - var combinedErr error - for _, t := range e.targets { - e.logger.Debugw("sending to target", "target", t.typeStr, "inputs", inputs) - tr := capabilities.CapabilityRequest{ - Inputs: &values.Map{Underlying: inputs}, - Config: t.config, - Metadata: capabilities.RequestMetadata{ - WorkflowID: mockedWorkflowID, - WorkflowExecutionID: mockedExecutionID, - }, - } - _, err := capabilities.ExecuteSync(ctx, t.capability, tr) - combinedErr = errors.Join(combinedErr, err) + // Let's get the capability. If we fail here, we'll bail out + // and try to handle it at the next execution. + cp, err := e.registry.Get(ctx, node.Type) + if err != nil { + return err + } + + api, ok := cp.(capabilities.CallbackExecutable) + if !ok { + return fmt.Errorf("capability %s must be an action, consensus or target", node.Type) } - return combinedErr + + i, err := findAndInterpolateAllKeys(node.Inputs, es) + if err != nil { + return err + } + + inputs, err := values.NewMap(i.(map[string]any)) + if err != nil { + return err + } + + stepState.inputs = inputs + + config, err := values.NewMap(node.Config) + if err != nil { + return err + } + + tr := capabilities.CapabilityRequest{ + Inputs: inputs, + Config: config, + Metadata: capabilities.RequestMetadata{ + WorkflowID: es.workflowID, + WorkflowExecutionID: es.executionID, + }, + } + + resp, err := capabilities.ExecuteSync(ctx, api, tr) + if err != nil { + stepState.outputs.err = err + return err + } + + // `ExecuteSync` returns a `values.List` even if there was + // just one return value. If that is the case, let's unwrap the + // single value to make it easier to use in -- for example -- variable interpolation. + if len(resp.Underlying) > 1 { + stepState.outputs.value = resp + } else { + stepState.outputs.value = resp.Underlying[0] + } + return nil } func (e *Engine) Close() error { @@ -240,76 +272,66 @@ func (e *Engine) Close() error { } func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine *Engine, err error) { - engine = &Engine{ - logger: lggr.Named("WorkflowEngine"), - registry: registry, - callbackCh: make(chan capabilities.CapabilityResponse), - } + yamlWorkflowSpec := ` +triggers: + - type: "on_mercury_report" + ref: report_data + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +consensus: + - type: "offchain_reporting" + ref: evm_median + inputs: + observations: + - $(report_data.outputs) + config: + aggregation_method: data_feeds_2_0 + aggregation_config: + 0x1111111111111111111100000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + 0x2222222222222222222200000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + 0x3333333333333333333300000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + encoder: EVM + encoder_config: + abi: "mercury_reports bytes[]" - // Trigger - engine.triggerType = "on_mercury_report" - engine.triggerConfig, err = values.NewMap( - map[string]any{ - "feedlist": []any{ - "0x1111111111111111111100000000000000000000000000000000000000000000", // ETHUSD - "0x2222222222222222222200000000000000000000000000000000000000000000", // LINKUSD - "0x3333333333333333333300000000000000000000000000000000000000000000", // BTCUSD - }, - }, - ) - if err != nil { - return nil, err - } +targets: + - type: write_polygon-testnet-mumbai + inputs: + report: + - $(evm_median.outputs.reports) + config: + address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" + params: [($inputs.report)] + abi: "receive(report bytes)" + - type: write_ethereum-testnet-sepolia + inputs: + report: + - $(evm_median.outputs.reports) + config: + address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" + params: ["$(inputs.report)"] + abi: "receive(report bytes)" +` - // Consensus - engine.consensusType = "offchain_reporting" - engine.consensusConfig, err = values.NewMap(map[string]any{ - "aggregation_method": "data_feeds_2_0", - "aggregation_config": map[string]any{ - // ETHUSD - "0x1111111111111111111100000000000000000000000000000000000000000000": map[string]any{ - "deviation": decimal.NewFromFloat(0.001), - "heartbeat": 1800, - }, - // LINKUSD - "0x2222222222222222222200000000000000000000000000000000000000000000": map[string]any{ - "deviation": decimal.NewFromFloat(0.001), - "heartbeat": 1800, - }, - // BTCUSD - "0x3333333333333333333300000000000000000000000000000000000000000000": map[string]any{ - "deviation": decimal.NewFromFloat(0.001), - "heartbeat": 1800, - }, - }, - "encoder": "EVM", - "encoder_config": map[string]any{ - "abi": "mercury_reports bytes[]", - }, - }) + workflow, err := Parse(yamlWorkflowSpec) if err != nil { return nil, err } - - // Targets - engine.targets = make([]target, 2) - engine.targets[0].typeStr = "write_polygon-testnet-mumbai" - engine.targets[0].config, err = values.NewMap(map[string]any{ - "address": "0x3F3554832c636721F1fD1822Ccca0354576741Ef", - "params": []any{"$(report)"}, - "abi": "receive(report bytes)", - }) - if err != nil { - return nil, err - } - engine.targets[1].typeStr = "write_ethereum-testnet-sepolia" - engine.targets[1].config, err = values.NewMap(map[string]any{ - "address": "0x54e220867af6683aE6DcBF535B4f952cB5116510", - "params": []any{"$(report)"}, - "abi": "receive(report bytes)", - }) - if err != nil { - return nil, err + engine = &Engine{ + logger: lggr.Named("WorkflowEngine"), + registry: registry, + workflow: workflow, + callbackCh: make(chan capabilities.CapabilityResponse), } - return + return engine, nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index e63264c789f..74a2093c0d2 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -88,8 +88,18 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { "v3.0.0", ), func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + obs := req.Inputs.Underlying["observations"] + reports := obs.(*values.List) + rm := map[string]any{ + "reports": reports.Underlying[0], + } + rv, err := values.NewMap(rm) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + return capabilities.CapabilityResponse{ - Value: req.Inputs.Underlying["observations"], + Value: rv, }, nil }, ) diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go new file mode 100644 index 00000000000..e002fa90501 --- /dev/null +++ b/core/services/workflows/state.go @@ -0,0 +1,144 @@ +package workflows + +import ( + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +type stepOutput struct { + err error + value values.Value +} + +type stepState struct { + inputs *values.Map + outputs *stepOutput +} + +type executionState struct { + steps map[string]*stepState + executionID string + workflowID string +} + +// interpolateKey takes a multi-part, dot-separated key and attempts to replace +// it with its corresponding value in `state`. +// A key is valid if: +// - it contains at least two parts, with the first part being the workflow step's `ref` variable, and the second being one of `inputs` or `outputs` +// - any subsequent parts will be processed as a list index (if the current element is a list) or a map key (if it's a map) +func interpolateKey(key string, state *executionState) (any, error) { + parts := strings.Split(key, ".") + + if len(parts) < 2 { + return "", fmt.Errorf("cannot interpolate %s: must have at least two parts", key) + } + + sc, ok := state.steps[parts[0]] + if !ok { + return "", fmt.Errorf("could not find ref `%s`", parts[0]) + } + + var value values.Value + switch parts[1] { + case "inputs": + value = sc.inputs + case "outputs": + if sc.outputs.err != nil { + return "", fmt.Errorf("cannot interpolate ref part `%s` in `%+v`: step has errored", parts[1], sc) + } + + value = sc.outputs.value + default: + return "", fmt.Errorf("cannot interpolate ref part `%s` in `%+v`: second part must be `inputs` or `outputs`", parts[1], sc) + } + + val, err := values.Unwrap(value) + if err != nil { + return "", err + } + + remainingParts := parts[2:] + for _, r := range remainingParts { + switch v := val.(type) { + case map[string]any: + inner, ok := v[r] + if !ok { + return "", fmt.Errorf("could not find ref part `%s` in `%+v`", r, v) + } + + val = inner + case []any: + d, err := strconv.Atoi(r) + if err != nil { + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: `%s` is not convertible to an int", r, v, r) + } + + if d > len(v)-1 { + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: cannot fetch index %d", r, v, d) + } + + if d < 0 { + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index %d must be a positive number", r, v, d) + } + + val = v[d] + default: + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`", r, val) + } + } + + return val, nil +} + +var ( + interpolationTokenRe = regexp.MustCompile(`^\$\((\S+)\)$`) +) + +// findAndInterpolateAllKeys takes an `input` any value, and recursively +// identifies any values that should be replaced from `state`. +// A value `v` should be replaced if it is wrapped as follows `$(v)`. +func findAndInterpolateAllKeys(input any, state *executionState) (any, error) { + switch tv := input.(type) { + case string: + matches := interpolationTokenRe.FindStringSubmatch(tv) + if len(matches) < 2 { + return tv, nil + } + + interpolatedVar := matches[1] + nv, err := interpolateKey(interpolatedVar, state) + if err != nil { + return nil, err + } + + return nv, nil + case map[string]any: + nm := map[string]any{} + for k, v := range tv { + nv, err := findAndInterpolateAllKeys(v, state) + if err != nil { + return nil, err + } + + nm[k] = nv + } + return nm, nil + case []any: + a := []any{} + for _, el := range tv { + ne, err := findAndInterpolateAllKeys(el, state) + if err != nil { + return nil, err + } + + a = append(a, ne) + } + return a, nil + } + + return nil, fmt.Errorf("cannot interpolate item %+v of type %T", input, input) +} diff --git a/core/services/workflows/state_test.go b/core/services/workflows/state_test.go new file mode 100644 index 00000000000..9a0fadd02bd --- /dev/null +++ b/core/services/workflows/state_test.go @@ -0,0 +1,265 @@ +package workflows + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +func TestInterpolateKey(t *testing.T) { + val, err := values.NewMap( + map[string]any{ + "reports": map[string]any{ + "inner": "key", + }, + "reportsList": []any{ + "listElement", + }, + }, + ) + require.NoError(t, err) + + testCases := []struct { + name string + key string + state *executionState + expected any + errMsg string + }{ + { + name: "digging into a string", + key: "evm_median.outputs.reports", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: values.NewString(""), + }, + }, + }, + }, + errMsg: "could not interpolate ref part `reports` in ``", + }, + { + name: "ref doesn't exist", + key: "evm_median.outputs.reports", + state: &executionState{ + steps: map[string]*stepState{}, + }, + errMsg: "could not find ref `evm_median`", + }, + { + name: "less than 2 parts", + key: "evm_median", + state: &executionState{ + steps: map[string]*stepState{}, + }, + errMsg: "must have at least two parts", + }, + { + name: "second part isn't `inputs` or `outputs`", + key: "evm_median.foo", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: values.NewString(""), + }, + }, + }, + }, + errMsg: "second part must be `inputs` or `outputs`", + }, + { + name: "outputs has errored", + key: "evm_median.outputs", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + err: errors.New("catastrophic error"), + }, + }, + }, + }, + errMsg: "step has errored", + }, + { + name: "digging into a recursive map", + key: "evm_median.outputs.reports.inner", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + expected: "key", + }, + { + name: "missing key in map", + key: "evm_median.outputs.reports.missing", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + errMsg: "could not find ref part `missing` in", + }, + { + name: "digging into an array", + key: "evm_median.outputs.reportsList.0", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + expected: "listElement", + }, + { + name: "digging into an array that's too small", + key: "evm_median.outputs.reportsList.2", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + errMsg: "cannot fetch index 2", + }, + { + name: "digging into an array with a string key", + key: "evm_median.outputs.reportsList.notAString", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + errMsg: "could not interpolate ref part `notAString` in `[listElement]`: `notAString` is not convertible to an int", + }, + { + name: "digging into an array with a negative index", + key: "evm_median.outputs.reportsList.-1", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + errMsg: "could not interpolate ref part `-1` in `[listElement]`: index -1 must be a positive number", + }, + { + name: "empty element", + key: "evm_median.outputs..notAString", + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: val, + }, + }, + }, + }, + errMsg: "could not find ref part `` in", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(st *testing.T) { + got, err := interpolateKey(tc.key, tc.state) + if tc.errMsg != "" { + require.ErrorContains(st, err, tc.errMsg) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expected, got) + } + }) + } +} + +func TestInterpolateInputsFromState(t *testing.T) { + testCases := []struct { + name string + inputs map[string]any + state *executionState + expected any + errMsg string + }{ + { + name: "substituting with a variable that exists", + inputs: map[string]any{ + "shouldnotinterpolate": map[string]any{ + "shouldinterpolate": "$(evm_median.outputs)", + }, + }, + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: values.NewString(""), + }, + }, + }, + }, + expected: map[string]any{ + "shouldnotinterpolate": map[string]any{ + "shouldinterpolate": "", + }, + }, + }, + { + name: "no substitution required", + inputs: map[string]any{ + "foo": "bar", + }, + state: &executionState{ + steps: map[string]*stepState{ + "evm_median": { + outputs: &stepOutput{ + value: values.NewString(""), + }, + }, + }, + }, + expected: map[string]any{ + "foo": "bar", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(st *testing.T) { + got, err := findAndInterpolateAllKeys(tc.inputs, tc.state) + if tc.errMsg != "" { + require.ErrorContains(st, err, tc.errMsg) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expected, got) + } + }) + } +} diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go new file mode 100644 index 00000000000..bf8394af610 --- /dev/null +++ b/core/services/workflows/workflow.go @@ -0,0 +1,23 @@ +package workflows + +import "gopkg.in/yaml.v3" + +type Capability struct { + Type string `yaml:"type"` + Ref string `yaml:"ref"` + Inputs map[string]any `yaml:"inputs"` + Config map[string]any `yaml:"config"` +} + +type Workflow struct { + Triggers []Capability `yaml:"triggers"` + Actions []Capability `yaml:"actions"` + Consensus []Capability `yaml:"consensus"` + Targets []Capability `yaml:"targets"` +} + +func Parse(yamlWorkflow string) (*Workflow, error) { + wf := &Workflow{} + err := yaml.Unmarshal([]byte(yamlWorkflow), wf) + return wf, err +} diff --git a/go.mod b/go.mod index effdb90b062..979f6b03a4e 100644 --- a/go.mod +++ b/go.mod @@ -108,6 +108,7 @@ require ( google.golang.org/protobuf v1.32.0 gopkg.in/guregu/null.v4 v4.0.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -326,7 +327,6 @@ require ( gopkg.in/guregu/null.v2 v2.1.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect nhooyr.io/websocket v1.8.7 // indirect pgregory.net/rapid v0.5.5 // indirect rsc.io/tmplfunc v0.0.3 // indirect