Skip to content

Commit

Permalink
[Keystone] Add remote target to syncer (#13456)
Browse files Browse the repository at this point in the history
* [Keystone] Add remote target to syncer

1. Refactor remote target client/server to implement services.Service
2. Use service-wide context when launching NewClientRequest
3. Handle early termination in Engine
4. Logging improvements

* Use updated error value; handle stop execution

* Fresh common + test fixes

---------

Co-authored-by: Cedric Cordenier <[email protected]>
  • Loading branch information
bolekk and cedric-cordenier authored Jun 7, 2024
1 parent d3b41d2 commit b09c14d
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 90 deletions.
5 changes: 5 additions & 0 deletions .changeset/healthy-shoes-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal [Keystone] Add remote target to syncer
2 changes: 2 additions & 0 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver rem
return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId)
}
d.receivers[k] = receiver
d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.receivers, key{capabilityId, donId})
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
Expand Down
78 changes: 55 additions & 23 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand All @@ -22,44 +22,34 @@ import (
//
// client communicates with corresponding server on remote nodes.
type client struct {
services.StateMachine
lggr logger.Logger
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo capabilities.DON
localDONInfo commoncap.DON
dispatcher types.Dispatcher
requestTimeout time.Duration

messageIDToCallerRequest map[string]*request.ClientRequest
mutex sync.Mutex
stopCh services.StopChan
wg sync.WaitGroup
}

var _ commoncap.TargetCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

func NewClient(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) *client {
c := &client{
func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
lggr: lggr,
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
messageIDToCallerRequest: make(map[string]*request.ClientRequest),
stopCh: make(services.StopChan),
}

go func() {
ticker := time.NewTicker(requestTimeout)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.expireRequests()
}
}
}()

return c
}

func (c *client) expireRequests() {
Expand All @@ -74,6 +64,36 @@ func (c *client) expireRequests() {
}
}

func (c *client) Start(ctx context.Context) error {
return c.StartOnce(c.Name(), func() error {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
c.lggr.Info("TargetClient started")
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.expireRequests()
}
}
}()
return nil
})
}

func (c *client) Close() error {
return c.StopOnce(c.Name(), func() error {
close(c.stopCh)
c.wg.Wait()
c.lggr.Info("TargetClient closed")
return nil
})
}

func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}
Expand Down Expand Up @@ -101,7 +121,8 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
cCtx, _ := c.stopCh.NewCtx()
req, err := request.NewClientRequest(cCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
Expand All @@ -115,8 +136,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
func (c *client) Receive(msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
// TODO should the dispatcher be passing in a context?
ctx := context.Background()
ctx, _ := c.stopCh.NewCtx()

messageID := GetMessageID(msg)

Expand All @@ -140,3 +160,15 @@ func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
}

func (c *client) Ready() error {
return nil
}

func (c *client) HealthReport() map[string]error {
return nil
}

func (c *client) Name() string {
return "TargetClient"
}
9 changes: 8 additions & 1 deletion core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand Down Expand Up @@ -165,11 +166,14 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
}

callers := make([]commoncap.TargetCapability, numWorkflowPeers)
srvcs := make([]services.Service, numWorkflowPeers)
for i := 0; i < numWorkflowPeers; i++ {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
caller := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout)
caller := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, lggr)
require.NoError(t, caller.Start(ctx))
broker.RegisterReceiverNode(workflowPeers[i], caller)
callers[i] = caller
srvcs[i] = caller
}

executeInputs, err := values.NewMap(
Expand Down Expand Up @@ -202,6 +206,9 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
}

wg.Wait()
for i := 0; i < numWorkflowPeers; i++ {
require.NoError(t, srvcs[i].Close())
}
}

// Simple client that only responds once it has received a message from each workflow peer
Expand Down
15 changes: 12 additions & 3 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand Down Expand Up @@ -225,22 +226,27 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
workflowDonInfo.ID: workflowDonInfo,
}

srvcs := []services.Service{}
capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
capabilityPeer := capabilityPeers[i]
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNodeResponseTimeout)
capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher,
capabilityNodeResponseTimeout, lggr)
require.NoError(t, capabilityNode.Start(ctx))
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
capabilityNodes[i] = capabilityNode
srvcs = append(srvcs, capabilityNode)
}

workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers)
for i := 0; i < numWorkflowPeers; i++ {
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i])
workflowNode := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout)
workflowNode := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, lggr)
require.NoError(t, workflowNode.Start(ctx))
broker.RegisterReceiverNode(workflowPeers[i], workflowNode)
workflowNodes[i] = workflowNode
srvcs = append(srvcs, workflowNode)
}

