Skip to content

Commit

Permalink
[KS-412] Validate called DON membership in TriggerPublisher
Browse files Browse the repository at this point in the history
Sender needs to actually belong to the DON on behalf of which it is subscribing trigger events.
  • Loading branch information
bolekk committed Aug 8, 2024
1 parent 2ac9d98 commit 01be507
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
14 changes: 14 additions & 0 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type triggerPublisher struct {
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
workflowDONs map[uint32]commoncap.DON
membersCache map[uint32]map[p2ptypes.PeerID]bool
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
Expand Down Expand Up @@ -54,12 +55,21 @@ func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying co
config = &capabilities.RemoteTriggerConfig{}
}
config.ApplyDefaults()
membersCache := make(map[uint32]map[p2ptypes.PeerID]bool)
for id, don := range workflowDONs {
cache := make(map[p2ptypes.PeerID]bool)
for _, member := range don.Members {
cache[member] = true
}
membersCache[id] = cache
}
return &triggerPublisher{
config: config,
underlying: underlying,
capInfo: capInfo,
capDonInfo: capDonInfo,
workflowDONs: workflowDONs,
membersCache: membersCache,
dispatcher: dispatcher,
messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
Expand Down Expand Up @@ -88,6 +98,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("received a message from unsupported workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId)
return
}
if !p.membersCache[msg.CallerDonId][sender] {
p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender)
return
}
p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender)
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
Expand Down
9 changes: 7 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand Down Expand Up @@ -42,7 +41,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := &capabilities.RemoteTriggerConfig{
config := &commoncap.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
Expand Down Expand Up @@ -73,6 +72,12 @@ func TestTriggerPublisher_Register(t *testing.T) {
Payload: marshaled,
}
publisher.Receive(ctx, regEvent)
// node p1 is not a member of the workflow DON so registration shoudn't happen
require.Empty(t, underlying.registrationsCh)

regEvent.Sender = p2[:]
publisher.Receive(ctx, regEvent)
require.NotEmpty(t, underlying.registrationsCh)
forwarded := <-underlying.registrationsCh
require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)

Expand Down

0 comments on commit 01be507

Please sign in to comment.