Skip to content

Commit

Permalink
[KS-241] More metadata passed to encoder (#13377)
Browse files Browse the repository at this point in the history
* [KS-241] More metadata passed to encoder

* Pass the step ref in the RequestMetadata

* Bump common

* Use updated NewCapabilityInfo

* Pass DON ID to RequestMetadata

* Bump common

* Write_Target fix and hardcoded DON ID

* fix test

* fix engine test

---------

Co-authored-by: Cedric Cordenier <[email protected]>
  • Loading branch information
bolekk and cedric-cordenier authored May 31, 2024
1 parent 527fdd5 commit 390ee19
Show file tree
Hide file tree
Showing 19 changed files with 123 additions and 91 deletions.
5 changes: 5 additions & 0 deletions .changeset/angry-spiders-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal handle new metadata fields
6 changes: 0 additions & 6 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestRegistry(t *testing.T) {
capabilities.CapabilityTypeAction,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -71,7 +70,6 @@ func TestRegistry_NoDuplicateIDs(t *testing.T) {
capabilities.CapabilityTypeAction,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

Expand All @@ -84,7 +82,6 @@ func TestRegistry_NoDuplicateIDs(t *testing.T) {
capabilities.CapabilityTypeConsensus,
"capability-2-description",
"v1.0.0",
nil,
)
require.NoError(t, err)
c2 := &mockCapability{CapabilityInfo: ci}
Expand All @@ -109,7 +106,6 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
capabilities.CapabilityTypeAction,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

Expand All @@ -130,7 +126,6 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
capabilities.CapabilityTypeTarget,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -164,7 +159,6 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
capabilities.CapabilityTypeConsensus,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions core/capabilities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (s *registrySyncer) launch(ctx context.Context) {
}
// NOTE: temporary hard-coded capabilities
capId := "streams-trigger"
triggerInfo := capabilities.CapabilityInfo{
ID: capId,
CapabilityType: capabilities.CapabilityTypeTrigger,
Description: "Remote Trigger",
Version: "0.0.1",
DON: &triggerCapabilityDonInfo,
triggerInfo, err := capabilities.NewRemoteCapabilityInfo(
capId,
capabilities.CapabilityTypeTrigger,
"Remote Trigger",
"v0.0.1",
&triggerCapabilityDonInfo,
)
if err != nil {
s.lggr.Errorw("failed to create capability info for streams-trigger", "error", err)
return
}

myId := s.peerWrapper.GetPeer().ID().String()
config := remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 20000,
Expand Down
8 changes: 6 additions & 2 deletions core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package targets

import (
"context"
"encoding/hex"
"fmt"
"math/big"

Expand Down Expand Up @@ -34,7 +35,6 @@ func NewWriteTarget(lggr logger.Logger, name string, cr commontypes.ContractRead
capabilities.CapabilityTypeTarget,
"Write target.",
"v1.0.0",
nil,
)

logger := lggr.Named("WriteTarget")
Expand Down Expand Up @@ -102,13 +102,17 @@ func (cap *WriteTarget) Execute(ctx context.Context, request capabilities.Capabi

// TODO: validate encoded report is prefixed with workflowID and executionID that match the request meta

rawExecutionID, err := hex.DecodeString(request.Metadata.WorkflowExecutionID)
if err != nil {
return nil, err
}
// Check whether value was already transmitted on chain
queryInputs := struct {
Receiver string
WorkflowExecutionID []byte
}{
Receiver: reqConfig.Address,
WorkflowExecutionID: []byte(request.Metadata.WorkflowExecutionID),
WorkflowExecutionID: rawExecutionID,
}
var transmitter common.Address
if err = cap.cr.GetLatestValue(ctx, "forwarder", "getTransmitter", queryInputs, &transmitter); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240530140143-638cade4f7f9
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240531161624-ddf27dafed93
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240530140143-638cade4f7f9 h1:owNVqkBje7u+q7eATcvEVfaEsGC3nPladp6Gu19+W10=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240530140143-638cade4f7f9/go.mod h1:DUZccDEW98n+J1mhdWGO7wr/Njad9p9Fzks839JN7Rs=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240531161624-ddf27dafed93 h1:LEukS7+ZNK61mfmZiieD2csK52n/RGg+M60LCuc6zPQ=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240531161624-ddf27dafed93/go.mod h1:DUZccDEW98n+J1mhdWGO7wr/Njad9p9Fzks839JN7Rs=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
61 changes: 35 additions & 26 deletions core/services/relay/evm/cap_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,48 +68,57 @@ func (c *capEncoder) Encode(ctx context.Context, input values.Map) ([]byte, erro
if err != nil {
return nil, err
}
// prepend workflowID and workflowExecutionID to the encoded user data
workflowIDbytes, donIDBytes, executionIDBytes, workflowOwnerBytes, err := extractIDs(unwrappedMap)

metaMap, ok := input.Underlying[consensustypes.MetadataFieldName]
if !ok {
return nil, fmt.Errorf("expected metadata field to be present: %s", consensustypes.MetadataFieldName)
}

var meta consensustypes.Metadata
err = metaMap.UnwrapTo(&meta)
if err != nil {
return nil, err
}
return append(append(append(append(workflowIDbytes, donIDBytes...), executionIDBytes...), workflowOwnerBytes...), userPayload...), nil

return prependMetadataFields(meta, userPayload)
}

func decodeID(input map[string]any, key string, idLen int) ([]byte, error) {
id, ok := input[key].(string)
if !ok {
return nil, fmt.Errorf("expected %s to be a string", key)
func prependMetadataFields(meta consensustypes.Metadata, userPayload []byte) ([]byte, error) {
// TODO: use all 7 fields from Metadata struct
result := []byte{}
workflowID, err := decodeID(meta.WorkflowID, idLen)
if err != nil {
return nil, err
}
result = append(result, workflowID...)

b, err := hex.DecodeString(id)
donID, err := decodeID(meta.DONID, 4)
if err != nil {
return nil, err
}
result = append(result, donID...)

if len(b) != idLen {
return nil, fmt.Errorf("incorrect length for id %s, expected %d bytes, got %d", id, idLen, len(b))
executionID, err := decodeID(meta.ExecutionID, idLen)
if err != nil {
return nil, err
}
result = append(result, executionID...)

return b, nil
}

// extract workflowID and executionID from the input map, validate and align to 32 bytes
// NOTE: consider requiring them to be exactly 32 bytes to avoid issues with padding
func extractIDs(input map[string]any) ([]byte, []byte, []byte, []byte, error) {
workflowID, err := decodeID(input, consensustypes.WorkflowIDFieldName, idLen)
workflowOwner, err := decodeID(meta.WorkflowOwner, 20)
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}
result = append(result, workflowOwner...)
return append(result, userPayload...), nil
}

// TODO: source donID and workflowOwner from somewhere
donID := []byte{0, 1, 2, 3}
workflowOwner := make([]byte, 20)

executionID, err := decodeID(input, consensustypes.ExecutionIDFieldName, idLen)
func decodeID(id string, expectedLen int) ([]byte, error) {
b, err := hex.DecodeString(id)
if err != nil {
return nil, nil, nil, nil, err
return nil, err
}

return workflowID, donID, executionID, workflowOwner, nil
if len(b) != expectedLen {
return nil, fmt.Errorf("incorrect length for id %s, expected %d bytes, got %d", id, expectedLen, len(b))
}
return b, nil
}
37 changes: 20 additions & 17 deletions core/services/relay/evm/cap_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ func TestEVMEncoder_SingleField(t *testing.T) {

// output of a DF2.0 aggregator + metadata fields appended by OCR
input := map[string]any{
"Full_reports": []any{reportA, reportB},
consensustypes.WorkflowIDFieldName: workflowID,
consensustypes.ExecutionIDFieldName: executionID,
"Full_reports": []any{reportA, reportB},
consensustypes.MetadataFieldName: getMetadata(workflowID),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
Expand Down Expand Up @@ -77,10 +76,9 @@ func TestEVMEncoder_TwoFields(t *testing.T) {

// output of a DF2.0 aggregator + metadata fields appended by OCR
input := map[string]any{
"Prices": []any{big.NewInt(234), big.NewInt(456)},
"Timestamps": []any{int64(111), int64(222)},
consensustypes.WorkflowIDFieldName: workflowID,
consensustypes.ExecutionIDFieldName: executionID,
"Prices": []any{big.NewInt(234), big.NewInt(456)},
"Timestamps": []any{int64(111), int64(222)},
consensustypes.MetadataFieldName: getMetadata(workflowID),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
Expand Down Expand Up @@ -121,8 +119,7 @@ func TestEVMEncoder_Tuple(t *testing.T) {
"Prices": []any{big.NewInt(234), big.NewInt(456)},
"Timestamps": []any{int64(111), int64(222)},
},
consensustypes.WorkflowIDFieldName: workflowID,
consensustypes.ExecutionIDFieldName: executionID,
consensustypes.MetadataFieldName: getMetadata(workflowID),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
Expand Down Expand Up @@ -170,8 +167,7 @@ func TestEVMEncoder_ListOfTuples(t *testing.T) {
"Timestamp": int64(222),
},
},
consensustypes.WorkflowIDFieldName: workflowID,
consensustypes.ExecutionIDFieldName: executionID,
consensustypes.MetadataFieldName: getMetadata(workflowID),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
Expand Down Expand Up @@ -207,9 +203,8 @@ func TestEVMEncoder_InvalidIDs(t *testing.T) {
// output of a DF2.0 aggregator + metadata fields appended by OCR
// using an invalid ID
input := map[string]any{
"Full_reports": []any{reportA, reportB},
consensustypes.WorkflowIDFieldName: invalidID,
consensustypes.ExecutionIDFieldName: executionID,
"Full_reports": []any{reportA, reportB},
consensustypes.MetadataFieldName: getMetadata(invalidID),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
Expand All @@ -218,12 +213,20 @@ func TestEVMEncoder_InvalidIDs(t *testing.T) {

// using valid hex string of wrong length
input = map[string]any{
"full_reports": []any{reportA, reportB},
consensustypes.WorkflowIDFieldName: wrongLength,
consensustypes.ExecutionIDFieldName: executionID,
"Full_reports": []any{reportA, reportB},
consensustypes.MetadataFieldName: getMetadata(wrongLength),
}
wrapped, err = values.NewMap(input)
require.NoError(t, err)
_, err = enc.Encode(testutils.Context(t), *wrapped)
assert.ErrorContains(t, err, "incorrect length for id")
}

func getMetadata(cid string) consensustypes.Metadata {
return consensustypes.Metadata{
WorkflowID: cid,
DONID: donID,
ExecutionID: executionID,
WorkflowOwner: workflowOwnerID,
}
}
17 changes: 10 additions & 7 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
}

cfg := Config{
Lggr: d.logger,
Spec: spec.WorkflowSpec.Workflow,
WorkflowID: spec.WorkflowSpec.WorkflowID,
Registry: d.registry,
DONInfo: dinfo,
PeerID: d.peerID,
Store: d.store,
Lggr: d.logger,
Spec: spec.WorkflowSpec.Workflow,
WorkflowID: spec.WorkflowSpec.WorkflowID,
WorkflowOwner: spec.WorkflowSpec.WorkflowOwner,
WorkflowName: spec.WorkflowSpec.WorkflowName,
Registry: d.registry,
DONInfo: dinfo,
PeerID: d.peerID,
Store: d.store,
}
engine, err := NewEngine(cfg)
if err != nil {
Expand Down Expand Up @@ -90,6 +92,7 @@ func initializeDONInfo(lggr logger.Logger) (*capabilities.DON, error) {
}

return &capabilities.DON{
ID: "00010203",
Members: p2pIDs,
Config: capabilities.DONConfig{
SharedSecret: key,
Expand Down
17 changes: 15 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro

triggerRegRequest := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
WorkflowID: e.workflow.id,
WorkflowDonID: e.donInfo.ID,
WorkflowName: e.workflow.name,
WorkflowOwner: e.workflow.owner,
},
Config: tc,
Inputs: triggerInputs,
Expand Down Expand Up @@ -651,6 +654,9 @@ func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepReque
Metadata: capabilities.RequestMetadata{
WorkflowID: msg.state.WorkflowID,
WorkflowExecutionID: msg.state.ExecutionID,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.name,
WorkflowDonID: e.donInfo.ID,
},
}

Expand All @@ -673,7 +679,10 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) er
}
deregRequest := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
WorkflowID: e.workflow.id,
WorkflowDonID: e.donInfo.ID,
WorkflowName: e.workflow.name,
WorkflowOwner: e.workflow.owner,
},
Inputs: triggerInputs,
Config: t.config,
Expand Down Expand Up @@ -744,6 +753,8 @@ func (e *Engine) Close() error {
type Config struct {
Spec string
WorkflowID string
WorkflowOwner string
WorkflowName string
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
Expand Down Expand Up @@ -820,6 +831,8 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
}

workflow.id = cfg.WorkflowID
workflow.owner = cfg.WorkflowOwner
workflow.name = cfg.WorkflowName

// Instantiate semaphore to put a limit on the number of workers
newWorkerCh := make(chan struct{}, cfg.MaxWorkerLimit)
Expand Down
Loading

0 comments on commit 390ee19

Please sign in to comment.