Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ks 119/remote transmission protocol #13293

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b359bec
factor out transmission protocol
ettec May 20, 2024
bc87132
tidyup
ettec May 20, 2024
990f28c
wip
ettec May 20, 2024
a871757
add timeout handling
ettec May 21, 2024
5b1c2c9
move to using deterministically unique message ids
ettec May 21, 2024
13a861d
wip
ettec May 21, 2024
43eb46b
error handling
ettec May 21, 2024
04b820b
wip
ettec May 21, 2024
bd79c12
wip
ettec May 21, 2024
e80bea6
remote target receiver base tests
ettec May 22, 2024
fbf15cf
wip
ettec May 22, 2024
8ae4e73
wip wip
ettec May 22, 2024
393cdab
request timeout handling
ettec May 22, 2024
79d98b5
request timeout handling
ettec May 22, 2024
f2587c6
context cancellation
ettec May 22, 2024
0ce3943
update message id
ettec May 23, 2024
7b2be14
refactored caller to return on f + 1 responses
ettec May 23, 2024
f2ce195
wip
ettec May 23, 2024
460f608
refactor tests and test broker
ettec May 23, 2024
62dfe30
wip
ettec May 23, 2024
166524a
error codes
ettec May 23, 2024
21344ce
fix up expiring caller requests
ettec May 23, 2024
6f86dd0
test setup supports multiple workflow dons
ettec May 23, 2024
efe3e38
wip
ettec May 24, 2024
ed31fae
wip
ettec May 24, 2024
328ed68
caller test tidy
ettec May 24, 2024
5b23fe9
wip markers
ettec May 24, 2024
12062a6
refactor and tidyup
ettec May 27, 2024
573ea65
refactor and prep to multi thread the receiver
ettec May 27, 2024
e03d7a7
wip
ettec May 27, 2024
2994867
move to subpackage
ettec May 27, 2024
b7980c2
wip
ettec May 28, 2024
fa117f1
wip
ettec May 28, 2024
18fe585
more tests
ettec May 28, 2024
e1427a0
wip
ettec May 28, 2024
a0fc1a7
error case tests
ettec May 28, 2024
f80dcdc
more tests
ettec May 28, 2024
8fc2972
wip
ettec May 28, 2024
eba0669
wip
ettec May 28, 2024
4de7ff9
make caller and reciver multithreaded to prevent slow executor blocking
ettec May 28, 2024
ac07133
update error handling
ettec May 29, 2024
a505abc
additional error case tests
ettec May 29, 2024
a3e0c6a
tidyup
ettec May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/client/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_Poller(t *testing.T) {
require.NoError(t, poller.Start())
defer poller.Unsubscribe()

// Receive updates from the poller
// OnMessage updates from the poller
pollCount := 0
pollMax := 50
for ; pollCount < pollMax; pollCount++ {
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
ctx := testutils.Context(t)
peer := mocks.NewPeer(t)
recvCh := make(<-chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return(recvCh)
peer.On("OnMessage", mock.Anything).Return(recvCh)
peer.On("ID", mock.Anything).Return(p2ptypes.PeerID{})
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
Expand All @@ -55,7 +55,7 @@ func TestDispatcher_Receive(t *testing.T) {

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDispatcher_RespondWithError(t *testing.T) {

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("OnMessage", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
sendCh := make(chan p2ptypes.PeerID)
peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
Expand Down
87 changes: 0 additions & 87 deletions core/capabilities/remote/target.go

This file was deleted.

140 changes: 140 additions & 0 deletions core/capabilities/remote/target/caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package target

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type callerRequest interface {
OnMessage(ctx context.Context, msg *types.MessageBody) error
ResponseChan() <-chan commoncap.CapabilityResponse
Expired() bool
Cancel(reason string)
}

// caller/Receiver are shims translating between capability API calls and network messages
type caller struct {
lggr logger.Logger
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo capabilities.DON
dispatcher types.Dispatcher
requestTimeout time.Duration

messageIDToExecuteRequest map[string]callerRequest
mutex sync.Mutex
}

var _ commoncap.TargetCapability = &caller{}
var _ types.Receiver = &caller{}

func NewCaller(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) *caller {

c := &caller{
lggr: lggr,
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
messageIDToExecuteRequest: make(map[string]callerRequest),
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert to services.Service style with Start()/Stop() for consistency with other objects that launch their own coroutines?

ticker := time.NewTicker(requestTimeout)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.ExpireRequests()
}
}
}()

return c
}

func (c *caller) ExpireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToExecuteRequest {
if req.Expired() {
req.Cancel("request expired")
delete(c.messageIDToExecuteRequest, messageID)
}
}
}

func (c *caller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *caller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *caller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *caller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID, err := GetMessageIDForRequest(req)
if err != nil {
return nil, fmt.Errorf("failed to get message ID for request: %w", err)
}

if _, ok := c.messageIDToExecuteRequest[messageID]; ok {
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

execRequest, err := request.NewCallerRequest(ctx, c.lggr, req, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)

c.messageIDToExecuteRequest[messageID] = execRequest

return execRequest.ResponseChan(), nil
}

func (c *caller) Receive(msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
// TODO should the dispatcher be passing in a context?
ctx := context.Background()

messageID := GetMessageID(msg)

req := c.messageIDToExecuteRequest[messageID]
if req == nil {
c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID)
return
}

go func() {
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}()

}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" {
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata")
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}
Loading
Loading