Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ks 119/remote transmission protocol #13293

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b359bec
factor out transmission protocol
ettec May 20, 2024
bc87132
tidyup
ettec May 20, 2024
990f28c
wip
ettec May 20, 2024
a871757
add timeout handling
ettec May 21, 2024
5b1c2c9
move to using deterministically unique message ids
ettec May 21, 2024
13a861d
wip
ettec May 21, 2024
43eb46b
error handling
ettec May 21, 2024
04b820b
wip
ettec May 21, 2024
bd79c12
wip
ettec May 21, 2024
e80bea6
remote target receiver base tests
ettec May 22, 2024
fbf15cf
wip
ettec May 22, 2024
8ae4e73
wip wip
ettec May 22, 2024
393cdab
request timeout handling
ettec May 22, 2024
79d98b5
request timeout handling
ettec May 22, 2024
f2587c6
context cancellation
ettec May 22, 2024
0ce3943
update message id
ettec May 23, 2024
7b2be14
refactored caller to return on f + 1 responses
ettec May 23, 2024
f2ce195
wip
ettec May 23, 2024
460f608
refactor tests and test broker
ettec May 23, 2024
62dfe30
wip
ettec May 23, 2024
166524a
error codes
ettec May 23, 2024
21344ce
fix up expiring caller requests
ettec May 23, 2024
6f86dd0
test setup supports multiple workflow dons
ettec May 23, 2024
efe3e38
wip
ettec May 24, 2024
ed31fae
wip
ettec May 24, 2024
328ed68
caller test tidy
ettec May 24, 2024
5b23fe9
wip markers
ettec May 24, 2024
12062a6
refactor and tidyup
ettec May 27, 2024
573ea65
refactor and prep to multi thread the receiver
ettec May 27, 2024
e03d7a7
wip
ettec May 27, 2024
2994867
move to subpackage
ettec May 27, 2024
b7980c2
wip
ettec May 28, 2024
fa117f1
wip
ettec May 28, 2024
18fe585
more tests
ettec May 28, 2024
e1427a0
wip
ettec May 28, 2024
a0fc1a7
error case tests
ettec May 28, 2024
f80dcdc
more tests
ettec May 28, 2024
8fc2972
wip
ettec May 28, 2024
eba0669
wip
ettec May 28, 2024
4de7ff9
make caller and reciver multithreaded to prevent slow executor blocking
ettec May 28, 2024
ac07133
update error handling
ettec May 29, 2024
a505abc
additional error case tests
ettec May 29, 2024
a3e0c6a
tidyup
ettec May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 0 additions & 87 deletions core/capabilities/remote/target.go

This file was deleted.

135 changes: 135 additions & 0 deletions core/capabilities/remote/target/caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package target

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages
type remoteTargetCaller struct {
lggr logger.Logger
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo capabilities.DON
dispatcher types.Dispatcher
requestTimeout time.Duration

requestIDToExecuteRequest map[string]*callerExecuteRequest
mutex sync.Mutex
}

var _ commoncap.TargetCapability = &remoteTargetCaller{}
var _ types.Receiver = &remoteTargetCaller{}

func NewRemoteTargetCaller(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) *remoteTargetCaller {

caller := &remoteTargetCaller{
lggr: lggr,
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
requestIDToExecuteRequest: make(map[string]*callerExecuteRequest),
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert to services.Service style with Start()/Stop() for consistency with other objects that launch their own coroutines?

timer := time.NewTimer(requestTimeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
caller.ExpireRequests()
}
}
}()

return caller
}

func (c *remoteTargetCaller) ExpireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.requestIDToExecuteRequest {
if time.Since(req.createdAt) > c.requestTimeout {
req.cancelRequest("request timed out")
}

delete(c.requestIDToExecuteRequest, messageID)
}
}

func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

requestID, err := GetRequestID(req)
if err != nil {
return nil, fmt.Errorf("failed to get request ID: %w", err)
}

if _, ok := c.requestIDToExecuteRequest[requestID]; ok {
return nil, fmt.Errorf("request with ID %s already exists", requestID)
}

execRequest, err := newCallerExecuteRequest(ctx, c.lggr, req, requestID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher)

Check failure on line 98 in core/capabilities/remote/target/caller.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)

c.requestIDToExecuteRequest[requestID] = execRequest

return execRequest.responseCh, nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()

requestID := GetMessageID(msg)
sender := remote.ToPeerID(msg.Sender)

