From ca14ccd3c64bea128e12a0d37d399f400ff62584 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Fri, 22 Mar 2024 08:01:40 -0700 Subject: [PATCH] [KS-72] Dispatcher for external messages (#12502) 1. Message types for external communication 2. Dispatcher object that en/decodes messages and routes traffic between peers and capabilities Co-authored-by: Cedric --- .changeset/chilled-buses-reflect.md | 5 + core/capabilities/remote/dispatcher.go | 168 ++++++++ core/capabilities/remote/dispatcher_test.go | 123 ++++++ core/capabilities/remote/target.go | 84 ++++ core/capabilities/remote/target_test.go | 28 ++ core/capabilities/remote/types/message.pb.go | 362 ++++++++++++++++++ core/capabilities/remote/types/message.proto | 29 ++ .../remote/types/mocks/dispatcher.go | 69 ++++ .../remote/types/mocks/receiver.go | 32 ++ core/capabilities/remote/types/types.go | 24 ++ core/capabilities/remote/utils.go | 39 ++ core/capabilities/remote/utils_test.go | 80 ++++ core/capabilities/syncer.go | 38 +- core/capabilities/syncer_test.go | 6 +- core/services/chainlink/application.go | 7 +- core/services/p2p/peer.go | 4 + core/services/p2p/types/mocks/peer.go | 20 + core/services/p2p/types/mocks/signer.go | 54 +++ core/services/p2p/types/types.go | 16 +- core/services/p2p/wrapper/wrapper.go | 13 +- 20 files changed, 1191 insertions(+), 10 deletions(-) create mode 100644 .changeset/chilled-buses-reflect.md create mode 100644 core/capabilities/remote/dispatcher.go create mode 100644 core/capabilities/remote/dispatcher_test.go create mode 100644 core/capabilities/remote/target.go create mode 100644 core/capabilities/remote/target_test.go create mode 100644 core/capabilities/remote/types/message.pb.go create mode 100644 core/capabilities/remote/types/message.proto create mode 100644 core/capabilities/remote/types/mocks/dispatcher.go create mode 100644 core/capabilities/remote/types/mocks/receiver.go create mode 100644 core/capabilities/remote/types/types.go create mode 100644 core/capabilities/remote/utils.go create mode 100644 core/capabilities/remote/utils_test.go create mode 100644 core/services/p2p/types/mocks/signer.go diff --git a/.changeset/chilled-buses-reflect.md b/.changeset/chilled-buses-reflect.md new file mode 100644 index 00000000000..eccac3b7f5b --- /dev/null +++ b/.changeset/chilled-buses-reflect.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Dispatcher service for external peering diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go new file mode 100644 index 00000000000..e594e45445e --- /dev/null +++ b/core/capabilities/remote/dispatcher.go @@ -0,0 +1,168 @@ +package remote + +import ( + "context" + "fmt" + sync "sync" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +// dispatcher en/decodes messages and routes traffic between peers and capabilities +type dispatcher struct { + peerWrapper p2ptypes.PeerWrapper + peer p2ptypes.Peer + peerID p2ptypes.PeerID + signer p2ptypes.Signer + registry commontypes.CapabilitiesRegistry + receivers map[key]remotetypes.Receiver + mu sync.RWMutex + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger +} + +type key struct { + capId string + donId string +} + +var _ services.Service = &dispatcher{} + +const supportedVersion = 1 + +func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry commontypes.CapabilitiesRegistry, lggr logger.Logger) *dispatcher { + return &dispatcher{ + peerWrapper: peerWrapper, + signer: signer, + registry: registry, + receivers: make(map[key]remotetypes.Receiver), + stopCh: make(services.StopChan), + lggr: lggr.Named("Dispatcher"), + } +} + +func (d *dispatcher) Start(ctx context.Context) error { + d.peer = d.peerWrapper.GetPeer() + d.peerID = d.peer.ID() + if d.peer == nil { + return fmt.Errorf("peer is not initialized") + } + d.wg.Add(1) + go d.receive() + d.lggr.Info("dispatcher started") + return nil +} + +func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { + d.mu.Lock() + defer d.mu.Unlock() + k := key{capabilityId, donId} + _, ok := d.receivers[k] + if ok { + return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId) + } + d.receivers[k] = receiver + return nil +} + +func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.receivers, key{capabilityId, donId}) +} + +func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { + msgBody.Version = supportedVersion + msgBody.Sender = d.peerID[:] + msgBody.Receiver = peerID[:] + msgBody.Timestamp = time.Now().UnixMilli() + rawBody, err := proto.Marshal(msgBody) + if err != nil { + return err + } + signature, err := d.signer.Sign(rawBody) + if err != nil { + return err + } + msg := &remotetypes.Message{Signature: signature, Body: rawBody} + rawMsg, err := proto.Marshal(msg) + if err != nil { + return err + } + return d.peer.Send(peerID, rawMsg) +} + +func (d *dispatcher) receive() { + defer d.wg.Done() + recvCh := d.peer.Receive() + for { + select { + case <-d.stopCh: + d.lggr.Info("stopped - exiting receive") + return + case msg := <-recvCh: + body, err := ValidateMessage(msg, d.peerID) + if err != nil { + d.lggr.Debugw("received invalid message", "error", err) + d.tryRespondWithError(msg.Sender, body, types.Error_VALIDATION_FAILED) + continue + } + k := key{body.CapabilityId, body.DonId} + d.mu.RLock() + receiver, ok := d.receivers[k] + d.mu.RUnlock() + if !ok { + d.lggr.Debugw("received message for unregistered capability", "capabilityId", k.capId, "donId", k.donId) + d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND) + continue + } + receiver.Receive(body) + } + } +} + +func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) { + if body == nil { + return + } + if body.Error != types.Error_OK { + d.lggr.Debug("received an invalid message with error field set - not responding to avoid an infinite loop") + return + } + body.Error = errType + // clear payload to reduce message size + body.Payload = nil + err := d.Send(peerID, body) + if err != nil { + d.lggr.Debugw("failed to send error response", "error", err) + } +} + +func (d *dispatcher) Close() error { + close(d.stopCh) + d.wg.Wait() + d.lggr.Info("dispatcher closed") + return nil +} + +func (d *dispatcher) Ready() error { + return nil +} + +func (d *dispatcher) HealthReport() map[string]error { + return nil +} + +func (d *dispatcher) Name() string { + return "Dispatcher" +} diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go new file mode 100644 index 00000000000..b6ba31aa8f2 --- /dev/null +++ b/core/capabilities/remote/dispatcher_test.go @@ -0,0 +1,123 @@ +package remote_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" +) + +type testReceiver struct { + ch chan *remotetypes.MessageBody +} + +func newReceiver() *testReceiver { + return &testReceiver{ + ch: make(chan *remotetypes.MessageBody, 100), + } +} + +func (r *testReceiver) Receive(msg *remotetypes.MessageBody) { + r.ch <- msg +} + +func TestDispatcher_CleanStartClose(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + peer := mocks.NewPeer(t) + recvCh := make(<-chan p2ptypes.Message) + peer.On("Receive", mock.Anything).Return(recvCh) + peer.On("ID", mock.Anything).Return(p2ptypes.PeerID{}) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + signer := mocks.NewSigner(t) + registry := commonMocks.NewCapabilitiesRegistry(t) + + dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + require.NoError(t, dispatcher.Start(ctx)) + require.NoError(t, dispatcher.Close()) +} + +func TestDispatcher_Receive(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + privKey1, peerId1 := newKeyPair(t) + _, peerId2 := newKeyPair(t) + + peer := mocks.NewPeer(t) + recvCh := make(chan p2ptypes.Message) + peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) + peer.On("ID", mock.Anything).Return(peerId2) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + signer := mocks.NewSigner(t) + signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented")) + registry := commonMocks.NewCapabilitiesRegistry(t) + + dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + require.NoError(t, dispatcher.Start(ctx)) + + rcv := newReceiver() + err := dispatcher.SetReceiver(capId1, donId1, rcv) + require.NoError(t, err) + + // supported capability + recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + // unknown capability + recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1)) + // sender doesn't match + invalid := encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1)) + invalid.Sender = peerId2 + recvCh <- invalid + // supported capability again + recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload2)) + + m := <-rcv.ch + require.Equal(t, payload1, string(m.Payload)) + m = <-rcv.ch + require.Equal(t, payload2, string(m.Payload)) + + dispatcher.RemoveReceiver(capId1, donId1) + require.NoError(t, dispatcher.Close()) +} + +func TestDispatcher_RespondWithError(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + privKey1, peerId1 := newKeyPair(t) + _, peerId2 := newKeyPair(t) + + peer := mocks.NewPeer(t) + recvCh := make(chan p2ptypes.Message) + peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) + peer.On("ID", mock.Anything).Return(peerId2) + sendCh := make(chan p2ptypes.PeerID) + peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + peerID := args.Get(0).(p2ptypes.PeerID) + sendCh <- peerID + }).Return(nil) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + signer := mocks.NewSigner(t) + signer.On("Sign", mock.Anything).Return([]byte{}, nil) + registry := commonMocks.NewCapabilitiesRegistry(t) + + dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr) + require.NoError(t, dispatcher.Start(ctx)) + + // unknown capability + recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + responseDestPeerID := <-sendCh + require.Equal(t, peerId1, responseDestPeerID) + + require.NoError(t, dispatcher.Close()) +} diff --git a/core/capabilities/remote/target.go b/core/capabilities/remote/target.go new file mode 100644 index 00000000000..4f24aaf20ab --- /dev/null +++ b/core/capabilities/remote/target.go @@ -0,0 +1,84 @@ +package remote + +import ( + "context" + "errors" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages +type remoteTargetCaller struct { + capInfo commoncap.CapabilityInfo + donInfo *types.DON + dispatcher types.Dispatcher + lggr logger.Logger +} + +var _ commoncap.TargetCapability = &remoteTargetCaller{} +var _ types.Receiver = &remoteTargetCaller{} + +type remoteTargetReceiver struct { + capInfo commoncap.CapabilityInfo + donInfo *types.DON + dispatcher types.Dispatcher + lggr logger.Logger +} + +var _ types.Receiver = &remoteTargetReceiver{} + +func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller { + return &remoteTargetCaller{ + capInfo: capInfo, + donInfo: donInfo, + dispatcher: dispatcher, + lggr: lggr, + } +} + +func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return c.capInfo, nil +} + +func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return errors.New("not implemented") +} + +func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return errors.New("not implemented") +} + +func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error { + c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members)) + for _, peerID := range c.donInfo.Members { + m := &types.MessageBody{ + CapabilityId: c.capInfo.ID, + DonId: c.donInfo.ID, + Payload: []byte{0x01, 0x02, 0x03}, + } + err := c.dispatcher.Send(peerID, m) + if err != nil { + return err + } + } + return nil +} + +func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { + c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) +} + +func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver { + return &remoteTargetReceiver{ + capInfo: capInfo, + donInfo: donInfo, + dispatcher: dispatcher, + lggr: lggr, + } +} + +func (c *remoteTargetReceiver) Receive(msg *types.MessageBody) { + c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) +} diff --git a/core/capabilities/remote/target_test.go b/core/capabilities/remote/target_test.go new file mode 100644 index 00000000000..904cd5b9c71 --- /dev/null +++ b/core/capabilities/remote/target_test.go @@ -0,0 +1,28 @@ +package remote_test + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func TestTarget_Placeholder(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + donInfo := &types.DON{ + Members: []p2ptypes.PeerID{{}}, + } + dispatcher := remoteMocks.NewDispatcher(t) + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil) + target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr) + require.NoError(t, target.Execute(ctx, nil, commoncap.CapabilityRequest{})) +} diff --git a/core/capabilities/remote/types/message.pb.go b/core/capabilities/remote/types/message.pb.go new file mode 100644 index 00000000000..c15b79b9c21 --- /dev/null +++ b/core/capabilities/remote/types/message.pb.go @@ -0,0 +1,362 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.8 +// source: core/capabilities/remote/types/message.proto + +package types + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Error int32 + +const ( + Error_OK Error = 0 + Error_VALIDATION_FAILED Error = 1 + Error_CAPABILITY_NOT_FOUND Error = 2 +) + +// Enum value maps for Error. +var ( + Error_name = map[int32]string{ + 0: "OK", + 1: "VALIDATION_FAILED", + 2: "CAPABILITY_NOT_FOUND", + } + Error_value = map[string]int32{ + "OK": 0, + "VALIDATION_FAILED": 1, + "CAPABILITY_NOT_FOUND": 2, + } +) + +func (x Error) Enum() *Error { + p := new(Error) + *p = x + return p +} + +func (x Error) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Error) Descriptor() protoreflect.EnumDescriptor { + return file_core_capabilities_remote_types_message_proto_enumTypes[0].Descriptor() +} + +func (Error) Type() protoreflect.EnumType { + return &file_core_capabilities_remote_types_message_proto_enumTypes[0] +} + +func (x Error) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Error.Descriptor instead. +func (Error) EnumDescriptor() ([]byte, []int) { + return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{0} +} + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` // proto-encoded MessageBody to sign +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_core_capabilities_remote_types_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_core_capabilities_remote_types_message_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetSignature() []byte { + if x != nil { + return x.Signature + } + return nil +} + +func (x *Message) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +type MessageBody struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + Sender []byte `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` + Receiver []byte `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` + MessageId []byte `protobuf:"bytes,4,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` // scoped to (don_id, capability_id) + CapabilityId string `protobuf:"bytes,5,opt,name=capability_id,json=capabilityId,proto3" json:"capability_id,omitempty"` + DonId string `protobuf:"bytes,6,opt,name=don_id,json=donId,proto3" json:"don_id,omitempty"` // where the capability actually lives + Method string `protobuf:"bytes,7,opt,name=method,proto3" json:"method,omitempty"` + Timestamp int64 `protobuf:"varint,8,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Payload []byte `protobuf:"bytes,9,opt,name=payload,proto3" json:"payload,omitempty"` + Error Error `protobuf:"varint,10,opt,name=error,proto3,enum=remote.Error" json:"error,omitempty"` +} + +func (x *MessageBody) Reset() { + *x = MessageBody{} + if protoimpl.UnsafeEnabled { + mi := &file_core_capabilities_remote_types_message_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MessageBody) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MessageBody) ProtoMessage() {} + +func (x *MessageBody) ProtoReflect() protoreflect.Message { + mi := &file_core_capabilities_remote_types_message_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MessageBody.ProtoReflect.Descriptor instead. +func (*MessageBody) Descriptor() ([]byte, []int) { + return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{1} +} + +func (x *MessageBody) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + +func (x *MessageBody) GetSender() []byte { + if x != nil { + return x.Sender + } + return nil +} + +func (x *MessageBody) GetReceiver() []byte { + if x != nil { + return x.Receiver + } + return nil +} + +func (x *MessageBody) GetMessageId() []byte { + if x != nil { + return x.MessageId + } + return nil +} + +func (x *MessageBody) GetCapabilityId() string { + if x != nil { + return x.CapabilityId + } + return "" +} + +func (x *MessageBody) GetDonId() string { + if x != nil { + return x.DonId + } + return "" +} + +func (x *MessageBody) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *MessageBody) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *MessageBody) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *MessageBody) GetError() Error { + if x != nil { + return x.Error + } + return Error_OK +} + +var File_core_capabilities_remote_types_message_proto protoreflect.FileDescriptor + +var file_core_capabilities_remote_types_message_proto_rawDesc = []byte{ + 0x0a, 0x2c, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, + 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, + 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, + 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x22, 0x3b, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, + 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, + 0x6f, 0x64, 0x79, 0x22, 0xab, 0x02, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, + 0x6f, 0x64, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, + 0x65, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, + 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x5f, 0x69, + 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, + 0x69, 0x74, 0x79, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x64, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, + 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x09, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x23, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x72, 0x65, + 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x2a, 0x40, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, + 0x10, 0x00, 0x12, 0x15, 0x0a, 0x11, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x41, 0x50, + 0x41, 0x42, 0x49, 0x4c, 0x49, 0x54, 0x59, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, + 0x44, 0x10, 0x02, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, + 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_core_capabilities_remote_types_message_proto_rawDescOnce sync.Once + file_core_capabilities_remote_types_message_proto_rawDescData = file_core_capabilities_remote_types_message_proto_rawDesc +) + +func file_core_capabilities_remote_types_message_proto_rawDescGZIP() []byte { + file_core_capabilities_remote_types_message_proto_rawDescOnce.Do(func() { + file_core_capabilities_remote_types_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_core_capabilities_remote_types_message_proto_rawDescData) + }) + return file_core_capabilities_remote_types_message_proto_rawDescData +} + +var file_core_capabilities_remote_types_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_core_capabilities_remote_types_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_core_capabilities_remote_types_message_proto_goTypes = []interface{}{ + (Error)(0), // 0: remote.Error + (*Message)(nil), // 1: remote.Message + (*MessageBody)(nil), // 2: remote.MessageBody +} +var file_core_capabilities_remote_types_message_proto_depIdxs = []int32{ + 0, // 0: remote.MessageBody.error:type_name -> remote.Error + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_core_capabilities_remote_types_message_proto_init() } +func file_core_capabilities_remote_types_message_proto_init() { + if File_core_capabilities_remote_types_message_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_core_capabilities_remote_types_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_core_capabilities_remote_types_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MessageBody); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_core_capabilities_remote_types_message_proto_rawDesc, + NumEnums: 1, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_core_capabilities_remote_types_message_proto_goTypes, + DependencyIndexes: file_core_capabilities_remote_types_message_proto_depIdxs, + EnumInfos: file_core_capabilities_remote_types_message_proto_enumTypes, + MessageInfos: file_core_capabilities_remote_types_message_proto_msgTypes, + }.Build() + File_core_capabilities_remote_types_message_proto = out.File + file_core_capabilities_remote_types_message_proto_rawDesc = nil + file_core_capabilities_remote_types_message_proto_goTypes = nil + file_core_capabilities_remote_types_message_proto_depIdxs = nil +} diff --git a/core/capabilities/remote/types/message.proto b/core/capabilities/remote/types/message.proto new file mode 100644 index 00000000000..98617528121 --- /dev/null +++ b/core/capabilities/remote/types/message.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +option go_package = "core/capabilities/remote/types"; + +package remote; + +enum Error { + OK = 0; + VALIDATION_FAILED = 1; + CAPABILITY_NOT_FOUND = 2; +} + +message Message { + bytes signature = 1; + bytes body = 2; // proto-encoded MessageBody to sign +} + +message MessageBody { + uint32 version = 1; + bytes sender = 2; + bytes receiver = 3; + bytes message_id = 4; // scoped to (don_id, capability_id) + string capability_id = 5; + string don_id = 6; // where the capability actually lives + string method = 7; + int64 timestamp = 8; + bytes payload = 9; + Error error = 10; +} diff --git a/core/capabilities/remote/types/mocks/dispatcher.go b/core/capabilities/remote/types/mocks/dispatcher.go new file mode 100644 index 00000000000..8675e6153ac --- /dev/null +++ b/core/capabilities/remote/types/mocks/dispatcher.go @@ -0,0 +1,69 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" + mock "github.com/stretchr/testify/mock" +) + +// Dispatcher is an autogenerated mock type for the Dispatcher type +type Dispatcher struct { + mock.Mock +} + +// RemoveReceiver provides a mock function with given fields: capabilityId, donId +func (_m *Dispatcher) RemoveReceiver(capabilityId string, donId string) { + _m.Called(capabilityId, donId) +} + +// Send provides a mock function with given fields: peerID, msgBody +func (_m *Dispatcher) Send(peerID ragep2ptypes.PeerID, msgBody *types.MessageBody) error { + ret := _m.Called(peerID, msgBody) + + if len(ret) == 0 { + panic("no return value specified for Send") + } + + var r0 error + if rf, ok := ret.Get(0).(func(ragep2ptypes.PeerID, *types.MessageBody) error); ok { + r0 = rf(peerID, msgBody) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetReceiver provides a mock function with given fields: capabilityId, donId, receiver +func (_m *Dispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error { + ret := _m.Called(capabilityId, donId, receiver) + + if len(ret) == 0 { + panic("no return value specified for SetReceiver") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, types.Receiver) error); ok { + r0 = rf(capabilityId, donId, receiver) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewDispatcher creates a new instance of Dispatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDispatcher(t interface { + mock.TestingT + Cleanup(func()) +}) *Dispatcher { + mock := &Dispatcher{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/capabilities/remote/types/mocks/receiver.go b/core/capabilities/remote/types/mocks/receiver.go new file mode 100644 index 00000000000..a15c464450e --- /dev/null +++ b/core/capabilities/remote/types/mocks/receiver.go @@ -0,0 +1,32 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + mock "github.com/stretchr/testify/mock" +) + +// Receiver is an autogenerated mock type for the Receiver type +type Receiver struct { + mock.Mock +} + +// Receive provides a mock function with given fields: msg +func (_m *Receiver) Receive(msg *types.MessageBody) { + _m.Called(msg) +} + +// NewReceiver creates a new instance of Receiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReceiver(t interface { + mock.TestingT + Cleanup(func()) +}) *Receiver { + mock := &Receiver{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go new file mode 100644 index 00000000000..c7ffe123348 --- /dev/null +++ b/core/capabilities/remote/types/types.go @@ -0,0 +1,24 @@ +package types + +import ( + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +//go:generate mockery --quiet --name Dispatcher --output ./mocks/ --case=underscore +type Dispatcher interface { + SetReceiver(capabilityId string, donId string, receiver Receiver) error + RemoveReceiver(capabilityId string, donId string) + Send(peerID p2ptypes.PeerID, msgBody *MessageBody) error +} + +//go:generate mockery --quiet --name Receiver --output ./mocks/ --case=underscore +type Receiver interface { + Receive(msg *MessageBody) +} + +// NOTE: this type will become part of the Registry (KS-108) +type DON struct { + ID string + Members []p2ptypes.PeerID + F uint8 +} diff --git a/core/capabilities/remote/utils.go b/core/capabilities/remote/utils.go new file mode 100644 index 00000000000..d8e7187e60a --- /dev/null +++ b/core/capabilities/remote/utils.go @@ -0,0 +1,39 @@ +package remote + +import ( + "bytes" + "crypto/ed25519" + "fmt" + + "google.golang.org/protobuf/proto" + + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func ValidateMessage(msg p2ptypes.Message, expectedReceiver p2ptypes.PeerID) (*remotetypes.MessageBody, error) { + var topLevelMessage remotetypes.Message + err := proto.Unmarshal(msg.Payload, &topLevelMessage) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal message, err: %v", err) + } + var body remotetypes.MessageBody + err = proto.Unmarshal(topLevelMessage.Body, &body) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal message body, err: %v", err) + } + if len(body.Sender) != p2ptypes.PeerIDLength || len(body.Receiver) != p2ptypes.PeerIDLength { + return &body, fmt.Errorf("invalid sender length (%d) or receiver length (%d)", len(body.Sender), len(body.Receiver)) + } + if !ed25519.Verify(body.Sender, topLevelMessage.Body, topLevelMessage.Signature) { + return &body, fmt.Errorf("failed to verify message signature") + } + // NOTE we currently don't support relaying messages so the p2p message sender needs to be the message author + if !bytes.Equal(body.Sender, msg.Sender[:]) { + return &body, fmt.Errorf("sender in message body does not match sender of p2p message") + } + if !bytes.Equal(body.Receiver, expectedReceiver[:]) { + return &body, fmt.Errorf("receiver in message body does not match expected receiver") + } + return &body, nil +} diff --git a/core/capabilities/remote/utils_test.go b/core/capabilities/remote/utils_test.go new file mode 100644 index 00000000000..b19fc6a8b0f --- /dev/null +++ b/core/capabilities/remote/utils_test.go @@ -0,0 +1,80 @@ +package remote_test + +import ( + "crypto/ed25519" + "crypto/rand" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +const ( + capId1 = "cap1" + capId2 = "cap2" + donId1 = "donA" + payload1 = "hello world" + payload2 = "goodbye world" +) + +func TestValidateMessage(t *testing.T) { + privKey1, peerId1 := newKeyPair(t) + _, peerId2 := newKeyPair(t) + + // valid + p2pMsg := encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + body, err := remote.ValidateMessage(p2pMsg, peerId2) + require.NoError(t, err) + require.Equal(t, peerId1[:], body.Sender) + require.Equal(t, payload1, string(body.Payload)) + + // invalid sender + p2pMsg = encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + p2pMsg.Sender = peerId2 + _, err = remote.ValidateMessage(p2pMsg, peerId2) + require.Error(t, err) + + // invalid receiver + p2pMsg = encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + _, err = remote.ValidateMessage(p2pMsg, peerId1) + require.Error(t, err) +} + +func newKeyPair(t *testing.T) (ed25519.PrivateKey, ragetypes.PeerID) { + _, privKey, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + peerID, err := ragetypes.PeerIDFromPrivateKey(privKey) + require.NoError(t, err) + return privKey, peerID +} + +func encodeAndSign(t *testing.T, senderPrivKey ed25519.PrivateKey, senderId p2ptypes.PeerID, receiverId p2ptypes.PeerID, capabilityId string, donId string, payload []byte) p2ptypes.Message { + body := remotetypes.MessageBody{ + Sender: senderId[:], + Receiver: receiverId[:], + CapabilityId: capabilityId, + DonId: donId, + Payload: payload, + } + rawBody, err := proto.Marshal(&body) + require.NoError(t, err) + signature := ed25519.Sign(senderPrivKey, rawBody) + + msg := remotetypes.Message{ + Signature: signature, + Body: rawBody, + } + rawMsg, err := proto.Marshal(&msg) + require.NoError(t, err) + + return p2ptypes.Message{ + Sender: senderId, + Payload: rawMsg, + } +} diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index a8cfb2c56f8..67a9306d1d8 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -3,12 +3,15 @@ package capabilities import ( "context" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/libocr/ragep2p" ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -16,22 +19,24 @@ import ( type registrySyncer struct { peerWrapper p2ptypes.PeerWrapper registry types.CapabilitiesRegistry + dispatcher remotetypes.Dispatcher lggr logger.Logger } var _ services.Service = ®istrySyncer{} // RegistrySyncer updates local Registry to match its onchain counterpart -func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry types.CapabilitiesRegistry, lggr logger.Logger) *registrySyncer { +func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry types.CapabilitiesRegistry, dispatcher remotetypes.Dispatcher, lggr logger.Logger) *registrySyncer { return ®istrySyncer{ peerWrapper: peerWrapper, registry: registry, + dispatcher: dispatcher, lggr: lggr, } } func (s *registrySyncer) Start(ctx context.Context) error { - // NOTE: temporary hard-coded values + // NOTE: temporary hard-coded DONs defaultStreamConfig := p2ptypes.StreamConfig{ IncomingMessageBufferSize: 1000000, OutgoingMessageBufferSize: 1000000, @@ -52,6 +57,9 @@ func (s *registrySyncer) Start(ctx context.Context) error { "12D3KooWGqfSPhHKmQycfhRjgUDE2vg9YWZN27Eue8idb2ZUk6EH", } peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + donInfo := &remotetypes.DON{ + ID: "don1", + } for _, peerID := range peerIDs { var p ragetypes.PeerID err := p.UnmarshalText([]byte(peerID)) @@ -59,8 +67,32 @@ func (s *registrySyncer) Start(ctx context.Context) error { return err } peers[p] = defaultStreamConfig + donInfo.Members = append(donInfo.Members, p) + } + err := s.peerWrapper.GetPeer().UpdateConnections(peers) + if err != nil { + return err + } + // NOTE: temporary hard-coded capabilities + capId := "sample_remote_target" + targetCap := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{ + ID: capId, + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + }, donInfo, s.dispatcher, s.lggr) + err = s.registry.Add(ctx, targetCap) + if err != nil { + s.lggr.Error("failed to add remote target capability to registry") + return err } - return s.peerWrapper.GetPeer().UpdateConnections(peers) + err = s.dispatcher.SetReceiver(capId, donInfo.ID, targetCap) + if err != nil { + s.lggr.Errorw("failed to set receiver", "capabilityId", capId, "donId", donInfo.ID, "error", err) + return err + } + s.lggr.Info("registry syncer started") + return nil } func (s *registrySyncer) Close() error { diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go index acfe0f00233..dbc621d6a8e 100644 --- a/core/capabilities/syncer_test.go +++ b/core/capabilities/syncer_test.go @@ -8,6 +8,7 @@ import ( commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" + remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" @@ -21,8 +22,11 @@ func TestSyncer_CleanStartClose(t *testing.T) { wrapper := mocks.NewPeerWrapper(t) wrapper.On("GetPeer").Return(peer) registry := commonMocks.NewCapabilitiesRegistry(t) + registry.On("Add", mock.Anything, mock.Anything).Return(nil) + dispatcher := remoteMocks.NewDispatcher(t) + dispatcher.On("SetReceiver", mock.Anything, mock.Anything, mock.Anything).Return(nil) - syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, lggr) + syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, dispatcher, lggr) require.NoError(t, syncer.Start(ctx)) require.NoError(t, syncer.Close()) } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index de06c891407..2a8cfb3f00f 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/build" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" @@ -194,11 +195,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) { if cfg.Capabilities().Peering().Enabled() { externalPeerWrapper := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) + signer := externalPeerWrapper srvcs = append(srvcs, externalPeerWrapper) // NOTE: RegistrySyncer will depend on a Relayer when fully implemented - registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, registry, globalLogger) - srvcs = append(srvcs, registrySyncer) + dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, registry, globalLogger) + registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, registry, dispatcher, globalLogger) + srvcs = append(srvcs, dispatcher, registrySyncer) } // LOOPs can be created as options, in the case of LOOP relayers, or diff --git a/core/services/p2p/peer.go b/core/services/p2p/peer.go index 2ed84f6a3f1..e4a6e52f930 100644 --- a/core/services/p2p/peer.go +++ b/core/services/p2p/peer.go @@ -102,6 +102,10 @@ func NewPeer(cfg PeerConfig, lggr logger.Logger) (*peer, error) { }, nil } +func (p *peer) ID() ragetypes.PeerID { + return p.myID +} + func (p *peer) UpdateConnections(peers map[ragetypes.PeerID]p2ptypes.StreamConfig) error { p.lggr.Infow("updating peer addresses", "peers", peers) if !p.isBootstrap { diff --git a/core/services/p2p/types/mocks/peer.go b/core/services/p2p/types/mocks/peer.go index ac4e4eee73d..23824b99a44 100644 --- a/core/services/p2p/types/mocks/peer.go +++ b/core/services/p2p/types/mocks/peer.go @@ -54,6 +54,26 @@ func (_m *Peer) HealthReport() map[string]error { return r0 } +// ID provides a mock function with given fields: +func (_m *Peer) ID() ragep2ptypes.PeerID { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ID") + } + + var r0 ragep2ptypes.PeerID + if rf, ok := ret.Get(0).(func() ragep2ptypes.PeerID); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ragep2ptypes.PeerID) + } + } + + return r0 +} + // Name provides a mock function with given fields: func (_m *Peer) Name() string { ret := _m.Called() diff --git a/core/services/p2p/types/mocks/signer.go b/core/services/p2p/types/mocks/signer.go new file mode 100644 index 00000000000..274116be57c --- /dev/null +++ b/core/services/p2p/types/mocks/signer.go @@ -0,0 +1,54 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Signer is an autogenerated mock type for the Signer type +type Signer struct { + mock.Mock +} + +// Sign provides a mock function with given fields: data +func (_m *Signer) Sign(data []byte) ([]byte, error) { + ret := _m.Called(data) + + if len(ret) == 0 { + panic("no return value specified for Sign") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok { + return rf(data) + } + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(data) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(data) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewSigner creates a new instance of Signer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSigner(t interface { + mock.TestingT + Cleanup(func()) +}) *Signer { + mock := &Signer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/p2p/types/types.go b/core/services/p2p/types/types.go index 0f395d75409..837e075860a 100644 --- a/core/services/p2p/types/types.go +++ b/core/services/p2p/types/types.go @@ -7,11 +7,16 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" ) +const PeerIDLength = 32 + +type PeerID = ragetypes.PeerID + //go:generate mockery --quiet --name Peer --output ./mocks/ --case=underscore type Peer interface { services.Service - UpdateConnections(peers map[ragetypes.PeerID]StreamConfig) error - Send(peerID ragetypes.PeerID, msg []byte) error + ID() PeerID + UpdateConnections(peers map[PeerID]StreamConfig) error + Send(peerID PeerID, msg []byte) error Receive() <-chan Message } @@ -21,8 +26,13 @@ type PeerWrapper interface { GetPeer() Peer } +//go:generate mockery --quiet --name Signer --output ./mocks/ --case=underscore +type Signer interface { + Sign(data []byte) ([]byte, error) +} + type Message struct { - Sender ragetypes.PeerID + Sender PeerID Payload []byte } diff --git a/core/services/p2p/wrapper/wrapper.go b/core/services/p2p/wrapper/wrapper.go index 138d1ef21fc..fd47c6c2dd2 100644 --- a/core/services/p2p/wrapper/wrapper.go +++ b/core/services/p2p/wrapper/wrapper.go @@ -2,6 +2,7 @@ package wrapper import ( "context" + "crypto/ed25519" "fmt" "github.com/prometheus/client_golang/prometheus" @@ -20,10 +21,12 @@ type peerWrapper struct { peer types.Peer keystoreP2P keystore.P2P p2pConfig config.P2P + privateKey ed25519.PrivateKey lggr logger.Logger } var _ types.PeerWrapper = &peerWrapper{} +var _ types.Signer = &peerWrapper{} func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, lggr logger.Logger) *peerWrapper { return &peerWrapper{ @@ -76,7 +79,7 @@ func convertBootstrapperLocators(bootstrappers []commontypes.BootstrapperLocator for i, a := range b.Addrs { addrs[i] = ragetypes.Address(a) } - var rageID ragetypes.PeerID + var rageID types.PeerID err := rageID.UnmarshalText([]byte(b.PeerID)) if err != nil { return nil, fmt.Errorf("failed to unmarshal v2 peer ID (%q) from BootstrapperLocator: %w", b.PeerID, err) @@ -94,6 +97,7 @@ func (e *peerWrapper) Start(ctx context.Context) error { if err != nil { return err } + e.privateKey = cfg.PrivateKey e.lggr.Info("Starting external P2P peer") peer, err := p2p.NewPeer(cfg, e.lggr) if err != nil { @@ -118,3 +122,10 @@ func (e *peerWrapper) HealthReport() map[string]error { func (e *peerWrapper) Name() string { return "PeerWrapper" } + +func (e *peerWrapper) Sign(msg []byte) ([]byte, error) { + if e.privateKey == nil { + return nil, fmt.Errorf("private key not set") + } + return ed25519.Sign(e.privateKey, msg), nil +}