Skip to content

Commit

Permalink
[KS-285] Update DON struct; pass through DON Config Version (#13739)
Browse files Browse the repository at this point in the history
* Convert donID string -> uint32 to match capabilities registry

* Fix launcher tests

* Pass through ConfigVersion

* Update tests

* bump common
  • Loading branch information
cedric-cordenier authored Jul 2, 2024
1 parent 2ecf45d commit 0efeed6
Show file tree
Hide file tree
Showing 32 changed files with 192 additions and 185 deletions.
19 changes: 10 additions & 9 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (w *launcher) LocalNode(ctx context.Context) (capabilities.Node, error) {
return w.localNode, errors.New("unable to get local node: peerWrapper hasn't started yet")
}

if w.localNode.WorkflowDON.ID == "" {
if w.localNode.WorkflowDON.ID == 0 {
return w.localNode, errors.New("unable to get local node: waiting for initial call from syncer")
}

Expand All @@ -113,7 +113,7 @@ func (w *launcher) updateLocalNode(state registrysyncer.State) {
for _, p := range d.NodeP2PIds {
if p == pid {
if d.AcceptsWorkflows {
if workflowDON.ID == "" {
if workflowDON.ID == 0 {
workflowDON = *toDONInfo(d)
w.lggr.Debug("Workflow DON identified: %+v", workflowDON)
} else {
Expand Down Expand Up @@ -353,7 +353,7 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability

err = w.dispatcher.SetReceiver(
fullCapID,
fmt.Sprint(don.Id),
don.Id,
capability,
)
if err != nil {
Expand All @@ -373,9 +373,9 @@ var (
)

func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don kcr.CapabilitiesRegistryDONInfo, state registrysyncer.State, remoteWorkflowDONs []kcr.CapabilitiesRegistryDONInfo) error {
idsToDONs := map[string]capabilities.DON{}
idsToDONs := map[uint32]capabilities.DON{}
for _, d := range remoteWorkflowDONs {
idsToDONs[fmt.Sprint(d.Id)] = *toDONInfo(d)
idsToDONs[d.Id] = *toDONInfo(d)
}

for _, c := range don.CapabilityConfigurations {
Expand Down Expand Up @@ -465,7 +465,7 @@ func (w *launcher) addReceiver(ctx context.Context, capability kcr.CapabilitiesR
}

w.lggr.Debugw("Enabling external access for capability", "id", fullCapID, "donID", don.Id)
err = w.dispatcher.SetReceiver(fullCapID, fmt.Sprint(don.Id), receiver)
err = w.dispatcher.SetReceiver(fullCapID, don.Id, receiver)
if err != nil {
return fmt.Errorf("failed to set receiver: %w", err)
}
Expand Down Expand Up @@ -502,9 +502,10 @@ func toDONInfo(don kcr.CapabilitiesRegistryDONInfo) *capabilities.DON {
}

return &capabilities.DON{
ID: fmt.Sprint(don.Id),
Members: peerIDs,
F: don.F,
ID: don.Id,
ConfigVersion: don.ConfigCount,
Members: peerIDs,
F: don.F,
}
}

Expand Down
20 changes: 10 additions & 10 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package capabilities
import (
"context"
"crypto/rand"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -178,8 +177,8 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
registry,
)

dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(dID), mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(dID), mock.AnythingOfType("*target.server")).Return(nil)
dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*target.server")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down Expand Up @@ -428,8 +427,8 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
registry,
)

dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(capDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(capDonID), mock.AnythingOfType("*target.client")).Return(nil)
dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down Expand Up @@ -586,7 +585,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
registry,
)

dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(triggerCapDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)

err = launcher.Launch(ctx, state)
require.NoError(t, err)
Expand Down Expand Up @@ -634,7 +633,7 @@ func TestLauncher_LocalNode(t *testing.T) {
IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{
registrysyncer.DonID(dID): {
Id: dID,
ConfigCount: uint32(0),
ConfigCount: uint32(2),
F: uint8(1),
IsPublic: true,
AcceptsWorkflows: true,
Expand Down Expand Up @@ -680,9 +679,10 @@ func TestLauncher_LocalNode(t *testing.T) {
require.NoError(t, err)

don := capabilities.DON{
ID: fmt.Sprintf("%d", dID),
Members: toPeerIDs(workflowDonNodes),
F: 1,
ID: dID,
ConfigVersion: 2,
Members: toPeerIDs(workflowDonNodes),
F: 1,
}
expectedNode := capabilities.Node{
PeerID: &pid,
Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type dispatcher struct {

type key struct {
capId string
donId string
donId uint32
}

var _ services.Service = &dispatcher{}
Expand Down Expand Up @@ -88,13 +88,13 @@ type receiver struct {
ch chan *remotetypes.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, rec remotetypes.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotetypes.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId)
return fmt.Errorf("receiver already exists for capability %s and don %d", capabilityId, donId)
}

receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId string, rec remotety
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) {
func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
d.mu.Lock()
defer d.mu.Unlock()

Expand Down Expand Up @@ -181,7 +181,7 @@ func (d *dispatcher) receive() {
}

receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize
capReceiveChannelUsage.WithLabelValues(k.capId, k.donId).Set(receiverQueueUsage)
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
}

capDonInfo := commoncap.DON{
ID: "capability-don",
ID: 1,
Members: capabilityPeers,
F: capabilityDonF,
}
Expand All @@ -149,7 +149,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo

workflowDonInfo := commoncap.DON{
Members: workflowPeers,
ID: "workflow-don",
ID: 2,
}

broker := newTestAsyncMessageBroker(t, 100)
Expand Down Expand Up @@ -259,7 +259,7 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
for receiver := range t.messageIDToSenders[messageID] {
var responseMsg = &remotetypes.MessageBody{
CapabilityId: "[email protected]",
CapabilityDonId: "capability-don",
CapabilityDonId: 1,
CallerDonId: t.workflowDonInfo.ID,
Method: remotetypes.MethodExecute,
MessageId: []byte(messageID),
Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
require.NoError(t, capabilityPeerID.UnmarshalText([]byte(NewPeerID())))

capDonInfo := commoncap.DON{
ID: "capability-don",
ID: 2,
Members: capabilityPeers,
F: capabilityDonF,
}
Expand All @@ -212,13 +212,13 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta

workflowDonInfo := commoncap.DON{
Members: workflowPeers,
ID: "workflow-don",
ID: 1,
F: workflowDonF,
}

broker := newTestAsyncMessageBroker(t, 1000)

workflowDONs := map[string]commoncap.DON{
workflowDONs := map[uint32]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
}

Expand Down Expand Up @@ -380,10 +380,10 @@ func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.Messa
return nil
}

func (t *nodeDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
func (t *nodeDispatcher) SetReceiver(capabilityId string, donId uint32, receiver remotetypes.Receiver) error {
return nil
}
func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId string) {}
func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId uint32) {}

type abstractTestCapability struct {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
}

capDonInfo := commoncap.DON{
ID: "capability-don",
ID: 1,
Members: capabilityPeers,
F: 1,
}
Expand All @@ -50,7 +50,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

workflowDonInfo := commoncap.DON{
Members: workflowPeers,
ID: "workflow-don",
ID: 2,
}

executeInputs, err := values.NewMap(
Expand Down Expand Up @@ -311,11 +311,11 @@ type clientRequestTestDispatcher struct {
msgs chan *types.MessageBody
}

func (t *clientRequestTestDispatcher) SetReceiver(capabilityId string, donId string, receiver types.Receiver) error {
func (t *clientRequestTestDispatcher) SetReceiver(capabilityId string, donId uint32, receiver types.Receiver) error {
return nil
}

func (t *clientRequestTestDispatcher) RemoveReceiver(capabilityId string, donId string) {}
func (t *clientRequestTestDispatcher) RemoveReceiver(capabilityId string, donId uint32) {}

func (t *clientRequestTestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
t.msgs <- msgBody
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ServerRequest struct {

capabilityPeerId p2ptypes.PeerID
capabilityID string
capabilityDonID string
capabilityDonID uint32

dispatcher types.Dispatcher

Expand All @@ -47,7 +47,7 @@ type ServerRequest struct {
lggr logger.Logger
}

func NewServerRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID,
func NewServerRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID uint32, capabilityPeerId p2ptypes.PeerID,
callingDon commoncap.DON, requestMessageID string,
dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *ServerRequest {
return &ServerRequest{
Expand Down
Loading

0 comments on commit 0efeed6

Please sign in to comment.