Skip to content

Commit

Permalink
[KS-79] Remote trigger setup (#12845)
Browse files Browse the repository at this point in the history
1. Fully set up a (hardcoded) capability DON in Syncer and move fake Mercury producer over there.
2. Refactor remote Aggregator interface to reduce marshaling steps.
3. Avoid redundant calls to RegisterTrigger.
4. Minor change to Prometheus config in Peer Wrapper.
  • Loading branch information
bolekk authored Apr 19, 2024
1 parent e52951f commit 63abd08
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 144 deletions.
5 changes: 5 additions & 0 deletions .changeset/orange-squids-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Remote Trigger setup
24 changes: 17 additions & 7 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sync "sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -87,17 +88,21 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
p.mu.Lock()
defer p.mu.Unlock()
p.messageCache.Insert(key, sender, nowMs, msg.Payload)
_, exists := p.registrations[key]
if exists {
p.lggr.Debugw("trigger registration already exists", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID)
return
}
// NOTE: require 2F+1 by default, introduce different strategies later (KS-76)
minRequired := uint32(2*callerDon.F + 1)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false)
p.mu.Unlock()
if !ready {
p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired)
return
}
agg := NewDefaultModeAggregator(uint32(callerDon.F + 1))
aggregated, err := agg.Aggregate("", payloads)
aggregated, err := AggregateModeRaw(payloads, uint32(callerDon.F+1))
if err != nil {
p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
return
Expand All @@ -107,7 +112,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
p.mu.Lock()
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
Expand All @@ -123,7 +127,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
} else {
p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
}
p.mu.Unlock()
} else {
p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender)
}
Expand Down Expand Up @@ -171,7 +174,13 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability
p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
return
}
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
triggerEvent := capabilities.TriggerEvent{}
err := response.Value.UnwrapTo(&triggerEvent)
if err != nil {
p.lggr.Errorw("can't unwrap trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "err", err)
break
}
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID)
marshaled, err := pb.MarshalCapabilityResponse(response)
if err != nil {
p.lggr.Debugw("can't marshal trigger event", "err", err)
Expand All @@ -186,7 +195,8 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
// NOTE: optionally introduce batching across workflows as an optimization
WorkflowIds: []string{key.workflowId},
WorkflowIds: []string{key.workflowId},
TriggerEventId: triggerEvent.ID,
},
},
}
Expand Down
15 changes: 6 additions & 9 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<-
callback: callback,
rawRequest: rawRequest,
}
s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
}

Expand All @@ -114,8 +115,8 @@ func (s *triggerSubscriber) registrationLoop() {
case <-s.stopCh:
return
case <-ticker.C:
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members))
s.mu.RLock()
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows))
for _, registration := range s.registeredWorkflows {
// NOTE: send to all by default, introduce different strategies later (KS-76)
for _, peerID := range s.capDonInfo.Members {
Expand Down Expand Up @@ -180,18 +181,14 @@ func (s *triggerSubscriber) Receive(msg *types.MessageBody) {
continue
}
if ready {
s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads)
if err != nil {
s.lggr.Errorw("failed to aggregate responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
continue
}
unmarshaled, err := pb.UnmarshalCapabilityResponse(aggregatedResponse)
if err != nil {
s.lggr.Errorw("failed to unmarshal responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err)
continue
}
s.lggr.Info("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
registration.callback <- unmarshaled
s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId)
registration.callback <- aggregatedResponse
}
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

Expand All @@ -23,7 +24,7 @@ type Receiver interface {
}

type Aggregator interface {
Aggregate(eventID string, responses [][]byte) ([]byte, error)
Aggregate(eventID string, responses [][]byte) (commoncap.CapabilityResponse, error)
}

// NOTE: this type will become part of the Registry (KS-108)
Expand Down
25 changes: 20 additions & 5 deletions core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"google.golang.org/protobuf/proto"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand Down Expand Up @@ -60,16 +62,29 @@ func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregat
}
}

func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) ([]byte, error) {
func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.CapabilityResponse, error) {
found, err := AggregateModeRaw(responses, a.minIdenticalResponses)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
}

unmarshaled, err := pb.UnmarshalCapabilityResponse(found)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
}
return unmarshaled, nil
}

func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) {
hashToCount := make(map[string]uint32)
var found []byte
for _, resp := range responses {
for _, elem := range elemList {
hasher := sha256.New()
hasher.Write(resp)
hasher.Write(elem)
sha := hex.EncodeToString(hasher.Sum(nil))
hashToCount[sha]++
if hashToCount[sha] >= a.minIdenticalResponses {
found = resp
if hashToCount[sha] >= minIdenticalResponses {
found = elem
break
}
}
Expand Down
40 changes: 21 additions & 19 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package remote_test

import (
"bytes"
"crypto/ed25519"
"crypto/rand"
"testing"
Expand Down Expand Up @@ -90,29 +89,32 @@ func TestToPeerID(t *testing.T) {
}

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
capResponse1 := marshalCapabilityResponse(t, triggerEvent1, nil)
capResponse2 := marshalCapabilityResponse(t, triggerEvent2, nil)
val, err := values.Wrap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.CapabilityResponse{
Value: val,
Err: nil,
}
marshaled1, err := pb.MarshalCapabilityResponse(capResponse1)
require.NoError(t, err)

val2, err := values.Wrap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.CapabilityResponse{
Value: val2,
Err: nil,
}
marshaled2, err := pb.MarshalCapabilityResponse(capResponse2)
require.NoError(t, err)

agg := remote.NewDefaultModeAggregator(2)
_, err := agg.Aggregate("", [][]byte{capResponse1})
_, err = agg.Aggregate("", [][]byte{marshaled1})
require.Error(t, err)

_, err = agg.Aggregate("", [][]byte{capResponse1, capResponse2})
_, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2})
require.Error(t, err)

res, err := agg.Aggregate("", [][]byte{capResponse1, capResponse2, capResponse1})
require.NoError(t, err)
require.True(t, bytes.Equal(res, capResponse1))
}

func marshalCapabilityResponse(t *testing.T, capValue any, capError error) []byte {
val, err := values.Wrap(capValue)
require.NoError(t, err)
capResponse := commoncap.CapabilityResponse{
Value: val,
Err: capError,
}
marshaled, err := pb.MarshalCapabilityResponse(capResponse)
res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1})
require.NoError(t, err)
return marshaled
require.Equal(t, res, capResponse1)
}
Loading

0 comments on commit 63abd08

Please sign in to comment.