Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keystone deployment pr #12546

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404151628-2fb437ef3814
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
8 changes: 4 additions & 4 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 h1:WCcC4vZDS1tYNxjWlwRJZQy28r8CMoggKnxNzxsVDMQ=
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
Expand Down Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI=
github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 h1:fY2wMtlr/VQxPyVVQdi1jFvQHi0VbDnGGVXzLKOZTOY=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404151628-2fb437ef3814 h1:8whef64m7tpUtT6Bm59f1ZVN/xtpV04ay1LwkVJ69iM=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404151628-2fb437ef3814/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
74 changes: 65 additions & 9 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package workflows
import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/pelletier/go-toml"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/mercury"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/targets"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
Expand All @@ -17,12 +20,12 @@ import (

const hardcodedWorkflow = `
triggers:
- type: "on_mercury_report"
- type: "mercury-trigger"
config:
feedlist:
- "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD
- "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD
- "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD
feedIds:
- "0x1111111111111111111100000000000000000000000000000000000000000000"
- "0x2222222222222222222200000000000000000000000000000000000000000000"
- "0x3333333333333333333300000000000000000000000000000000000000000000"

consensus:
- type: "offchain_reporting"
Expand All @@ -35,13 +38,13 @@ consensus:
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
heartbeat: 3600
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
heartbeat: 3600
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: "30m"
heartbeat: 3600
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"
Expand Down Expand Up @@ -100,9 +103,62 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser

func NewDelegate(logger logger.Logger, registry types.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer) *Delegate {
// NOTE: we temporarily do registration inside NewDelegate, this will be moved out of job specs in the future
_ = targets.InitializeWrite(registry, legacyEVMChains, logger)
err := targets.InitializeWrite(registry, legacyEVMChains, logger)
if err != nil {
logger.Errorw("could not initialize writes", err)
}

trigger := triggers.NewMercuryTriggerService()
err = registry.Add(context.Background(), trigger)
if err != nil {
logger.Errorw("could not add mercury trigger to registry", err)
}

go mercuryEventLoop(trigger, logger)

return &Delegate{logger: logger, registry: registry}

}

func mercuryEventLoop(trigger *triggers.MercuryTriggerService, logger logger.Logger) {
sleepSec := 60
ticker := time.NewTicker(time.Duration(sleepSec) * time.Second)
defer ticker.Stop()

prices := []int64{300000, 2000, 5000000}

for range ticker.C {
for i := range prices {
prices[i] = prices[i] + 1
}

reports := []triggers.FeedReport{
{
FeedID: mercury.FeedID("0x1111111111111111111100000000000000000000000000000000000000000000").Bytes(),
FullReport: []byte{},
BenchmarkPrice: prices[0],
ObservationTimestamp: time.Now().Unix(),
},
{
FeedID: mercury.FeedID("0x2222222222222222222200000000000000000000000000000000000000000000").Bytes(),
FullReport: []byte{},
BenchmarkPrice: prices[1],
ObservationTimestamp: time.Now().Unix(),
},
{
FeedID: mercury.FeedID("0x3333333333333333333300000000000000000000000000000000000000000000").Bytes(),
FullReport: []byte{},
BenchmarkPrice: prices[2],
ObservationTimestamp: time.Now().Unix(),
},
}

logger.Infow("New set of Mercury reports", "timestamp", time.Now().Unix(), "payload", reports)
err := trigger.ProcessReport(reports)
if err != nil {
logger.Errorw("failed to process Mercury reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports)
}
}
}

func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
Expand Down
8 changes: 5 additions & 3 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ LOOP:
return nil
}

// If the capability is already cached, that means we've already registered it
// If the capability already exists, that means we've already registered it
if s.capability != nil {
return nil
}
Expand Down Expand Up @@ -129,7 +129,7 @@ LOOP:

innerErr = cc.RegisterToWorkflow(ctx, reg)
if innerErr != nil {
return fmt.Errorf("failed to register to workflow: %+v", reg)
return fmt.Errorf("failed to register to workflow (%+v): %w", reg, innerErr)
}

s.capability = cc
Expand Down Expand Up @@ -375,6 +375,7 @@ func (e *Engine) queueIfReady(state executionState, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.Infow("finishing execution", "executionID", executionID, "status", status)
err := e.executionStates.updateStatus(ctx, executionID, status)
if err != nil {
return err
Expand Down Expand Up @@ -408,7 +409,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
stepState.outputs.err = err
stepState.status = statusErrored
} else {
e.logger.Debugw("step executed successfully", "executionID", msg.state.executionID, "stepRef", msg.stepRef, "outputs", outputs)
e.logger.Infow("step executed successfully", "executionID", msg.state.executionID, "stepRef", msg.stepRef, "outputs", outputs)
stepState.outputs.value = outputs
stepState.status = statusCompleted
}
Expand Down Expand Up @@ -564,6 +565,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
// - that there are no step `ref` called `trigger` as this is reserved for any triggers
// - that there are no duplicate `ref`s
// - that the `ref` for any triggers is empty -- and filled in with `trigger`
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

workflow, err := Parse(cfg.Spec)
Expand Down
6 changes: 3 additions & 3 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
const (
simpleWorkflow = `
triggers:
- type: "on_mercury_report"
- type: "mercury-trigger"
config:
feedlist:
- "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD
Expand Down Expand Up @@ -164,7 +164,7 @@ targets:
func mockTrigger(t *testing.T) (capabilities.TriggerCapability, capabilities.CapabilityResponse) {
mt := &mockTriggerCapability{
CapabilityInfo: capabilities.MustNewCapabilityInfo(
"on_mercury_report",
"mercury-trigger",
capabilities.CapabilityTypeTrigger,
"issues a trigger when a mercury report is received.",
"v1.0.0",
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) {
const (
multiStepWorkflow = `
triggers:
- type: "on_mercury_report"
- type: "mercury-trigger"
config:
feedlist:
- "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD
Expand Down
14 changes: 14 additions & 0 deletions core/services/workflows/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ func deepMap(input any, transform func(el string) (any, error)) (any, error) {
}

return nv, nil
case mapping:
// coerce mapping to map[string]any
mp := map[string]any(tv)

nm := map[string]any{}
for k, v := range mp {
nv, err := deepMap(v, transform)
if err != nil {
return nil, err
}

nm[k] = nv
}
return nm, nil
case map[string]any:
nm := map[string]any{}
for k, v := range tv {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
triggers:
- type: on_mercury_report@1
- type: mercury-trigger@1
ref: report_data
config:
boolean_coercion:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"$id": "https://github.com/smartcontractkit/chainlink/v2/core/services/workflows/workflow-spec",
"$ref": "#/$defs/workflowSpec",
"$defs": {
"mapping": {
"type": "object"
},
"stepDefinition": {
"properties": {
"type": {
Expand All @@ -14,10 +17,10 @@
"pattern": "^[a-z0-9_]+$"
},
"inputs": {
"type": "object"
"$ref": "#/$defs/mapping"
},
"config": {
"type": "object"
"$ref": "#/$defs/mapping"
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -63,4 +66,4 @@
]
}
}
}
}
Loading
Loading