Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed May 24, 2024
1 parent 6f86dd0 commit efe3e38
Show file tree
Hide file tree
Showing 4 changed files with 476 additions and 114 deletions.
20 changes: 10 additions & 10 deletions core/capabilities/remote/target_caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,22 @@ func (c *remoteTargetCaller) transmitRequestWithMessageID(ctx context.Context, r
return fmt.Errorf("failed to extract transmission config from request config: %w", err)
}

message := &types.MessageBody{
CapabilityId: c.remoteCapabilityInfo.ID,
CapabilityDonId: c.remoteCapabilityDonInfo.ID,
CallerDonId: c.localDONInfo.ID,
Method: types.MethodExecute,
Payload: rawRequest,
MessageId: []byte(messageID),
}

peerIDToDelay, err := transmission.GetPeerIDToTransmissionDelay(c.remoteCapabilityDonInfo.Members, c.localDONInfo.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, tc)
if err != nil {
return fmt.Errorf("failed to get peer ID to transmission delay: %w", err)
}

for peerID, delay := range peerIDToDelay {
go func(peerID ragep2ptypes.PeerID, delay time.Duration) {
message := &types.MessageBody{
CapabilityId: c.remoteCapabilityInfo.ID,
CapabilityDonId: c.remoteCapabilityDonInfo.ID,
CallerDonId: c.localDONInfo.ID,
Method: types.MethodExecute,
Payload: rawRequest,
MessageId: []byte(messageID),
}

select {
case <-ctx.Done():
return
Expand All @@ -172,7 +172,7 @@ func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID := getMessageID(msg)
messageID := GetMessageID(msg)

req := c.messageIDToExecuteRequest[messageID]
if req == nil {
Expand Down
250 changes: 250 additions & 0 deletions core/capabilities/remote/target_caller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package remote_test

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -225,6 +227,254 @@ func Test_TargetCallerExecuteWithErrorTimesOut(t *testing.T) {
require.NotNil(t, response.Err)
}

func Test_RemoteTargetCaller_DonTopologies(t *testing.T) {

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
}

capability := &testCapability{}

responseTimeOut := 10 * time.Minute

testRemoteTargetCaller(t, 1, responseTimeOut, 1, 0,
capability, transmissionSchedule, responseTest)

testRemoteTargetCaller(t, 10, responseTimeOut, 1, 0,
capability, transmissionSchedule, responseTest)

testRemoteTargetCaller(t, 1, responseTimeOut, 10, 3,
capability, transmissionSchedule, responseTest)

testRemoteTargetCaller(t, 10, responseTimeOut, 10, 3,
capability, transmissionSchedule, responseTest)

testRemoteTargetCaller(t, 10, responseTimeOut, 10, 9,
capability, transmissionSchedule, responseTest)

}

func Test_RemoteTargetCaller_TransmissionSchedules(t *testing.T) {

responseTest := func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error) {
require.NoError(t, responseError)
response := <-responseCh
responseValue, err := response.Value.Unwrap()
require.NoError(t, err)
assert.Equal(t, "aValue1", responseValue.(string))
}

capability := &testCapability{}

responseTimeOut := 10 * time.Minute

transmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

testRemoteTargetCaller(t, 1, responseTimeOut, 1, 0,
capability, transmissionSchedule, responseTest)
testRemoteTargetCaller(t, 10, responseTimeOut, 10, 3,
capability, transmissionSchedule, responseTest)

transmissionSchedule, err = values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10ms",
})
require.NoError(t, err)

testRemoteTargetCaller(t, 1, responseTimeOut, 1, 0,
capability, transmissionSchedule, responseTest)
testRemoteTargetCaller(t, 10, responseTimeOut, 10, 3,
capability, transmissionSchedule, responseTest)

}

func testRemoteTargetCaller(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout time.Duration,
numCapabilityPeers int, capabilityDonF uint8, underlying commoncap.TargetCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseCh <-chan commoncap.CapabilityResponse, responseError error)) {
lggr := logger.TestLogger(t)
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()

capInfo := commoncap.CapabilityInfo{
ID: "cap_id",
CapabilityType: commoncap.CapabilityTypeTarget,
Description: "Remote Target",
Version: "0.0.1",
}

capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
capabilityPeerID := p2ptypes.PeerID{}
require.NoError(t, capabilityPeerID.UnmarshalText([]byte(newPeerID())))
capabilityPeers[i] = capabilityPeerID
}

