Skip to content

Commit

Permalink
changes to support deterministic message hash in the remote target (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec authored Aug 5, 2024
1 parent 1ac2902 commit 7ec99ef
Show file tree
Hide file tree
Showing 26 changed files with 202 additions and 82 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-crabs-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal ensure remote target request hash is deterministic
60 changes: 35 additions & 25 deletions core/capabilities/integration_tests/keystone_contracts_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func peerToNode(nopID uint32, p peer) (kcr.CapabilitiesRegistryNodeParams, error
}, nil
}

func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workflowDonPeers []peer, triggerDonPeers []peer,
targetDonPeerIDs []peer,
func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workflowDon donInfo, triggerDon donInfo,
targetDon donInfo,
transactOpts *bind.TransactOpts, backend *ethBackend) common.Address {
addr, _, reg, err := kcr.DeployCapabilitiesRegistry(transactOpts, backend)
require.NoError(t, err)
Expand Down Expand Up @@ -157,23 +157,23 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl

nopID := recLog.NodeOperatorId
nodes := []kcr.CapabilitiesRegistryNodeParams{}
for _, wfPeer := range workflowDonPeers {
for _, wfPeer := range workflowDon.peerIDs {
n, innerErr := peerToNode(nopID, wfPeer)
require.NoError(t, innerErr)

n.HashedCapabilityIds = [][32]byte{ocrid}
nodes = append(nodes, n)
}

for _, triggerPeer := range triggerDonPeers {
for _, triggerPeer := range triggerDon.peerIDs {
n, innerErr := peerToNode(nopID, triggerPeer)
require.NoError(t, innerErr)

n.HashedCapabilityIds = [][32]byte{sid}
nodes = append(nodes, n)
}

for _, targetPeer := range targetDonPeerIDs {
for _, targetPeer := range targetDon.peerIDs {
n, innerErr := peerToNode(nopID, targetPeer)
require.NoError(t, innerErr)

Expand All @@ -185,7 +185,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
require.NoError(t, err)

// workflow DON
ps, err := peers(workflowDonPeers)
ps, err := peers(workflowDon.peerIDs)
require.NoError(t, err)

cc := newCapabilityConfig()
Expand All @@ -199,22 +199,24 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
},
}

workflowDonF := uint8(2)
_, err = reg.AddDON(transactOpts, ps, cfgs, false, true, workflowDonF)
_, err = reg.AddDON(transactOpts, ps, cfgs, false, true, workflowDon.F)
require.NoError(t, err)

// trigger DON
ps, err = peers(triggerDonPeers)
ps, err = peers(triggerDon.peerIDs)
require.NoError(t, err)

triggerDonF := 1
config := &pb.RemoteTriggerConfig{
RegistrationRefresh: durationpb.New(20000 * time.Millisecond),
RegistrationExpiry: durationpb.New(60000 * time.Millisecond),
// F + 1
MinResponsesToAggregate: uint32(triggerDonF) + 1,
triggerCapabilityConfig := newCapabilityConfig()
triggerCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTriggerConfig{
RemoteTriggerConfig: &pb.RemoteTriggerConfig{
RegistrationRefresh: durationpb.New(60000 * time.Millisecond),
RegistrationExpiry: durationpb.New(60000 * time.Millisecond),
// F + 1
MinResponsesToAggregate: uint32(triggerDon.F) + 1,
},
}
configb, err := proto.Marshal(config)

configb, err := proto.Marshal(triggerCapabilityConfig)
require.NoError(t, err)

cfgs = []kcr.CapabilitiesRegistryCapabilityConfiguration{
Expand All @@ -224,22 +226,31 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
},
}

_, err = reg.AddDON(transactOpts, ps, cfgs, true, false, uint8(triggerDonF))
_, err = reg.AddDON(transactOpts, ps, cfgs, true, false, triggerDon.F)
require.NoError(t, err)

// target DON
ps, err = peers(targetDonPeerIDs)
ps, err = peers(targetDon.peerIDs)
require.NoError(t, err)

targetCapabilityConfig := newCapabilityConfig()
targetCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTargetConfig{
RemoteTargetConfig: &pb.RemoteTargetConfig{
RequestHashExcludedAttributes: []string{"signed_report.Signatures"},
},
}

remoteTargetConfigBytes, err := proto.Marshal(targetCapabilityConfig)
require.NoError(t, err)

cfgs = []kcr.CapabilitiesRegistryCapabilityConfiguration{
{
CapabilityId: wid,
Config: ccb,
Config: remoteTargetConfigBytes,
},
}

targetDonF := uint8(1)
_, err = reg.AddDON(transactOpts, ps, cfgs, true, false, targetDonF)
_, err = reg.AddDON(transactOpts, ps, cfgs, true, false, targetDon.F)
require.NoError(t, err)

backend.Commit()
Expand All @@ -253,19 +264,18 @@ func newCapabilityConfig() *pb.CapabilityConfig {
}
}

func setupForwarderContract(t *testing.T, workflowDonPeers []peer, workflowDonId uint32,
configVersion uint32, f uint8,
func setupForwarderContract(t *testing.T, workflowDon donInfo,
transactOpts *bind.TransactOpts, backend *ethBackend) (common.Address, *forwarder.KeystoneForwarder) {
addr, _, fwd, err := forwarder.DeployKeystoneForwarder(transactOpts, backend)
require.NoError(t, err)
backend.Commit()

var signers []common.Address
for _, p := range workflowDonPeers {
for _, p := range workflowDon.peerIDs {
signers = append(signers, common.HexToAddress(p.Signer))
}

_, err = fwd.SetConfig(transactOpts, workflowDonId, configVersion, f, signers)
_, err = fwd.SetConfig(transactOpts, workflowDon.ID, workflowDon.ConfigVersion, workflowDon.F, signers)
require.NoError(t, err)
backend.Commit()

Expand Down
15 changes: 10 additions & 5 deletions core/capabilities/integration_tests/mock_libocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (m *mockLibOCR) simulateProtocolRound(ctx context.Context) error {
Signer: commontypes.OracleID(i),
Signature: sig,
})

if uint8(len(signatures)) == m.f+1 {
break
}
}

for _, node := range m.nodes {
Expand All @@ -181,7 +177,16 @@ func (m *mockLibOCR) simulateProtocolRound(ctx context.Context) error {
continue
}

err = node.Transmit(ctx, types.ConfigDigest{}, 0, report, signatures)
// For each node select a random set of f+1 signatures to mimic libocr behaviour
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)
indices := r.Perm(len(signatures))
selectedSignatures := make([]types.AttributedOnchainSignature, m.f+1)
for i := 0; i < int(m.f+1); i++ {
selectedSignatures[i] = signatures[indices[i]]
}

err = node.Transmit(ctx, types.ConfigDigest{}, 0, report, selectedSignatures)
if err != nil {
return fmt.Errorf("failed to transmit report: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions core/capabilities/integration_tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func setupStreamDonsWithTransmissionSchedule(ctx context.Context, t *testing.T,
lggr.SetLogLevel(TestLogLevel)

ethBlockchain, transactor := setupBlockchain(t, 1000, 1*time.Second)
capabilitiesRegistryAddr := setupCapabilitiesRegistryContract(ctx, t, workflowDonInfo.peerIDs, triggerDonInfo.peerIDs, targetDonInfo.peerIDs, transactor, ethBlockchain)
forwarderAddr, _ := setupForwarderContract(t, workflowDonInfo.peerIDs, workflowDonInfo.ID, 1, workflowDonInfo.F, transactor, ethBlockchain)
capabilitiesRegistryAddr := setupCapabilitiesRegistryContract(ctx, t, workflowDonInfo, triggerDonInfo, targetDonInfo, transactor, ethBlockchain)
forwarderAddr, _ := setupForwarderContract(t, workflowDonInfo, transactor, ethBlockchain)
consumerAddr, consumer := setupConsumerContract(t, transactor, ethBlockchain, forwarderAddr, workflowOwnerID, workflowName)

var feedIDs []string
Expand Down Expand Up @@ -259,9 +259,10 @@ func createDonInfo(t *testing.T, don don) donInfo {

triggerDonInfo := donInfo{
DON: commoncap.DON{
ID: don.id,
Members: donPeers,
F: don.f,
ID: don.id,
Members: donPeers,
F: don.f,
ConfigVersion: 1,
},
peerIDs: peerIDs,
keys: donKeys,
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/integration_tests/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func Test_AllAtOnceTransmissionSchedule(t *testing.T) {

// The don IDs set in the below calls are inferred from the order in which the dons are added to the capabilities registry
// in the setupCapabilitiesRegistryContract function, should this order change the don IDs will need updating.
workflowDonInfo := createDonInfo(t, don{id: 1, numNodes: 5, f: 1})
triggerDonInfo := createDonInfo(t, don{id: 2, numNodes: 7, f: 1})
targetDonInfo := createDonInfo(t, don{id: 3, numNodes: 4, f: 1})
workflowDonInfo := createDonInfo(t, don{id: 1, numNodes: 7, f: 2})
triggerDonInfo := createDonInfo(t, don{id: 2, numNodes: 7, f: 2})
targetDonInfo := createDonInfo(t, don{id: 3, numNodes: 4, f: 2})

consumer, feedIDs, triggerSink := setupStreamDonsWithTransmissionSchedule(ctx, t, workflowDonInfo, triggerDonInfo, targetDonInfo, 3,
"2s", "allAtOnce")
Expand Down
2 changes: 2 additions & 0 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
int(remoteDON.F+1),
w.lggr,
)

// TODO: We need to implement a custom, Mercury-specific
// aggregator here, because there is no guarantee that
// all trigger events in the workflow will have the same
Expand Down Expand Up @@ -358,6 +359,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
case capabilities.CapabilityTypeTarget:
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) {
return target.NewServer(
c.RemoteTargetConfig,
myPeerID,
capability.(capabilities.TargetCapability),
info,
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the streams-trigger and write_chain capabilities.
// We expect receivers to be wired up and both capabilities to be added to the registry.
var rtc capabilities.RemoteTriggerConfig
rtc := &capabilities.RemoteTriggerConfig{}
rtc.ApplyDefaults()

state := &registrysyncer.LocalRegistry{
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
for i := 0; i < numCapabilityPeers; i++ {
capabilityPeer := capabilityPeers[i]
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNode := target.NewServer(&commoncap.RemoteTargetConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNodeResponseTimeout, lggr)
servicetest.Run(t, capabilityNode)
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
Expand Down
44 changes: 39 additions & 5 deletions core/capabilities/remote/target/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
"time"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand All @@ -24,7 +26,9 @@ import (
// server communicates with corresponding client on remote nodes.
type server struct {
services.StateMachine
lggr logger.Logger
lggr logger.Logger

config *commoncap.RemoteTargetConfig
peerID p2ptypes.PeerID
underlying commoncap.TargetCapability
capInfo commoncap.CapabilityInfo
Expand All @@ -51,9 +55,14 @@ type requestAndMsgID struct {
messageID string
}

func NewServer(peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON,
func NewServer(config *commoncap.RemoteTargetConfig, peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON,
workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *server {
if config == nil {
lggr.Info("no config provided, using default values")
config = &commoncap.RemoteTargetConfig{}
}
return &server{
config: config,
underlying: underlying,
peerID: peerID,
capInfo: capInfo,
Expand Down Expand Up @@ -126,11 +135,16 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
return
}

msgHash, err := r.getMessageHash(msg)
if err != nil {
r.lggr.Errorw("failed to get message hash", "err", err)
return
}

// A request is uniquely identified by the message id and the hash of the payload to prevent a malicious
// actor from sending a different payload with the same message id
messageId := GetMessageID(msg)
hash := sha256.Sum256(msg.Payload)
requestID := messageId + hex.EncodeToString(hash[:])
requestID := messageId + hex.EncodeToString(msgHash[:])

if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok {
requestIDs[requestID] = requestIDs[requestID] + 1
Expand Down Expand Up @@ -161,12 +175,32 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {

reqAndMsgID := r.requestIDToRequest[requestID]

err := reqAndMsgID.request.OnMessage(ctx, msg)
err = reqAndMsgID.request.OnMessage(ctx, msg)
if err != nil {
r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err)
}
}

func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) {
req, err := pb.UnmarshalCapabilityRequest(msg.Payload)
if err != nil {
return [32]byte{}, fmt.Errorf("failed to unmarshal capability request: %w", err)
}

for _, path := range r.config.RequestHashExcludedAttributes {
if !req.Inputs.DeleteAtPath(path) {
return [32]byte{}, fmt.Errorf("failed to delete attribute from map at path: %s", path)
}
}

reqBytes, err := pb.MarshalCapabilityRequest(req)
if err != nil {
return [32]byte{}, fmt.Errorf("failed to marshal capability request: %w", err)
}
hash := sha256.Sum256(reqBytes)
return hash, nil
}

func GetMessageID(msg *types.MessageBody) string {
return string(msg.MessageId)
}
Expand Down
Loading

0 comments on commit 7ec99ef

Please sign in to comment.