Skip to content

Commit

Permalink
Update core to be compatible with changes made in chainlink-common as…
Browse files Browse the repository at this point in the history
… part of KS-120 (moving capabilities to use streams) (#12857)

* Update core to be compatible with changes made in chainlink-common as part of KS-120

* upgrade common version
  • Loading branch information
ettec authored Apr 19, 2024
1 parent bfd9a8b commit d90229e
Show file tree
Hide file tree
Showing 20 changed files with 88 additions and 60 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-lizards-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Updates required to work with chainlink-common changes to support grpc streams for capabilities
6 changes: 3 additions & 3 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
return nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
{
name: "trigger",
newCapability: func(ctx context.Context, reg *coreCapabilities.Registry) (string, error) {
odt := triggers.NewOnDemand()
odt := triggers.NewOnDemand(logger.TestLogger(t))
info, err := odt.Info(ctx)
require.NoError(t, err)
return info.ID, reg.Add(ctx, odt)
Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
Expand All @@ -60,10 +60,12 @@ func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- common
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
return err
return nil, err
}
}
return nil

// TODO: return a channel that will be closed when all responses are received
return nil, nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
Expand Down
6 changes: 4 additions & 2 deletions core/capabilities/remote/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package remote_test
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand All @@ -24,5 +24,7 @@ func TestTarget_Placeholder(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil)
target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr)
require.NoError(t, target.Execute(ctx, nil, commoncap.CapabilityRequest{}))

_, err := target.Execute(ctx, commoncap.CapabilityRequest{})
assert.NoError(t, err)
}
8 changes: 3 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type registrationKey struct {
}

type pubRegState struct {
callback chan<- commoncap.CapabilityResponse
callback <-chan commoncap.CapabilityResponse
request commoncap.CapabilityRequest
}

Expand Down Expand Up @@ -112,9 +112,8 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled)
cancel()
if err == nil {
p.registrations[key] = &pubRegState{
Expand Down Expand Up @@ -153,7 +152,6 @@ func (p *triggerPublisher) registrationCleanupLoop() {
cancel()
p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err)
// after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel
close(req.callback)
delete(p.registrations, key)
p.messageCache.Delete(key)
}
Expand All @@ -163,7 +161,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.CapabilityResponse, key registrationKey) {
func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.CapabilityResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error)
return t.info, nil
}

func (t *testTrigger) RegisterTrigger(_ context.Context, _ chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
t.registrationsCh <- request
return nil
return nil, nil
}

func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error {
Expand Down
19 changes: 14 additions & 5 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ var _ commoncap.TriggerCapability = &triggerSubscriber{}
var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON,
dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -88,22 +92,25 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return s.capInfo, nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
rawRequest, err := pb.MarshalCapabilityRequest(request)
if err != nil {
return err
return nil, err
}
if request.Metadata.WorkflowID == "" {
return errors.New("empty workflowID")
return nil, errors.New("empty workflowID")
}
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
return callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand Down Expand Up @@ -141,6 +148,8 @@ func (s *triggerSubscriber) registrationLoop() {
func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) error {
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
triggerEventCallbackCh := make(chan commoncap.CapabilityResponse, 2)
require.NoError(t, subscriber.RegisterTrigger(ctx, triggerEventCallbackCh, commoncap.CapabilityRequest{

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
}))
})
require.NoError(t, err)
<-awaitRegistrationMessageCh

// receive trigger event
Expand Down
26 changes: 15 additions & 11 deletions core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

var forwardABI = evmtypes.MustGetABI(forwarder.KeystoneForwarderMetaData.ABI)

