Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ks 119/remote transmission protocol #13293

Closed
wants to merge 43 commits into from

Conversation

ettec
Copy link
Collaborator

@ettec ettec commented May 22, 2024

Very much still a WIP - created just to facilitate an early review meeting with Bolek

Copy link
Contributor

I see you updated files related to core. Please run pnpm changeset in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

return nil, fmt.Errorf("failed to marshal capability request: %w", err)
}

deterministicMessageID := sha256.Sum256(rawRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your unique identified should be (workflowID, executionID). That is available inside req.Metadata - see engine.go:executeStep().

}

select {
case <-responseReceived:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute shouldn't block on that.

}

func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) {
// TODO should the dispatcher be passing in a context?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I though about improving goroutine management in the Dispatcher, let me think about it


executeReq.fromPeers[sender] = true
minRequiredRequests := int(callerDon.F + 1)
if len(executeReq.fromPeers) >= minRequiredRequests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try leveraging the messageCache object.

return fmt.Errorf("failed to get peer ID to transmission delay: %w", err)
}

for peerID, delay := range peerIDToDelay {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you put the strategy inside the shim. I initially though that it will exist outside of it but maybe this is better. Let me think about it more.

}

func (c *remoteTargetCaller) RegisterToWorkflow(ctx context.Context, request commoncap.RegisterToWorkflowRequest) error {
return errors.New("not implemented")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these just be no-ops for targets?

}
}

func (e *remoteTargetCapabilityRequest) receive(ctx context.Context, msg *types.MessageBody) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using MessageCache for this logic? It would be nice to implement similar behaviors in a consistent way across all remote capabilities.

return
}

// A request is uniquely identified by the message id and the hash of the payload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds risky. A message to a target should be identified by (CallerDonID, WorkflowExecutionID) - available in the metadata. WorkflowExecutionID is something that all nodes in the workflow DON reached consensus on. If we track things only by payload hashes and we have multiple buggy or malicious nodes, it will be very hard for us to make sense of any metrics. And you also need scoping to caller DON.

@@ -0,0 +1,186 @@
package target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inconsistent file names - rename to receiver_request.go

requestIDToExecuteRequest: make(map[string]*callerExecuteRequest),
}

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we convert to services.Service style with Start()/Stop() for consistency with other objects that launch their own coroutines?


if msg.Error != types.Error_OK {
c.lggr.Warnw("received error response for pending request", "requestID", requestID, "sender", sender, "receiver", msg.Receiver, "error", msg.Error)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not aggregate like successful responses? If all remote nodes return the same error, shouldn't we also pass it back to the underlying caller?

@cl-sonarqube-production
Copy link

Quality Gate failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)
21 New Major Issues (required ≤ 5)

See analysis details on SonarQube

Catch issues before they fail your Quality Gate with our IDE extension SonarLint SonarLint

@ettec ettec closed this May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants