Skip to content

Commit

Permalink
ks-265 wrap local target in capability registry with transmission log…
Browse files Browse the repository at this point in the history
…ic (#13396)

* wrap local target in capability registry with transmission logic

* review comments

* review comments

* test fix
  • Loading branch information
ettec authored Jun 3, 2024
1 parent bc087f1 commit 5ff32bd
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 268 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-kings-compete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal keystone: handle local target transmission logic in capability wrapper
30 changes: 24 additions & 6 deletions core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ import (
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
m map[string]capabilities.BaseCapability
mu sync.RWMutex
lggr logger.Logger
lggr logger.Logger
peerID p2ptypes.PeerID
don capabilities.DON

m map[string]capabilities.BaseCapability
mu sync.RWMutex
}

// Get gets a capability from the registry.
Expand Down Expand Up @@ -134,6 +139,17 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
if !ok {
return fmt.Errorf("target capability does not satisfy TargetCapability interface")
}

capInfo, err := c.Info(ctx)
if err != nil {
return fmt.Errorf("failed to get info of target capability: %w", err)
}

// If the DON is nil this is a local capability and requires wrapping in a local target transmission capability
if capInfo.DON == nil {
c = transmission.NewLocalTargetCapability(r.lggr, r.peerID, r.don, c.(capabilities.TargetCapability))
}

default:
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
}
Expand All @@ -150,9 +166,11 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
}

// NewRegistry returns a new Registry.
func NewRegistry(lggr logger.Logger) *Registry {
func NewRegistry(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON) *Registry {
return &Registry{
m: map[string]capabilities.BaseCapability{},
lggr: lggr.Named("CapabilityRegistry"),
m: map[string]capabilities.BaseCapability{},
lggr: lggr.Named("CapabilityRegistry"),
peerID: peerID,
don: don,
}
}
53 changes: 50 additions & 3 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/values"
coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type mockCapability struct {
Expand All @@ -34,7 +37,7 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap
func TestRegistry(t *testing.T) {
ctx := testutils.Context(t)

r := coreCapabilities.NewRegistry(logger.TestLogger(t))
r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})

id := "capability-1"
ci, err := capabilities.NewCapabilityInfo(
Expand Down Expand Up @@ -62,7 +65,7 @@ func TestRegistry(t *testing.T) {

func TestRegistry_NoDuplicateIDs(t *testing.T) {
ctx := testutils.Context(t)
r := coreCapabilities.NewRegistry(logger.TestLogger(t))
r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})

id := "capability-1"
ci, err := capabilities.NewCapabilityInfo(
Expand Down Expand Up @@ -173,7 +176,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
}

ctx := testutils.Context(t)
reg := coreCapabilities.NewRegistry(logger.TestLogger(t))
reg := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
id, err := tc.newCapability(ctx, reg)
Expand All @@ -184,3 +187,47 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
})
}
}

func TestRegistry_ReturnsLocalTargetCapabilityForLocalTargets(t *testing.T) {
ctx := testutils.Context(t)
r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})

id := "capability-1"
ci, err := capabilities.NewRemoteCapabilityInfo(
id,
capabilities.CapabilityTypeTarget,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

c := &mockCapability{CapabilityInfo: ci}
err = r.Add(ctx, c)
require.NoError(t, err)

targetCapability, err := r.GetTarget(ctx, id)
require.NoError(t, err)

duffTransmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10banana",
})
require.NoError(t, err)

_, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{
Config: duffTransmissionSchedule,
})
assert.NotNil(t, err)

validTransmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

_, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{
Config: validTransmissionSchedule,
})
assert.NoError(t, err)
}
Loading

0 comments on commit 5ff32bd

Please sign in to comment.