Skip to content

Commit

Permalink
remote target and transmission protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed May 29, 2024
1 parent 8491b24 commit e467d5e
Show file tree
Hide file tree
Showing 25 changed files with 2,561 additions and 342 deletions.
2 changes: 1 addition & 1 deletion common/client/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_Poller(t *testing.T) {
require.NoError(t, poller.Start())
defer poller.Unsubscribe()

// Receive updates from the poller
// OnMessage updates from the poller
pollCount := 0
pollMax := 50
for ; pollCount < pollMax; pollCount++ {
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
ctx := testutils.Context(t)
peer := mocks.NewPeer(t)
recvCh := make(<-chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return(recvCh)
peer.On("OnMessage", mock.Anything).Return(recvCh)
peer.On("ID", mock.Anything).Return(p2ptypes.PeerID{})
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
Expand All @@ -55,7 +55,7 @@ func TestDispatcher_Receive(t *testing.T) {

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDispatcher_RespondWithError(t *testing.T) {

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
sendCh := make(chan p2ptypes.PeerID)
peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
Expand Down
87 changes: 0 additions & 87 deletions core/capabilities/remote/target.go

This file was deleted.

140 changes: 140 additions & 0 deletions core/capabilities/remote/target/caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package target

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type callerRequest interface {
OnMessage(ctx context.Context, msg *types.MessageBody) error
ResponseChan() <-chan commoncap.CapabilityResponse
Expired() bool
Cancel(reason string)
}

// caller/Receiver are shims translating between capability API calls and network messages
type caller struct {
lggr logger.Logger
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo capabilities.DON
dispatcher types.Dispatcher
requestTimeout time.Duration

messageIDToExecuteRequest map[string]callerRequest
mutex sync.Mutex
}

var _ commoncap.TargetCapability = &caller{}
var _ types.Receiver = &caller{}

func NewCaller(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) *caller {

c := &caller{
lggr: lggr,
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
messageIDToExecuteRequest: make(map[string]callerRequest),
}

go func() {
ticker := time.NewTicker(requestTimeout)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.ExpireRequests()
}
}
}()

return c
}

func (c *caller) ExpireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToExecuteRequest {
if req.Expired() {
req.Cancel("request expired")
delete(c.messageIDToExecuteRequest, messageID)
}
}
}

func (c *caller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *caller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *caller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *caller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID, err := GetMessageIDForRequest(req)
if err != nil {
return nil, fmt.Errorf("failed to get message ID for request: %w", err)
}

if _, ok := c.messageIDToExecuteRequest[messageID]; ok {
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

execRequest, err := request.NewCallerRequest(ctx, c.lggr, req, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,

Check failure on line 104 in core/capabilities/remote/target/caller.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
c.requestTimeout)

c.messageIDToExecuteRequest[messageID] = execRequest

return execRequest.ResponseChan(), nil
}

func (c *caller) Receive(msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
// TODO should the dispatcher be passing in a context?
ctx := context.Background()

messageID := GetMessageID(msg)

req := c.messageIDToExecuteRequest[messageID]
if req == nil {
c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID)
return
}

go func() {
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}()

}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" {
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata")
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}
Loading

0 comments on commit e467d5e

Please sign in to comment.