-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ks 119/remote transmission protocol #13293
Conversation
I see you updated files related to
|
return nil, fmt.Errorf("failed to marshal capability request: %w", err) | ||
} | ||
|
||
deterministicMessageID := sha256.Sum256(rawRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your unique identified should be (workflowID, executionID). That is available inside req.Metadata - see engine.go:executeStep().
} | ||
|
||
select { | ||
case <-responseReceived: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Execute shouldn't block on that.
} | ||
|
||
func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) { | ||
// TODO should the dispatcher be passing in a context? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, I though about improving goroutine management in the Dispatcher, let me think about it
|
||
executeReq.fromPeers[sender] = true | ||
minRequiredRequests := int(callerDon.F + 1) | ||
if len(executeReq.fromPeers) >= minRequiredRequests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can try leveraging the messageCache object.
return fmt.Errorf("failed to get peer ID to transmission delay: %w", err) | ||
} | ||
|
||
for peerID, delay := range peerIDToDelay { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's interesting that you put the strategy inside the shim. I initially though that it will exist outside of it but maybe this is better. Let me think about it more.
} | ||
|
||
func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error { | ||
return errors.New("not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these just be no-ops for targets?
} | ||
} | ||
|
||
func (e *remoteTargetCapabilityRequest) receive(ctx context.Context, msg *types.MessageBody) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider using MessageCache for this logic? It would be nice to implement similar behaviors in a consistent way across all remote capabilities.
return | ||
} | ||
|
||
// A request is uniquely identified by the message id and the hash of the payload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds risky. A message to a target should be identified by (CallerDonID, WorkflowExecutionID) - available in the metadata. WorkflowExecutionID is something that all nodes in the workflow DON reached consensus on. If we track things only by payload hashes and we have multiple buggy or malicious nodes, it will be very hard for us to make sense of any metrics. And you also need scoping to caller DON.
@@ -0,0 +1,186 @@ | |||
package target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inconsistent file names - rename to receiver_request.go
requestIDToExecuteRequest: make(map[string]*callerExecuteRequest), | ||
} | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we convert to services.Service style with Start()/Stop() for consistency with other objects that launch their own coroutines?
|
||
if msg.Error != types.Error_OK { | ||
c.lggr.Warnw("received error response for pending request", "requestID", requestID, "sender", sender, "receiver", msg.Receiver, "error", msg.Error) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not aggregate like successful responses? If all remote nodes return the same error, shouldn't we also pass it back to the underlying caller?
Quality Gate failedFailed conditions See analysis details on SonarQube Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
Very much still a WIP - created just to facilitate an early review meeting with Bolek