From 12962e4d8af2944cfc2d08ac1dd90a53f904a72b Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Mon, 20 May 2024 15:16:27 +0100 Subject: [PATCH] remote target and transmission protocol --- .changeset/few-suns-occur.md | 5 + core/capabilities/remote/target.go | 87 ---- core/capabilities/remote/target/client.go | 142 ++++++ .../capabilities/remote/target/client_test.go | 312 ++++++++++++++ .../remote/target/endtoend_test.go | 404 ++++++++++++++++++ .../remote/target/request/client_request.go | 161 +++++++ .../target/request/client_request_test.go | 311 ++++++++++++++ .../remote/target/request/server_request.go | 222 ++++++++++ .../target/request/server_request_test.go | 261 +++++++++++ core/capabilities/remote/target/server.go | 131 ++++++ .../capabilities/remote/target/server_test.go | 224 ++++++++++ core/capabilities/remote/target_test.go | 30 -- core/capabilities/remote/types/message.pb.go | 102 +++-- core/capabilities/remote/types/message.proto | 11 +- core/capabilities/remote/types/types.go | 1 + .../capabilities/transmission/transmission.go | 109 +++++ .../transmission/transmission_test.go | 101 +++++ core/services/workflows/execution_strategy.go | 108 +---- 18 files changed, 2470 insertions(+), 252 deletions(-) create mode 100644 .changeset/few-suns-occur.md delete mode 100644 core/capabilities/remote/target.go create mode 100644 core/capabilities/remote/target/client.go create mode 100644 core/capabilities/remote/target/client_test.go create mode 100644 core/capabilities/remote/target/endtoend_test.go create mode 100644 core/capabilities/remote/target/request/client_request.go create mode 100644 core/capabilities/remote/target/request/client_request_test.go create mode 100644 core/capabilities/remote/target/request/server_request.go create mode 100644 core/capabilities/remote/target/request/server_request_test.go create mode 100644 core/capabilities/remote/target/server.go create mode 100644 core/capabilities/remote/target/server_test.go delete mode 100644 core/capabilities/remote/target_test.go create mode 100644 core/capabilities/transmission/transmission.go create mode 100644 core/capabilities/transmission/transmission_test.go diff --git a/.changeset/few-suns-occur.md b/.changeset/few-suns-occur.md new file mode 100644 index 00000000000..a0b55e27e61 --- /dev/null +++ b/.changeset/few-suns-occur.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal remote target capability and transmission protocol diff --git a/core/capabilities/remote/target.go b/core/capabilities/remote/target.go deleted file mode 100644 index 655f4f84abb..00000000000 --- a/core/capabilities/remote/target.go +++ /dev/null @@ -1,87 +0,0 @@ -package remote - -import ( - "context" - "errors" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages -type remoteTargetCaller struct { - capInfo commoncap.CapabilityInfo - donInfo *capabilities.DON - dispatcher types.Dispatcher - lggr logger.Logger -} - -var _ commoncap.TargetCapability = &remoteTargetCaller{} -var _ types.Receiver = &remoteTargetCaller{} - -type remoteTargetReceiver struct { - capInfo commoncap.CapabilityInfo - donInfo *capabilities.DON - dispatcher types.Dispatcher - lggr logger.Logger -} - -var _ types.Receiver = &remoteTargetReceiver{} - -func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller { - return &remoteTargetCaller{ - capInfo: capInfo, - donInfo: donInfo, - dispatcher: dispatcher, - lggr: lggr, - } -} - -func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { - return c.capInfo, nil -} - -func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { - return errors.New("not implemented") -} - -func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { - return errors.New("not implemented") -} - -func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { - c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members)) - for _, peerID := range c.donInfo.Members { - m := &types.MessageBody{ - CapabilityId: c.capInfo.ID, - CapabilityDonId: c.donInfo.ID, - Payload: []byte{0x01, 0x02, 0x03}, - } - err := c.dispatcher.Send(peerID, m) - if err != nil { - return nil, err - } - } - - // TODO: return a channel that will be closed when all responses are received - return nil, nil -} - -func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { - c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) -} - -func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver { - return &remoteTargetReceiver{ - capInfo: capInfo, - donInfo: donInfo, - dispatcher: dispatcher, - lggr: lggr, - } -} - -func (c *remoteTargetReceiver) Receive(msg *types.MessageBody) { - c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) -} diff --git a/core/capabilities/remote/target/client.go b/core/capabilities/remote/target/client.go new file mode 100644 index 00000000000..ceab11dfcb0 --- /dev/null +++ b/core/capabilities/remote/target/client.go @@ -0,0 +1,142 @@ +package target + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// client is a shim for remote target capabilities. +// It translates between capability API calls and network messages. +// Its responsibilities are: +// 1. Transmit capability requests to remote nodes according to a transmission schedule +// 2. Aggregate responses from remote nodes and return the aggregated response +// +// client communicates with corresponding server on remote nodes. +type client struct { + lggr logger.Logger + remoteCapabilityInfo commoncap.CapabilityInfo + localDONInfo capabilities.DON + dispatcher types.Dispatcher + requestTimeout time.Duration + + messageIDToCallerRequest map[string]*request.ClientRequest + mutex sync.Mutex +} + +var _ commoncap.TargetCapability = &client{} +var _ types.Receiver = &client{} + +func NewClient(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, + requestTimeout time.Duration) *client { + c := &client{ + lggr: lggr, + remoteCapabilityInfo: remoteCapabilityInfo, + localDONInfo: localDonInfo, + dispatcher: dispatcher, + requestTimeout: requestTimeout, + messageIDToCallerRequest: make(map[string]*request.ClientRequest), + } + + go func() { + ticker := time.NewTicker(requestTimeout) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.expireRequests() + } + } + }() + + return c +} + +func (c *client) expireRequests() { + c.mutex.Lock() + defer c.mutex.Unlock() + + for messageID, req := range c.messageIDToCallerRequest { + if req.Expired() { + req.Cancel(errors.New("request expired")) + delete(c.messageIDToCallerRequest, messageID) + } + } +} + +func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return c.remoteCapabilityInfo, nil +} + +func (c *client) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + // do nothing + return nil +} + +func (c *client) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + // do nothing + return nil +} + +func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + messageID, err := GetMessageIDForRequest(capReq) + if err != nil { + return nil, fmt.Errorf("failed to get message ID for request: %w", err) + } + + if _, ok := c.messageIDToCallerRequest[messageID]; ok { + return nil, fmt.Errorf("request for message ID %s already exists", messageID) + } + + req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, + c.requestTimeout) + if err != nil { + return nil, fmt.Errorf("failed to create client request: %w", err) + } + + c.messageIDToCallerRequest[messageID] = req + + return req.ResponseChan(), nil +} + +func (c *client) Receive(msg *types.MessageBody) { + c.mutex.Lock() + defer c.mutex.Unlock() + // TODO should the dispatcher be passing in a context? + ctx := context.Background() + + messageID := GetMessageID(msg) + + req := c.messageIDToCallerRequest[messageID] + if req == nil { + c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID) + return + } + + go func() { + if err := req.OnMessage(ctx, msg); err != nil { + c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err) + } + }() +} + +func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) { + if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" { + return "", errors.New("workflow ID and workflow execution ID must be set in request metadata") + } + + return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil +} diff --git a/core/capabilities/remote/target/client_test.go b/core/capabilities/remote/target/client_test.go new file mode 100644 index 00000000000..5bfbb0c7a0c --- /dev/null +++ b/core/capabilities/remote/target/client_test.go @@ -0,0 +1,312 @@ +package target_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_Client_DonTopologies(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + capability := &TestCapability{} + + responseTimeOut := 10 * time.Minute + + testClient(ctx, t, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + + testClient(ctx, t, 10, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + + testClient(ctx, t, 1, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + testClient(ctx, t, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + testClient(ctx, t, 10, responseTimeOut, 10, 9, + capability, transmissionSchedule, responseTest) +} + +func Test_Client_TransmissionSchedules(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + capability := &TestCapability{} + + responseTimeOut := 10 * time.Minute + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testClient(ctx, t, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + testClient(ctx, t, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) + + transmissionSchedule, err = values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testClient(ctx, t, 1, responseTimeOut, 1, 0, + capability, transmissionSchedule, responseTest) + testClient(ctx, t, 10, responseTimeOut, 10, 3, + capability, transmissionSchedule, responseTest) +} + +func Test_Client_TimesOutIfInsufficientCapabilityPeerResponses(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + // number of capability peers is less than F + 1 + + testClient(ctx, t, 10, 1*time.Second, 10, 11, + capability, transmissionSchedule, responseTest) +} + +func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout time.Duration, + numCapabilityPeers int, capabilityDonF uint8, underlying commoncap.TargetCapability, transmissionSchedule *values.Map, + responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) { + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeers[i] = NewP2PPeerID(t) + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + } + + broker := newTestMessageBroker() + + receivers := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeers[i]) + receiver := newTestServer(capabilityPeers[i], capabilityDispatcher, workflowDonInfo, underlying) + broker.RegisterReceiverNode(capabilityPeers[i], receiver) + receivers[i] = receiver + } + + callers := make([]commoncap.TargetCapability, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + caller := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout) + broker.RegisterReceiverNode(workflowPeers[i], caller) + callers[i] = caller + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(len(callers)) + + // Fire off all the requests + for _, caller := range callers { + go func(caller commoncap.TargetCapability) { + responseCh, err := caller.Execute(ctx, + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Config: transmissionSchedule, + Inputs: executeInputs, + }) + + responseTest(t, responseCh, err) + wg.Done() + }(caller) + } + + wg.Wait() +} + +// Simple client that only responds once it has received a message from each workflow peer +type clientTestServer struct { + peerID p2ptypes.PeerID + dispatcher remotetypes.Dispatcher + workflowDonInfo commoncap.DON + messageIDToSenders map[string]map[p2ptypes.PeerID]bool + + targetCapability commoncap.TargetCapability + + mux sync.Mutex +} + +func newTestServer(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, workflowDonInfo commoncap.DON, + targetCapability commoncap.TargetCapability) *clientTestServer { + return &clientTestServer{ + dispatcher: dispatcher, + workflowDonInfo: workflowDonInfo, + peerID: peerID, + messageIDToSenders: make(map[string]map[p2ptypes.PeerID]bool), + targetCapability: targetCapability, + } +} + +func (t *clientTestServer) Receive(msg *remotetypes.MessageBody) { + t.mux.Lock() + defer t.mux.Unlock() + + sender := toPeerID(msg.Sender) + messageID := target.GetMessageID(msg) + + if t.messageIDToSenders[messageID] == nil { + t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool) + } + + sendersOfMessageID := t.messageIDToSenders[messageID] + if sendersOfMessageID[sender] { + panic("received duplicate message") + } + + sendersOfMessageID[sender] = true + + if len(t.messageIDToSenders[messageID]) == len(t.workflowDonInfo.Members) { + capabilityRequest, err := pb.UnmarshalCapabilityRequest(msg.Payload) + if err != nil { + panic(err) + } + + respCh, responseErr := t.targetCapability.Execute(context.Background(), capabilityRequest) + resp := <-respCh + + for receiver := range t.messageIDToSenders[messageID] { + var responseMsg = &remotetypes.MessageBody{ + CapabilityId: "cap_id", + CapabilityDonId: "capability-don", + CallerDonId: t.workflowDonInfo.ID, + Method: remotetypes.MethodExecute, + MessageId: []byte(messageID), + Sender: t.peerID[:], + Receiver: receiver[:], + } + + if responseErr != nil { + responseMsg.Error = remotetypes.Error_INTERNAL_ERROR + } else { + payload, marshalErr := pb.MarshalCapabilityResponse(resp) + if marshalErr != nil { + panic(marshalErr) + } + responseMsg.Payload = payload + } + + err = t.dispatcher.Send(receiver, responseMsg) + if err != nil { + panic(err) + } + } + } +} + +type TestDispatcher struct { + sentMessagesCh chan *remotetypes.MessageBody + receiver remotetypes.Receiver +} + +func NewTestDispatcher() *TestDispatcher { + return &TestDispatcher{ + sentMessagesCh: make(chan *remotetypes.MessageBody, 1), + } +} + +func (t *TestDispatcher) SendToReceiver(msgBody *remotetypes.MessageBody) { + t.receiver.Receive(msgBody) +} + +func (t *TestDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { + t.receiver = receiver + return nil +} + +func (t *TestDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *TestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { + t.sentMessagesCh <- msgBody + return nil +} diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go new file mode 100644 index 00000000000..998c9532871 --- /dev/null +++ b/core/capabilities/remote/target/endtoend_test.go @@ -0,0 +1,404 @@ +package target_test + +import ( + "context" + "crypto/rand" + "errors" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_RemoteTargetCapability_InsufficientCapabilityResponses(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 10, 10*time.Minute, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_InsufficientWorkflowRequests(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.NotNil(t, response.Err) + } + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(ctx, t, capability, 10, 10, 10*time.Millisecond, 10, 9, timeOut, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_TransmissionSchedules(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + testRemoteTarget(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) + + transmissionSchedule, err = values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_DonTopologies(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + responseValue, err := response.Value.Unwrap() + require.NoError(t, err) + assert.Equal(t, "aValue1", responseValue.(string)) + } + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + timeOut := 10 * time.Minute + + capability := &TestCapability{} + + // Test scenarios where the number of submissions is greater than or equal to F + 1 + testRemoteTarget(ctx, t, capability, 1, 0, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 4, 3, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 10, 3, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + + testRemoteTarget(ctx, t, capability, 1, 0, timeOut, 1, 0, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 1, 0, timeOut, 4, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 1, 0, timeOut, 10, 3, timeOut, transmissionSchedule, responseTest) + + testRemoteTarget(ctx, t, capability, 4, 3, timeOut, 4, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 10, 3, timeOut, 10, 3, timeOut, transmissionSchedule, responseTest) + testRemoteTarget(ctx, t, capability, 10, 9, timeOut, 10, 9, timeOut, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_CapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.Equal(t, "failed to execute capability: an error", response.Err.Error()) + } + + capability := &TestErrorCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(ctx, t, capability, 10, 9, 10*time.Minute, 10, 9, 10*time.Minute, transmissionSchedule, responseTest) +} + +func Test_RemoteTargetCapability_RandomCapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) { + require.NoError(t, responseError) + response := <-responseCh + assert.Equal(t, "request expired", response.Err.Error()) + } + + capability := &TestRandomErrorCapability{} + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + testRemoteTarget(ctx, t, capability, 10, 9, 10*time.Millisecond, 10, 9, 10*time.Minute, transmissionSchedule, responseTest) +} + +func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.TargetCapability, numWorkflowPeers int, workflowDonF uint8, workflowNodeTimeout time.Duration, + numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration, transmissionSchedule *values.Map, + responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) { + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeerID := p2ptypes.PeerID{} + require.NoError(t, capabilityPeerID.UnmarshalText([]byte(NewPeerID()))) + capabilityPeers[i] = capabilityPeerID + } + + capabilityPeerID := p2ptypes.PeerID{} + require.NoError(t, capabilityPeerID.UnmarshalText([]byte(NewPeerID()))) + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerID := p2ptypes.PeerID{} + require.NoError(t, workflowPeerID.UnmarshalText([]byte(NewPeerID()))) + workflowPeers[i] = workflowPeerID + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: workflowDonF, + } + + broker := newTestMessageBroker() + + workflowDONs := map[string]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + + capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeer := capabilityPeers[i] + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) + capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout) + broker.RegisterReceiverNode(capabilityPeer, capabilityNode) + capabilityNodes[i] = capabilityNode + } + + workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + workflowNode := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout) + broker.RegisterReceiverNode(workflowPeers[i], workflowNode) + workflowNodes[i] = workflowNode + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(len(workflowNodes)) + + for _, caller := range workflowNodes { + go func(caller commoncap.TargetCapability) { + responseCh, err := caller.Execute(ctx, + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Config: transmissionSchedule, + Inputs: executeInputs, + }) + + responseTest(t, responseCh, err) + wg.Done() + }(caller) + } + + wg.Wait() +} + +type testMessageBroker struct { + nodes map[p2ptypes.PeerID]remotetypes.Receiver +} + +func newTestMessageBroker() *testMessageBroker { + return &testMessageBroker{ + nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver), + } +} + +func (r *testMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher { + return &nodeDispatcher{ + callerPeerID: nodePeerID, + broker: r, + } +} + +func (r *testMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) { + if _, ok := r.nodes[nodePeerID]; ok { + panic("node already registered") + } + + r.nodes[nodePeerID] = node +} + +func (r *testMessageBroker) Send(msg *remotetypes.MessageBody) { + receiverId := toPeerID(msg.Receiver) + + receiver, ok := r.nodes[receiverId] + if !ok { + panic("server not found for peer id") + } + + receiver.Receive(msg) +} + +func toPeerID(id []byte) p2ptypes.PeerID { + return [32]byte(id) +} + +type nodeDispatcher struct { + callerPeerID p2ptypes.PeerID + broker *testMessageBroker +} + +func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { + msgBody.Version = 1 + msgBody.Sender = t.callerPeerID[:] + msgBody.Receiver = peerID[:] + msgBody.Timestamp = time.Now().UnixMilli() + t.broker.Send(msgBody) + return nil +} + +func (t *nodeDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error { + return nil +} +func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +type abstractTestCapability struct { +} + +func (t abstractTestCapability) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return commoncap.CapabilityInfo{}, nil +} + +func (t abstractTestCapability) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return nil +} + +func (t abstractTestCapability) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return nil +} + +type TestCapability struct { + abstractTestCapability +} + +func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + ch := make(chan commoncap.CapabilityResponse, 1) + + value := request.Inputs.Underlying["executeValue1"] + + ch <- commoncap.CapabilityResponse{ + Value: value, + } + + return ch, nil +} + +type TestErrorCapability struct { + abstractTestCapability +} + +func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New("an error") +} + +type TestRandomErrorCapability struct { + abstractTestCapability +} + +func (t TestRandomErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New(uuid.New().String()) +} + +func NewP2PPeerID(t *testing.T) p2ptypes.PeerID { + id := p2ptypes.PeerID{} + require.NoError(t, id.UnmarshalText([]byte(NewPeerID()))) + return id +} + +func NewPeerID() string { + var privKey [32]byte + _, err := rand.Read(privKey[:]) + if err != nil { + panic(err) + } + + peerID := append(libp2pMagic(), privKey[:]...) + + return base58.Encode(peerID[:]) +} + +func libp2pMagic() []byte { + return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20} +} diff --git a/core/capabilities/remote/target/request/client_request.go b/core/capabilities/remote/target/request/client_request.go new file mode 100644 index 00000000000..1355932c833 --- /dev/null +++ b/core/capabilities/remote/target/request/client_request.go @@ -0,0 +1,161 @@ +package request + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "sync" + "time" + + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +type ClientRequest struct { + responseCh chan commoncap.CapabilityResponse + createdAt time.Time + responseIDCount map[[32]byte]int + errorCount map[string]int + responseReceived map[p2ptypes.PeerID]bool + + requiredIdenticalResponses int + + requestTimeout time.Duration + + respSent bool + mux sync.Mutex +} + +func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string, + remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, + requestTimeout time.Duration) (*ClientRequest, error) { + remoteCapabilityDonInfo := remoteCapabilityInfo.DON + if remoteCapabilityDonInfo == nil { + return nil, errors.New("remote capability info missing DON") + } + + rawRequest, err := pb.MarshalCapabilityRequest(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability request: %w", err) + } + + tc, err := transmission.ExtractTransmissionConfig(req.Config) + if err != nil { + return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) + } + + peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(remoteCapabilityDonInfo.Members, localDonInfo.Config.SharedSecret, + messageID, tc) + if err != nil { + return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err) + } + + responseReceived := make(map[p2ptypes.PeerID]bool) + for peerID, delay := range peerIDToTransmissionDelay { + responseReceived[peerID] = false + go func(peerID ragep2ptypes.PeerID, delay time.Duration) { + message := &types.MessageBody{ + CapabilityId: remoteCapabilityInfo.ID, + CapabilityDonId: remoteCapabilityDonInfo.ID, + CallerDonId: localDonInfo.ID, + Method: types.MethodExecute, + Payload: rawRequest, + MessageId: []byte(messageID), + } + + select { + case <-ctx.Done(): + return + case <-time.After(delay): + err := dispatcher.Send(peerID, message) + if err != nil { + lggr.Errorw("failed to send message", "peerID", peerID, "err", err) + } + } + }(peerID, delay) + } + + return &ClientRequest{ + createdAt: time.Now(), + requestTimeout: requestTimeout, + requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1), + responseIDCount: make(map[[32]byte]int), + errorCount: make(map[string]int), + responseReceived: responseReceived, + responseCh: make(chan commoncap.CapabilityResponse, 1), + }, nil +} + +func (c *ClientRequest) ResponseChan() <-chan commoncap.CapabilityResponse { + return c.responseCh +} + +func (c *ClientRequest) Expired() bool { + return time.Since(c.createdAt) > c.requestTimeout +} + +func (c *ClientRequest) Cancel(err error) { + c.mux.Lock() + defer c.mux.Unlock() + if !c.respSent { + c.sendResponse(commoncap.CapabilityResponse{Err: err}) + } +} + +// TODO OnMessage assumes that only one response is received from each peer, if streaming responses need to be supported this will need to be updated +func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) error { + c.mux.Lock() + defer c.mux.Unlock() + + if msg.Sender == nil { + return fmt.Errorf("sender missing from message") + } + + sender := remote.ToPeerID(msg.Sender) + + received, expected := c.responseReceived[sender] + if !expected { + return fmt.Errorf("response from peer %s not expected", sender) + } + + if received { + return fmt.Errorf("response from peer %s already received", sender) + } + + c.responseReceived[sender] = true + + if msg.Error == types.Error_OK { + responseID := sha256.Sum256(msg.Payload) + c.responseIDCount[responseID]++ + + if c.responseIDCount[responseID] == c.requiredIdenticalResponses { + capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload) + if err != nil { + c.sendResponse(commoncap.CapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)}) + } else { + c.sendResponse(commoncap.CapabilityResponse{Value: capabilityResponse.Value}) + } + } + } else { + c.errorCount[msg.ErrorMsg]++ + if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses { + c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)}) + } + } + return nil +} + +func (c *ClientRequest) sendResponse(response commoncap.CapabilityResponse) { + c.responseCh <- response + close(c.responseCh) + c.respSent = true +} diff --git a/core/capabilities/remote/target/request/client_request_test.go b/core/capabilities/remote/target/request/client_request_test.go new file mode 100644 index 00000000000..930ba595625 --- /dev/null +++ b/core/capabilities/remote/target/request/client_request_test.go @@ -0,0 +1,311 @@ +package request_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_ClientRequest_MessageValidation(t *testing.T) { + lggr := logger.TestLogger(t) + + numCapabilityPeers := 2 + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeers[i] = NewP2PPeerID(t) + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: 1, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + numWorkflowPeers := 2 + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + } + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + require.NoError(t, err) + + transmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "1000ms", + }) + require.NoError(t, err) + + capabilityRequest := commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Inputs: executeInputs, + Config: transmissionSchedule, + } + + capabilityResponse := commoncap.CapabilityResponse{ + Value: values.NewString("response1"), + Err: nil, + } + + rawResponse, err := pb.MarshalCapabilityResponse(capabilityResponse) + require.NoError(t, err) + + messageID, err := target.GetMessageIDForRequest(capabilityRequest) + require.NoError(t, err) + + msg := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + } + + t.Run("Send second message with different response", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + capabilityResponse2 := commoncap.CapabilityResponse{ + Value: values.NewString("response2"), + Err: nil, + } + + rawResponse2, err := pb.MarshalCapabilityResponse(capabilityResponse2) + require.NoError(t, err) + msg2 := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse2, + MessageId: []byte("messageID"), + } + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + msg2.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msg2) + require.NoError(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second message from non calling Don peer", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + nonDonPeer := NewP2PPeerID(t) + msg.Sender = nonDonPeer[:] + err = request.OnMessage(ctx, msg) + require.NotNil(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second message from same peer as first message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + err = request.OnMessage(ctx, msg) + require.NotNil(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second message with same error as first", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msgWithError := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error", + } + + msgWithError.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + + msgWithError.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + + response := <-request.ResponseChan() + + assert.Equal(t, "an error", response.Err.Error()) + }) + + t.Run("Send second message with different error to first", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msgWithError := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error", + Sender: capabilityPeers[0][:], + } + + msgWithError2 := &types.MessageBody{ + CapabilityId: capInfo.ID, + CapabilityDonId: capDonInfo.ID, + CallerDonId: workflowDonInfo.ID, + Method: types.MethodExecute, + Payload: rawResponse, + MessageId: []byte("messageID"), + Error: types.Error_INTERNAL_ERROR, + ErrorMsg: "an error2", + Sender: capabilityPeers[1][:], + } + + err = request.OnMessage(ctx, msgWithError) + require.NoError(t, err) + err = request.OnMessage(ctx, msgWithError2) + require.NoError(t, err) + + select { + case <-request.ResponseChan(): + t.Fatal("expected no response") + default: + } + }) + + t.Run("Send second valid message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} + request, err := request.NewClientRequest(ctx, lggr, capabilityRequest, messageID, capInfo, + workflowDonInfo, dispatcher, 10*time.Minute) + require.NoError(t, err) + + <-dispatcher.msgs + <-dispatcher.msgs + assert.Equal(t, 0, len(dispatcher.msgs)) + + msg.Sender = capabilityPeers[0][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + msg.Sender = capabilityPeers[1][:] + err = request.OnMessage(ctx, msg) + require.NoError(t, err) + + response := <-request.ResponseChan() + + assert.Equal(t, response.Value, values.NewString("response1")) + }) +} + +type clientRequestTestDispatcher struct { + msgs chan *types.MessageBody +} + +func (t *clientRequestTestDispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error { + return nil +} + +func (t *clientRequestTestDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *clientRequestTestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + t.msgs <- msgBody + return nil +} diff --git a/core/capabilities/remote/target/request/server_request.go b/core/capabilities/remote/target/request/server_request.go new file mode 100644 index 00000000000..84968de9f11 --- /dev/null +++ b/core/capabilities/remote/target/request/server_request.go @@ -0,0 +1,222 @@ +package request + +import ( + "context" + "fmt" + "sync" + "time" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +type response struct { + response []byte + error types.Error + errorMsg string +} + +type ServerRequest struct { + capability capabilities.TargetCapability + + capabilityPeerId p2ptypes.PeerID + capabilityID string + capabilityDonID string + + dispatcher types.Dispatcher + + requesters map[p2ptypes.PeerID]bool + responseSentToRequester map[p2ptypes.PeerID]bool + + createdTime time.Time + + response *response + + callingDon commoncap.DON + + requestMessageID string + requestTimeout time.Duration + + mux sync.Mutex +} + +func NewServerRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID, + callingDon commoncap.DON, requestMessageID string, + dispatcher types.Dispatcher, requestTimeout time.Duration) *ServerRequest { + return &ServerRequest{ + capability: capability, + createdTime: time.Now(), + capabilityID: capabilityID, + capabilityDonID: capabilityDonID, + capabilityPeerId: capabilityPeerId, + dispatcher: dispatcher, + requesters: map[p2ptypes.PeerID]bool{}, + responseSentToRequester: map[p2ptypes.PeerID]bool{}, + callingDon: callingDon, + requestMessageID: requestMessageID, + requestTimeout: requestTimeout, + } +} + +func (e *ServerRequest) OnMessage(ctx context.Context, msg *types.MessageBody) error { + e.mux.Lock() + defer e.mux.Unlock() + + if msg.Sender == nil { + return fmt.Errorf("sender missing from message") + } + + requester := remote.ToPeerID(msg.Sender) + if err := e.addRequester(requester); err != nil { + return fmt.Errorf("failed to add requester to request: %w", err) + } + + if e.minimumRequiredRequestsReceived() && !e.hasResponse() { + if err := e.executeRequest(ctx, msg.Payload); err != nil { + e.setError(types.Error_INTERNAL_ERROR, err.Error()) + } + } + + if err := e.sendResponses(); err != nil { + return fmt.Errorf("failed to send responses: %w", err) + } + + return nil +} + +func (e *ServerRequest) Expired() bool { + return time.Since(e.createdTime) > e.requestTimeout +} + +func (e *ServerRequest) Cancel(err types.Error, msg string) error { + e.mux.Lock() + defer e.mux.Unlock() + + if e.hasResponse() { + return fmt.Errorf("request already has response") + } + + e.setError(err, msg) + if err := e.sendResponses(); err != nil { + return fmt.Errorf("failed to send responses: %w", err) + } + + return nil +} + +func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, e.requestTimeout) + defer cancel() + + capabilityRequest, err := pb.UnmarshalCapabilityRequest(payload) + if err != nil { + return fmt.Errorf("failed to unmarshal capability request: %w", err) + } + + capResponseCh, err := e.capability.Execute(ctxWithTimeout, capabilityRequest) + + if err != nil { + return fmt.Errorf("failed to execute capability: %w", err) + } + + // TODO working on the assumption that the capability will only ever return one response from its channel (for now at least) + capResponse := <-capResponseCh + responsePayload, err := pb.MarshalCapabilityResponse(capResponse) + if err != nil { + return fmt.Errorf("failed to marshal capability response: %w", err) + } + + e.setResult(responsePayload) + + return nil +} + +func (e *ServerRequest) addRequester(from p2ptypes.PeerID) error { + fromPeerInCallingDon := false + for _, member := range e.callingDon.Members { + if member == from { + fromPeerInCallingDon = true + break + } + } + + if !fromPeerInCallingDon { + return fmt.Errorf("request received from peer %s not in calling don", from) + } + + if e.requesters[from] { + return fmt.Errorf("request already received from peer %s", from) + } + + e.requesters[from] = true + + return nil +} + +func (e *ServerRequest) minimumRequiredRequestsReceived() bool { + return len(e.requesters) >= int(e.callingDon.F+1) +} + +func (e *ServerRequest) setResult(result []byte) { + e.response = &response{ + response: result, + } +} + +func (e *ServerRequest) setError(err types.Error, errMsg string) { + e.response = &response{ + error: err, + errorMsg: errMsg, + } +} + +func (e *ServerRequest) hasResponse() bool { + return e.response != nil +} + +func (e *ServerRequest) sendResponses() error { + if e.hasResponse() { + for requester := range e.requesters { + if !e.responseSentToRequester[requester] { + e.responseSentToRequester[requester] = true + if err := e.sendResponse(requester); err != nil { + return fmt.Errorf("failed to send response to requester %s: %w", requester, err) + } + } + } + } + + return nil +} + +func (e *ServerRequest) sendResponse(requester p2ptypes.PeerID) error { + responseMsg := types.MessageBody{ + CapabilityId: e.capabilityID, + CapabilityDonId: e.capabilityDonID, + CallerDonId: e.callingDon.ID, + Method: types.MethodExecute, + MessageId: []byte(e.requestMessageID), + Sender: e.capabilityPeerId[:], + Receiver: requester[:], + } + + if e.response.error != types.Error_OK { + responseMsg.Error = e.response.error + responseMsg.ErrorMsg = e.response.errorMsg + } else { + responseMsg.Payload = e.response.response + } + + if err := e.dispatcher.Send(requester, &responseMsg); err != nil { + return fmt.Errorf("failed to send response to dispatcher: %w", err) + } + + e.responseSentToRequester[requester] = true + + return nil +} diff --git a/core/capabilities/remote/target/request/server_request_test.go b/core/capabilities/remote/target/request/server_request_test.go new file mode 100644 index 00000000000..0529a1e9004 --- /dev/null +++ b/core/capabilities/remote/target/request/server_request_test.go @@ -0,0 +1,261 @@ +package request_test + +import ( + "context" + "crypto/rand" + "errors" + "testing" + "time" + + "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_ServerRequest_MessageValidation(t *testing.T) { + capability := TestCapability{} + capabilityPeerID := NewP2PPeerID(t) + + numWorkflowPeers := 2 + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + callingDon := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: 1, + } + + dispatcher := &testDispatcher{} + + executeInputs, err := values.NewMap( + map[string]any{ + "executeValue1": "aValue1", + }, + ) + require.NoError(t, err) + + capabilityRequest := commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + Inputs: executeInputs, + } + + rawRequest, err := pb.MarshalCapabilityRequest(capabilityRequest) + require.NoError(t, err) + + t.Run("Send duplicate message", func(t *testing.T) { + req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + assert.NotNil(t, err) + }) + + t.Run("Send message with non calling don peer", func(t *testing.T) { + req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + nonDonPeer := NewP2PPeerID(t) + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: nonDonPeer[:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + + assert.NotNil(t, err) + }) + + t.Run("Send message invalid payload", func(t *testing.T) { + req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: append(rawRequest, []byte("asdf")...), + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_INTERNAL_ERROR) + }) + + t.Run("Send second valid request when capability errors", func(t *testing.T) { + dispatcher := &testDispatcher{} + req := request.NewServerRequest(TestErrorCapability{}, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = req.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[0].ErrorMsg, "failed to execute capability: an error") + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_INTERNAL_ERROR) + assert.Equal(t, dispatcher.msgs[1].ErrorMsg, "failed to execute capability: an error") + }) + + t.Run("Send second valid request", func(t *testing.T) { + dispatcher := &testDispatcher{} + request := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + + err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest) + require.NoError(t, err) + + err = request.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[1][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) + assert.NoError(t, err) + assert.Equal(t, 2, len(dispatcher.msgs)) + assert.Equal(t, dispatcher.msgs[0].Error, types.Error_OK) + assert.Equal(t, dispatcher.msgs[1].Error, types.Error_OK) + }) +} + +type serverRequest interface { + OnMessage(ctx context.Context, msg *types.MessageBody) error +} + +func sendValidRequest(request serverRequest, workflowPeers []p2ptypes.PeerID, capabilityPeerID p2ptypes.PeerID, + rawRequest []byte) error { + return request.OnMessage(context.Background(), &types.MessageBody{ + Version: 0, + Sender: workflowPeers[0][:], + Receiver: capabilityPeerID[:], + MessageId: []byte("workflowID" + "workflowExecutionID"), + CapabilityId: "capabilityID", + CapabilityDonId: "capabilityDonID", + CallerDonId: "workflow-don", + Method: types.MethodExecute, + Payload: rawRequest, + }) +} + +type testDispatcher struct { + msgs []*types.MessageBody +} + +func (t *testDispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error { + return nil +} + +func (t *testDispatcher) RemoveReceiver(capabilityId string, donId string) {} + +func (t *testDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + t.msgs = append(t.msgs, msgBody) + return nil +} + +type abstractTestCapability struct { +} + +func (t abstractTestCapability) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + return commoncap.CapabilityInfo{}, nil +} + +func (t abstractTestCapability) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + return nil +} + +func (t abstractTestCapability) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + return nil +} + +type TestCapability struct { + abstractTestCapability +} + +func (t TestCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + ch := make(chan commoncap.CapabilityResponse, 1) + + value := request.Inputs.Underlying["executeValue1"] + + ch <- commoncap.CapabilityResponse{ + Value: value, + } + + return ch, nil +} + +type TestErrorCapability struct { + abstractTestCapability +} + +func (t TestErrorCapability) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + return nil, errors.New("an error") +} + +func NewP2PPeerID(t *testing.T) p2ptypes.PeerID { + id := p2ptypes.PeerID{} + require.NoError(t, id.UnmarshalText([]byte(NewPeerID()))) + return id +} + +func NewPeerID() string { + var privKey [32]byte + _, err := rand.Read(privKey[:]) + if err != nil { + panic(err) + } + + peerID := append(libp2pMagic(), privKey[:]...) + + return base58.Encode(peerID[:]) +} + +func libp2pMagic() []byte { + return []byte{0x00, 0x24, 0x08, 0x01, 0x12, 0x20} +} diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go new file mode 100644 index 00000000000..bb3bc6e4edc --- /dev/null +++ b/core/capabilities/remote/target/server.go @@ -0,0 +1,131 @@ +package target + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "sync" + "time" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// server manages all external users of a local target capability. +// Its responsibilities are: +// 1. Manage requests from external nodes executing the target capability once sufficient requests are received. +// 2. Send out responses produced by an underlying capability to all requesters. +// +// server communicates with corresponding client on remote nodes. +type server struct { + lggr logger.Logger + peerID p2ptypes.PeerID + underlying commoncap.TargetCapability + capInfo commoncap.CapabilityInfo + localDonInfo capabilities.DON + workflowDONs map[string]commoncap.DON + dispatcher types.Dispatcher + + requestIDToRequest map[string]*request.ServerRequest + requestTimeout time.Duration + + receiveLock sync.Mutex +} + +var _ types.Receiver = &server{} + +func NewReceiver(ctx context.Context, lggr logger.Logger, peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, + workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration) *server { + r := &server{ + underlying: underlying, + peerID: peerID, + capInfo: capInfo, + localDonInfo: localDonInfo, + workflowDONs: workflowDONs, + dispatcher: dispatcher, + + requestIDToRequest: map[string]*request.ServerRequest{}, + requestTimeout: requestTimeout, + + lggr: lggr, + } + + go func() { + ticker := time.NewTicker(requestTimeout) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.expireRequests() + } + } + }() + + return r +} + +func (r *server) expireRequests() { + r.receiveLock.Lock() + defer r.receiveLock.Unlock() + + for requestID, executeReq := range r.requestIDToRequest { + if executeReq.Expired() { + err := executeReq.Cancel(types.Error_TIMEOUT, "request expired") + if err != nil { + r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err) + } + delete(r.requestIDToRequest, requestID) + } + } +} + +// Receive handles incoming messages from remote nodes and dispatches them to the corresponding request without blocking +// the client. +func (r *server) Receive(msg *types.MessageBody) { + r.receiveLock.Lock() + defer r.receiveLock.Unlock() + // TODO should the dispatcher be passing in a context? + ctx := context.Background() + + if msg.Method != types.MethodExecute { + r.lggr.Errorw("received request for unsupported method type", "method", msg.Method) + return + } + + // A request is uniquely identified by the message id and the hash of the payload to prevent a malicious + // actor from sending a different payload with the same message id + messageId := GetMessageID(msg) + hash := sha256.Sum256(msg.Payload) + requestID := messageId + hex.EncodeToString(hash[:]) + + if _, ok := r.requestIDToRequest[requestID]; !ok { + callingDon, ok := r.workflowDONs[msg.CallerDonId] + if !ok { + r.lggr.Errorw("received request from unregistered don", "donId", msg.CallerDonId) + return + } + + r.requestIDToRequest[requestID] = request.NewServerRequest(r.underlying, r.capInfo.ID, r.localDonInfo.ID, r.peerID, + callingDon, messageId, r.dispatcher, r.requestTimeout) + } + + req := r.requestIDToRequest[requestID] + + go func() { + err := req.OnMessage(ctx, msg) + if err != nil { + r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err) + } + }() +} + +func GetMessageID(msg *types.MessageBody) string { + return string(msg.MessageId) +} diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go new file mode 100644 index 00000000000..ed80e760951 --- /dev/null +++ b/core/capabilities/remote/target/server_test.go @@ -0,0 +1,224 @@ +package target_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + + for _, caller := range callers { + _, err := caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_OK, msg.Error) + } + } +} + +func Test_Server_InsufficientCallers(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) + + for _, caller := range callers { + _, err := caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_TIMEOUT, msg.Error) + } + } +} + +func Test_Server_CapabilityError(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + defer cancel() + + numCapabilityPeers := 4 + + callers := testRemoteTargetServer(ctx, t, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) + + for _, caller := range callers { + _, err := caller.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflowID", + WorkflowExecutionID: "workflowExecutionID", + }, + }) + require.NoError(t, err) + } + + for _, caller := range callers { + for i := 0; i < numCapabilityPeers; i++ { + msg := <-caller.receivedMessages + assert.Equal(t, remotetypes.Error_INTERNAL_ERROR, msg.Error) + } + } +} + +func testRemoteTargetServer(ctx context.Context, t *testing.T, + underlying commoncap.TargetCapability, + numWorkflowPeers int, workflowDonF uint8, + numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration) []*serverTestClient { + lggr := logger.TestLogger(t) + + capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeerID := NewP2PPeerID(t) + capabilityPeers[i] = capabilityPeerID + } + + capDonInfo := commoncap.DON{ + ID: "capability-don", + Members: capabilityPeers, + F: capabilityDonF, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + Version: "0.0.1", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: "workflow-don", + F: workflowDonF, + } + + broker := newTestMessageBroker() + + workflowDONs := map[string]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + + capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) + for i := 0; i < numCapabilityPeers; i++ { + capabilityPeer := capabilityPeers[i] + capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) + capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout) + broker.RegisterReceiverNode(capabilityPeer, capabilityNode) + capabilityNodes[i] = capabilityNode + } + + workflowNodes := make([]*serverTestClient, numWorkflowPeers) + for i := 0; i < numWorkflowPeers; i++ { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + workflowNode := newServerTestClient(workflowPeers[i], capDonInfo, workflowPeerDispatcher) + broker.RegisterReceiverNode(workflowPeers[i], workflowNode) + workflowNodes[i] = workflowNode + } + + return workflowNodes +} + +type serverTestClient struct { + peerID p2ptypes.PeerID + dispatcher remotetypes.Dispatcher + capabilityDonInfo commoncap.DON + receivedMessages chan *remotetypes.MessageBody + callerDonID string +} + +func (r *serverTestClient) Receive(msg *remotetypes.MessageBody) { + r.receivedMessages <- msg +} + +func newServerTestClient(peerID p2ptypes.PeerID, capabilityDonInfo commoncap.DON, + dispatcher remotetypes.Dispatcher) *serverTestClient { + return &serverTestClient{peerID: peerID, dispatcher: dispatcher, capabilityDonInfo: capabilityDonInfo, + receivedMessages: make(chan *remotetypes.MessageBody, 100), callerDonID: "workflow-don"} +} + +func (r *serverTestClient) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { + panic("not implemented") +} + +func (r *serverTestClient) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { + panic("not implemented") +} + +func (r *serverTestClient) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { + panic("not implemented") +} + +func (r *serverTestClient) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { + rawRequest, err := pb.MarshalCapabilityRequest(req) + if err != nil { + return nil, err + } + + messageID, err := target.GetMessageIDForRequest(req) + if err != nil { + return nil, err + } + + for _, node := range r.capabilityDonInfo.Members { + message := &remotetypes.MessageBody{ + CapabilityId: "capability-id", + CapabilityDonId: "capability-don", + CallerDonId: "workflow-don", + Method: remotetypes.MethodExecute, + Payload: rawRequest, + MessageId: []byte(messageID), + Sender: r.peerID[:], + Receiver: node[:], + } + + if err = r.dispatcher.Send(node, message); err != nil { + return nil, err + } + } + + return nil, nil +} diff --git a/core/capabilities/remote/target_test.go b/core/capabilities/remote/target_test.go deleted file mode 100644 index 0f9bad51f67..00000000000 --- a/core/capabilities/remote/target_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package remote_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" - remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" -) - -func TestTarget_Placeholder(t *testing.T) { - lggr := logger.TestLogger(t) - ctx := testutils.Context(t) - donInfo := &capabilities.DON{ - Members: []p2ptypes.PeerID{{}}, - } - dispatcher := remoteMocks.NewDispatcher(t) - dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil) - target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr) - - _, err := target.Execute(ctx, commoncap.CapabilityRequest{}) - assert.NoError(t, err) -} diff --git a/core/capabilities/remote/types/message.pb.go b/core/capabilities/remote/types/message.pb.go index d8e9579e96c..7cef9d45748 100644 --- a/core/capabilities/remote/types/message.pb.go +++ b/core/capabilities/remote/types/message.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.8 +// protoc-gen-go v1.33.0 +// protoc v4.25.1 // source: core/capabilities/remote/types/message.proto package types @@ -26,6 +26,9 @@ const ( Error_OK Error = 0 Error_VALIDATION_FAILED Error = 1 Error_CAPABILITY_NOT_FOUND Error = 2 + Error_INVALID_REQUEST Error = 3 + Error_TIMEOUT Error = 4 + Error_INTERNAL_ERROR Error = 5 ) // Enum value maps for Error. @@ -34,11 +37,17 @@ var ( 0: "OK", 1: "VALIDATION_FAILED", 2: "CAPABILITY_NOT_FOUND", + 3: "INVALID_REQUEST", + 4: "TIMEOUT", + 5: "INTERNAL_ERROR", } Error_value = map[string]int32{ "OK": 0, "VALIDATION_FAILED": 1, "CAPABILITY_NOT_FOUND": 2, + "INVALID_REQUEST": 3, + "TIMEOUT": 4, + "INTERNAL_ERROR": 5, } ) @@ -129,7 +138,6 @@ type MessageBody struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // header fields set and validated by the Dispatcher Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` Sender []byte `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` Receiver []byte `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` @@ -140,9 +148,11 @@ type MessageBody struct { CallerDonId string `protobuf:"bytes,8,opt,name=caller_don_id,json=callerDonId,proto3" json:"caller_don_id,omitempty"` Method string `protobuf:"bytes,9,opt,name=method,proto3" json:"method,omitempty"` Error Error `protobuf:"varint,10,opt,name=error,proto3,enum=remote.Error" json:"error,omitempty"` + ErrorMsg string `protobuf:"bytes,11,opt,name=errorMsg,proto3" json:"errorMsg,omitempty"` // payload contains a CapabilityRequest or CapabilityResponse - Payload []byte `protobuf:"bytes,11,opt,name=payload,proto3" json:"payload,omitempty"` + Payload []byte `protobuf:"bytes,12,opt,name=payload,proto3" json:"payload,omitempty"` // Types that are assignable to Metadata: + // // *MessageBody_TriggerRegistrationMetadata // *MessageBody_TriggerEventMetadata Metadata isMessageBody_Metadata `protobuf_oneof:"metadata"` @@ -250,6 +260,13 @@ func (x *MessageBody) GetError() Error { return Error_OK } +func (x *MessageBody) GetErrorMsg() string { + if x != nil { + return x.ErrorMsg + } + return "" +} + func (x *MessageBody) GetPayload() []byte { if x != nil { return x.Payload @@ -283,11 +300,11 @@ type isMessageBody_Metadata interface { } type MessageBody_TriggerRegistrationMetadata struct { - TriggerRegistrationMetadata *TriggerRegistrationMetadata `protobuf:"bytes,12,opt,name=trigger_registration_metadata,json=triggerRegistrationMetadata,proto3,oneof"` + TriggerRegistrationMetadata *TriggerRegistrationMetadata `protobuf:"bytes,13,opt,name=trigger_registration_metadata,json=triggerRegistrationMetadata,proto3,oneof"` } type MessageBody_TriggerEventMetadata struct { - TriggerEventMetadata *TriggerEventMetadata `protobuf:"bytes,13,opt,name=trigger_event_metadata,json=triggerEventMetadata,proto3,oneof"` + TriggerEventMetadata *TriggerEventMetadata `protobuf:"bytes,14,opt,name=trigger_event_metadata,json=triggerEventMetadata,proto3,oneof"` } func (*MessageBody_TriggerRegistrationMetadata) isMessageBody_Metadata() {} @@ -406,7 +423,7 @@ var file_core_capabilities_remote_types_message_proto_rawDesc = []byte{ 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, - 0x6f, 0x64, 0x79, 0x22, 0xb1, 0x04, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, + 0x6f, 0x64, 0x79, 0x22, 0xcd, 0x04, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, @@ -427,39 +444,44 @@ var file_core_capabilities_remote_types_message_proto_rawDesc = []byte{ 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x23, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x69, 0x0a, 0x1d, 0x74, 0x72, - 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0c, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x23, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, - 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x1b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, - 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x54, 0x0a, 0x16, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, - 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x14, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x0a, 0x0a, 0x08, 0x6d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x52, 0x0a, 0x1b, 0x54, 0x72, 0x69, 0x67, 0x67, - 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x33, 0x0a, 0x16, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x72, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x6c, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, - 0x69, 0x76, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x63, 0x0a, 0x14, 0x54, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x12, 0x28, 0x0a, 0x10, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x74, - 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, - 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, - 0x2a, 0x40, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, - 0x00, 0x12, 0x15, 0x0a, 0x11, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, - 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x41, 0x50, 0x41, - 0x42, 0x49, 0x4c, 0x49, 0x54, 0x59, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, - 0x10, 0x02, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x18, 0x0b, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x69, 0x0a, 0x1d, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, + 0x72, 0x5f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, + 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x48, 0x00, 0x52, 0x1b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x54, 0x0a, 0x16, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0e, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, + 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x48, + 0x00, 0x52, 0x14, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x42, 0x0a, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x22, 0x52, 0x0a, 0x1b, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x12, 0x33, 0x0a, 0x16, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x72, 0x65, 0x63, 0x65, 0x69, + 0x76, 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x13, 0x6c, 0x61, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x63, 0x0a, 0x14, 0x54, 0x72, 0x69, 0x67, 0x67, + 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, + 0x28, 0x0a, 0x10, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x74, 0x72, 0x69, 0x67, 0x67, + 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, 0x72, + 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x73, 0x2a, 0x76, 0x0a, 0x05, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x12, 0x15, 0x0a, + 0x11, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x46, 0x41, 0x49, 0x4c, + 0x45, 0x44, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x41, 0x50, 0x41, 0x42, 0x49, 0x4c, 0x49, + 0x54, 0x59, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x02, 0x12, 0x13, + 0x0a, 0x0f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x52, 0x45, 0x51, 0x55, 0x45, 0x53, + 0x54, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x54, 0x49, 0x4d, 0x45, 0x4f, 0x55, 0x54, 0x10, 0x04, + 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x5f, 0x45, 0x52, 0x52, + 0x4f, 0x52, 0x10, 0x05, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, + 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, + 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/core/capabilities/remote/types/message.proto b/core/capabilities/remote/types/message.proto index 072accedbc0..4d0507fd1e0 100644 --- a/core/capabilities/remote/types/message.proto +++ b/core/capabilities/remote/types/message.proto @@ -8,6 +8,9 @@ enum Error { OK = 0; VALIDATION_FAILED = 1; CAPABILITY_NOT_FOUND = 2; + INVALID_REQUEST = 3; + TIMEOUT = 4; + INTERNAL_ERROR = 5; } message Message { @@ -26,13 +29,15 @@ message MessageBody { string caller_don_id = 8; string method = 9; Error error = 10; + string errorMsg = 11; // payload contains a CapabilityRequest or CapabilityResponse - bytes payload = 11; + bytes payload = 12; oneof metadata { - TriggerRegistrationMetadata trigger_registration_metadata = 12; - TriggerEventMetadata trigger_event_metadata = 13; + TriggerRegistrationMetadata trigger_registration_metadata = 13; + TriggerEventMetadata trigger_event_metadata = 14; } + } message TriggerRegistrationMetadata { diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index d8307d09f80..a825c42be56 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -9,6 +9,7 @@ const ( MethodRegisterTrigger = "RegisterTrigger" MethodUnRegisterTrigger = "UnregisterTrigger" MethodTriggerEvent = "TriggerEvent" + MethodExecute = "Execute" ) //go:generate mockery --quiet --name Dispatcher --output ./mocks/ --case=underscore diff --git a/core/capabilities/transmission/transmission.go b/core/capabilities/transmission/transmission.go new file mode 100644 index 00000000000..0129d600265 --- /dev/null +++ b/core/capabilities/transmission/transmission.go @@ -0,0 +1,109 @@ +package transmission + +import ( + "fmt" + "time" + + "golang.org/x/crypto/sha3" + + "github.com/smartcontractkit/libocr/permutation" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +// TODO determine location for this code + +var ( + // S = [N] + Schedule_AllAtOnce = "allAtOnce" + // S = [1 * N] + Schedule_OneAtATime = "oneAtATime" +) + +type TransmissionConfig struct { + Schedule string + DeltaStage time.Duration +} + +func ExtractTransmissionConfig(config *values.Map) (TransmissionConfig, error) { + var tc struct { + DeltaStage string + Schedule string + } + err := config.UnwrapTo(&tc) + if err != nil { + return TransmissionConfig{}, fmt.Errorf("failed to unwrap tranmission config from value map: %w", err) + } + + duration, err := time.ParseDuration(tc.DeltaStage) + if err != nil { + return TransmissionConfig{}, fmt.Errorf("failed to parse DeltaStage %s as duration: %w", tc.DeltaStage, err) + } + + return TransmissionConfig{ + Schedule: tc.Schedule, + DeltaStage: duration, + }, nil +} + +// GetPeerIDToTransmissionDelay returns a map of PeerID to the time.Duration that the node with that PeerID should wait +// before transmitting. If a node is not in the map, it should not transmit. The sharedSecret is shared by nodes in the +// same DON and used to generate a deterministic schedule for the transmission delays. +func GetPeerIDToTransmissionDelay(donPeerIDs []ragep2ptypes.PeerID, sharedSecret [16]byte, transmissionID string, tc TransmissionConfig) (map[p2ptypes.PeerID]time.Duration, error) { + donMemberCount := len(donPeerIDs) + key := transmissionScheduleSeed(sharedSecret, transmissionID) + schedule, err := createTransmissionSchedule(tc.Schedule, donMemberCount) + if err != nil { + return nil, err + } + + picked := permutation.Permutation(donMemberCount, key) + + peerIDToTransmissionDelay := map[p2ptypes.PeerID]time.Duration{} + for i, peerID := range donPeerIDs { + delay := delayFor(i, schedule, picked, tc.DeltaStage) + if delay != nil { + peerIDToTransmissionDelay[peerID] = *delay + } + } + return peerIDToTransmissionDelay, nil +} + +func delayFor(position int, schedule []int, permutation []int, deltaStage time.Duration) *time.Duration { + sum := 0 + for i, s := range schedule { + sum += s + if permutation[position] < sum { + result := time.Duration(i) * deltaStage + return &result + } + } + + return nil +} + +func createTransmissionSchedule(scheduleType string, N int) ([]int, error) { + switch scheduleType { + case Schedule_AllAtOnce: + return []int{N}, nil + case Schedule_OneAtATime: + sch := []int{} + for i := 0; i < N; i++ { + sch = append(sch, 1) + } + return sch, nil + } + return nil, fmt.Errorf("unknown schedule type %s", scheduleType) +} + +func transmissionScheduleSeed(sharedSecret [16]byte, transmissionID string) [16]byte { + hash := sha3.NewLegacyKeccak256() + hash.Write(sharedSecret[:]) + hash.Write([]byte(transmissionID)) + + var key [16]byte + copy(key[:], hash.Sum(nil)) + return key +} diff --git a/core/capabilities/transmission/transmission_test.go b/core/capabilities/transmission/transmission_test.go new file mode 100644 index 00000000000..bbdaaa27fe2 --- /dev/null +++ b/core/capabilities/transmission/transmission_test.go @@ -0,0 +1,101 @@ +package transmission + +import ( + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func Test_GetPeerIDToTransmissionDelay(t *testing.T) { + peer1 := [32]byte([]byte(fmt.Sprintf("%-32s", "one"))) + peer2 := [32]byte([]byte(fmt.Sprintf("%-32s", "two"))) + peer3 := [32]byte([]byte(fmt.Sprintf("%-32s", "three"))) + peer4 := [32]byte([]byte(fmt.Sprintf("%-32s", "four"))) + + ids := []p2ptypes.PeerID{ + peer1, peer2, peer3, peer4, + } + + testCases := []struct { + name string + peerName string + sharedSecret string + schedule string + deltaStage string + workflowExecutionID string + expectedDelays map[string]time.Duration + }{ + { + "TestOneAtATime", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "oneAtATime", + "100ms", + "mock-execution-id", + map[string]time.Duration{ + "one": 300 * time.Millisecond, + "two": 200 * time.Millisecond, + "three": 0 * time.Millisecond, + "four": 100 * time.Millisecond, + }, + }, + { + "TestAllAtOnce", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "allAtOnce", + "100ms", + "mock-execution-id", + map[string]time.Duration{ + "one": 0 * time.Millisecond, + "two": 0 * time.Millisecond, + "three": 0 * time.Millisecond, + "four": 0 * time.Millisecond, + }, + }, + { + "TestOneAtATimeWithDifferentExecutionID", + "one", + "fb13ca015a9ec60089c7141e9522de79", + "oneAtATime", + "100ms", + "mock-execution-id2", + map[string]time.Duration{ + "one": 0 * time.Millisecond, + "two": 300 * time.Millisecond, + "three": 100 * time.Millisecond, + "four": 200 * time.Millisecond, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sharedSecret, err := hex.DecodeString(tc.sharedSecret) + require.NoError(t, err) + + m, err := values.NewMap(map[string]any{ + "schedule": tc.schedule, + "deltaStage": tc.deltaStage, + }) + require.NoError(t, err) + transmissionCfg, err := ExtractTransmissionConfig(m) + require.NoError(t, err) + + peerIdToDelay, err := GetPeerIDToTransmissionDelay(ids, [16]byte(sharedSecret), "mock-workflow-id"+tc.workflowExecutionID, transmissionCfg) + require.NoError(t, err) + + assert.Equal(t, tc.expectedDelays["one"], peerIdToDelay[peer1]) + assert.Equal(t, tc.expectedDelays["two"], peerIdToDelay[peer2]) + assert.Equal(t, tc.expectedDelays["three"], peerIdToDelay[peer3]) + assert.Equal(t, tc.expectedDelays["four"], peerIdToDelay[peer4]) + }) + } +} diff --git a/core/services/workflows/execution_strategy.go b/core/services/workflows/execution_strategy.go index f5da8bca4be..5cc8164c4f7 100644 --- a/core/services/workflows/execution_strategy.go +++ b/core/services/workflows/execution_strategy.go @@ -7,12 +7,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" - - "github.com/smartcontractkit/libocr/permutation" - - "golang.org/x/crypto/sha3" ) type executionStrategy interface { @@ -47,19 +44,12 @@ type scheduledExecution struct { Position int } -var ( - // S = [N] - Schedule_AllAtOnce = "allAtOnce" - // S = [1 * N] - Schedule_OneAtATime = "oneAtATime" -) - // scheduledExecution generates a pseudo-random transmission schedule, // and delays execution until a node is required to transmit. func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { - tc, err := d.transmissionConfig(req.Config) + tc, err := transmission.ExtractTransmissionConfig(req.Config) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) } info, err := cap.Info(ctx) @@ -70,102 +60,36 @@ func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap c switch { // Case 1: Local DON case info.DON == nil: - n := len(d.DON.Members) - key := d.key(d.DON.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID) - sched, err := schedule(tc.Schedule, n) + + // The transmission ID is created using the workflow ID and the workflow execution ID which nodes don't know + // ahead of time and ensures a malicious node cannot game the schedule. + peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(d.DON.Members, d.DON.Config.SharedSecret, + req.Metadata.WorkflowID+req.Metadata.WorkflowExecutionID, tc) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err) } - picked := permutation.Permutation(n, key) - delay := d.delayFor(d.Position, sched, picked, tc.DeltaStage) - if delay == nil { + delay, existsForPeerID := peerIDToTransmissionDelay[*d.PeerID] + if !existsForPeerID { lggr.Debugw("skipping transmission: node is not included in schedule") return nil, nil } - lggr.Debugf("execution delayed by %+v", *delay) + lggr.Debugf("execution delayed by %+v", delay) select { case <-ctx.Done(): return nil, ctx.Err() - case <-time.After(*delay): + case <-time.After(delay): lggr.Debugw("executing delayed execution") return immediateExecution{}.Apply(ctx, lggr, cap, req) } // Case 2: Remote DON default: + + // In this case just execute immediately on the capability and the shims will handle the scheduling and f+1 aggregation + // TODO: fill in the remote DON case once consensus has been reach on what to do. lggr.Debugw("remote DON transmission not implemented: using immediate execution") return immediateExecution{}.Apply(ctx, lggr, cap, req) } } - -// `key` uses a shared secret, combined with a workflowID and a workflowExecutionID to generate -// a secret that can later be used to pseudo-randomly determine a schedule for a set of nodes in a DON. -// The addition of the workflowExecutionID -- which nodes don't know ahead of time -- additionally guarantees -// that a malicious coalition of nodes can't "game" the schedule. -// IMPORTANT: changing this function should happen carefully to maintain the guarantee that all nodes -// arrive at the same secret. -func (d scheduledExecution) key(sharedSecret [16]byte, workflowID, workflowExecutionID string) [16]byte { - hash := sha3.NewLegacyKeccak256() - hash.Write(sharedSecret[:]) - hash.Write([]byte(workflowID)) - hash.Write([]byte(workflowExecutionID)) - - var key [16]byte - copy(key[:], hash.Sum(nil)) - return key -} - -type transmissionConfig struct { - Schedule string - DeltaStage time.Duration -} - -func (d scheduledExecution) transmissionConfig(config *values.Map) (transmissionConfig, error) { - var tc struct { - DeltaStage string - Schedule string - } - err := config.UnwrapTo(&tc) - if err != nil { - return transmissionConfig{}, err - } - - duration, err := time.ParseDuration(tc.DeltaStage) - if err != nil { - return transmissionConfig{}, fmt.Errorf("failed to parse DeltaStage %s as duration: %w", tc.DeltaStage, err) - } - - return transmissionConfig{ - Schedule: tc.Schedule, - DeltaStage: duration, - }, nil -} - -func (d scheduledExecution) delayFor(position int, schedule []int, permutation []int, deltaStage time.Duration) *time.Duration { - sum := 0 - for i, s := range schedule { - sum += s - if permutation[position] < sum { - result := time.Duration(i) * deltaStage - return &result - } - } - - return nil -} - -func schedule(sched string, N int) ([]int, error) { - switch sched { - case Schedule_AllAtOnce: - return []int{N}, nil - case Schedule_OneAtATime: - sch := []int{} - for i := 0; i < N; i++ { - sch = append(sch, 1) - } - return sch, nil - } - return nil, fmt.Errorf("unknown schedule %s", sched) -}