func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, lggr logger.Logger) error {
func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer,
lggr logger.Logger) error {
for _, chain := range legacyEVMChains.Slice() {
capability := NewEvmWrite(chain, lggr)
if err := registry.Add(context.TODO(), capability); err != nil {
Expand Down Expand Up @@ -157,7 +158,7 @@ func encodePayload(args []any, rawSelector string) ([]byte, error) {
// return append(method.ID, arguments...), nil
}

func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, request capabilities.CapabilityRequest) error {
func (cap *EvmWrite) Execute(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cap.lggr.Debugw("Execute", "request", request)
// TODO: idempotency

Expand All @@ -168,22 +169,23 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C

reqConfig, err := parseConfig(request.Config)
if err != nil {
return err
return nil, err
}

inputsAny, err := request.Inputs.Unwrap()
if err != nil {
return err
return nil, err
}
inputs := inputsAny.(map[string]any)
rep, ok := inputs["report"]
if !ok {
return errors.New("malformed data: inputs doesn't contain a report key")
return nil, errors.New("malformed data: inputs doesn't contain a report key")
}

if rep == nil {
// We received any empty report -- this means we should skip transmission.
cap.lggr.Debugw("Skipping empty report", "request", request)
callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -192,18 +194,18 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

// evaluate any variables in reqConfig.Params
args, err := evaluateParams(reqConfig.Params, inputs)
if err != nil {
return err
return nil, err
}

data, err := encodePayload(args, reqConfig.ABI)
if err != nil {
return err
return nil, err
}

// TODO: validate encoded report is prefixed with workflowID and executionID that match the request meta
Expand All @@ -214,7 +216,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
// construct forwarding payload
calldata, err := forwardABI.Pack("report", common.HexToAddress(reqConfig.Address), data, signatures)
if err != nil {
return err
return nil, err
}

txMeta := &txmgr.TxMeta{
Expand All @@ -238,9 +240,11 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
tx, err := txm.CreateTransaction(ctx, req)
if err != nil {
return err
return nil, err
}
cap.lggr.Debugw("Transaction submitted", "request", request, "transaction", tx)

callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -249,7 +253,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

func (cap *EvmWrite) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down
8 changes: 2 additions & 6 deletions core/capabilities/targets/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func TestEvmWrite(t *testing.T) {

})

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down Expand Up @@ -134,9 +132,7 @@ func TestEvmWrite_EmptyReport(t *testing.T) {
Inputs: inputs,
}

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e h1:nHs5mFOR7FPII20GrCGIPywDW43MhEUlD7DqHnTgu6Q=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
9 changes: 8 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro
Config: tc,
Inputs: triggerInputs,
}
err = t.trigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest)
eventsCh, err := t.trigger.RegisterTrigger(ctx, triggerRegRequest)
if err != nil {
return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err)
}

go func() {
for event := range eventsCh {
e.triggerEvents <- event
}
}()

return nil
}

Expand Down
18 changes: 11 additions & 7 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,18 @@ func newMockCapability(info capabilities.CapabilityInfo, transform func(capabili
}
}

func (m *mockCapability) Execute(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cr, err := m.transform(req)
if err != nil {
return err
return nil, err
}

ch := make(chan capabilities.CapabilityResponse, 10)

m.response <- cr
ch <- cr
close(ch)
m.response <- cr
return nil
return ch, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -102,13 +104,14 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap
type mockTriggerCapability struct {
capabilities.CapabilityInfo
triggerEvent capabilities.CapabilityResponse
ch chan capabilities.CapabilityResponse
}

var _ capabilities.TriggerCapability = (*mockTriggerCapability)(nil)

func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
ch <- m.triggerEvent
return nil
func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
m.ch <- m.triggerEvent
return m.ch, nil
}

func (m *mockTriggerCapability) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error {
Expand Down Expand Up @@ -217,6 +220,7 @@ func mockTrigger(t *testing.T) (capabilities.TriggerCapability, capabilities.Cap
"issues a trigger when a mercury report is received.",
"v1.0.0",
),
ch: make(chan capabilities.CapabilityResponse, 10),
}
resp, err := values.NewMap(map[string]any{
"123": decimal.NewFromFloat(1.00),
Expand Down
Loading

0 comments on commit d90229e

Please sign in to comment.