capabilityPeerID := p2ptypes.PeerID{}
require.NoError(t, capabilityPeerID.UnmarshalText([]byte(newPeerID())))

capDonInfo := commoncap.DON{
ID: "capability-don",
Members: capabilityPeers,
F: capabilityDonF,
}

workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers)
for i := 0; i < numWorkflowPeers; i++ {
workflowPeerID := p2ptypes.PeerID{}
require.NoError(t, workflowPeerID.UnmarshalText([]byte(newPeerID())))
workflowPeers[i] = workflowPeerID
}

workflowDonInfo := commoncap.DON{
Members: workflowPeers,
ID: "workflow-don",
}

broker := newTestMessageBroker()

receivers := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeers[i])
receiver := newTestReceiver(capabilityPeers[i], capabilityDispatcher, workflowDonInfo, underlying)
broker.RegisterReceiverNode(capabilityPeers[i], receiver)
receivers[i] = receiver
}

callers := make([]commoncap.TargetCapability, numWorkflowPeers)
for i := 0; i < numWorkflowPeers; i++ {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
caller := remote.NewRemoteTargetCaller(ctx, lggr, capInfo, capDonInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout)
broker.RegisterReceiverNode(workflowPeers[i], caller)
callers[i] = caller
}

executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
},
)

require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(len(callers))

// Fire off all the requests
for _, caller := range callers {
go func(caller commoncap.TargetCapability) {
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
},
Config: transmissionSchedule,
Inputs: executeInputs,
})

responseTest(t, responseCh, err)
wg.Done()
}(caller)
}

wg.Wait()
}

// Simple receiver that only responds once it has received a message from each workflow peer
type callerTestReceiver struct {
peerID p2ptypes.PeerID
dispatcher remotetypes.Dispatcher
workflowDonInfo commoncap.DON
messageIDToSenders map[string]map[p2ptypes.PeerID]bool

targetCapability commoncap.TargetCapability

mux sync.Mutex
}

func newTestReceiver(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, workflowDonInfo commoncap.DON,
targetCapability commoncap.TargetCapability) *callerTestReceiver {

return &callerTestReceiver{
dispatcher: dispatcher,
workflowDonInfo: workflowDonInfo,
peerID: peerID,
messageIDToSenders: make(map[string]map[p2ptypes.PeerID]bool),
targetCapability: targetCapability,
}
}

func (t *callerTestReceiver) Receive(msg *remotetypes.MessageBody) {
t.mux.Lock()
defer t.mux.Unlock()

sender := toPeerID(msg.Sender)
messageID := remote.GetMessageID(msg)

if t.messageIDToSenders[messageID] == nil {
t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool)
}

sendersOfMessageID := t.messageIDToSenders[messageID]
if sendersOfMessageID[sender] {
panic("received duplicate message")
}

sendersOfMessageID[sender] = true

if len(t.messageIDToSenders[messageID]) == len(t.workflowDonInfo.Members) {

capabilityRequest, err := pb.UnmarshalCapabilityRequest(msg.Payload)
if err != nil {
panic(err)
}

respCh, responseErr := t.targetCapability.Execute(context.Background(), capabilityRequest)
resp := <-respCh

for receiver := range t.messageIDToSenders[messageID] {
var responseMsg = &remotetypes.MessageBody{
CapabilityId: "cap_id",
CapabilityDonId: "capability-don",
CallerDonId: t.workflowDonInfo.ID,
Method: remotetypes.MethodExecute,
MessageId: []byte(messageID),
Sender: t.peerID[:],
Receiver: receiver[:],
}

if responseErr != nil {
responseMsg.Error = remotetypes.Error_INTERNAL_ERROR
} else {
payload, err := pb.MarshalCapabilityResponse(resp)

Check failure on line 463 in core/capabilities/remote/target_caller_test.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 441 (govet)
if err != nil {
panic(err)
}
responseMsg.Payload = payload
}

err = t.dispatcher.Send(receiver, responseMsg)
if err != nil {
panic(err)
}
}
}
}

type TestDispatcher struct {
sentMessagesCh chan *remotetypes.MessageBody
receiver remotetypes.Receiver
Expand Down
Loading

0 comments on commit efe3e38

Please sign in to comment.