-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ks 119/remote transmission protocol #13293
Closed
Closed
Changes from 31 commits
Commits
Show all changes
43 commits
Select commit
Hold shift + click to select a range
b359bec
factor out transmission protocol
ettec bc87132
tidyup
ettec 990f28c
wip
ettec a871757
add timeout handling
ettec 5b1c2c9
move to using deterministically unique message ids
ettec 13a861d
wip
ettec 43eb46b
error handling
ettec 04b820b
wip
ettec bd79c12
wip
ettec e80bea6
remote target receiver base tests
ettec fbf15cf
wip
ettec 8ae4e73
wip wip
ettec 393cdab
request timeout handling
ettec 79d98b5
request timeout handling
ettec f2587c6
context cancellation
ettec 0ce3943
update message id
ettec 7b2be14
refactored caller to return on f + 1 responses
ettec f2ce195
wip
ettec 460f608
refactor tests and test broker
ettec 62dfe30
wip
ettec 166524a
error codes
ettec 21344ce
fix up expiring caller requests
ettec 6f86dd0
test setup supports multiple workflow dons
ettec efe3e38
wip
ettec ed31fae
wip
ettec 328ed68
caller test tidy
ettec 5b23fe9
wip markers
ettec 12062a6
refactor and tidyup
ettec 573ea65
refactor and prep to multi thread the receiver
ettec e03d7a7
wip
ettec 2994867
move to subpackage
ettec b7980c2
wip
ettec fa117f1
wip
ettec 18fe585
more tests
ettec e1427a0
wip
ettec a0fc1a7
error case tests
ettec f80dcdc
more tests
ettec 8fc2972
wip
ettec eba0669
wip
ettec 4de7ff9
make caller and reciver multithreaded to prevent slow executor blocking
ettec ac07133
update error handling
ettec a505abc
additional error case tests
ettec a3e0c6a
tidyup
ettec File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package target | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
) | ||
|
||
// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages | ||
type remoteTargetCaller struct { | ||
lggr logger.Logger | ||
remoteCapabilityInfo commoncap.CapabilityInfo | ||
localDONInfo capabilities.DON | ||
dispatcher types.Dispatcher | ||
requestTimeout time.Duration | ||
|
||
requestIDToExecuteRequest map[string]*callerExecuteRequest | ||
mutex sync.Mutex | ||
} | ||
|
||
var _ commoncap.TargetCapability = &remoteTargetCaller{} | ||
var _ types.Receiver = &remoteTargetCaller{} | ||
|
||
func NewRemoteTargetCaller(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, | ||
requestTimeout time.Duration) *remoteTargetCaller { | ||
|
||
caller := &remoteTargetCaller{ | ||
lggr: lggr, | ||
remoteCapabilityInfo: remoteCapabilityInfo, | ||
localDONInfo: localDonInfo, | ||
dispatcher: dispatcher, | ||
requestTimeout: requestTimeout, | ||
requestIDToExecuteRequest: make(map[string]*callerExecuteRequest), | ||
} | ||
|
||
go func() { | ||
timer := time.NewTimer(requestTimeout) | ||
defer timer.Stop() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-timer.C: | ||
caller.ExpireRequests() | ||
} | ||
} | ||
}() | ||
|
||
return caller | ||
} | ||
|
||
func (c *remoteTargetCaller) ExpireRequests() { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
for messageID, req := range c.requestIDToExecuteRequest { | ||
if time.Since(req.createdAt) > c.requestTimeout { | ||
req.cancelRequest("request timed out") | ||
} | ||
|
||
delete(c.requestIDToExecuteRequest, messageID) | ||
} | ||
} | ||
|
||
func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { | ||
return c.remoteCapabilityInfo, nil | ||
} | ||
|
||
func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
func (c *remoteTargetCaller) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
requestID, err := GetRequestID(req) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get request ID: %w", err) | ||
} | ||
|
||
if _, ok := c.requestIDToExecuteRequest[requestID]; ok { | ||
return nil, fmt.Errorf("request with ID %s already exists", requestID) | ||
} | ||
|
||
execRequest, err := newCallerExecuteRequest(ctx, c.lggr, req, requestID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher) | ||
|
||
c.requestIDToExecuteRequest[requestID] = execRequest | ||
|
||
return execRequest.responseCh, nil | ||
} | ||
|
||
func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
requestID := GetMessageID(msg) | ||
sender := remote.ToPeerID(msg.Sender) | ||
|
||
req := c.requestIDToExecuteRequest[requestID] | ||
if req == nil { | ||
c.lggr.Warnw("received response for unknown request ID", "requestID", requestID, "sender", sender) | ||
return | ||
} | ||
|
||
if msg.Error != types.Error_OK { | ||
c.lggr.Warnw("received error response for pending request", "requestID", requestID, "sender", sender, "receiver", msg.Receiver, "error", msg.Error) | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not aggregate like successful responses? If all remote nodes return the same error, shouldn't we also pass it back to the underlying caller? |
||
} | ||
|
||
if err := req.addResponse(sender, msg.Payload); err != nil { | ||
c.lggr.Errorw("failed to add response to request", "requestID", requestID, "sender", sender, "err", err) | ||
} | ||
} | ||
|
||
// Move this into common? | ||
func GetRequestID(req commoncap.CapabilityRequest) (string, error) { | ||
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" { | ||
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata") | ||
} | ||
|
||
return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package target | ||
|
||
import ( | ||
"context" | ||
"crypto/sha256" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" | ||
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
) | ||
|
||
type callerExecuteRequest struct { | ||
transmissionCtx context.Context | ||
responseCh chan commoncap.CapabilityResponse | ||
transmissionCancelFn context.CancelFunc | ||
createdAt time.Time | ||
responseIDCount map[[32]byte]int | ||
responseReceived map[p2ptypes.PeerID]bool | ||
|
||
requiredIdenticalResponses int | ||
|
||
respSent bool | ||
} | ||
|
||
func newCallerExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string, | ||
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher) (*callerExecuteRequest, error) { | ||
|
||
remoteCapabilityDonInfo := remoteCapabilityInfo.DON | ||
if remoteCapabilityDonInfo == nil { | ||
return nil, errors.New("remote capability info missing DON") | ||
} | ||
|
||
rawRequest, err := pb.MarshalCapabilityRequest(req) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal capability request: %w", err) | ||
} | ||
|
||
tc, err := transmission.ExtractTransmissionConfig(req.Config) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) | ||
} | ||
|
||
peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(remoteCapabilityDonInfo.Members, localDonInfo.Config.SharedSecret, | ||
messageID, tc) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err) | ||
} | ||
|
||
transmissionCtx, transmissionCancelFn := context.WithCancel(ctx) | ||
responseReceived := make(map[p2ptypes.PeerID]bool) | ||
for peerID, delay := range peerIDToTransmissionDelay { | ||
responseReceived[peerID] = false | ||
go func(peerID ragep2ptypes.PeerID, delay time.Duration) { | ||
message := &types.MessageBody{ | ||
CapabilityId: remoteCapabilityInfo.ID, | ||
CapabilityDonId: remoteCapabilityDonInfo.ID, | ||
CallerDonId: localDonInfo.ID, | ||
Method: types.MethodExecute, | ||
Payload: rawRequest, | ||
MessageId: []byte(messageID), | ||
} | ||
|
||
select { | ||
case <-transmissionCtx.Done(): | ||
return | ||
case <-time.After(delay): | ||
err = dispatcher.Send(peerID, message) | ||
if err != nil { | ||
lggr.Errorw("failed to send message", "peerID", peerID, "err", err) | ||
} | ||
} | ||
}(peerID, delay) | ||
} | ||
|
||
return &callerExecuteRequest{ | ||
createdAt: time.Now(), | ||
transmissionCancelFn: transmissionCancelFn, | ||
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1), | ||
responseIDCount: make(map[[32]byte]int), | ||
responseReceived: responseReceived, | ||
responseCh: make(chan commoncap.CapabilityResponse, 1), | ||
}, nil | ||
} | ||
|
||
func (c *callerExecuteRequest) responseSent() bool { | ||
return c.respSent | ||
} | ||
|
||
// TODO addResponse assumes that only one response is received from each peer, if streaming responses need to be supported this will need to be updated | ||
func (c *callerExecuteRequest) addResponse(sender p2ptypes.PeerID, response []byte) error { | ||
if _, ok := c.responseReceived[sender]; !ok { | ||
return fmt.Errorf("response from peer %s not expected", sender) | ||
} | ||
|
||
if c.responseReceived[sender] { | ||
return fmt.Errorf("response from peer %s already received", sender) | ||
} | ||
|
||
c.responseReceived[sender] = true | ||
|
||
payloadId := sha256.Sum256(response) | ||
c.responseIDCount[payloadId]++ | ||
|
||
if c.responseIDCount[payloadId] == c.requiredIdenticalResponses { | ||
capabilityResponse, err := pb.UnmarshalCapabilityResponse(response) | ||
if err != nil { | ||
c.sendResponse(commoncap.CapabilityResponse{Err: fmt.Errorf("failed to unmarshal capability response: %w", err)}) | ||
} else { | ||
c.sendResponse(commoncap.CapabilityResponse{Value: capabilityResponse.Value}) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *callerExecuteRequest) sendResponse(response commoncap.CapabilityResponse) { | ||
c.responseCh <- response | ||
close(c.responseCh) | ||
c.transmissionCancelFn() | ||
c.respSent = true | ||
} | ||
|
||
func (c *callerExecuteRequest) cancelRequest(reason string) { | ||
c.transmissionCancelFn() | ||
if !c.responseSent() { | ||
c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(reason)}) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we convert to services.Service style with Start()/Stop() for consistency with other objects that launch their own coroutines?