req := c.requestIDToExecuteRequest[requestID]
if req == nil {
c.lggr.Warnw("received response for unknown request ID", "requestID", requestID, "sender", sender)
return
}

if msg.Error != types.Error_OK {
c.lggr.Warnw("received error response for pending request", "requestID", requestID, "sender", sender, "receiver", msg.Receiver, "error", msg.Error)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not aggregate like successful responses? If all remote nodes return the same error, shouldn't we also pass it back to the underlying caller?

}

if err := req.addResponse(sender, msg.Payload); err != nil {
c.lggr.Errorw("failed to add response to request", "requestID", requestID, "sender", sender, "err", err)
}
}

// Move this into common?
func GetRequestID(req commoncap.CapabilityRequest) (string, error) {
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" {
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata")
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}
136 changes: 136 additions & 0 deletions core/capabilities/remote/target/caller_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package target

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

type callerExecuteRequest struct {
transmissionCtx context.Context

Check failure on line 21 in core/capabilities/remote/target/caller_request.go

View workflow job for this annotation

GitHub Actions / lint

field `transmissionCtx` is unused (unused)
responseCh chan commoncap.CapabilityResponse
transmissionCancelFn context.CancelFunc
createdAt time.Time
responseIDCount map[[32]byte]int
responseReceived map[p2ptypes.PeerID]bool

requiredIdenticalResponses int

respSent bool
}

func newCallerExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher) (*callerExecuteRequest, error) {

remoteCapabilityDonInfo := remoteCapabilityInfo.DON
if remoteCapabilityDonInfo == nil {
return nil, errors.New("remote capability info missing DON")
}

rawRequest, err := pb.MarshalCapabilityRequest(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability request: %w", err)
}

tc, err := transmission.ExtractTransmissionConfig(req.Config)
if err != nil {
return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err)
}

peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(remoteCapabilityDonInfo.Members, localDonInfo.Config.SharedSecret,
messageID, tc)
if err != nil {
return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err)
}

transmissionCtx, transmissionCancelFn := context.WithCancel(ctx)
responseReceived := make(map[p2ptypes.PeerID]bool)
for peerID, delay := range peerIDToTransmissionDelay {
responseReceived[peerID] = false
go func(peerID ragep2ptypes.PeerID, delay time.Duration) {
message := &types.MessageBody{
CapabilityId: remoteCapabilityInfo.ID,
CapabilityDonId: remoteCapabilityDonInfo.ID,
CallerDonId: localDonInfo.ID,
Method: types.MethodExecute,
Payload: rawRequest,
MessageId: []byte(messageID),
}

select {
case <-transmissionCtx.Done():
return
case <-time.After(delay):
err = dispatcher.Send(peerID, message)
if err != nil {
lggr.Errorw("failed to send message", "peerID", peerID, "err", err)
}
}
}(peerID, delay)
}

return &callerExecuteRequest{
createdAt: time.Now(),
transmissionCancelFn: transmissionCancelFn,
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1),
responseIDCount: make(map[[32]byte]int),
responseReceived: responseReceived,
responseCh: make(chan commoncap.CapabilityResponse, 1),
}, nil
}

func (c *callerExecuteRequest) responseSent() bool {
return c.respSent
}

// TODO addResponse assumes that only one response is received from each peer, if streaming responses need to be supported this will need to be updated
func (c *callerExecuteRequest) addResponse(sender p2ptypes.PeerID, response []byte) error {
if _, ok := c.responseReceived[sender]; !ok {
return fmt.Errorf("response from peer %s not expected", sender)
}

if c.responseReceived[sender] {
return fmt.Errorf("response from peer %s already received", sender)
}

c.responseReceived[sender] = true

payloadId := sha256.Sum256(response)
c.responseIDCount[payloadId]++

if c.responseIDCount[payloadId] == c.requiredIdenticalResponses {
capabilityResponse, err := pb.UnmarshalCapabilityResponse(response)
if err != nil {
c.sendResponse(commoncap.CapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)})
} else {
c.sendResponse(commoncap.CapabilityResponse{Value: capabilityResponse.Value})
}
}

return nil
}

func (c *callerExecuteRequest) sendResponse(response commoncap.CapabilityResponse) {
c.responseCh <- response
close(c.responseCh)
c.transmissionCancelFn()
c.respSent = true
}

func (c *callerExecuteRequest) cancelRequest(reason string) {
c.transmissionCancelFn()
if !c.responseSent() {
c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(reason)})
}
}
Loading
Loading