diff --git a/common/client/poller_test.go b/common/client/poller_test.go index 82a05b5dfc7..4221b232108 100644 --- a/common/client/poller_test.go +++ b/common/client/poller_test.go @@ -52,7 +52,7 @@ func Test_Poller(t *testing.T) { require.NoError(t, poller.Start()) defer poller.Unsubscribe() - // Receive updates from the poller + // OnMessage updates from the poller pollCount := 0 pollMax := 50 for ; pollCount < pollMax; pollCount++ { diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index b6ba31aa8f2..b3a2a0b3412 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -35,7 +35,7 @@ func TestDispatcher_CleanStartClose(t *testing.T) { ctx := testutils.Context(t) peer := mocks.NewPeer(t) recvCh := make(<-chan p2ptypes.Message) - peer.On("Receive", mock.Anything).Return(recvCh) + peer.On("OnMessage", mock.Anything).Return(recvCh) peer.On("ID", mock.Anything).Return(p2ptypes.PeerID{}) wrapper := mocks.NewPeerWrapper(t) wrapper.On("GetPeer").Return(peer) @@ -55,7 +55,7 @@ func TestDispatcher_Receive(t *testing.T) { peer := mocks.NewPeer(t) recvCh := make(chan p2ptypes.Message) - peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) + peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) peer.On("ID", mock.Anything).Return(peerId2) wrapper := mocks.NewPeerWrapper(t) wrapper.On("GetPeer").Return(peer) @@ -98,7 +98,7 @@ func TestDispatcher_RespondWithError(t *testing.T) { peer := mocks.NewPeer(t) recvCh := make(chan p2ptypes.Message) - peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) + peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) peer.On("ID", mock.Anything).Return(peerId2) sendCh := make(chan p2ptypes.PeerID) peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { diff --git a/core/capabilities/remote/target.go b/core/capabilities/remote/target.go deleted file mode 100644 index 655f4f84abb..00000000000 --- a/core/capabilities/remote/target.go +++ /dev/null @@ -1,87 +0,0 @@ -package remote - -import ( - "context" - "errors" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages -type remoteTargetCaller struct { - capInfo commoncap.CapabilityInfo - donInfo *capabilities.DON - dispatcher types.Dispatcher - lggr logger.Logger -} - -var _ commoncap.TargetCapability = &remoteTargetCaller{} -var _ types.Receiver = &remoteTargetCaller{} - -type remoteTargetReceiver struct { - capInfo commoncap.CapabilityInfo - donInfo *capabilities.DON - dispatcher types.Dispatcher - lggr logger.Logger -} - -var _ types.Receiver = &remoteTargetReceiver{} - -func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller { - return &remoteTargetCaller{ - capInfo: capInfo, - donInfo: donInfo, - dispatcher: dispatcher, - lggr: lggr, - } -} - -func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { - return c.capInfo, nil -} - -func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { - return errors.New("not implemented") -} - -func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { - return errors.New("not implemented") -} - -func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { - c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members)) - for _, peerID := range c.donInfo.Members { - m := &types.MessageBody{ - CapabilityId: c.capInfo.ID, - CapabilityDonId: c.donInfo.ID, - Payload: []byte{0x01, 0x02, 0x03}, - } - err := c.dispatcher.Send(peerID, m) - if err != nil { - return nil, err - } - } - - // TODO: return a channel that will be closed when all responses are received - return nil, nil -} - -func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { - c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) -} - -func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver { - return &remoteTargetReceiver{ - capInfo: capInfo, - donInfo: donInfo, - dispatcher: dispatcher, - lggr: lggr, - } -} - -func (c *remoteTargetReceiver) Receive(msg *types.MessageBody) { - c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) -} diff --git a/core/capabilities/remote/target/caller.go b/core/capabilities/remote/target/caller.go new file mode 100644 index 00000000000..816505fbfd2 --- /dev/null +++ b/core/capabilities/remote/target/caller.go @@ -0,0 +1,140 @@ +package target + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type callerRequest interface { + OnMessage(ctx context.Context, msg *types.MessageBody) error + ResponseChan() <-chan commoncap.CapabilityResponse + Expired() bool + Cancel(reason string) +} + +// caller/Receiver are shims translating between capability API calls and network messages +type caller struct { + lggr logger.Logger + remoteCapabilityInfo commoncap.CapabilityInfo + localDONInfo capabilities.DON + dispatcher types.Dispatcher + requestTimeout time.Duration + + messageIDToExecuteRequest map[string]callerRequest + mutex sync.Mutex +} + +var _ commoncap.TargetCapability = &caller{} +var _ types.Receiver = &caller{} + +func NewCaller(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, + requestTimeout time.Duration) *caller { + + c := &caller{ + lggr: lggr, + remoteCapabilityInfo: remoteCapabilityInfo, + localDONInfo: localDonInfo, + dispatcher: dispatcher, + requestTimeout: requestTimeout, + messageIDToExecuteRequest: make(map[string]callerRequest), + } + + go func() { + ticker := time.NewTicker(requestTimeout) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.ExpireRequests() + } + } + }() + + return c +} + +func (c *caller) ExpireRequests() { + c.mutex.Lock() + defer c.mutex.Unlock() + + for messageID, req := range c.messageIDToExecuteRequest { + if req.Expired() { + req.Cancel("request expired") + delete(c.messageIDToExecuteRequest, messageID) + } + } +} + +func (c *caller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return c.remoteCapabilityInfo, nil +} + +func (c *caller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return errors.New("not implemented") +} + +func (c *caller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return errors.New("not implemented") +} + +func (c *caller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + messageID, err := GetMessageIDForRequest(req) + if err != nil { + return nil, fmt.Errorf("failed to get message ID for request: %w", err) + } + + if _, ok := c.messageIDToExecuteRequest[messageID]; ok { + return nil, fmt.Errorf("request for message ID %s already exists", messageID) + } + + execRequest, err := request.NewCallerRequest(ctx, c.lggr, req, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, + c.requestTimeout) + + c.messageIDToExecuteRequest[messageID] = execRequest + + return execRequest.ResponseChan(), nil +} + +func (c *caller) Receive(msg *types.MessageBody) { + c.mutex.Lock() + defer c.mutex.Unlock() + // TODO should the dispatcher be passing in a context? + ctx := context.Background() + + messageID := GetMessageID(msg) + + req := c.messageIDToExecuteRequest[messageID] + if req == nil { + c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID) + return + } + + go func() { + if err := req.OnMessage(ctx, msg); err != nil { + c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err) + } + }() + +} + +func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) { + if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" { + return "", errors.New("workflow ID and workflow execution ID must be set in request metadata") + } + + return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil +} diff --git a/core/capabilities/remote/target/caller_test.go b/core/capabilities/remote/target/caller_test.go new file mode 100644 index 00000000000..f5a9efd6c02 --- /dev/null +++ b/core/capabilities/remote/target/caller_test.go @@ -0,0 +1,317 @@ +package target_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_Caller_DonTopologies(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + capability := &TestCapability{} + + responseTimeOut := 10 * time.Minute + + testCaller(t, ctx, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + + testCaller(t, ctx, 10, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + + testCaller(t, ctx, 1, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + testCaller(t, ctx, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + testCaller(t, ctx, 10, responseTimeOut, 10, 9, + capability, transmissionSchedule, responseTest) + +} + +func Test_Caller_TransmissionSchedules(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + capability := &TestCapability{} + + responseTimeOut := 10 * time.Minute + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testCaller(t, ctx, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + testCaller(t, ctx, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + transmissionSchedule, err = values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testCaller(t, ctx, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + testCaller(t, ctx, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + +} + +func Test_Caller_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + // number of capability peers is less than F + 1 + + testCaller(t, ctx, 10, 1*time.Second, 10, 11, + capability, transmissionSchedule, responseTest) + +} + +func testCaller(t *testing.T, ctx context.Context, numWorkflowPeers int, workflowNodeResponseTimeout time.Duration, + numCapabilityPeers int, capabilityDonF uint8, underlying commoncap.TargetCapability, transmissionSchedule *values.Map, + responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) { + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeers[i] = NewP2PPeerID(t) + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + } + + broker := newTestMessageBroker() + + receivers := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeers[i]) + receiver := newTestReceiver(capabilityPeers[i], capabilityDispatcher, workflowDonInfo, underlying) + broker.RegisterReceiverNode(capabilityPeers[i], receiver) + receivers[i] = receiver + } + + callers := make([]commoncap.TargetCapability, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + caller := target.NewCaller(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout) + broker.RegisterReceiverNode(workflowPeers[i], caller) + callers[i] = caller + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(len(callers)) + + // Fire off all the requests + for _, caller := range callers { + go func(caller commoncap.TargetCapability) { + responseCh, err := caller.Execute(ctx, + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Config: transmissionSchedule, + Inputs: executeInputs, + }) + + responseTest(t, responseCh, err) + wg.Done() + }(caller) + } + + wg.Wait() +} + +// Simple receiver that only responds once it has received a message from each workflow peer +type callerTestReceiver struct { + peerID p2ptypes.PeerID + dispatcher remotetypes.Dispatcher + workflowDonInfo commoncap.DON + messageIDToSenders map[string]map[p2ptypes.PeerID]bool + + targetCapability commoncap.TargetCapability + + mux sync.Mutex +} + +func newTestReceiver(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, workflowDonInfo commoncap.DON, + targetCapability commoncap.TargetCapability) *callerTestReceiver { + + return &callerTestReceiver{ + dispatcher: dispatcher, + workflowDonInfo: workflowDonInfo, + peerID: peerID, + messageIDToSenders: make(map[string]map[p2ptypes.PeerID]bool), + targetCapability: targetCapability, + } +} + +func (t *callerTestReceiver) Receive(msg *remotetypes.MessageBody) { + t.mux.Lock() + defer t.mux.Unlock() + + sender := toPeerID(msg.Sender) + messageID := target.GetMessageID(msg) + + if t.messageIDToSenders[messageID] == nil { + t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool) + } + + sendersOfMessageID := t.messageIDToSenders[messageID] + if sendersOfMessageID[sender] { + panic("received duplicate message") + } + + sendersOfMessageID[sender] = true + + if len(t.messageIDToSenders[messageID]) == len(t.workflowDonInfo.Members) { + + capabilityRequest, err := pb.UnmarshalCapabilityRequest(msg.Payload) + if err != nil { + panic(err) + } + + respCh, responseErr := t.targetCapability.Execute(context.Background(), capabilityRequest) + resp := <-respCh + + for receiver := range t.messageIDToSenders[messageID] { + var responseMsg = &remotetypes.MessageBody{ + CapabilityId: "cap_id", + CapabilityDonId: "capability-don", + CallerDonId: t.workflowDonInfo.ID, + Method: remotetypes.MethodExecute, + MessageId: []byte(messageID), + Sender: t.peerID[:], + Receiver: receiver[:], + } + + if responseErr != nil { + responseMsg.Error = remotetypes.Error_INTERNAL_ERROR + } else { + payload, err := pb.MarshalCapabilityResponse(resp) + if err != nil { + panic(err) + } + responseMsg.Payload = payload + } + + err = t.dispatcher.Send(receiver, responseMsg) + if err != nil { + panic(err) + } + } + } +} + +type TestDispatcher struct { + sentMessagesCh chan *remotetypes.MessageBody + receiver remotetypes.Receiver +} + +func NewTestDispatcher() *TestDispatcher { + return &TestDispatcher{ + sentMessagesCh: make(chan *remotetypes.MessageBody, 1), + } +} + +func (t *TestDispatcher) SendToReceiver(msgBody *remotetypes.MessageBody) { + t.receiver.Receive(msgBody) +} + +func (t *TestDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { + t.receiver = receiver + return nil +} + +func (t *TestDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *TestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { + t.sentMessagesCh <- msgBody + return nil +} diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go new file mode 100644 index 00000000000..a95de5e2d24 --- /dev/null +++ b/core/capabilities/remote/target/endtoend_test.go @@ -0,0 +1,405 @@ +package target_test + +import ( + "context" + "crypto/rand" + "errors" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(t, ctx, capability, 10, 9, 10*time.Millisecond, 10, 10, 10*time.Minute, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(t, ctx, capability, 10, 10, 10*time.Millisecond, 10, 9, timeOut, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + testRemoteTarget(t, ctx, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) + + transmissionSchedule, err = values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(t, ctx, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) + +} + +func Test_RemoteTargetCapability_DonTopologies(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + // Test scenarios where the number of submissions is greater than or equal to F + 1 + testRemoteTarget(t, ctx, capability, 1, 0, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 4, 3, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 10, 3, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + + testRemoteTarget(t, ctx, capability, 1, 0, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 1, 0, timeOut, 4, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 1, 0, timeOut, 10, 3, timeOut, transmissionSchedule, responseTest) + + testRemoteTarget(t, ctx, capability, 4, 3, timeOut, 4, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 10, 3, timeOut, 10, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(t, ctx, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_CapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.Equal(t, "failed to execute capability: an error", response.Err.Error()) + } + + capability := &TestErrorCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(t, ctx, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.Equal(t, "request expired", response.Err.Error()) + } + + capability := &TestRandomErrorCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(t, ctx, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute, transmissionSchedule, responseTest) +} + +func testRemoteTarget(t *testing.T, ctx context.Context, underlying commoncap.TargetCapability, numWorkflowPeers int, workflowDonF uint8, workflowNodeTimeout time.Duration, + numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration, transmissionSchedule *values.Map, + responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) { + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeerID := p2ptypes.PeerID{} + require.NoError(t, capabilityPeerID.UnmarshalText([]byte(NewPeerID()))) + capabilityPeers[i] = capabilityPeerID + } + + capabilityPeerID := p2ptypes.PeerID{} + require.NoError(t, capabilityPeerID.UnmarshalText([]byte(NewPeerID()))) + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerID := p2ptypes.PeerID{} + require.NoError(t, workflowPeerID.UnmarshalText([]byte(NewPeerID()))) + workflowPeers[i] = workflowPeerID + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: workflowDonF, + } + + broker := newTestMessageBroker() + + workflowDONs := map[string]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + + capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeer := capabilityPeers[i] + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) + capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout) + broker.RegisterReceiverNode(capabilityPeer, capabilityNode) + capabilityNodes[i] = capabilityNode + } + + workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + workflowNode := target.NewCaller(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout) + broker.RegisterReceiverNode(workflowPeers[i], workflowNode) + workflowNodes[i] = workflowNode + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(len(workflowNodes)) + + for _, caller := range workflowNodes { + go func(caller commoncap.TargetCapability) { + responseCh, err := caller.Execute(ctx, + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Config: transmissionSchedule, + Inputs: executeInputs, + }) + + responseTest(t, responseCh, err) + wg.Done() + }(caller) + } + + wg.Wait() +} + +type testMessageBroker struct { + nodes map[p2ptypes.PeerID]remotetypes.Receiver +} + +func newTestMessageBroker() *testMessageBroker { + return &testMessageBroker{ + nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver), + } +} + +func (r *testMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher { + return &nodeDispatcher{ + callerPeerID: nodePeerID, + broker: r, + } +} + +func (r *testMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) { + if _, ok := r.nodes[nodePeerID]; ok { + panic("node already registered") + } + + r.nodes[nodePeerID] = node +} + +func (r *testMessageBroker) Send(msg *remotetypes.MessageBody) { + receiverId := toPeerID(msg.Receiver) + + if receiver, ok := r.nodes[receiverId]; ok { + receiver.Receive(msg) + } else { + panic("receiver not found for peer id") + } + +} + +func toPeerID(id []byte) p2ptypes.PeerID { + return [32]byte(id) +} + +type nodeDispatcher struct { + callerPeerID p2ptypes.PeerID + broker *testMessageBroker +} + +func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { + msgBody.Version = 1 + msgBody.Sender = t.callerPeerID[:] + msgBody.Receiver = peerID[:] + msgBody.Timestamp = time.Now().UnixMilli() + t.broker.Send(msgBody) + return nil +} + +func (t *nodeDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { + return nil +} +func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +type abstractTestCapability struct { +} + +func (t abstractTestCapability) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return commoncap.CapabilityInfo{}, nil +} + +func (t abstractTestCapability) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return nil +} + +func (t abstractTestCapability) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return nil +} + +type TestCapability struct { + abstractTestCapability +} + +func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + ch := make(chan commoncap.CapabilityResponse, 1) + + value := request.Inputs.Underlying["executeValue1"] + + ch <- commoncap.CapabilityResponse{ + Value: value, + } + + return ch, nil +} + +type TestErrorCapability struct { + abstractTestCapability +} + +func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New("an error") +} + +type TestRandomErrorCapability struct { + abstractTestCapability +} + +func (t TestRandomErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New(uuid.New().String()) +} + +func NewP2PPeerID(t *testing.T) p2ptypes.PeerID { + id := p2ptypes.PeerID{} + require.NoError(t, id.UnmarshalText([]byte(NewPeerID()))) + return id +} + +func NewPeerID() string { + var privKey [32]byte + _, err := rand.Read(privKey[:]) + if err != nil { + panic(err) + } + + peerID := append(libp2pMagic(), privKey[:]...) + + return base58.Encode(peerID[:]) +} + +func libp2pMagic() []byte { + return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20} +} diff --git a/core/capabilities/remote/target/receiver.go b/core/capabilities/remote/target/receiver.go new file mode 100644 index 00000000000..63e6825e961 --- /dev/null +++ b/core/capabilities/remote/target/receiver.go @@ -0,0 +1,130 @@ +package target + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "sync" + "time" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type receiverRequest interface { + OnMessage(ctx context.Context, msg *types.MessageBody) error + Expired() bool + Cancel(err types.Error, msg string) error +} + +type receiver struct { + lggr logger.Logger + peerID p2ptypes.PeerID + underlying commoncap.TargetCapability + capInfo commoncap.CapabilityInfo + localDonInfo capabilities.DON + workflowDONs map[string]commoncap.DON + dispatcher types.Dispatcher + + requestIDToRequest map[string]receiverRequest + requestTimeout time.Duration + + receiveLock sync.Mutex +} + +var _ types.Receiver = &receiver{} + +func NewReceiver(ctx context.Context, lggr logger.Logger, peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, + workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration) *receiver { + + r := &receiver{ + underlying: underlying, + peerID: peerID, + capInfo: capInfo, + localDonInfo: localDonInfo, + workflowDONs: workflowDONs, + dispatcher: dispatcher, + + requestIDToRequest: map[string]receiverRequest{}, + requestTimeout: requestTimeout, + + lggr: lggr, + } + + go func() { + ticker := time.NewTicker(requestTimeout) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.ExpireRequests() + } + } + }() + + return r +} + +func (r *receiver) ExpireRequests() { + r.receiveLock.Lock() + defer r.receiveLock.Unlock() + + for requestID, executeReq := range r.requestIDToRequest { + if executeReq.Expired() { + err := executeReq.Cancel(types.Error_TIMEOUT, "request expired") + if err != nil { + r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err) + } + delete(r.requestIDToRequest, requestID) + } + } + +} + +func (r *receiver) Receive(msg *types.MessageBody) { + r.receiveLock.Lock() + defer r.receiveLock.Unlock() + // TODO should the dispatcher be passing in a context? + ctx := context.Background() + + if msg.Method != types.MethodExecute { + r.lggr.Errorw("received request for unsupported method type", "method", msg.Method) + return + } + + // A request is uniquely identified by the message id and the hash of the payload to prevent a malicious + // actor from sending a different payload with the same message id + messageId := GetMessageID(msg) + hash := sha256.Sum256(msg.Payload) + requestID := messageId + hex.EncodeToString(hash[:]) + + if _, ok := r.requestIDToRequest[requestID]; !ok { + if callingDon, ok := r.workflowDONs[msg.CallerDonId]; ok { + r.requestIDToRequest[requestID] = request.NewReceiverRequest(r.underlying, r.capInfo.ID, r.localDonInfo.ID, r.peerID, + callingDon, messageId, r.dispatcher, r.requestTimeout) + } else { + r.lggr.Errorw("received request from unregistered don", "donId", msg.CallerDonId) + return + } + } + + req := r.requestIDToRequest[requestID] + + go func() { + err := req.OnMessage(ctx, msg) + if err != nil { + r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err) + } + }() +} + +func GetMessageID(msg *types.MessageBody) string { + return string(msg.MessageId) +} diff --git a/core/capabilities/remote/target/receiver_test.go b/core/capabilities/remote/target/receiver_test.go new file mode 100644 index 00000000000..d23cc46b93f --- /dev/null +++ b/core/capabilities/remote/target/receiver_test.go @@ -0,0 +1,222 @@ +package target_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_Receiver_RespondsAfterSufficientRequests(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetReceiver(t, ctx, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + + for _, caller := range callers { + caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_OK, msg.Error) + } + } +} + +func Test_Receiver_InsufficientCallers(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetReceiver(t, ctx, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) + + for _, caller := range callers { + caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_TIMEOUT, msg.Error) + } + } +} + +func Test_Receiver_CapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetReceiver(t, ctx, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) + + for _, caller := range callers { + caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_INTERNAL_ERROR, msg.Error) + } + } +} + +func testRemoteTargetReceiver(t *testing.T, ctx context.Context, + underlying commoncap.TargetCapability, + numWorkflowPeers int, workflowDonF uint8, + numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration) []*receiverTestCaller { + + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeerID := NewP2PPeerID(t) + capabilityPeers[i] = capabilityPeerID + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: workflowDonF, + } + + broker := newTestMessageBroker() + + workflowDONs := map[string]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + + capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeer := capabilityPeers[i] + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) + capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout) + broker.RegisterReceiverNode(capabilityPeer, capabilityNode) + capabilityNodes[i] = capabilityNode + } + + workflowNodes := make([]*receiverTestCaller, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + workflowNode := newReceiverTestCaller(workflowPeers[i], capDonInfo, workflowPeerDispatcher) + broker.RegisterReceiverNode(workflowPeers[i], workflowNode) + workflowNodes[i] = workflowNode + } + + return workflowNodes +} + +type receiverTestCaller struct { + peerID p2ptypes.PeerID + dispatcher remotetypes.Dispatcher + capabilityDonInfo commoncap.DON + receivedMessages chan *remotetypes.MessageBody + callerDonID string +} + +func (r *receiverTestCaller) Receive(msg *remotetypes.MessageBody) { + r.receivedMessages <- msg +} + +func newReceiverTestCaller(peerID p2ptypes.PeerID, capabilityDonInfo commoncap.DON, + dispatcher remotetypes.Dispatcher) *receiverTestCaller { + return &receiverTestCaller{peerID: peerID, dispatcher: dispatcher, capabilityDonInfo: capabilityDonInfo, + receivedMessages: make(chan *remotetypes.MessageBody, 100), callerDonID: "workflow-don"} +} + +func (r *receiverTestCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + panic("not implemented") +} + +func (r *receiverTestCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + panic("not implemented") +} + +func (r *receiverTestCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + panic("not implemented") +} + +func (r *receiverTestCaller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + + rawRequest, err := pb.MarshalCapabilityRequest(req) + if err != nil { + return nil, err + } + + messageID, err := target.GetMessageIDForRequest(req) + if err != nil { + return nil, err + } + + for _, node := range r.capabilityDonInfo.Members { + message := &remotetypes.MessageBody{ + CapabilityId: "capability-id", + CapabilityDonId: "capability-don", + CallerDonId: "workflow-don", + Method: remotetypes.MethodExecute, + Payload: rawRequest, + MessageId: []byte(messageID), + Sender: r.peerID[:], + Receiver: node[:], + } + + if err = r.dispatcher.Send(node, message); err != nil { + return nil, err + } + } + + return nil, nil +} diff --git a/core/capabilities/remote/target/request/caller_request.go b/core/capabilities/remote/target/request/caller_request.go new file mode 100644 index 00000000000..c12f7419154 --- /dev/null +++ b/core/capabilities/remote/target/request/caller_request.go @@ -0,0 +1,160 @@ +package request + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" +) + +type callerRequest struct { + responseCh chan commoncap.CapabilityResponse + createdAt time.Time + responseIDCount map[[32]byte]int + errorCount map[string]int + responseReceived map[p2ptypes.PeerID]bool + + requiredIdenticalResponses int + + requestTimeout time.Duration + + respSent bool + mux sync.Mutex +} + +func NewCallerRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string, + remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, + requestTimeout time.Duration) (*callerRequest, error) { + + remoteCapabilityDonInfo := remoteCapabilityInfo.DON + if remoteCapabilityDonInfo == nil { + return nil, errors.New("remote capability info missing DON") + } + + rawRequest, err := pb.MarshalCapabilityRequest(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability request: %w", err) + } + + tc, err := transmission.ExtractTransmissionConfig(req.Config) + if err != nil { + return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) + } + + peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(remoteCapabilityDonInfo.Members, localDonInfo.Config.SharedSecret, + messageID, tc) + if err != nil { + return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err) + } + + responseReceived := make(map[p2ptypes.PeerID]bool) + for peerID, delay := range peerIDToTransmissionDelay { + responseReceived[peerID] = false + go func(peerID ragep2ptypes.PeerID, delay time.Duration) { + message := &types.MessageBody{ + CapabilityId: remoteCapabilityInfo.ID, + CapabilityDonId: remoteCapabilityDonInfo.ID, + CallerDonId: localDonInfo.ID, + Method: types.MethodExecute, + Payload: rawRequest, + MessageId: []byte(messageID), + } + + select { + case <-ctx.Done(): + return + case <-time.After(delay): + err := dispatcher.Send(peerID, message) + if err != nil { + lggr.Errorw("failed to send message", "peerID", peerID, "err", err) + } + } + }(peerID, delay) + } + + return &callerRequest{ + createdAt: time.Now(), + requestTimeout: requestTimeout, + requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1), + responseIDCount: make(map[[32]byte]int), + errorCount: make(map[string]int), + responseReceived: responseReceived, + responseCh: make(chan commoncap.CapabilityResponse, 1), + }, nil +} + +func (c *callerRequest) ResponseChan() <-chan commoncap.CapabilityResponse { + return c.responseCh +} + +func (c *callerRequest) Expired() bool { + return time.Since(c.createdAt) > c.requestTimeout +} + +func (c *callerRequest) Cancel(reason string) { + c.mux.Lock() + defer c.mux.Unlock() + if !c.respSent { + c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(reason)}) + } +} + +// TODO addResponse assumes that only one response is received from each peer, if streaming responses need to be supported this will need to be updated +func (c *callerRequest) OnMessage(_ context.Context, msg *types.MessageBody) error { + c.mux.Lock() + defer c.mux.Unlock() + + if msg.Sender == nil { + return fmt.Errorf("sender missing from message") + } + + sender := remote.ToPeerID(msg.Sender) + + if _, ok := c.responseReceived[sender]; !ok { + return fmt.Errorf("response from peer %s not expected", sender) + } + + if c.responseReceived[sender] { + return fmt.Errorf("response from peer %s already received", sender) + } + + c.responseReceived[sender] = true + + if msg.Error == types.Error_OK { + responseID := sha256.Sum256(msg.Payload) + c.responseIDCount[responseID]++ + + if c.responseIDCount[responseID] == c.requiredIdenticalResponses { + capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload) + if err != nil { + c.sendResponse(commoncap.CapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)}) + } else { + c.sendResponse(commoncap.CapabilityResponse{Value: capabilityResponse.Value}) + } + } + } else { + c.errorCount[msg.ErrorMsg]++ + if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses { + c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)}) + } + } + return nil +} + +func (c *callerRequest) sendResponse(response commoncap.CapabilityResponse) { + c.responseCh <- response + close(c.responseCh) + c.respSent = true +} diff --git a/core/capabilities/remote/target/request/caller_request_test.go b/core/capabilities/remote/target/request/caller_request_test.go new file mode 100644 index 00000000000..6287399be92 --- /dev/null +++ b/core/capabilities/remote/target/request/caller_request_test.go @@ -0,0 +1,311 @@ +package request_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_CallerRequest_MessageValidation(t *testing.T) { + lggr := logger.TestLogger(t) + + numCapabilityPeers := 2 + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeers[i] = NewP2PPeerID(t) + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: 1, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + numWorkflowPeers := 2 + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + require.NoError(t, err) + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "1000ms", + }) + capabilityRequest := commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Inputs: executeInputs, + Config: transmissionSchedule, + } + + capabilityResponse := commoncap.CapabilityResponse{ + Value: values.NewString("response1"), + Err: nil, + } + + rawResponse, err := pb.MarshalCapabilityResponse(capabilityResponse) + require.NoError(t, err) + + messageID, err := target.GetMessageIDForRequest(capabilityRequest) + require.NoError(t, err) + + msg := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + } + + t.Run("Send second message with different response", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + capabilityResponse2 := commoncap.CapabilityResponse{ + Value: values.NewString("response2"), + Err: nil, + } + + rawResponse2, err := pb.MarshalCapabilityResponse(capabilityResponse2) + require.NoError(t, err) + msg2 := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse2, + MessageId: []byte("messageID"), + } + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + msg2.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msg2) + require.NoError(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second message from non calling Don peer", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + nonDonPeer := NewP2PPeerID(t) + msg.Sender = nonDonPeer[:] + err = request.OnMessage(ctx, msg) + require.NotNil(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + + }) + + t.Run("Send second message from same peer as first message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + err = request.OnMessage(ctx, msg) + require.NotNil(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + + }) + + t.Run("Send second message with same error as first", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msgWithError := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error", + } + + msgWithError.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + + msgWithError.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + + response := <-request.ResponseChan() + + assert.Equal(t, "an error", response.Err.Error()) + }) + + t.Run("Send second message with different error to first", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msgWithError := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error", + Sender: capabilityPeers[0][:], + } + + msgWithError2 := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error2", + Sender: capabilityPeers[1][:], + } + + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + err = request.OnMessage(ctx, msgWithError2) + require.NoError(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second valid message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &callerRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewCallerRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + msg.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + response := <-request.ResponseChan() + + assert.Equal(t, response.Value, values.NewString("response1")) + }) +} + +type callerRequestTestDispatcher struct { + msgs chan *types.MessageBody +} + +func (t *callerRequestTestDispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error { + return nil +} + +func (t *callerRequestTestDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *callerRequestTestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + t.msgs <- msgBody + return nil +} diff --git a/core/capabilities/remote/target/request/receiver_request.go b/core/capabilities/remote/target/request/receiver_request.go new file mode 100644 index 00000000000..0c83f09c2ce --- /dev/null +++ b/core/capabilities/remote/target/request/receiver_request.go @@ -0,0 +1,224 @@ +package request + +import ( + "context" + "fmt" + "sync" + "time" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +type response struct { + response []byte + error types.Error + errorMsg string +} + +type receiverRequest struct { + capability capabilities.TargetCapability + + capabilityPeerId p2ptypes.PeerID + capabilityID string + capabilityDonID string + + dispatcher types.Dispatcher + + requesters map[p2ptypes.PeerID]bool + responseSentToRequester map[p2ptypes.PeerID]bool + + createdTime time.Time + + response *response + + callingDon commoncap.DON + + requestMessageID string + requestTimeout time.Duration + + mux sync.Mutex +} + +func NewReceiverRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID, + callingDon commoncap.DON, requestMessageID string, + dispatcher types.Dispatcher, requestTimeout time.Duration) *receiverRequest { + return &receiverRequest{ + capability: capability, + createdTime: time.Now(), + capabilityID: capabilityID, + capabilityDonID: capabilityDonID, + capabilityPeerId: capabilityPeerId, + dispatcher: dispatcher, + requesters: map[p2ptypes.PeerID]bool{}, + responseSentToRequester: map[p2ptypes.PeerID]bool{}, + callingDon: callingDon, + requestMessageID: requestMessageID, + requestTimeout: requestTimeout, + } +} + +func (e *receiverRequest) OnMessage(ctx context.Context, msg *types.MessageBody) error { + e.mux.Lock() + defer e.mux.Unlock() + + if msg.Sender == nil { + return fmt.Errorf("sender missing from message") + } + + requester := remote.ToPeerID(msg.Sender) + if err := e.addRequester(requester); err != nil { + return fmt.Errorf("failed to add requester to request: %w", err) + } + + if e.minimumRequiredRequestsReceived() && !e.hasResponse() { + if err := e.executeRequest(ctx, msg.Payload); err != nil { + e.setError(types.Error_INTERNAL_ERROR, err.Error()) + } + } + + if err := e.sendResponses(); err != nil { + return fmt.Errorf("failed to send responses: %w", err) + } + + return nil +} + +func (e *receiverRequest) Expired() bool { + return time.Since(e.createdTime) > e.requestTimeout +} + +func (e *receiverRequest) Cancel(err types.Error, msg string) error { + e.mux.Lock() + defer e.mux.Unlock() + + if e.hasResponse() { + return fmt.Errorf("request already has response") + } + + e.setError(err, msg) + if err := e.sendResponses(); err != nil { + return fmt.Errorf("failed to send responses: %w", err) + } + + return nil +} + +func (e *receiverRequest) executeRequest(ctx context.Context, payload []byte) error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, e.requestTimeout) + defer cancel() + + capabilityRequest, err := pb.UnmarshalCapabilityRequest(payload) + if err != nil { + return fmt.Errorf("failed to unmarshal capability request: %w", err) + } + + capResponseCh, err := e.capability.Execute(ctxWithTimeout, capabilityRequest) + + if err != nil { + return fmt.Errorf("failed to execute capability: %w", err) + } + + // TODO working on the assumption that the capability will only ever return one response from its channel (for now at least) + capResponse := <-capResponseCh + responsePayload, err := pb.MarshalCapabilityResponse(capResponse) + if err != nil { + return fmt.Errorf("failed to marshal capability response: %w", err) + } + + e.setResult(responsePayload) + + return nil +} + +func (e *receiverRequest) addRequester(from p2ptypes.PeerID) error { + + fromPeerInCallingDon := false + for _, member := range e.callingDon.Members { + if member == from { + fromPeerInCallingDon = true + break + } + } + + if !fromPeerInCallingDon { + return fmt.Errorf("request received from peer %s not in calling don", from) + } + + if e.requesters[from] { + return fmt.Errorf("request already received from peer %s", from) + } + + e.requesters[from] = true + + return nil +} + +func (e *receiverRequest) minimumRequiredRequestsReceived() bool { + return len(e.requesters) >= int(e.callingDon.F+1) +} + +func (e *receiverRequest) setResult(result []byte) { + e.response = &response{ + response: result, + } +} + +func (e *receiverRequest) setError(err types.Error, errMsg string) { + e.response = &response{ + error: err, + errorMsg: errMsg, + } +} + +func (e *receiverRequest) hasResponse() bool { + return e.response != nil +} + +func (e *receiverRequest) sendResponses() error { + if e.hasResponse() { + for requester := range e.requesters { + if !e.responseSentToRequester[requester] { + e.responseSentToRequester[requester] = true + if err := e.sendResponse(requester); err != nil { + return fmt.Errorf("failed to send response to requester %s: %w", requester, err) + } + } + } + } + + return nil +} + +func (e *receiverRequest) sendResponse(requester p2ptypes.PeerID) error { + + responseMsg := types.MessageBody{ + CapabilityId: e.capabilityID, + CapabilityDonId: e.capabilityDonID, + CallerDonId: e.callingDon.ID, + Method: types.MethodExecute, + MessageId: []byte(e.requestMessageID), + Sender: e.capabilityPeerId[:], + Receiver: requester[:], + } + + if e.response.error != types.Error_OK { + responseMsg.Error = e.response.error + responseMsg.ErrorMsg = e.response.errorMsg + } else { + responseMsg.Payload = e.response.response + } + + if err := e.dispatcher.Send(requester, &responseMsg); err != nil { + return fmt.Errorf("failed to send response to dispatcher: %w", err) + } + + e.responseSentToRequester[requester] = true + + return nil +} diff --git a/core/capabilities/remote/target/request/receiver_request_test.go b/core/capabilities/remote/target/request/receiver_request_test.go new file mode 100644 index 00000000000..4689d72de82 --- /dev/null +++ b/core/capabilities/remote/target/request/receiver_request_test.go @@ -0,0 +1,265 @@ +package request_test + +import ( + "context" + "crypto/rand" + "errors" + "testing" + "time" + + "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_ReceiverRequest_MessageValidation(t *testing.T) { + capability := TestCapability{} + capabilityPeerID := NewP2PPeerID(t) + + numWorkflowPeers := 2 + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + callingDon := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: 1, + } + + dispatcher := &testDispatcher{} + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + require.NoError(t, err) + + capabilityRequest := commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Inputs: executeInputs, + } + + rawRequest, err := pb.MarshalCapabilityRequest(capabilityRequest) + require.NoError(t, err) + + t.Run("Send duplicate message", func(t *testing.T) { + req := request.NewReceiverRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + assert.NotNil(t, err) + }) + + t.Run("Send message with non calling don peer", func(t *testing.T) { + req := request.NewReceiverRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + nonDonPeer := NewP2PPeerID(t) + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: nonDonPeer[:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + + assert.NotNil(t, err) + }) + + t.Run("Send message invalid payload", func(t *testing.T) { + req := request.NewReceiverRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: append(rawRequest, []byte("asdf")...), + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_INTERNAL_ERROR) + + }) + + t.Run("Send second valid request when capability errors", func(t *testing.T) { + + dispatcher := &testDispatcher{} + req := request.NewReceiverRequest(TestErrorCapability{}, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[0].ErrorMsg, "failed to execute capability: an error") + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[1].ErrorMsg, "failed to execute capability: an error") + + }) + + t.Run("Send second valid request", func(t *testing.T) { + dispatcher := &testDispatcher{} + request := request.NewReceiverRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = request.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_OK) + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_OK) + }) +} + +type receiverRequest interface { + OnMessage(ctx context.Context, msg *types.MessageBody) error +} + +func sendValidRequest(request receiverRequest, workflowPeers []p2ptypes.PeerID, capabilityPeerID p2ptypes.PeerID, + rawRequest []byte) error { + return request.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[0][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + +} + +type testDispatcher struct { + msgs []*types.MessageBody +} + +func (t *testDispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error { + return nil +} + +func (t *testDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *testDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + t.msgs = append(t.msgs, msgBody) + return nil +} + +type abstractTestCapability struct { +} + +func (t abstractTestCapability) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return commoncap.CapabilityInfo{}, nil +} + +func (t abstractTestCapability) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return nil +} + +func (t abstractTestCapability) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return nil +} + +type TestCapability struct { + abstractTestCapability +} + +func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + ch := make(chan commoncap.CapabilityResponse, 1) + + value := request.Inputs.Underlying["executeValue1"] + + ch <- commoncap.CapabilityResponse{ + Value: value, + } + + return ch, nil +} + +type TestErrorCapability struct { + abstractTestCapability +} + +func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New("an error") +} + +func NewP2PPeerID(t *testing.T) p2ptypes.PeerID { + id := p2ptypes.PeerID{} + require.NoError(t, id.UnmarshalText([]byte(NewPeerID()))) + return id +} + +func NewPeerID() string { + var privKey [32]byte + _, err := rand.Read(privKey[:]) + if err != nil { + panic(err) + } + + peerID := append(libp2pMagic(), privKey[:]...) + + return base58.Encode(peerID[:]) +} + +func libp2pMagic() []byte { + return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20} +} diff --git a/core/capabilities/remote/target_test.go b/core/capabilities/remote/target_test.go deleted file mode 100644 index 0f9bad51f67..00000000000 --- a/core/capabilities/remote/target_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package remote_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" - remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" -) - -func TestTarget_Placeholder(t *testing.T) { - lggr := logger.TestLogger(t) - ctx := testutils.Context(t) - donInfo := &capabilities.DON{ - Members: []p2ptypes.PeerID{{}}, - } - dispatcher := remoteMocks.NewDispatcher(t) - dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil) - target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr) - - _, err := target.Execute(ctx, commoncap.CapabilityRequest{}) - assert.NoError(t, err) -} diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index 71a5174c07f..81f3d737c69 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -26,9 +26,9 @@ func TestTriggerPublisher_Register(t *testing.T) { Version: "0.0.1", } p1 := p2ptypes.PeerID{} - require.NoError(t, p1.UnmarshalText([]byte(peerID1))) + require.NoError(t, p1.UnmarshalText([]byte(PeerID1))) p2 := p2ptypes.PeerID{} - require.NoError(t, p2.UnmarshalText([]byte(peerID2))) + require.NoError(t, p2.UnmarshalText([]byte(PeerID2))) capDonInfo := commoncap.DON{ ID: "capability-don", Members: []p2ptypes.PeerID{p1}, @@ -60,7 +60,7 @@ func TestTriggerPublisher_Register(t *testing.T) { // trigger registration event capRequest := commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: workflowID1, + WorkflowID: WorkflowID1, }, } marshaled, err := pb.MarshalCapabilityRequest(capRequest) diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 4d251d49dc8..1a58c9c4eb6 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -18,9 +18,9 @@ import ( ) const ( - peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC" - peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8" - workflowID1 = "workflowID1" + PeerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC" + PeerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8" + WorkflowID1 = "workflowID1" triggerEvent1 = "triggerEvent1" triggerEvent2 = "triggerEvent2" ) @@ -35,9 +35,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { Version: "0.0.1", } p1 := p2ptypes.PeerID{} - require.NoError(t, p1.UnmarshalText([]byte(peerID1))) + require.NoError(t, p1.UnmarshalText([]byte(PeerID1))) p2 := p2ptypes.PeerID{} - require.NoError(t, p2.UnmarshalText([]byte(peerID2))) + require.NoError(t, p2.UnmarshalText([]byte(PeerID2))) capDonInfo := commoncap.DON{ ID: "capability-don", Members: []p2ptypes.PeerID{p1}, @@ -70,7 +70,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: workflowID1, + WorkflowID: WorkflowID1, }, }) require.NoError(t, err) @@ -90,7 +90,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { Method: remotetypes.MethodTriggerEvent, Metadata: &remotetypes.MessageBody_TriggerEventMetadata{ TriggerEventMetadata: &remotetypes.TriggerEventMetadata{ - WorkflowIds: []string{workflowID1}, + WorkflowIds: []string{WorkflowID1}, }, }, Payload: marshaled, diff --git a/core/capabilities/remote/types/generate.go b/core/capabilities/remote/types/generate.go new file mode 100644 index 00000000000..845c53b2f43 --- /dev/null +++ b/core/capabilities/remote/types/generate.go @@ -0,0 +1,4 @@ +//go:generate protoc --proto_path=.:.. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative message.proto + + +package types diff --git a/core/capabilities/remote/types/message.pb.go b/core/capabilities/remote/types/message.pb.go index d8e9579e96c..78356b864d7 100644 --- a/core/capabilities/remote/types/message.pb.go +++ b/core/capabilities/remote/types/message.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.8 -// source: core/capabilities/remote/types/message.proto +// protoc-gen-go v1.33.0 +// protoc v4.25.1 +// source: message.proto package types @@ -26,6 +26,9 @@ const ( Error_OK Error = 0 Error_VALIDATION_FAILED Error = 1 Error_CAPABILITY_NOT_FOUND Error = 2 + Error_INVALID_REQUEST Error = 3 + Error_TIMEOUT Error = 4 + Error_INTERNAL_ERROR Error = 5 ) // Enum value maps for Error. @@ -34,11 +37,17 @@ var ( 0: "OK", 1: "VALIDATION_FAILED", 2: "CAPABILITY_NOT_FOUND", + 3: "INVALID_REQUEST", + 4: "TIMEOUT", + 5: "INTERNAL_ERROR", } Error_value = map[string]int32{ "OK": 0, "VALIDATION_FAILED": 1, "CAPABILITY_NOT_FOUND": 2, + "INVALID_REQUEST": 3, + "TIMEOUT": 4, + "INTERNAL_ERROR": 5, } ) @@ -53,11 +62,11 @@ func (x Error) String() string { } func (Error) Descriptor() protoreflect.EnumDescriptor { - return file_core_capabilities_remote_types_message_proto_enumTypes[0].Descriptor() + return file_message_proto_enumTypes[0].Descriptor() } func (Error) Type() protoreflect.EnumType { - return &file_core_capabilities_remote_types_message_proto_enumTypes[0] + return &file_message_proto_enumTypes[0] } func (x Error) Number() protoreflect.EnumNumber { @@ -66,7 +75,7 @@ func (x Error) Number() protoreflect.EnumNumber { // Deprecated: Use Error.Descriptor instead. func (Error) EnumDescriptor() ([]byte, []int) { - return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{0} + return file_message_proto_rawDescGZIP(), []int{0} } type Message struct { @@ -81,7 +90,7 @@ type Message struct { func (x *Message) Reset() { *x = Message{} if protoimpl.UnsafeEnabled { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[0] + mi := &file_message_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -94,7 +103,7 @@ func (x *Message) String() string { func (*Message) ProtoMessage() {} func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[0] + mi := &file_message_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -107,7 +116,7 @@ func (x *Message) ProtoReflect() protoreflect.Message { // Deprecated: Use Message.ProtoReflect.Descriptor instead. func (*Message) Descriptor() ([]byte, []int) { - return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{0} + return file_message_proto_rawDescGZIP(), []int{0} } func (x *Message) GetSignature() []byte { @@ -129,7 +138,6 @@ type MessageBody struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // header fields set and validated by the Dispatcher Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` Sender []byte `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` Receiver []byte `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` @@ -140,9 +148,11 @@ type MessageBody struct { CallerDonId string `protobuf:"bytes,8,opt,name=caller_don_id,json=callerDonId,proto3" json:"caller_don_id,omitempty"` Method string `protobuf:"bytes,9,opt,name=method,proto3" json:"method,omitempty"` Error Error `protobuf:"varint,10,opt,name=error,proto3,enum=remote.Error" json:"error,omitempty"` + ErrorMsg string `protobuf:"bytes,11,opt,name=errorMsg,proto3" json:"errorMsg,omitempty"` // payload contains a CapabilityRequest or CapabilityResponse - Payload []byte `protobuf:"bytes,11,opt,name=payload,proto3" json:"payload,omitempty"` + Payload []byte `protobuf:"bytes,12,opt,name=payload,proto3" json:"payload,omitempty"` // Types that are assignable to Metadata: + // // *MessageBody_TriggerRegistrationMetadata // *MessageBody_TriggerEventMetadata Metadata isMessageBody_Metadata `protobuf_oneof:"metadata"` @@ -151,7 +161,7 @@ type MessageBody struct { func (x *MessageBody) Reset() { *x = MessageBody{} if protoimpl.UnsafeEnabled { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[1] + mi := &file_message_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -164,7 +174,7 @@ func (x *MessageBody) String() string { func (*MessageBody) ProtoMessage() {} func (x *MessageBody) ProtoReflect() protoreflect.Message { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[1] + mi := &file_message_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -177,7 +187,7 @@ func (x *MessageBody) ProtoReflect() protoreflect.Message { // Deprecated: Use MessageBody.ProtoReflect.Descriptor instead. func (*MessageBody) Descriptor() ([]byte, []int) { - return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{1} + return file_message_proto_rawDescGZIP(), []int{1} } func (x *MessageBody) GetVersion() uint32 { @@ -250,6 +260,13 @@ func (x *MessageBody) GetError() Error { return Error_OK } +func (x *MessageBody) GetErrorMsg() string { + if x != nil { + return x.ErrorMsg + } + return "" +} + func (x *MessageBody) GetPayload() []byte { if x != nil { return x.Payload @@ -283,11 +300,11 @@ type isMessageBody_Metadata interface { } type MessageBody_TriggerRegistrationMetadata struct { - TriggerRegistrationMetadata *TriggerRegistrationMetadata `protobuf:"bytes,12,opt,name=trigger_registration_metadata,json=triggerRegistrationMetadata,proto3,oneof"` + TriggerRegistrationMetadata *TriggerRegistrationMetadata `protobuf:"bytes,13,opt,name=trigger_registration_metadata,json=triggerRegistrationMetadata,proto3,oneof"` } type MessageBody_TriggerEventMetadata struct { - TriggerEventMetadata *TriggerEventMetadata `protobuf:"bytes,13,opt,name=trigger_event_metadata,json=triggerEventMetadata,proto3,oneof"` + TriggerEventMetadata *TriggerEventMetadata `protobuf:"bytes,14,opt,name=trigger_event_metadata,json=triggerEventMetadata,proto3,oneof"` } func (*MessageBody_TriggerRegistrationMetadata) isMessageBody_Metadata() {} @@ -305,7 +322,7 @@ type TriggerRegistrationMetadata struct { func (x *TriggerRegistrationMetadata) Reset() { *x = TriggerRegistrationMetadata{} if protoimpl.UnsafeEnabled { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[2] + mi := &file_message_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -318,7 +335,7 @@ func (x *TriggerRegistrationMetadata) String() string { func (*TriggerRegistrationMetadata) ProtoMessage() {} func (x *TriggerRegistrationMetadata) ProtoReflect() protoreflect.Message { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[2] + mi := &file_message_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -331,7 +348,7 @@ func (x *TriggerRegistrationMetadata) ProtoReflect() protoreflect.Message { // Deprecated: Use TriggerRegistrationMetadata.ProtoReflect.Descriptor instead. func (*TriggerRegistrationMetadata) Descriptor() ([]byte, []int) { - return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{2} + return file_message_proto_rawDescGZIP(), []int{2} } func (x *TriggerRegistrationMetadata) GetLastReceivedEventId() string { @@ -353,7 +370,7 @@ type TriggerEventMetadata struct { func (x *TriggerEventMetadata) Reset() { *x = TriggerEventMetadata{} if protoimpl.UnsafeEnabled { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[3] + mi := &file_message_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -366,7 +383,7 @@ func (x *TriggerEventMetadata) String() string { func (*TriggerEventMetadata) ProtoMessage() {} func (x *TriggerEventMetadata) ProtoReflect() protoreflect.Message { - mi := &file_core_capabilities_remote_types_message_proto_msgTypes[3] + mi := &file_message_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -379,7 +396,7 @@ func (x *TriggerEventMetadata) ProtoReflect() protoreflect.Message { // Deprecated: Use TriggerEventMetadata.ProtoReflect.Descriptor instead. func (*TriggerEventMetadata) Descriptor() ([]byte, []int) { - return file_core_capabilities_remote_types_message_proto_rawDescGZIP(), []int{3} + return file_message_proto_rawDescGZIP(), []int{3} } func (x *TriggerEventMetadata) GetTriggerEventId() string { @@ -396,94 +413,97 @@ func (x *TriggerEventMetadata) GetWorkflowIds() []string { return nil } -var File_core_capabilities_remote_types_message_proto protoreflect.FileDescriptor - -var file_core_capabilities_remote_types_message_proto_rawDesc = []byte{ - 0x0a, 0x2c, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, - 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, - 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, - 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x22, 0x3b, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, - 0x6f, 0x64, 0x79, 0x22, 0xb1, 0x04, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, - 0x6f, 0x64, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, - 0x06, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, - 0x65, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, - 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, - 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, - 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x23, - 0x0a, 0x0d, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, - 0x79, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, - 0x79, 0x5f, 0x64, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, - 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x44, 0x6f, 0x6e, 0x49, 0x64, 0x12, - 0x22, 0x0a, 0x0d, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x64, 0x6f, 0x6e, 0x5f, 0x69, 0x64, - 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x44, 0x6f, - 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x09, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x23, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x72, 0x65, 0x6d, - 0x6f, 0x74, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x69, 0x0a, 0x1d, 0x74, 0x72, - 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0c, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x23, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, - 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x1b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, - 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, 0x16, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, - 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x14, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x0a, 0x0a, 0x08, 0x6d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x52, 0x0a, 0x1b, 0x54, 0x72, 0x69, 0x67, 0x67, - 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x33, 0x0a, 0x16, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x72, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x6c, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, - 0x69, 0x76, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x63, 0x0a, 0x14, 0x54, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x12, 0x28, 0x0a, 0x10, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x74, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, - 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, - 0x2a, 0x40, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, - 0x00, 0x12, 0x15, 0x0a, 0x11, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, - 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x41, 0x50, 0x41, - 0x42, 0x49, 0x4c, 0x49, 0x54, 0x59, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, - 0x10, 0x02, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_message_proto protoreflect.FileDescriptor + +var file_message_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x06, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x22, 0x3b, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x62, 0x6f, 0x64, 0x79, 0x22, 0xcd, 0x04, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x42, 0x6f, 0x64, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, + 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, + 0x23, 0x0a, 0x0d, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, + 0x74, 0x79, 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, + 0x74, 0x79, 0x5f, 0x64, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x44, 0x6f, 0x6e, 0x49, 0x64, + 0x12, 0x22, 0x0a, 0x0d, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x64, 0x6f, 0x6e, 0x5f, 0x69, + 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x44, + 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x09, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x23, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x72, 0x65, + 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x69, 0x0a, 0x1d, 0x74, 0x72, 0x69, 0x67, 0x67, + 0x65, 0x72, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, + 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x1b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x12, 0x54, 0x0a, 0x16, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0e, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, + 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x48, 0x00, 0x52, 0x14, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x0a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x22, 0x52, 0x0a, 0x1b, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x33, 0x0a, 0x16, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x72, 0x65, 0x63, 0x65, + 0x69, 0x76, 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x13, 0x6c, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x63, 0x0a, 0x14, 0x54, 0x72, 0x69, 0x67, + 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x28, 0x0a, 0x10, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x74, 0x72, 0x69, 0x67, + 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, + 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, 0x2a, 0x76, 0x0a, + 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x12, 0x15, + 0x0a, 0x11, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x41, 0x49, + 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x41, 0x50, 0x41, 0x42, 0x49, 0x4c, + 0x49, 0x54, 0x59, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x02, 0x12, + 0x13, 0x0a, 0x0f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x52, 0x45, 0x51, 0x55, 0x45, + 0x53, 0x54, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x54, 0x49, 0x4d, 0x45, 0x4f, 0x55, 0x54, 0x10, + 0x04, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x5f, 0x45, 0x52, + 0x52, 0x4f, 0x52, 0x10, 0x05, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, + 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, + 0x65, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_core_capabilities_remote_types_message_proto_rawDescOnce sync.Once - file_core_capabilities_remote_types_message_proto_rawDescData = file_core_capabilities_remote_types_message_proto_rawDesc + file_message_proto_rawDescOnce sync.Once + file_message_proto_rawDescData = file_message_proto_rawDesc ) -func file_core_capabilities_remote_types_message_proto_rawDescGZIP() []byte { - file_core_capabilities_remote_types_message_proto_rawDescOnce.Do(func() { - file_core_capabilities_remote_types_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_core_capabilities_remote_types_message_proto_rawDescData) +func file_message_proto_rawDescGZIP() []byte { + file_message_proto_rawDescOnce.Do(func() { + file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData) }) - return file_core_capabilities_remote_types_message_proto_rawDescData + return file_message_proto_rawDescData } -var file_core_capabilities_remote_types_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_core_capabilities_remote_types_message_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_core_capabilities_remote_types_message_proto_goTypes = []interface{}{ +var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_message_proto_goTypes = []interface{}{ (Error)(0), // 0: remote.Error (*Message)(nil), // 1: remote.Message (*MessageBody)(nil), // 2: remote.MessageBody (*TriggerRegistrationMetadata)(nil), // 3: remote.TriggerRegistrationMetadata (*TriggerEventMetadata)(nil), // 4: remote.TriggerEventMetadata } -var file_core_capabilities_remote_types_message_proto_depIdxs = []int32{ +var file_message_proto_depIdxs = []int32{ 0, // 0: remote.MessageBody.error:type_name -> remote.Error 3, // 1: remote.MessageBody.trigger_registration_metadata:type_name -> remote.TriggerRegistrationMetadata 4, // 2: remote.MessageBody.trigger_event_metadata:type_name -> remote.TriggerEventMetadata @@ -494,13 +514,13 @@ var file_core_capabilities_remote_types_message_proto_depIdxs = []int32{ 0, // [0:3] is the sub-list for field type_name } -func init() { file_core_capabilities_remote_types_message_proto_init() } -func file_core_capabilities_remote_types_message_proto_init() { - if File_core_capabilities_remote_types_message_proto != nil { +func init() { file_message_proto_init() } +func file_message_proto_init() { + if File_message_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_core_capabilities_remote_types_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Message); i { case 0: return &v.state @@ -512,7 +532,7 @@ func file_core_capabilities_remote_types_message_proto_init() { return nil } } - file_core_capabilities_remote_types_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MessageBody); i { case 0: return &v.state @@ -524,7 +544,7 @@ func file_core_capabilities_remote_types_message_proto_init() { return nil } } - file_core_capabilities_remote_types_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TriggerRegistrationMetadata); i { case 0: return &v.state @@ -536,7 +556,7 @@ func file_core_capabilities_remote_types_message_proto_init() { return nil } } - file_core_capabilities_remote_types_message_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_message_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TriggerEventMetadata); i { case 0: return &v.state @@ -549,7 +569,7 @@ func file_core_capabilities_remote_types_message_proto_init() { } } } - file_core_capabilities_remote_types_message_proto_msgTypes[1].OneofWrappers = []interface{}{ + file_message_proto_msgTypes[1].OneofWrappers = []interface{}{ (*MessageBody_TriggerRegistrationMetadata)(nil), (*MessageBody_TriggerEventMetadata)(nil), } @@ -557,19 +577,19 @@ func file_core_capabilities_remote_types_message_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_core_capabilities_remote_types_message_proto_rawDesc, + RawDescriptor: file_message_proto_rawDesc, NumEnums: 1, NumMessages: 4, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_core_capabilities_remote_types_message_proto_goTypes, - DependencyIndexes: file_core_capabilities_remote_types_message_proto_depIdxs, - EnumInfos: file_core_capabilities_remote_types_message_proto_enumTypes, - MessageInfos: file_core_capabilities_remote_types_message_proto_msgTypes, + GoTypes: file_message_proto_goTypes, + DependencyIndexes: file_message_proto_depIdxs, + EnumInfos: file_message_proto_enumTypes, + MessageInfos: file_message_proto_msgTypes, }.Build() - File_core_capabilities_remote_types_message_proto = out.File - file_core_capabilities_remote_types_message_proto_rawDesc = nil - file_core_capabilities_remote_types_message_proto_goTypes = nil - file_core_capabilities_remote_types_message_proto_depIdxs = nil + File_message_proto = out.File + file_message_proto_rawDesc = nil + file_message_proto_goTypes = nil + file_message_proto_depIdxs = nil } diff --git a/core/capabilities/remote/types/message.proto b/core/capabilities/remote/types/message.proto index 072accedbc0..4d0507fd1e0 100644 --- a/core/capabilities/remote/types/message.proto +++ b/core/capabilities/remote/types/message.proto @@ -8,6 +8,9 @@ enum Error { OK = 0; VALIDATION_FAILED = 1; CAPABILITY_NOT_FOUND = 2; + INVALID_REQUEST = 3; + TIMEOUT = 4; + INTERNAL_ERROR = 5; } message Message { @@ -26,13 +29,15 @@ message MessageBody { string caller_don_id = 8; string method = 9; Error error = 10; + string errorMsg = 11; // payload contains a CapabilityRequest or CapabilityResponse - bytes payload = 11; + bytes payload = 12; oneof metadata { - TriggerRegistrationMetadata trigger_registration_metadata = 12; - TriggerEventMetadata trigger_event_metadata = 13; + TriggerRegistrationMetadata trigger_registration_metadata = 13; + TriggerEventMetadata trigger_event_metadata = 14; } + } message TriggerRegistrationMetadata { diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index d8307d09f80..a825c42be56 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -9,6 +9,7 @@ const ( MethodRegisterTrigger = "RegisterTrigger" MethodUnRegisterTrigger = "UnregisterTrigger" MethodTriggerEvent = "TriggerEvent" + MethodExecute = "Execute" ) //go:generate mockery --quiet --name Dispatcher --output ./mocks/ --case=underscore diff --git a/core/capabilities/transmission/transmission.go b/core/capabilities/transmission/transmission.go new file mode 100644 index 00000000000..dba0aa1f746 --- /dev/null +++ b/core/capabilities/transmission/transmission.go @@ -0,0 +1,107 @@ +package transmission + +import ( + "fmt" + "time" + + "golang.org/x/crypto/sha3" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/libocr/permutation" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" +) + +// TODO determine location for this code + +var ( + // S = [N] + Schedule_AllAtOnce = "allAtOnce" + // S = [1 * N] + Schedule_OneAtATime = "oneAtATime" +) + +type TransmissionConfig struct { + Schedule string + DeltaStage time.Duration +} + +func ExtractTransmissionConfig(config *values.Map) (TransmissionConfig, error) { + var tc struct { + DeltaStage string + Schedule string + } + err := config.UnwrapTo(&tc) + if err != nil { + return TransmissionConfig{}, fmt.Errorf("failed to unwrap tranmission config from value map: %w", err) + } + + duration, err := time.ParseDuration(tc.DeltaStage) + if err != nil { + return TransmissionConfig{}, fmt.Errorf("failed to parse DeltaStage %s as duration: %w", tc.DeltaStage, err) + } + + return TransmissionConfig{ + Schedule: tc.Schedule, + DeltaStage: duration, + }, nil +} + +// GetPeerIDToTransmissionDelay returns a map of PeerID to the time.Duration that the node with that PeerID should wait +// before transmitting. If a node is not in the map, it should not transmit. +func GetPeerIDToTransmissionDelay(donPeerIDs []ragep2ptypes.PeerID, sharedSecret [16]byte, transmissionID string, tc TransmissionConfig) (map[p2ptypes.PeerID]time.Duration, error) { + donMemberCount := len(donPeerIDs) + key := transmissionScheduleSeed(sharedSecret, transmissionID) + schedule, err := createTransmissionSchedule(tc.Schedule, donMemberCount) + if err != nil { + return nil, err + } + + picked := permutation.Permutation(donMemberCount, key) + + peerIDToTransmissionDelay := map[p2ptypes.PeerID]time.Duration{} + for i, peerID := range donPeerIDs { + delay := delayFor(i, schedule, picked, tc.DeltaStage) + if delay != nil { + peerIDToTransmissionDelay[peerID] = *delay + } + } + return peerIDToTransmissionDelay, nil +} + +func delayFor(position int, schedule []int, permutation []int, deltaStage time.Duration) *time.Duration { + sum := 0 + for i, s := range schedule { + sum += s + if permutation[position] < sum { + result := time.Duration(i) * deltaStage + return &result + } + } + + return nil +} + +func createTransmissionSchedule(scheduleType string, N int) ([]int, error) { + switch scheduleType { + case Schedule_AllAtOnce: + return []int{N}, nil + case Schedule_OneAtATime: + sch := []int{} + for i := 0; i < N; i++ { + sch = append(sch, 1) + } + return sch, nil + } + return nil, fmt.Errorf("unknown schedule type %s", scheduleType) +} + +func transmissionScheduleSeed(sharedSecret [16]byte, transmissionID string) [16]byte { + hash := sha3.NewLegacyKeccak256() + hash.Write(sharedSecret[:]) + hash.Write([]byte(transmissionID)) + + var key [16]byte + copy(key[:], hash.Sum(nil)) + return key +} diff --git a/core/capabilities/transmission/transmission_test.go b/core/capabilities/transmission/transmission_test.go new file mode 100644 index 00000000000..6c4494c407a --- /dev/null +++ b/core/capabilities/transmission/transmission_test.go @@ -0,0 +1,102 @@ +package transmission + +import ( + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_GetPeerIDToTransmissionDelay(t *testing.T) { + + peer1 := [32]byte([]byte(fmt.Sprintf("%-32s", "one"))) + peer2 := [32]byte([]byte(fmt.Sprintf("%-32s", "two"))) + peer3 := [32]byte([]byte(fmt.Sprintf("%-32s", "three"))) + peer4 := [32]byte([]byte(fmt.Sprintf("%-32s", "four"))) + + ids := []p2ptypes.PeerID{ + peer1, peer2, peer3, peer4, + } + + testCases := []struct { + name string + peerName string + sharedSecret string + schedule string + deltaStage string + workflowExecutionID string + expectedDelays map[string]time.Duration + }{ + { + "TestOneAtATime", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "oneAtATime", + "100ms", + "mock-execution-id", + map[string]time.Duration{ + "one": 300 * time.Millisecond, + "two": 200 * time.Millisecond, + "three": 0 * time.Millisecond, + "four": 100 * time.Millisecond, + }, + }, + { + "TestAllAtOnce", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "allAtOnce", + "100ms", + "mock-execution-id", + map[string]time.Duration{ + "one": 0 * time.Millisecond, + "two": 0 * time.Millisecond, + "three": 0 * time.Millisecond, + "four": 0 * time.Millisecond, + }, + }, + { + "TestOneAtATimeWithDifferentExecutionID", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "oneAtATime", + "100ms", + "mock-execution-id2", + map[string]time.Duration{ + "one": 0 * time.Millisecond, + "two": 300 * time.Millisecond, + "three": 100 * time.Millisecond, + "four": 200 * time.Millisecond, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + sharedSecret, err := hex.DecodeString(tc.sharedSecret) + require.NoError(t, err) + + m, err := values.NewMap(map[string]any{ + "schedule": tc.schedule, + "deltaStage": tc.deltaStage, + }) + require.NoError(t, err) + transmissionCfg, err := ExtractTransmissionConfig(m) + + peerIdToDelay, err := GetPeerIDToTransmissionDelay(ids, [16]byte(sharedSecret), "mock-workflow-id"+tc.workflowExecutionID, transmissionCfg) + require.NoError(t, err) + + assert.Equal(t, tc.expectedDelays["one"], peerIdToDelay[peer1]) + assert.Equal(t, tc.expectedDelays["two"], peerIdToDelay[peer2]) + assert.Equal(t, tc.expectedDelays["three"], peerIdToDelay[peer3]) + assert.Equal(t, tc.expectedDelays["four"], peerIdToDelay[peer4]) + }) + } +} diff --git a/core/services/p2p/types/mocks/peer.go b/core/services/p2p/types/mocks/peer.go index 3a2e218c170..52845e5a999 100644 --- a/core/services/p2p/types/mocks/peer.go +++ b/core/services/p2p/types/mocks/peer.go @@ -115,7 +115,7 @@ func (_m *Peer) Receive() <-chan types.Message { ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for Receive") + panic("no return value specified for OnMessage") } var r0 <-chan types.Message diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index 0c31a1d7ac9..4d05db4380f 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -11,7 +11,6 @@ import ( ) // MercuryClient is the client API for Mercury service. -// type MercuryClient interface { Transmit(ctx context.Context, in *TransmitRequest) (*TransmitResponse, error) LatestReport(ctx context.Context, in *LatestReportRequest) (*LatestReportResponse, error) diff --git a/core/services/workflows/execution_strategy.go b/core/services/workflows/execution_strategy.go index f5da8bca4be..5cc8164c4f7 100644 --- a/core/services/workflows/execution_strategy.go +++ b/core/services/workflows/execution_strategy.go @@ -7,12 +7,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" - - "github.com/smartcontractkit/libocr/permutation" - - "golang.org/x/crypto/sha3" ) type executionStrategy interface { @@ -47,19 +44,12 @@ type scheduledExecution struct { Position int } -var ( - // S = [N] - Schedule_AllAtOnce = "allAtOnce" - // S = [1 * N] - Schedule_OneAtATime = "oneAtATime" -) - // scheduledExecution generates a pseudo-random transmission schedule, // and delays execution until a node is required to transmit. func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { - tc, err := d.transmissionConfig(req.Config) + tc, err := transmission.ExtractTransmissionConfig(req.Config) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) } info, err := cap.Info(ctx) @@ -70,102 +60,36 @@ func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap c switch { // Case 1: Local DON case info.DON == nil: - n := len(d.DON.Members) - key := d.key(d.DON.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID) - sched, err := schedule(tc.Schedule, n) + + // The transmission ID is created using the workflow ID and the workflow execution ID which nodes don't know + // ahead of time and ensures a malicious node cannot game the schedule. + peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(d.DON.Members, d.DON.Config.SharedSecret, + req.Metadata.WorkflowID+req.Metadata.WorkflowExecutionID, tc) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err) } - picked := permutation.Permutation(n, key) - delay := d.delayFor(d.Position, sched, picked, tc.DeltaStage) - if delay == nil { + delay, existsForPeerID := peerIDToTransmissionDelay[*d.PeerID] + if !existsForPeerID { lggr.Debugw("skipping transmission: node is not included in schedule") return nil, nil } - lggr.Debugf("execution delayed by %+v", *delay) + lggr.Debugf("execution delayed by %+v", delay) select { case <-ctx.Done(): return nil, ctx.Err() - case <-time.After(*delay): + case <-time.After(delay): lggr.Debugw("executing delayed execution") return immediateExecution{}.Apply(ctx, lggr, cap, req) } // Case 2: Remote DON default: + + // In this case just execute immediately on the capability and the shims will handle the scheduling and f+1 aggregation + // TODO: fill in the remote DON case once consensus has been reach on what to do. lggr.Debugw("remote DON transmission not implemented: using immediate execution") return immediateExecution{}.Apply(ctx, lggr, cap, req) } } - -// `key` uses a shared secret, combined with a workflowID and a workflowExecutionID to generate -// a secret that can later be used to pseudo-randomly determine a schedule for a set of nodes in a DON. -// The addition of the workflowExecutionID -- which nodes don't know ahead of time -- additionally guarantees -// that a malicious coalition of nodes can't "game" the schedule. -// IMPORTANT: changing this function should happen carefully to maintain the guarantee that all nodes -// arrive at the same secret. -func (d scheduledExecution) key(sharedSecret [16]byte, workflowID, workflowExecutionID string) [16]byte { - hash := sha3.NewLegacyKeccak256() - hash.Write(sharedSecret[:]) - hash.Write([]byte(workflowID)) - hash.Write([]byte(workflowExecutionID)) - - var key [16]byte - copy(key[:], hash.Sum(nil)) - return key -} - -type transmissionConfig struct { - Schedule string - DeltaStage time.Duration -} - -func (d scheduledExecution) transmissionConfig(config *values.Map) (transmissionConfig, error) { - var tc struct { - DeltaStage string - Schedule string - } - err := config.UnwrapTo(&tc) - if err != nil { - return transmissionConfig{}, err - } - - duration, err := time.ParseDuration(tc.DeltaStage) - if err != nil { - return transmissionConfig{}, fmt.Errorf("failed to parse DeltaStage %s as duration: %w", tc.DeltaStage, err) - } - - return transmissionConfig{ - Schedule: tc.Schedule, - DeltaStage: duration, - }, nil -} - -func (d scheduledExecution) delayFor(position int, schedule []int, permutation []int, deltaStage time.Duration) *time.Duration { - sum := 0 - for i, s := range schedule { - sum += s - if permutation[position] < sum { - result := time.Duration(i) * deltaStage - return &result - } - } - - return nil -} - -func schedule(sched string, N int) ([]int, error) { - switch sched { - case Schedule_AllAtOnce: - return []int{N}, nil - case Schedule_OneAtATime: - sch := []int{} - for i := 0; i < N; i++ { - sch = append(sch, 1) - } - return sch, nil - } - return nil, fmt.Errorf("unknown schedule %s", sched) -} diff --git a/dashboard-lib/k8s-pods/component.go b/dashboard-lib/k8s-pods/component.go index df9a6ac6a69..551eccd1d31 100644 --- a/dashboard-lib/k8s-pods/component.go +++ b/dashboard-lib/k8s-pods/component.go @@ -143,7 +143,7 @@ func New(p Props) []dashboard.Option { ), ), row.WithTimeSeries( - "Receive Bandwidth", + "OnMessage Bandwidth", timeseries.Span(6), timeseries.Height("200px"), timeseries.DataSource(p.PrometheusDataSource),