Skip to content

Commit

Permalink
[KS-72] Dispatcher for external messages (#12502)
Browse files Browse the repository at this point in the history
1. Message types for external communication
2. Dispatcher object that en/decodes messages and routes traffic between peers and capabilities

Co-authored-by: Cedric <[email protected]>
  • Loading branch information
bolekk and cedric-cordenier authored Mar 22, 2024
1 parent d806030 commit ca14ccd
Show file tree
Hide file tree
Showing 20 changed files with 1,191 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilled-buses-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Dispatcher service for external peering
168 changes: 168 additions & 0 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package remote

import (
"context"
"fmt"
sync "sync"
"time"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// dispatcher en/decodes messages and routes traffic between peers and capabilities
type dispatcher struct {
peerWrapper p2ptypes.PeerWrapper
peer p2ptypes.Peer
peerID p2ptypes.PeerID
signer p2ptypes.Signer
registry commontypes.CapabilitiesRegistry
receivers map[key]remotetypes.Receiver
mu sync.RWMutex
stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
}

type key struct {
capId string
donId string
}

var _ services.Service = &dispatcher{}

const supportedVersion = 1

func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry commontypes.CapabilitiesRegistry, lggr logger.Logger) *dispatcher {
return &dispatcher{
peerWrapper: peerWrapper,
signer: signer,
registry: registry,
receivers: make(map[key]remotetypes.Receiver),
stopCh: make(services.StopChan),
lggr: lggr.Named("Dispatcher"),
}
}

func (d *dispatcher) Start(ctx context.Context) error {
d.peer = d.peerWrapper.GetPeer()
d.peerID = d.peer.ID()
if d.peer == nil {
return fmt.Errorf("peer is not initialized")
}
d.wg.Add(1)
go d.receive()
d.lggr.Info("dispatcher started")
return nil
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId)
}
d.receivers[k] = receiver
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.receivers, key{capabilityId, donId})
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
msgBody.Version = supportedVersion
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
msgBody.Timestamp = time.Now().UnixMilli()
rawBody, err := proto.Marshal(msgBody)
if err != nil {
return err
}
signature, err := d.signer.Sign(rawBody)
if err != nil {
return err
}
msg := &remotetypes.Message{Signature: signature, Body: rawBody}
rawMsg, err := proto.Marshal(msg)
if err != nil {
return err
}
return d.peer.Send(peerID, rawMsg)
}

func (d *dispatcher) receive() {
defer d.wg.Done()
recvCh := d.peer.Receive()
for {
select {
case <-d.stopCh:
d.lggr.Info("stopped - exiting receive")
return
case msg := <-recvCh:
body, err := ValidateMessage(msg, d.peerID)
if err != nil {
d.lggr.Debugw("received invalid message", "error", err)
d.tryRespondWithError(msg.Sender, body, types.Error_VALIDATION_FAILED)
continue
}
k := key{body.CapabilityId, body.DonId}
d.mu.RLock()
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", k.capId, "donId", k.donId)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}
receiver.Receive(body)
}
}
}

func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) {
if body == nil {
return
}
if body.Error != types.Error_OK {
d.lggr.Debug("received an invalid message with error field set - not responding to avoid an infinite loop")
return
}
body.Error = errType
// clear payload to reduce message size
body.Payload = nil
err := d.Send(peerID, body)
if err != nil {
d.lggr.Debugw("failed to send error response", "error", err)
}
}

func (d *dispatcher) Close() error {
close(d.stopCh)
d.wg.Wait()
d.lggr.Info("dispatcher closed")
return nil
}

func (d *dispatcher) Ready() error {
return nil
}

func (d *dispatcher) HealthReport() map[string]error {
return nil
}

func (d *dispatcher) Name() string {
return "Dispatcher"
}
123 changes: 123 additions & 0 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package remote_test

import (
"errors"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/mocks"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks"
)

type testReceiver struct {
ch chan *remotetypes.MessageBody
}

func newReceiver() *testReceiver {
return &testReceiver{
ch: make(chan *remotetypes.MessageBody, 100),
}
}

func (r *testReceiver) Receive(msg *remotetypes.MessageBody) {
r.ch <- msg
}

func TestDispatcher_CleanStartClose(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
peer := mocks.NewPeer(t)
recvCh := make(<-chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return(recvCh)
peer.On("ID", mock.Anything).Return(p2ptypes.PeerID{})
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
signer := mocks.NewSigner(t)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
require.NoError(t, dispatcher.Start(ctx))
require.NoError(t, dispatcher.Close())
}

func TestDispatcher_Receive(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
signer := mocks.NewSigner(t)
signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented"))
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
require.NoError(t, dispatcher.Start(ctx))

rcv := newReceiver()
err := dispatcher.SetReceiver(capId1, donId1, rcv)
require.NoError(t, err)

// supported capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
// sender doesn't match
invalid := encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
invalid.Sender = peerId2
recvCh <- invalid
// supported capability again
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload2))

m := <-rcv.ch
require.Equal(t, payload1, string(m.Payload))
m = <-rcv.ch
require.Equal(t, payload2, string(m.Payload))

dispatcher.RemoveReceiver(capId1, donId1)
require.NoError(t, dispatcher.Close())
}

func TestDispatcher_RespondWithError(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
sendCh := make(chan p2ptypes.PeerID)
peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
peerID := args.Get(0).(p2ptypes.PeerID)
sendCh <- peerID
}).Return(nil)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
signer := mocks.NewSigner(t)
signer.On("Sign", mock.Anything).Return([]byte{}, nil)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
require.NoError(t, dispatcher.Start(ctx))

// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
responseDestPeerID := <-sendCh
require.Equal(t, peerId1, responseDestPeerID)

require.NoError(t, dispatcher.Close())
}
84 changes: 84 additions & 0 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package remote

import (
"context"
"errors"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages
type remoteTargetCaller struct {
capInfo commoncap.CapabilityInfo
donInfo *types.DON
dispatcher types.Dispatcher
lggr logger.Logger
}

var _ commoncap.TargetCapability = &remoteTargetCaller{}
var _ types.Receiver = &remoteTargetCaller{}

type remoteTargetReceiver struct {
capInfo commoncap.CapabilityInfo
donInfo *types.DON
dispatcher types.Dispatcher
lggr logger.Logger
}

var _ types.Receiver = &remoteTargetReceiver{}

func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller {
return &remoteTargetCaller{
capInfo: capInfo,
donInfo: donInfo,
dispatcher: dispatcher,
lggr: lggr,
}
}

func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.capInfo, nil
}

func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
CapabilityId: c.capInfo.ID,
DonId: c.donInfo.ID,
Payload: []byte{0x01, 0x02, 0x03},
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
return err
}
}
return nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload)
}

func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver {
return &remoteTargetReceiver{
capInfo: capInfo,
donInfo: donInfo,
dispatcher: dispatcher,
lggr: lggr,
}
}

func (c *remoteTargetReceiver) Receive(msg *types.MessageBody) {
c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload)
}
Loading

0 comments on commit ca14ccd

Please sign in to comment.