Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ks 119/remote transmission protocol #13293

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b359bec
factor out transmission protocol
ettec May 20, 2024
bc87132
tidyup
ettec May 20, 2024
990f28c
wip
ettec May 20, 2024
a871757
add timeout handling
ettec May 21, 2024
5b1c2c9
move to using deterministically unique message ids
ettec May 21, 2024
13a861d
wip
ettec May 21, 2024
43eb46b
error handling
ettec May 21, 2024
04b820b
wip
ettec May 21, 2024
bd79c12
wip
ettec May 21, 2024
e80bea6
remote target receiver base tests
ettec May 22, 2024
fbf15cf
wip
ettec May 22, 2024
8ae4e73
wip wip
ettec May 22, 2024
393cdab
request timeout handling
ettec May 22, 2024
79d98b5
request timeout handling
ettec May 22, 2024
f2587c6
context cancellation
ettec May 22, 2024
0ce3943
update message id
ettec May 23, 2024
7b2be14
refactored caller to return on f + 1 responses
ettec May 23, 2024
f2ce195
wip
ettec May 23, 2024
460f608
refactor tests and test broker
ettec May 23, 2024
62dfe30
wip
ettec May 23, 2024
166524a
error codes
ettec May 23, 2024
21344ce
fix up expiring caller requests
ettec May 23, 2024
6f86dd0
test setup supports multiple workflow dons
ettec May 23, 2024
efe3e38
wip
ettec May 24, 2024
ed31fae
wip
ettec May 24, 2024
328ed68
caller test tidy
ettec May 24, 2024
5b23fe9
wip markers
ettec May 24, 2024
12062a6
refactor and tidyup
ettec May 27, 2024
573ea65
refactor and prep to multi thread the receiver
ettec May 27, 2024
e03d7a7
wip
ettec May 27, 2024
2994867
move to subpackage
ettec May 27, 2024
b7980c2
wip
ettec May 28, 2024
fa117f1
wip
ettec May 28, 2024
18fe585
more tests
ettec May 28, 2024
e1427a0
wip
ettec May 28, 2024
a0fc1a7
error case tests
ettec May 28, 2024
f80dcdc
more tests
ettec May 28, 2024
8fc2972
wip
ettec May 28, 2024
eba0669
wip
ettec May 28, 2024
4de7ff9
make caller and reciver multithreaded to prevent slow executor blocking
ettec May 28, 2024
ac07133
update error handling
ettec May 29, 2024
a505abc
additional error case tests
ettec May 29, 2024
a3e0c6a
tidyup
ettec May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 0 additions & 87 deletions core/capabilities/remote/target.go

This file was deleted.

177 changes: 177 additions & 0 deletions core/capabilities/remote/target_caller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package remote

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages
type remoteTargetCaller struct {
remoteCapabilityInfo commoncap.CapabilityInfo
remoteCapabilityDonInfo capabilities.DON
localDONInfo capabilities.DON
dispatcher types.Dispatcher
lggr logger.Logger
messageIDToWaitgroup sync.Map
messageIDToResponse sync.Map
}

var _ commoncap.TargetCapability = &remoteTargetCaller{}
var _ types.Receiver = &remoteTargetCaller{}

func NewRemoteTargetCaller(lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, remoteCapabilityDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher) (*remoteTargetCaller, error) {

return &remoteTargetCaller{
remoteCapabilityInfo: remoteCapabilityInfo,
remoteCapabilityDonInfo: remoteCapabilityDonInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
lggr: lggr,
}, nil
}

func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}

func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these just be no-ops for targets?

}

func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error {
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(parentCtx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {

// TODO Assuming here that the capability request is deterministically unique across the nodes, need to confirm this is reasonable assumption
// TODO also check pb marshalliing is by default deterministic in the version being used

rawRequest, err := pb.MarshalCapabilityRequest(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability request: %w", err)
}

deterministicMessageID := sha256.Sum256(rawRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your unique identified should be (workflowID, executionID). That is available inside req.Metadata - see engine.go:executeStep().


responseWaitGroup := &sync.WaitGroup{}
responseWaitGroup.Add(1)
c.messageIDToWaitgroup.Store(deterministicMessageID, responseWaitGroup)

responseReceived := make(chan struct{})
go func() {
responseWaitGroup.Wait()
close(responseReceived)
}()

// Once a response is received from a remote capability further transmission should be cancelled
ctx, cancelFn := context.WithCancel(parentCtx)
defer cancelFn()

if err := c.transmitRequestWithMessageID(ctx, req, deterministicMessageID); err != nil {
return nil, fmt.Errorf("failed to transmit request: %w", err)
}

select {
case <-responseReceived:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute shouldn't block on that.


response, loaded := c.messageIDToResponse.LoadAndDelete(deterministicMessageID)
if !loaded {
return nil, fmt.Errorf("no response found for message ID %s", deterministicMessageID)
}

msg, ok := response.(*types.MessageBody)
if !ok {
return nil, fmt.Errorf("unexpected response type %T for message ID %s", response, deterministicMessageID)
}

if msg.Error != types.Error_OK {
return nil, fmt.Errorf("remote capability returned error: %s", msg.Error)
}

capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal capability response: %w", err)
}

// TODO handle the case where the capability returns a stream of responses
resultCh := make(chan commoncap.CapabilityResponse, 1)
resultCh <- capabilityResponse
close(resultCh)

return resultCh, nil
case <-ctx.Done():
return nil, ctx.Err()
}

}

// transmitRequestWithMessageID transmits a capability request to remote capabilities according to the transmission configuration
func (c *remoteTargetCaller) transmitRequestWithMessageID(ctx context.Context, req commoncap.CapabilityRequest, messageID [32]byte) error {
rawRequest, err := pb.MarshalCapabilityRequest(req)
if err != nil {
return fmt.Errorf("failed to marshal capability request: %w", err)
}

// TODO should the transmission config be passed into the constructor rather than pulled from the request?
tc, err := transmission.ExtractTransmissionConfig(req.Config)
if err != nil {
return fmt.Errorf("failed to extract transmission config from request config: %w", err)
}

message := &types.MessageBody{
CapabilityId: c.remoteCapabilityInfo.ID,
CapabilityDonId: c.remoteCapabilityDonInfo.ID,
CallerDonId: c.localDONInfo.ID,
Method: types.MethodExecute,
Payload: rawRequest,
MessageId: messageID[:],
}

peerIDToDelay, err := transmission.GetPeerIDToTransmissionDelay(c.remoteCapabilityDonInfo.Members, c.localDONInfo.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, tc)
if err != nil {
return fmt.Errorf("failed to get peer ID to transmission delay: %w", err)
}

for peerID, delay := range peerIDToDelay {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you put the strategy inside the shim. I initially though that it will exist outside of it but maybe this is better. Let me think about it more.

go func(peerID ragep2ptypes.PeerID, delay time.Duration) {
select {
case <-ctx.Done():
return
case <-time.After(delay):
c.lggr.Debugw("executing delayed execution for peer", "peerID", peerID)
err = c.dispatcher.Send(peerID, message)
if err != nil {
c.lggr.Errorw("failed to send message", "peerID", peerID, "err", err)
}
}
}(peerID, delay)
}

return nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {

// TODO handle the case where the capability returns a stream of responses
messageID := getMessageID(msg)

wg, loaded := c.messageIDToWaitgroup.LoadAndDelete(messageID)
if loaded {
wg.(*sync.WaitGroup).Done()
c.messageIDToResponse.Store(messageID, msg)
return
}
}
Loading
Loading