executeInputs, err := values.NewMap(
Expand Down Expand Up @@ -272,6 +278,9 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
}

wg.Wait()
for _, srv := range srvcs {
require.NoError(t, srv.Close())
}
}

type testMessageBroker struct {
Expand Down
4 changes: 4 additions & 0 deletions core/capabilities/remote/target/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err)
}

lggr.Debugw("sending request to peers", "execID", req.Metadata.WorkflowExecutionID, "schedule", peerIDToTransmissionDelay)

responseReceived := make(map[p2ptypes.PeerID]bool)
for peerID, delay := range peerIDToTransmissionDelay {
responseReceived[peerID] = false
Expand All @@ -74,8 +76,10 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap

select {
case <-ctx.Done():
lggr.Debugw("context done, not sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID)
return
case <-time.After(delay):
lggr.Debugw("sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID)
err := dispatcher.Send(peerID, message)
if err != nil {
lggr.Errorw("failed to send message", "peerID", peerID, "err", err)
Expand Down
13 changes: 9 additions & 4 deletions core/capabilities/remote/target/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand Down Expand Up @@ -42,12 +43,13 @@ type ServerRequest struct {
requestMessageID string
requestTimeout time.Duration

mux sync.Mutex
mux sync.Mutex
lggr logger.Logger
}

func NewServerRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID,
callingDon commoncap.DON, requestMessageID string,
dispatcher types.Dispatcher, requestTimeout time.Duration) *ServerRequest {
dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *ServerRequest {
return &ServerRequest{
capability: capability,
createdTime: time.Now(),
Expand All @@ -60,6 +62,7 @@ func NewServerRequest(capability capabilities.TargetCapability, capabilityID str
callingDon: callingDon,
requestMessageID: requestMessageID,
requestTimeout: requestTimeout,
lggr: lggr,
}
}

Expand Down Expand Up @@ -118,21 +121,22 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro
return fmt.Errorf("failed to unmarshal capability request: %w", err)
}

e.lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata)
capResponseCh, err := e.capability.Execute(ctxWithTimeout, capabilityRequest)

if err != nil {
return fmt.Errorf("failed to execute capability: %w", err)
}

// TODO working on the assumption that the capability will only ever return one response from its channel (for now at least)
// NOTE working on the assumption that the capability will only ever return one response from its channel
capResponse := <-capResponseCh
responsePayload, err := pb.MarshalCapabilityResponse(capResponse)
if err != nil {
return fmt.Errorf("failed to marshal capability response: %w", err)
}

e.lggr.Debugw("received execution results", "metadata", capabilityRequest.Metadata, "error", capResponse.Err)
e.setResult(responsePayload)

return nil
}

Expand Down Expand Up @@ -212,6 +216,7 @@ func (e *ServerRequest) sendResponse(requester p2ptypes.PeerID) error {
responseMsg.Payload = e.response.response
}

e.lggr.Debugw("Sending response", "receiver", requester)
if err := e.dispatcher.Send(requester, &responseMsg); err != nil {
return fmt.Errorf("failed to send response to dispatcher: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions core/capabilities/remote/target/request/server_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

func Test_ServerRequest_MessageValidation(t *testing.T) {
lggr := logger.TestLogger(t)
capability := TestCapability{}
capabilityPeerID := NewP2PPeerID(t)

Expand Down Expand Up @@ -57,7 +59,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {

t.Run("Send duplicate message", func(t *testing.T) {
req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID",
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute)
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr)

err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest)
require.NoError(t, err)
Expand All @@ -67,7 +69,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {

t.Run("Send message with non calling don peer", func(t *testing.T) {
req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID",
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute)
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr)

err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest)
require.NoError(t, err)
Expand All @@ -90,7 +92,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {

t.Run("Send message invalid payload", func(t *testing.T) {
req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID",
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute)
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr)

err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest)
require.NoError(t, err)
Expand All @@ -115,7 +117,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {
t.Run("Send second valid request when capability errors", func(t *testing.T) {
dispatcher := &testDispatcher{}
req := request.NewServerRequest(TestErrorCapability{}, "capabilityID", "capabilityDonID",
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute)
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr)

err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest)
require.NoError(t, err)
Expand All @@ -142,7 +144,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) {
t.Run("Send second valid request", func(t *testing.T) {
dispatcher := &testDispatcher{}
request := request.NewServerRequest(capability, "capabilityID", "capabilityDonID",
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute)
capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr)

err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit b09c14d

Please sign in to comment.