From 7ec99efc64832750825f8bc6711fb9794d6e40df Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Mon, 5 Aug 2024 14:29:14 +0100 Subject: [PATCH] changes to support deterministic message hash in the remote target (#13935) --- .changeset/polite-crabs-pretend.md | 5 ++ .../keystone_contracts_setup.go | 60 +++++++++++-------- .../integration_tests/mock_libocr.go | 15 +++-- core/capabilities/integration_tests/setup.go | 11 ++-- .../integration_tests/streams_test.go | 6 +- core/capabilities/launcher.go | 2 + core/capabilities/launcher_test.go | 2 +- .../remote/target/endtoend_test.go | 2 +- core/capabilities/remote/target/server.go | 44 ++++++++++++-- .../capabilities/remote/target/server_test.go | 47 +++++++++++++-- core/capabilities/remote/trigger_publisher.go | 8 ++- .../remote/trigger_publisher_test.go | 2 +- .../capabilities/remote/trigger_subscriber.go | 8 ++- .../remote/trigger_subscriber_test.go | 2 +- core/capabilities/streams/trigger_test.go | 2 +- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- ...deploy_initialize_capabilities_registry.go | 15 +++-- core/services/registrysyncer/syncer.go | 26 +++++--- core/services/registrysyncer/syncer_test.go | 3 +- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- 26 files changed, 202 insertions(+), 82 deletions(-) create mode 100644 .changeset/polite-crabs-pretend.md diff --git a/.changeset/polite-crabs-pretend.md b/.changeset/polite-crabs-pretend.md new file mode 100644 index 00000000000..f8ea63b45c1 --- /dev/null +++ b/.changeset/polite-crabs-pretend.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal ensure remote target request hash is deterministic diff --git a/core/capabilities/integration_tests/keystone_contracts_setup.go b/core/capabilities/integration_tests/keystone_contracts_setup.go index 42269d1bd45..004a4c32a3a 100644 --- a/core/capabilities/integration_tests/keystone_contracts_setup.go +++ b/core/capabilities/integration_tests/keystone_contracts_setup.go @@ -91,8 +91,8 @@ func peerToNode(nopID uint32, p peer) (kcr.CapabilitiesRegistryNodeParams, error }, nil } -func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workflowDonPeers []peer, triggerDonPeers []peer, - targetDonPeerIDs []peer, +func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workflowDon donInfo, triggerDon donInfo, + targetDon donInfo, transactOpts *bind.TransactOpts, backend *ethBackend) common.Address { addr, _, reg, err := kcr.DeployCapabilitiesRegistry(transactOpts, backend) require.NoError(t, err) @@ -157,7 +157,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl nopID := recLog.NodeOperatorId nodes := []kcr.CapabilitiesRegistryNodeParams{} - for _, wfPeer := range workflowDonPeers { + for _, wfPeer := range workflowDon.peerIDs { n, innerErr := peerToNode(nopID, wfPeer) require.NoError(t, innerErr) @@ -165,7 +165,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl nodes = append(nodes, n) } - for _, triggerPeer := range triggerDonPeers { + for _, triggerPeer := range triggerDon.peerIDs { n, innerErr := peerToNode(nopID, triggerPeer) require.NoError(t, innerErr) @@ -173,7 +173,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl nodes = append(nodes, n) } - for _, targetPeer := range targetDonPeerIDs { + for _, targetPeer := range targetDon.peerIDs { n, innerErr := peerToNode(nopID, targetPeer) require.NoError(t, innerErr) @@ -185,7 +185,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl require.NoError(t, err) // workflow DON - ps, err := peers(workflowDonPeers) + ps, err := peers(workflowDon.peerIDs) require.NoError(t, err) cc := newCapabilityConfig() @@ -199,22 +199,24 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl }, } - workflowDonF := uint8(2) - _, err = reg.AddDON(transactOpts, ps, cfgs, false, true, workflowDonF) + _, err = reg.AddDON(transactOpts, ps, cfgs, false, true, workflowDon.F) require.NoError(t, err) // trigger DON - ps, err = peers(triggerDonPeers) + ps, err = peers(triggerDon.peerIDs) require.NoError(t, err) - triggerDonF := 1 - config := &pb.RemoteTriggerConfig{ - RegistrationRefresh: durationpb.New(20000 * time.Millisecond), - RegistrationExpiry: durationpb.New(60000 * time.Millisecond), - // F + 1 - MinResponsesToAggregate: uint32(triggerDonF) + 1, + triggerCapabilityConfig := newCapabilityConfig() + triggerCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &pb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(60000 * time.Millisecond), + RegistrationExpiry: durationpb.New(60000 * time.Millisecond), + // F + 1 + MinResponsesToAggregate: uint32(triggerDon.F) + 1, + }, } - configb, err := proto.Marshal(config) + + configb, err := proto.Marshal(triggerCapabilityConfig) require.NoError(t, err) cfgs = []kcr.CapabilitiesRegistryCapabilityConfiguration{ @@ -224,22 +226,31 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl }, } - _, err = reg.AddDON(transactOpts, ps, cfgs, true, false, uint8(triggerDonF)) + _, err = reg.AddDON(transactOpts, ps, cfgs, true, false, triggerDon.F) require.NoError(t, err) // target DON - ps, err = peers(targetDonPeerIDs) + ps, err = peers(targetDon.peerIDs) + require.NoError(t, err) + + targetCapabilityConfig := newCapabilityConfig() + targetCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTargetConfig{ + RemoteTargetConfig: &pb.RemoteTargetConfig{ + RequestHashExcludedAttributes: []string{"signed_report.Signatures"}, + }, + } + + remoteTargetConfigBytes, err := proto.Marshal(targetCapabilityConfig) require.NoError(t, err) cfgs = []kcr.CapabilitiesRegistryCapabilityConfiguration{ { CapabilityId: wid, - Config: ccb, + Config: remoteTargetConfigBytes, }, } - targetDonF := uint8(1) - _, err = reg.AddDON(transactOpts, ps, cfgs, true, false, targetDonF) + _, err = reg.AddDON(transactOpts, ps, cfgs, true, false, targetDon.F) require.NoError(t, err) backend.Commit() @@ -253,19 +264,18 @@ func newCapabilityConfig() *pb.CapabilityConfig { } } -func setupForwarderContract(t *testing.T, workflowDonPeers []peer, workflowDonId uint32, - configVersion uint32, f uint8, +func setupForwarderContract(t *testing.T, workflowDon donInfo, transactOpts *bind.TransactOpts, backend *ethBackend) (common.Address, *forwarder.KeystoneForwarder) { addr, _, fwd, err := forwarder.DeployKeystoneForwarder(transactOpts, backend) require.NoError(t, err) backend.Commit() var signers []common.Address - for _, p := range workflowDonPeers { + for _, p := range workflowDon.peerIDs { signers = append(signers, common.HexToAddress(p.Signer)) } - _, err = fwd.SetConfig(transactOpts, workflowDonId, configVersion, f, signers) + _, err = fwd.SetConfig(transactOpts, workflowDon.ID, workflowDon.ConfigVersion, workflowDon.F, signers) require.NoError(t, err) backend.Commit() diff --git a/core/capabilities/integration_tests/mock_libocr.go b/core/capabilities/integration_tests/mock_libocr.go index 39c53d48aff..14ccdce6000 100644 --- a/core/capabilities/integration_tests/mock_libocr.go +++ b/core/capabilities/integration_tests/mock_libocr.go @@ -157,10 +157,6 @@ func (m *mockLibOCR) simulateProtocolRound(ctx context.Context) error { Signer: commontypes.OracleID(i), Signature: sig, }) - - if uint8(len(signatures)) == m.f+1 { - break - } } for _, node := range m.nodes { @@ -181,7 +177,16 @@ func (m *mockLibOCR) simulateProtocolRound(ctx context.Context) error { continue } - err = node.Transmit(ctx, types.ConfigDigest{}, 0, report, signatures) + // For each node select a random set of f+1 signatures to mimic libocr behaviour + s := rand.NewSource(time.Now().UnixNano()) + r := rand.New(s) + indices := r.Perm(len(signatures)) + selectedSignatures := make([]types.AttributedOnchainSignature, m.f+1) + for i := 0; i < int(m.f+1); i++ { + selectedSignatures[i] = signatures[indices[i]] + } + + err = node.Transmit(ctx, types.ConfigDigest{}, 0, report, selectedSignatures) if err != nil { return fmt.Errorf("failed to transmit report: %w", err) } diff --git a/core/capabilities/integration_tests/setup.go b/core/capabilities/integration_tests/setup.go index 0095d2fd9de..69b8c3eaa0a 100644 --- a/core/capabilities/integration_tests/setup.go +++ b/core/capabilities/integration_tests/setup.go @@ -68,8 +68,8 @@ func setupStreamDonsWithTransmissionSchedule(ctx context.Context, t *testing.T, lggr.SetLogLevel(TestLogLevel) ethBlockchain, transactor := setupBlockchain(t, 1000, 1*time.Second) - capabilitiesRegistryAddr := setupCapabilitiesRegistryContract(ctx, t, workflowDonInfo.peerIDs, triggerDonInfo.peerIDs, targetDonInfo.peerIDs, transactor, ethBlockchain) - forwarderAddr, _ := setupForwarderContract(t, workflowDonInfo.peerIDs, workflowDonInfo.ID, 1, workflowDonInfo.F, transactor, ethBlockchain) + capabilitiesRegistryAddr := setupCapabilitiesRegistryContract(ctx, t, workflowDonInfo, triggerDonInfo, targetDonInfo, transactor, ethBlockchain) + forwarderAddr, _ := setupForwarderContract(t, workflowDonInfo, transactor, ethBlockchain) consumerAddr, consumer := setupConsumerContract(t, transactor, ethBlockchain, forwarderAddr, workflowOwnerID, workflowName) var feedIDs []string @@ -259,9 +259,10 @@ func createDonInfo(t *testing.T, don don) donInfo { triggerDonInfo := donInfo{ DON: commoncap.DON{ - ID: don.id, - Members: donPeers, - F: don.f, + ID: don.id, + Members: donPeers, + F: don.f, + ConfigVersion: 1, }, peerIDs: peerIDs, keys: donKeys, diff --git a/core/capabilities/integration_tests/streams_test.go b/core/capabilities/integration_tests/streams_test.go index 6216e36c856..7be392932f8 100644 --- a/core/capabilities/integration_tests/streams_test.go +++ b/core/capabilities/integration_tests/streams_test.go @@ -22,9 +22,9 @@ func Test_AllAtOnceTransmissionSchedule(t *testing.T) { // The don IDs set in the below calls are inferred from the order in which the dons are added to the capabilities registry // in the setupCapabilitiesRegistryContract function, should this order change the don IDs will need updating. - workflowDonInfo := createDonInfo(t, don{id: 1, numNodes: 5, f: 1}) - triggerDonInfo := createDonInfo(t, don{id: 2, numNodes: 7, f: 1}) - targetDonInfo := createDonInfo(t, don{id: 3, numNodes: 4, f: 1}) + workflowDonInfo := createDonInfo(t, don{id: 1, numNodes: 7, f: 2}) + triggerDonInfo := createDonInfo(t, don{id: 2, numNodes: 7, f: 2}) + targetDonInfo := createDonInfo(t, don{id: 3, numNodes: 4, f: 2}) consumer, feedIDs, triggerSink := setupStreamDonsWithTransmissionSchedule(ctx, t, workflowDonInfo, triggerDonInfo, targetDonInfo, 3, "2s", "allAtOnce") diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index b4ade04127b..b30477e4c83 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -216,6 +216,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync int(remoteDON.F+1), w.lggr, ) + // TODO: We need to implement a custom, Mercury-specific // aggregator here, because there is no guarantee that // all trigger events in the workflow will have the same @@ -358,6 +359,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee case capabilities.CapabilityTypeTarget: newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { return target.NewServer( + c.RemoteTargetConfig, myPeerID, capability.(capabilities.TargetCapability), info, diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index fb3e6837d00..82b03edcecb 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -323,7 +323,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { // The below state describes a Workflow DON (AcceptsWorkflows = true), // which exposes the streams-trigger and write_chain capabilities. // We expect receivers to be wired up and both capabilities to be added to the registry. - var rtc capabilities.RemoteTriggerConfig + rtc := &capabilities.RemoteTriggerConfig{} rtc.ApplyDefaults() state := ®istrysyncer.LocalRegistry{ diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go index 9bbb53d4f66..cfab50f0fe7 100644 --- a/core/capabilities/remote/target/endtoend_test.go +++ b/core/capabilities/remote/target/endtoend_test.go @@ -226,7 +226,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNode := target.NewServer(&commoncap.RemoteTargetConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, capabilityNodeResponseTimeout, lggr) servicetest.Run(t, capabilityNode) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go index ea9caf81eff..39023ffb3fa 100644 --- a/core/capabilities/remote/target/server.go +++ b/core/capabilities/remote/target/server.go @@ -4,10 +4,12 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "sync" "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -24,7 +26,9 @@ import ( // server communicates with corresponding client on remote nodes. type server struct { services.StateMachine - lggr logger.Logger + lggr logger.Logger + + config *commoncap.RemoteTargetConfig peerID p2ptypes.PeerID underlying commoncap.TargetCapability capInfo commoncap.CapabilityInfo @@ -51,9 +55,14 @@ type requestAndMsgID struct { messageID string } -func NewServer(peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, +func NewServer(config *commoncap.RemoteTargetConfig, peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *server { + if config == nil { + lggr.Info("no config provided, using default values") + config = &commoncap.RemoteTargetConfig{} + } return &server{ + config: config, underlying: underlying, peerID: peerID, capInfo: capInfo, @@ -126,11 +135,16 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { return } + msgHash, err := r.getMessageHash(msg) + if err != nil { + r.lggr.Errorw("failed to get message hash", "err", err) + return + } + // A request is uniquely identified by the message id and the hash of the payload to prevent a malicious // actor from sending a different payload with the same message id messageId := GetMessageID(msg) - hash := sha256.Sum256(msg.Payload) - requestID := messageId + hex.EncodeToString(hash[:]) + requestID := messageId + hex.EncodeToString(msgHash[:]) if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok { requestIDs[requestID] = requestIDs[requestID] + 1 @@ -161,12 +175,32 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { reqAndMsgID := r.requestIDToRequest[requestID] - err := reqAndMsgID.request.OnMessage(ctx, msg) + err = reqAndMsgID.request.OnMessage(ctx, msg) if err != nil { r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err) } } +func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) { + req, err := pb.UnmarshalCapabilityRequest(msg.Payload) + if err != nil { + return [32]byte{}, fmt.Errorf("failed to unmarshal capability request: %w", err) + } + + for _, path := range r.config.RequestHashExcludedAttributes { + if !req.Inputs.DeleteAtPath(path) { + return [32]byte{}, fmt.Errorf("failed to delete attribute from map at path: %s", path) + } + } + + reqBytes, err := pb.MarshalCapabilityRequest(req) + if err != nil { + return [32]byte{}, fmt.Errorf("failed to marshal capability request: %w", err) + } + hash := sha256.Sum256(reqBytes) + return hash, nil +} + func GetMessageID(msg *types.MessageBody) string { return string(msg.MessageId) } diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go index a5aa45efd06..2460a2dd0f7 100644 --- a/core/capabilities/remote/target/server_test.go +++ b/core/capabilities/remote/target/server_test.go @@ -2,6 +2,7 @@ package target_test import ( "context" + "strconv" "testing" "time" @@ -11,6 +12,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -18,12 +20,48 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +func Test_Server_ExcludesNonDeterministicInputAttributes(t *testing.T) { + ctx := testutils.Context(t) + + numCapabilityPeers := 4 + + callers, srvcs := testRemoteTargetServer(ctx, t, &commoncap.RemoteTargetConfig{RequestHashExcludedAttributes: []string{"signed_report.Signatures"}}, + &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + + for idx, caller := range callers { + rawInputs := map[string]any{ + "signed_report": map[string]any{"Signatures": "sig" + strconv.Itoa(idx), "Price": 20}, + } + + inputs, err := values.NewMap(rawInputs) + require.NoError(t, err) + + _, err = caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Inputs: inputs, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_OK, msg.Error) + } + } + closeServices(t, srvcs) +} + func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { ctx := testutils.Context(t) numCapabilityPeers := 4 - callers, srvcs := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + callers, srvcs := testRemoteTargetServer(ctx, t, &commoncap.RemoteTargetConfig{}, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -50,7 +88,7 @@ func Test_Server_InsufficientCallers(t *testing.T) { numCapabilityPeers := 4 - callers, srvcs := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) + callers, srvcs := testRemoteTargetServer(ctx, t, &commoncap.RemoteTargetConfig{}, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -77,7 +115,7 @@ func Test_Server_CapabilityError(t *testing.T) { numCapabilityPeers := 4 - callers, srvcs := testRemoteTargetServer(ctx, t, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) + callers, srvcs := testRemoteTargetServer(ctx, t, &commoncap.RemoteTargetConfig{}, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -100,6 +138,7 @@ func Test_Server_CapabilityError(t *testing.T) { } func testRemoteTargetServer(ctx context.Context, t *testing.T, + config *commoncap.RemoteTargetConfig, underlying commoncap.TargetCapability, numWorkflowPeers int, workflowDonF uint8, numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration) ([]*serverTestClient, []services.Service) { @@ -150,7 +189,7 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T, for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNode := target.NewServer(config, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, capabilityNodeResponseTimeout, lggr) require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 35ce41118f5..c1f2fb32c5a 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -21,7 +21,7 @@ import ( // // TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes. type triggerPublisher struct { - config capabilities.RemoteTriggerConfig + config *capabilities.RemoteTriggerConfig underlying commoncap.TriggerCapability capInfo commoncap.CapabilityInfo capDonInfo commoncap.DON @@ -48,7 +48,11 @@ type pubRegState struct { var _ types.Receiver = &triggerPublisher{} var _ services.Service = &triggerPublisher{} -func NewTriggerPublisher(config capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { +func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { + if config == nil { + lggr.Info("no config provided, using default values") + config = &capabilities.RemoteTriggerConfig{} + } config.ApplyDefaults() return &triggerPublisher{ config: config, diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index 1e3000d20ca..2c4a8518965 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -42,7 +42,7 @@ func TestTriggerPublisher_Register(t *testing.T) { } dispatcher := remoteMocks.NewDispatcher(t) - config := capabilities.RemoteTriggerConfig{ + config := &capabilities.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, MinResponsesToAggregate: 1, diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 0ccbf37c61a..2d038e45c08 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -23,7 +23,7 @@ import ( // // TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes. type triggerSubscriber struct { - config capabilities.RemoteTriggerConfig + config *capabilities.RemoteTriggerConfig capInfo commoncap.CapabilityInfo capDonInfo capabilities.DON capDonMembers map[p2ptypes.PeerID]struct{} @@ -55,11 +55,15 @@ var _ services.Service = &triggerSubscriber{} // TODO makes this configurable with a default const defaultSendChannelBufferSize = 1000 -func NewTriggerSubscriber(config capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { +func NewTriggerSubscriber(config *capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { if aggregator == nil { lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) } + if config == nil { + lggr.Info("no config provided, using default values") + config = &capabilities.RemoteTriggerConfig{} + } config.ApplyDefaults() capDonMembers := make(map[p2ptypes.PeerID]struct{}) for _, member := range capDonInfo.Members { diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 93e962215ab..2e34b03ec5c 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -63,7 +63,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { }) // register trigger - config := capabilities.RemoteTriggerConfig{ + config := &capabilities.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, MinResponsesToAggregate: 1, diff --git a/core/capabilities/streams/trigger_test.go b/core/capabilities/streams/trigger_test.go index cb4cfaa36bc..853f07f2aae 100644 --- a/core/capabilities/streams/trigger_test.go +++ b/core/capabilities/streams/trigger_test.go @@ -87,7 +87,7 @@ func TestStreamsTrigger(t *testing.T) { Members: capMembers, F: uint8(F), } - config := capabilities.RemoteTriggerConfig{ + config := &capabilities.RemoteTriggerConfig{ MinResponsesToAggregate: uint32(F + 1), } subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, capabilities.DON{}, nil, agg, lggr) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 4ee443d46f8..4e250038710 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -22,7 +22,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 github.com/spf13/cobra v1.8.0 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 3ae26beb633..4c8eee4a1db 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1184,8 +1184,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc h1:nNZqLasN8y5huDKX76JUZtni7WkUI36J61//czbJpDM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 h1:rwo/bzqzbhSPBn1CHFfHiQPcMlpBV/hau4TrpJngTJc= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/core/scripts/keystone/src/05_deploy_initialize_capabilities_registry.go b/core/scripts/keystone/src/05_deploy_initialize_capabilities_registry.go index 87622415430..3352267d149 100644 --- a/core/scripts/keystone/src/05_deploy_initialize_capabilities_registry.go +++ b/core/scripts/keystone/src/05_deploy_initialize_capabilities_registry.go @@ -11,10 +11,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/values" @@ -373,8 +374,14 @@ func (c *deployAndInitializeCapabilitiesRegistryCommand) Run(args []string) { panic(err) } - cc = newCapabilityConfig() - ccb, err = proto.Marshal(cc) + targetCapabilityConfig := newCapabilityConfig() + targetCapabilityConfig.RemoteConfig = &capabilitiespb.CapabilityConfig_RemoteTargetConfig{ + RemoteTargetConfig: &capabilitiespb.RemoteTargetConfig{ + RequestHashExcludedAttributes: []string{"signed_report.Signatures"}, + }, + } + + remoteTargetConfigBytes, err := proto.Marshal(targetCapabilityConfig) if err != nil { panic(err) } @@ -382,7 +389,7 @@ func (c *deployAndInitializeCapabilitiesRegistryCommand) Run(args []string) { cfgs = []kcr.CapabilitiesRegistryCapabilityConfiguration{ { CapabilityId: wid, - Config: ccb, + Config: remoteTargetConfigBytes, }, } _, err = reg.AddDON(env.Owner, ps, cfgs, true, false, 1) diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index 4bbfaef5040..9675d86dc86 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -165,12 +165,21 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio return capabilities.CapabilityConfiguration{}, err } - var rtc capabilities.RemoteTriggerConfig - if prtc := cconf.GetRemoteTriggerConfig(); prtc != nil { - rtc.RegistrationRefresh = prtc.RegistrationRefresh.AsDuration() - rtc.RegistrationExpiry = prtc.RegistrationExpiry.AsDuration() - rtc.MinResponsesToAggregate = prtc.MinResponsesToAggregate - rtc.MessageExpiry = prtc.MessageExpiry.AsDuration() + var remoteTriggerConfig *capabilities.RemoteTriggerConfig + var remoteTargetConfig *capabilities.RemoteTargetConfig + + switch cconf.GetRemoteConfig().(type) { + case *capabilitiespb.CapabilityConfig_RemoteTriggerConfig: + prtc := cconf.GetRemoteTriggerConfig() + remoteTriggerConfig = &capabilities.RemoteTriggerConfig{} + remoteTriggerConfig.RegistrationRefresh = prtc.RegistrationRefresh.AsDuration() + remoteTriggerConfig.RegistrationExpiry = prtc.RegistrationExpiry.AsDuration() + remoteTriggerConfig.MinResponsesToAggregate = prtc.MinResponsesToAggregate + remoteTriggerConfig.MessageExpiry = prtc.MessageExpiry.AsDuration() + case *capabilitiespb.CapabilityConfig_RemoteTargetConfig: + prtc := cconf.GetRemoteTargetConfig() + remoteTargetConfig = &capabilities.RemoteTargetConfig{} + remoteTargetConfig.RequestHashExcludedAttributes = prtc.RequestHashExcludedAttributes } dc, err := values.FromMapValueProto(cconf.DefaultConfig) @@ -180,7 +189,8 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio return capabilities.CapabilityConfiguration{ DefaultConfig: dc, - RemoteTriggerConfig: rtc, + RemoteTriggerConfig: remoteTriggerConfig, + RemoteTargetConfig: remoteTargetConfig, }, nil } @@ -223,8 +233,6 @@ func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, err return nil, innerErr } - cconf.RemoteTriggerConfig.ApplyDefaults() - cc[cid] = cconf } diff --git a/core/services/registrysyncer/syncer_test.go b/core/services/registrysyncer/syncer_test.go index b926183394e..c13cc904909 100644 --- a/core/services/registrysyncer/syncer_test.go +++ b/core/services/registrysyncer/syncer_test.go @@ -210,6 +210,7 @@ func TestReader_Integration(t *testing.T) { RegistrationExpiry: durationpb.New(60 * time.Second), // F + 1 MinResponsesToAggregate: uint32(1) + 1, + MessageExpiry: durationpb.New(120 * time.Second), }, }, } @@ -256,7 +257,7 @@ func TestReader_Integration(t *testing.T) { }, gotCap) assert.Len(t, s.IDsToDONs, 1) - rtc := capabilities.RemoteTriggerConfig{ + rtc := &capabilities.RemoteTriggerConfig{ RegistrationRefresh: 20 * time.Second, MinResponsesToAggregate: 2, RegistrationExpiry: 60 * time.Second, diff --git a/go.mod b/go.mod index 8e2103eb246..3aa878d1ab2 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f github.com/smartcontractkit/chainlink-feeds v0.0.0-20240710170203-5b41615da827 diff --git a/go.sum b/go.sum index 73d6d5b227a..0264dcc01c3 100644 --- a/go.sum +++ b/go.sum @@ -1136,8 +1136,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc h1:nNZqLasN8y5huDKX76JUZtni7WkUI36J61//czbJpDM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 h1:rwo/bzqzbhSPBn1CHFfHiQPcMlpBV/hau4TrpJngTJc= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index ed693f4fccc..d7c1918c927 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -28,7 +28,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 github.com/smartcontractkit/chainlink-testing-framework v1.33.0 github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240405215812-5a72bc9af239 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index ca3ce8d903e..2854b2d599b 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1486,8 +1486,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc h1:nNZqLasN8y5huDKX76JUZtni7WkUI36J61//czbJpDM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 h1:rwo/bzqzbhSPBn1CHFfHiQPcMlpBV/hau4TrpJngTJc= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 3d1ae6c7a98..f0485082ecb 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.31.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc + github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 github.com/smartcontractkit/chainlink-testing-framework v1.33.0 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 2a54ec9254f..647a02b9b0e 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1468,8 +1468,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc h1:nNZqLasN8y5huDKX76JUZtni7WkUI36J61//czbJpDM= -github.com/smartcontractkit/chainlink-common v0.2.2-0.20240731184516-249ef7ad0cdc/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409 h1:rwo/bzqzbhSPBn1CHFfHiQPcMlpBV/hau4TrpJngTJc= +github.com/smartcontractkit/chainlink-common v0.2.2-0.20240801092904-114abb088409/go.mod h1:Jg1sCTsbxg76YByI8ifpFby3FvVqISStHT8ypy9ocmY= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45 h1:NBQLtqk8zsyY4qTJs+NElI3aDFTcAo83JHvqD04EvB0= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240710121324-3ed288aa9b45/go.mod h1:LV0h7QBQUpoC2UUi6TcUvcIFm1xjP/DtEcqV8+qeLUs= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240801131703-fd75761c982f h1:I9fTBJpHkeldFplXUy71eLIn6A6GxuR4xrABoUeD+CM=