Skip to content

Commit

Permalink
[KS-370] Pass config from onchain registry to execute calls (#13750)
Browse files Browse the repository at this point in the history
* [KS-370] Add ConfigForCapability to registry

- This allows us to set some config inside the registry without needing
  to specify it on the capability registry

* Update common

* Update PR

* Linting

* Rename ExecuteConfig -> DefaultConfig

* Add IsLocal to CapabilityInfo

* Minor review feedback; rename GetLocalNode -> LocalNode

* More comments

* Remote CapabilityID type

* Stop coercing capability.ID since it's already a string

* Update common

* Linting / Go generate

* Update common

* Update common
  • Loading branch information
cedric-cordenier authored Jul 24, 2024
1 parent c5224df commit d777fd8
Show file tree
Hide file tree
Showing 30 changed files with 1,084 additions and 941 deletions.
207 changes: 59 additions & 148 deletions core/capabilities/launcher.go

Large diffs are not rendered by default.

408 changes: 152 additions & 256 deletions core/capabilities/launcher_test.go

Large diffs are not rendered by default.

36 changes: 31 additions & 5 deletions core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,42 @@ var (
ErrCapabilityAlreadyExists = errors.New("capability already exists")
)

type metadataRegistry interface {
LocalNode(ctx context.Context) (capabilities.Node, error)
ConfigForCapability(ctx context.Context, capabilityID string, donID uint32) (capabilities.CapabilityConfiguration, error)
}

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
lggr logger.Logger
m map[string]capabilities.BaseCapability
mu sync.RWMutex
metadataRegistry metadataRegistry
lggr logger.Logger
m map[string]capabilities.BaseCapability
mu sync.RWMutex
}

func (r *Registry) LocalNode(ctx context.Context) (capabilities.Node, error) {
if r.metadataRegistry == nil {
return capabilities.Node{}, errors.New("metadataRegistry information not available")
}

return r.metadataRegistry.LocalNode(ctx)
}

func (r *Registry) GetLocalNode(_ context.Context) (capabilities.Node, error) {
return capabilities.Node{}, nil
func (r *Registry) ConfigForCapability(ctx context.Context, capabilityID string, donID uint32) (capabilities.CapabilityConfiguration, error) {
if r.metadataRegistry == nil {
return capabilities.CapabilityConfiguration{}, errors.New("metadataRegistry information not available")
}

return r.metadataRegistry.ConfigForCapability(ctx, capabilityID, donID)
}

// SetLocalRegistry sets a local copy of the offchain registry for the registry to use.
// This is only public for testing purposes; the only production use should be from the CapabilitiesLauncher.
func (r *Registry) SetLocalRegistry(lr metadataRegistry) {
r.mu.Lock()
defer r.mu.Unlock()
r.metadataRegistry = lr
}

// Get gets a capability from the registry.
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks"

commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
)

type testReceiver struct {
Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config *types.RemoteTriggerConfig
config capabilities.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
Expand All @@ -48,7 +48,7 @@ type pubRegState struct {
var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config *types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
func NewTriggerPublisher(config capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
}
// NOTE: require 2F+1 by default, introduce different strategies later (KS-76)
minRequired := uint32(2*callerDon.F + 1)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-p.config.RegistrationExpiry.Milliseconds(), false)
if !ready {
p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired)
return
Expand Down Expand Up @@ -133,7 +133,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {

func (p *triggerPublisher) registrationCleanupLoop() {
defer p.wg.Done()
ticker := time.NewTicker(time.Duration(p.config.RegistrationExpiryMs) * time.Millisecond)
ticker := time.NewTicker(p.config.RegistrationExpiry)
defer ticker.Stop()
for {
select {
Expand All @@ -144,7 +144,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
p.mu.RLock()
for key, req := range p.registrations {
callerDon := p.workflowDONs[key.callerDonId]
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-int64(p.config.RegistrationExpiryMs), false)
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false)
if !ready {
p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId)
ctx, cancel := p.stopCh.NewCtx()
Expand Down
10 changes: 6 additions & 4 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package remote_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand Down Expand Up @@ -40,11 +42,11 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100_000,
config := capabilities.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
MessageExpiryMs: 100_000,
MessageExpiry: 100 * time.Second,
}
workflowDONs := map[uint32]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand Down
14 changes: 7 additions & 7 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
//
// TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes.
type triggerSubscriber struct {
config *types.RemoteTriggerConfig
config capabilities.RemoteTriggerConfig
capInfo commoncap.CapabilityInfo
capDonInfo capabilities.DON
capDonMembers map[p2ptypes.PeerID]struct{}
Expand Down Expand Up @@ -55,7 +55,7 @@ var _ services.Service = &triggerSubscriber{}
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config *types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
func NewTriggerSubscriber(config capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc

func (s *triggerSubscriber) registrationLoop() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.config.RegistrationRefreshMs) * time.Millisecond)
ticker := time.NewTicker(s.config.RegistrationRefresh)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -195,9 +195,9 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
nowMs := time.Now().UnixMilli()
s.mu.RLock()
creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload)
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-int64(s.config.MessageExpiryMs), true)
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true)
s.mu.RUnlock()
if nowMs-creationTs > int64(s.config.RegistrationExpiryMs) {
if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() {
s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "sender", sender)
continue
}
Expand All @@ -219,15 +219,15 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {

func (s *triggerSubscriber) eventCleanupLoop() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.config.MessageExpiryMs) * time.Millisecond)
ticker := time.NewTicker(s.config.MessageExpiry)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
s.mu.Lock()
s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - int64(s.config.MessageExpiryMs))
s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - s.config.MessageExpiry.Milliseconds())
s.mu.Unlock()
}
}
Expand Down
10 changes: 6 additions & 4 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package remote_test

import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
Expand Down Expand Up @@ -61,11 +63,11 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
})

// register trigger
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100,
config := capabilities.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
MessageExpiryMs: 100_000,
MessageExpiry: 100 * time.Second,
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
Expand Down
21 changes: 0 additions & 21 deletions core/capabilities/remote/types/config.go

This file was deleted.

Loading

0 comments on commit d777fd8

Please sign in to comment.