-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ks 119/remote transmission protocol #13293
Changes from 12 commits
b359bec
bc87132
990f28c
a871757
5b1c2c9
13a861d
43eb46b
04b820b
bd79c12
e80bea6
fbf15cf
8ae4e73
393cdab
79d98b5
f2587c6
0ce3943
7b2be14
f2ce195
460f608
62dfe30
166524a
21344ce
6f86dd0
efe3e38
ed31fae
328ed68
5b23fe9
12062a6
573ea65
e03d7a7
2994867
b7980c2
fa117f1
18fe585
e1427a0
a0fc1a7
f80dcdc
8fc2972
eba0669
4de7ff9
ac07133
a505abc
a3e0c6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package remote | ||
|
||
import ( | ||
"context" | ||
"crypto/sha256" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
) | ||
|
||
// remoteTargetCaller/Receiver are shims translating between capability API calls and network messages | ||
type remoteTargetCaller struct { | ||
remoteCapabilityInfo commoncap.CapabilityInfo | ||
remoteCapabilityDonInfo capabilities.DON | ||
localDONInfo capabilities.DON | ||
dispatcher types.Dispatcher | ||
lggr logger.Logger | ||
messageIDToWaitgroup sync.Map | ||
messageIDToResponse sync.Map | ||
} | ||
|
||
var _ commoncap.TargetCapability = &remoteTargetCaller{} | ||
var _ types.Receiver = &remoteTargetCaller{} | ||
|
||
func NewRemoteTargetCaller(lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, remoteCapabilityDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher) (*remoteTargetCaller, error) { | ||
|
||
return &remoteTargetCaller{ | ||
remoteCapabilityInfo: remoteCapabilityInfo, | ||
remoteCapabilityDonInfo: remoteCapabilityDonInfo, | ||
localDONInfo: localDonInfo, | ||
dispatcher: dispatcher, | ||
lggr: lggr, | ||
}, nil | ||
} | ||
|
||
func (c *remoteTargetCaller) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { | ||
return c.remoteCapabilityInfo, nil | ||
} | ||
|
||
func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request commoncap.UnregisterFromWorkflowRequest) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
func (c *remoteTargetCaller) Execute(parentCtx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { | ||
|
||
// TODO Assuming here that the capability request is deterministically unique across the nodes, need to confirm this is reasonable assumption | ||
// TODO also check pb marshalliing is by default deterministic in the version being used | ||
|
||
rawRequest, err := pb.MarshalCapabilityRequest(req) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal capability request: %w", err) | ||
} | ||
|
||
deterministicMessageID := sha256.Sum256(rawRequest) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your unique identified should be (workflowID, executionID). That is available inside req.Metadata - see engine.go:executeStep(). |
||
|
||
responseWaitGroup := &sync.WaitGroup{} | ||
responseWaitGroup.Add(1) | ||
c.messageIDToWaitgroup.Store(deterministicMessageID, responseWaitGroup) | ||
|
||
responseReceived := make(chan struct{}) | ||
go func() { | ||
responseWaitGroup.Wait() | ||
close(responseReceived) | ||
}() | ||
|
||
// Once a response is received from a remote capability further transmission should be cancelled | ||
ctx, cancelFn := context.WithCancel(parentCtx) | ||
defer cancelFn() | ||
|
||
if err := c.transmitRequestWithMessageID(ctx, req, deterministicMessageID); err != nil { | ||
return nil, fmt.Errorf("failed to transmit request: %w", err) | ||
} | ||
|
||
select { | ||
case <-responseReceived: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Execute shouldn't block on that. |
||
|
||
response, loaded := c.messageIDToResponse.LoadAndDelete(deterministicMessageID) | ||
if !loaded { | ||
return nil, fmt.Errorf("no response found for message ID %s", deterministicMessageID) | ||
} | ||
|
||
msg, ok := response.(*types.MessageBody) | ||
if !ok { | ||
return nil, fmt.Errorf("unexpected response type %T for message ID %s", response, deterministicMessageID) | ||
} | ||
|
||
if msg.Error != types.Error_OK { | ||
return nil, fmt.Errorf("remote capability returned error: %s", msg.Error) | ||
} | ||
|
||
capabilityResponse, err := pb.UnmarshalCapabilityResponse(msg.Payload) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) | ||
} | ||
|
||
// TODO handle the case where the capability returns a stream of responses | ||
resultCh := make(chan commoncap.CapabilityResponse, 1) | ||
resultCh <- capabilityResponse | ||
close(resultCh) | ||
|
||
return resultCh, nil | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
|
||
} | ||
|
||
// transmitRequestWithMessageID transmits a capability request to remote capabilities according to the transmission configuration | ||
func (c *remoteTargetCaller) transmitRequestWithMessageID(ctx context.Context, req commoncap.CapabilityRequest, messageID [32]byte) error { | ||
rawRequest, err := pb.MarshalCapabilityRequest(req) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal capability request: %w", err) | ||
} | ||
|
||
// TODO should the transmission config be passed into the constructor rather than pulled from the request? | ||
tc, err := transmission.ExtractTransmissionConfig(req.Config) | ||
if err != nil { | ||
return fmt.Errorf("failed to extract transmission config from request config: %w", err) | ||
} | ||
|
||
message := &types.MessageBody{ | ||
CapabilityId: c.remoteCapabilityInfo.ID, | ||
CapabilityDonId: c.remoteCapabilityDonInfo.ID, | ||
CallerDonId: c.localDONInfo.ID, | ||
Method: types.MethodExecute, | ||
Payload: rawRequest, | ||
MessageId: messageID[:], | ||
} | ||
|
||
peerIDToDelay, err := transmission.GetPeerIDToTransmissionDelay(c.remoteCapabilityDonInfo.Members, c.localDONInfo.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, tc) | ||
if err != nil { | ||
return fmt.Errorf("failed to get peer ID to transmission delay: %w", err) | ||
} | ||
|
||
for peerID, delay := range peerIDToDelay { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's interesting that you put the strategy inside the shim. I initially though that it will exist outside of it but maybe this is better. Let me think about it more. |
||
go func(peerID ragep2ptypes.PeerID, delay time.Duration) { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(delay): | ||
c.lggr.Debugw("executing delayed execution for peer", "peerID", peerID) | ||
err = c.dispatcher.Send(peerID, message) | ||
if err != nil { | ||
c.lggr.Errorw("failed to send message", "peerID", peerID, "err", err) | ||
} | ||
} | ||
}(peerID, delay) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { | ||
|
||
// TODO handle the case where the capability returns a stream of responses | ||
messageID := getMessageID(msg) | ||
|
||
wg, loaded := c.messageIDToWaitgroup.LoadAndDelete(messageID) | ||
if loaded { | ||
wg.(*sync.WaitGroup).Done() | ||
c.messageIDToResponse.Store(messageID, msg) | ||
return | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these just be no-ops for targets?