diff --git a/.changeset/healthy-shoes-lie.md b/.changeset/healthy-shoes-lie.md new file mode 100644 index 00000000000..85c0cc0beed --- /dev/null +++ b/.changeset/healthy-shoes-lie.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal [Keystone] Add remote target to syncer diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index 44b00c035f1..63b0fad7e98 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -72,6 +72,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver rem return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId) } d.receivers[k] = receiver + d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId) return nil } @@ -79,6 +80,7 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) { d.mu.Lock() defer d.mu.Unlock() delete(d.receivers, key{capabilityId, donId}) + d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId) } func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error { diff --git a/core/capabilities/remote/target/client.go b/core/capabilities/remote/target/client.go index ceab11dfcb0..2fb11930164 100644 --- a/core/capabilities/remote/target/client.go +++ b/core/capabilities/remote/target/client.go @@ -7,8 +7,8 @@ import ( "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -22,44 +22,34 @@ import ( // // client communicates with corresponding server on remote nodes. type client struct { + services.StateMachine lggr logger.Logger remoteCapabilityInfo commoncap.CapabilityInfo - localDONInfo capabilities.DON + localDONInfo commoncap.DON dispatcher types.Dispatcher requestTimeout time.Duration messageIDToCallerRequest map[string]*request.ClientRequest mutex sync.Mutex + stopCh services.StopChan + wg sync.WaitGroup } var _ commoncap.TargetCapability = &client{} var _ types.Receiver = &client{} +var _ services.Service = &client{} -func NewClient(ctx context.Context, lggr logger.Logger, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration) *client { - c := &client{ +func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, + requestTimeout time.Duration, lggr logger.Logger) *client { + return &client{ lggr: lggr, remoteCapabilityInfo: remoteCapabilityInfo, localDONInfo: localDonInfo, dispatcher: dispatcher, requestTimeout: requestTimeout, messageIDToCallerRequest: make(map[string]*request.ClientRequest), + stopCh: make(services.StopChan), } - - go func() { - ticker := time.NewTicker(requestTimeout) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - c.expireRequests() - } - } - }() - - return c } func (c *client) expireRequests() { @@ -74,6 +64,36 @@ func (c *client) expireRequests() { } } +func (c *client) Start(ctx context.Context) error { + return c.StartOnce(c.Name(), func() error { + c.wg.Add(1) + go func() { + defer c.wg.Done() + ticker := time.NewTicker(c.requestTimeout) + defer ticker.Stop() + c.lggr.Info("TargetClient started") + for { + select { + case <-c.stopCh: + return + case <-ticker.C: + c.expireRequests() + } + } + }() + return nil + }) +} + +func (c *client) Close() error { + return c.StopOnce(c.Name(), func() error { + close(c.stopCh) + c.wg.Wait() + c.lggr.Info("TargetClient closed") + return nil + }) +} + func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { return c.remoteCapabilityInfo, nil } @@ -101,7 +121,8 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest return nil, fmt.Errorf("request for message ID %s already exists", messageID) } - req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, + cCtx, _ := c.stopCh.NewCtx() + req, err := request.NewClientRequest(cCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, c.requestTimeout) if err != nil { return nil, fmt.Errorf("failed to create client request: %w", err) @@ -115,8 +136,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest func (c *client) Receive(msg *types.MessageBody) { c.mutex.Lock() defer c.mutex.Unlock() - // TODO should the dispatcher be passing in a context? - ctx := context.Background() + ctx, _ := c.stopCh.NewCtx() messageID := GetMessageID(msg) @@ -140,3 +160,15 @@ func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) { return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil } + +func (c *client) Ready() error { + return nil +} + +func (c *client) HealthReport() map[string]error { + return nil +} + +func (c *client) Name() string { + return "TargetClient" +} diff --git a/core/capabilities/remote/target/client_test.go b/core/capabilities/remote/target/client_test.go index f5c6c19ef93..8665ffe7544 100644 --- a/core/capabilities/remote/target/client_test.go +++ b/core/capabilities/remote/target/client_test.go @@ -11,6 +11,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -165,11 +166,14 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo } callers := make([]commoncap.TargetCapability, numWorkflowPeers) + srvcs := make([]services.Service, numWorkflowPeers) for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - caller := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout) + caller := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, lggr) + require.NoError(t, caller.Start(ctx)) broker.RegisterReceiverNode(workflowPeers[i], caller) callers[i] = caller + srvcs[i] = caller } executeInputs, err := values.NewMap( @@ -202,6 +206,9 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo } wg.Wait() + for i := 0; i < numWorkflowPeers; i++ { + require.NoError(t, srvcs[i].Close()) + } } // Simple client that only responds once it has received a message from each workflow peer diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go index 24776960562..a5379250e5e 100644 --- a/core/capabilities/remote/target/endtoend_test.go +++ b/core/capabilities/remote/target/endtoend_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -225,22 +226,27 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta workflowDonInfo.ID: workflowDonInfo, } + srvcs := []services.Service{} capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout) + capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout, lggr) + require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode + srvcs = append(srvcs, capabilityNode) } workflowNodes := make([]commoncap.TargetCapability, numWorkflowPeers) for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - workflowNode := target.NewClient(ctx, lggr, capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout) + workflowNode := target.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, lggr) + require.NoError(t, workflowNode.Start(ctx)) broker.RegisterReceiverNode(workflowPeers[i], workflowNode) workflowNodes[i] = workflowNode + srvcs = append(srvcs, workflowNode) } executeInputs, err := values.NewMap( @@ -272,6 +278,9 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta } wg.Wait() + for _, srv := range srvcs { + require.NoError(t, srv.Close()) + } } type testMessageBroker struct { diff --git a/core/capabilities/remote/target/request/client_request.go b/core/capabilities/remote/target/request/client_request.go index 1355932c833..5f49040e0fb 100644 --- a/core/capabilities/remote/target/request/client_request.go +++ b/core/capabilities/remote/target/request/client_request.go @@ -59,6 +59,8 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap return nil, fmt.Errorf("failed to get peer ID to transmission delay: %w", err) } + lggr.Debugw("sending request to peers", "execID", req.Metadata.WorkflowExecutionID, "schedule", peerIDToTransmissionDelay) + responseReceived := make(map[p2ptypes.PeerID]bool) for peerID, delay := range peerIDToTransmissionDelay { responseReceived[peerID] = false @@ -74,8 +76,10 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap select { case <-ctx.Done(): + lggr.Debugw("context done, not sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID) return case <-time.After(delay): + lggr.Debugw("sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID) err := dispatcher.Send(peerID, message) if err != nil { lggr.Errorw("failed to send message", "peerID", peerID, "err", err) diff --git a/core/capabilities/remote/target/request/server_request.go b/core/capabilities/remote/target/request/server_request.go index 84968de9f11..bb84fda4ac0 100644 --- a/core/capabilities/remote/target/request/server_request.go +++ b/core/capabilities/remote/target/request/server_request.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -42,12 +43,13 @@ type ServerRequest struct { requestMessageID string requestTimeout time.Duration - mux sync.Mutex + mux sync.Mutex + lggr logger.Logger } func NewServerRequest(capability capabilities.TargetCapability, capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID, callingDon commoncap.DON, requestMessageID string, - dispatcher types.Dispatcher, requestTimeout time.Duration) *ServerRequest { + dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *ServerRequest { return &ServerRequest{ capability: capability, createdTime: time.Now(), @@ -60,6 +62,7 @@ func NewServerRequest(capability capabilities.TargetCapability, capabilityID str callingDon: callingDon, requestMessageID: requestMessageID, requestTimeout: requestTimeout, + lggr: lggr, } } @@ -118,21 +121,22 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro return fmt.Errorf("failed to unmarshal capability request: %w", err) } + e.lggr.Debugw("executing capability", "metadata", capabilityRequest.Metadata) capResponseCh, err := e.capability.Execute(ctxWithTimeout, capabilityRequest) if err != nil { return fmt.Errorf("failed to execute capability: %w", err) } - // TODO working on the assumption that the capability will only ever return one response from its channel (for now at least) + // NOTE working on the assumption that the capability will only ever return one response from its channel capResponse := <-capResponseCh responsePayload, err := pb.MarshalCapabilityResponse(capResponse) if err != nil { return fmt.Errorf("failed to marshal capability response: %w", err) } + e.lggr.Debugw("received execution results", "metadata", capabilityRequest.Metadata, "error", capResponse.Err) e.setResult(responsePayload) - return nil } @@ -212,6 +216,7 @@ func (e *ServerRequest) sendResponse(requester p2ptypes.PeerID) error { responseMsg.Payload = e.response.response } + e.lggr.Debugw("Sending response", "receiver", requester) if err := e.dispatcher.Send(requester, &responseMsg); err != nil { return fmt.Errorf("failed to send response to dispatcher: %w", err) } diff --git a/core/capabilities/remote/target/request/server_request_test.go b/core/capabilities/remote/target/request/server_request_test.go index 0529a1e9004..fe3fdd713b5 100644 --- a/core/capabilities/remote/target/request/server_request_test.go +++ b/core/capabilities/remote/target/request/server_request_test.go @@ -16,10 +16,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) func Test_ServerRequest_MessageValidation(t *testing.T) { + lggr := logger.TestLogger(t) capability := TestCapability{} capabilityPeerID := NewP2PPeerID(t) @@ -57,7 +59,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send duplicate message", func(t *testing.T) { req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) require.NoError(t, err) @@ -67,7 +69,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send message with non calling don peer", func(t *testing.T) { req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) require.NoError(t, err) @@ -90,7 +92,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send message invalid payload", func(t *testing.T) { req := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) require.NoError(t, err) @@ -115,7 +117,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send second valid request when capability errors", func(t *testing.T) { dispatcher := &testDispatcher{} req := request.NewServerRequest(TestErrorCapability{}, "capabilityID", "capabilityDonID", - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) require.NoError(t, err) @@ -142,7 +144,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send second valid request", func(t *testing.T) { dispatcher := &testDispatcher{} request := request.NewServerRequest(capability, "capabilityID", "capabilityDonID", - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest) require.NoError(t, err) diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go index bb3bc6e4edc..8ccbecf530e 100644 --- a/core/capabilities/remote/target/server.go +++ b/core/capabilities/remote/target/server.go @@ -8,11 +8,11 @@ import ( "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -23,11 +23,12 @@ import ( // // server communicates with corresponding client on remote nodes. type server struct { + services.StateMachine lggr logger.Logger peerID p2ptypes.PeerID underlying commoncap.TargetCapability capInfo commoncap.CapabilityInfo - localDonInfo capabilities.DON + localDonInfo commoncap.DON workflowDONs map[string]commoncap.DON dispatcher types.Dispatcher @@ -35,13 +36,16 @@ type server struct { requestTimeout time.Duration receiveLock sync.Mutex + stopCh services.StopChan + wg sync.WaitGroup } var _ types.Receiver = &server{} +var _ services.Service = &server{} -func NewReceiver(ctx context.Context, lggr logger.Logger, peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, - workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration) *server { - r := &server{ +func NewServer(peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, + workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *server { + return &server{ underlying: underlying, peerID: peerID, capInfo: capInfo, @@ -52,23 +56,39 @@ func NewReceiver(ctx context.Context, lggr logger.Logger, peerID p2ptypes.PeerID requestIDToRequest: map[string]*request.ServerRequest{}, requestTimeout: requestTimeout, - lggr: lggr, + lggr: lggr, + stopCh: make(services.StopChan), } +} - go func() { - ticker := time.NewTicker(requestTimeout) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.expireRequests() +func (r *server) Start(ctx context.Context) error { + return r.StartOnce(r.Name(), func() error { + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(r.requestTimeout) + defer ticker.Stop() + r.lggr.Info("TargetServer started") + for { + select { + case <-r.stopCh: + return + case <-ticker.C: + r.expireRequests() + } } - } - }() + }() + return nil + }) +} - return r +func (r *server) Close() error { + return r.StopOnce(r.Name(), func() error { + close(r.stopCh) + r.wg.Wait() + r.lggr.Info("TargetServer closed") + return nil + }) } func (r *server) expireRequests() { @@ -91,8 +111,7 @@ func (r *server) expireRequests() { func (r *server) Receive(msg *types.MessageBody) { r.receiveLock.Lock() defer r.receiveLock.Unlock() - // TODO should the dispatcher be passing in a context? - ctx := context.Background() + ctx, _ := r.stopCh.NewCtx() if msg.Method != types.MethodExecute { r.lggr.Errorw("received request for unsupported method type", "method", msg.Method) @@ -113,7 +132,7 @@ func (r *server) Receive(msg *types.MessageBody) { } r.requestIDToRequest[requestID] = request.NewServerRequest(r.underlying, r.capInfo.ID, r.localDonInfo.ID, r.peerID, - callingDon, messageId, r.dispatcher, r.requestTimeout) + callingDon, messageId, r.dispatcher, r.requestTimeout, r.lggr) } req := r.requestIDToRequest[requestID] @@ -129,3 +148,15 @@ func (r *server) Receive(msg *types.MessageBody) { func GetMessageID(msg *types.MessageBody) string { return string(msg.MessageId) } + +func (r *server) Ready() error { + return nil +} + +func (r *server) HealthReport() map[string]error { + return nil +} + +func (r *server) Name() string { + return "TargetServer" +} diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go index fd7cbe0dfd1..e6d85ebff25 100644 --- a/core/capabilities/remote/target/server_test.go +++ b/core/capabilities/remote/target/server_test.go @@ -10,6 +10,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -23,7 +24,7 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { numCapabilityPeers := 4 - callers := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) + callers, srvcs := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -42,6 +43,7 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { assert.Equal(t, remotetypes.Error_OK, msg.Error) } } + closeServices(t, srvcs) } func Test_Server_InsufficientCallers(t *testing.T) { @@ -50,7 +52,7 @@ func Test_Server_InsufficientCallers(t *testing.T) { numCapabilityPeers := 4 - callers := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) + callers, srvcs := testRemoteTargetServer(ctx, t, &TestCapability{}, 10, 10, numCapabilityPeers, 3, 100*time.Millisecond) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -69,6 +71,7 @@ func Test_Server_InsufficientCallers(t *testing.T) { assert.Equal(t, remotetypes.Error_TIMEOUT, msg.Error) } } + closeServices(t, srvcs) } func Test_Server_CapabilityError(t *testing.T) { @@ -77,7 +80,7 @@ func Test_Server_CapabilityError(t *testing.T) { numCapabilityPeers := 4 - callers := testRemoteTargetServer(ctx, t, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) + callers, srvcs := testRemoteTargetServer(ctx, t, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 100*time.Millisecond) for _, caller := range callers { _, err := caller.Execute(context.Background(), @@ -96,12 +99,13 @@ func Test_Server_CapabilityError(t *testing.T) { assert.Equal(t, remotetypes.Error_INTERNAL_ERROR, msg.Error) } } + closeServices(t, srvcs) } func testRemoteTargetServer(ctx context.Context, t *testing.T, underlying commoncap.TargetCapability, numWorkflowPeers int, workflowDonF uint8, - numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration) []*serverTestClient { + numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration) ([]*serverTestClient, []services.Service) { lggr := logger.TestLogger(t) capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) @@ -141,13 +145,16 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T, } capabilityNodes := make([]remotetypes.Receiver, numCapabilityPeers) + srvcs := make([]services.Service, numCapabilityPeers) for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := target.NewReceiver(ctx, lggr, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout) + capabilityNode := target.NewServer(capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, + capabilityNodeResponseTimeout, lggr) + require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode + srvcs[i] = capabilityNode } workflowNodes := make([]*serverTestClient, numWorkflowPeers) @@ -158,7 +165,13 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T, workflowNodes[i] = workflowNode } - return workflowNodes + return workflowNodes, srvcs +} + +func closeServices(t *testing.T, srvcs []services.Service) { + for _, srv := range srvcs { + require.NoError(t, srv.Close()) + } } type serverTestClient struct { diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index e80ea9a13ff..17782edf601 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" "github.com/smartcontractkit/libocr/ragep2p" ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" @@ -68,7 +69,7 @@ func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry core.Capabilit func (s *registrySyncer) Start(ctx context.Context) error { s.wg.Add(1) - go s.launch(ctx) + go s.launch(context.Background()) return nil } @@ -88,6 +89,18 @@ func (s *registrySyncer) launch(ctx context.Context) { return } + targetCapId := "write_ethereum-testnet-sepolia@0.0.1" + targetInfo, err := capabilities.NewRemoteCapabilityInfo( + targetCapId, + capabilities.CapabilityTypeTarget, + "Remote Target", + &s.networkSetup.TargetCapabilityDonInfo, + ) + if err != nil { + s.lggr.Errorw("failed to create capability info for write_ethereum-testnet-sepolia", "error", err) + return + } + myId := s.peerWrapper.GetPeer().ID() config := remotetypes.RemoteTriggerConfig{ RegistrationRefreshMs: 20000, @@ -106,15 +119,29 @@ func (s *registrySyncer) launch(ctx context.Context) { triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, s.networkSetup.TriggerCapabilityDonInfo, s.networkSetup.WorkflowsDonInfo, s.dispatcher, aggregator, s.lggr) err = s.registry.Add(ctx, triggerCap) if err != nil { - s.lggr.Errorw("failed to add remote target capability to registry", "error", err) + s.lggr.Errorw("failed to add remote trigger capability to registry", "error", err) return } err = s.dispatcher.SetReceiver(capId, s.networkSetup.TriggerCapabilityDonInfo.ID, triggerCap) if err != nil { - s.lggr.Errorw("workflow DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) + s.lggr.Errorw("workflow DON failed to set receiver for trigger", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) return } s.subServices = append(s.subServices, triggerCap) + + s.lggr.Info("member of a workflow DON - starting remote targets") + targetCap := target.NewClient(targetInfo, s.networkSetup.WorkflowsDonInfo, s.dispatcher, 60*time.Second, s.lggr) + err = s.registry.Add(ctx, targetCap) + if err != nil { + s.lggr.Errorw("failed to add remote target capability to registry", "error", err) + return + } + err = s.dispatcher.SetReceiver(targetCapId, s.networkSetup.TargetCapabilityDonInfo.ID, targetCap) + if err != nil { + s.lggr.Errorw("workflow DON failed to set receiver for target", "capabilityId", capId, "donId", s.networkSetup.TargetCapabilityDonInfo.ID, "error", err) + return + } + s.subServices = append(s.subServices, targetCap) } if s.networkSetup.IsTriggerDon(myId) { s.lggr.Info("member of a capability DON - starting remote publishers") @@ -162,6 +189,24 @@ func (s *registrySyncer) launch(ctx context.Context) { break } } + if s.networkSetup.IsTargetDon(myId) { + s.lggr.Info("member of a target DON - starting remote shims") + underlying, err2 := s.registry.GetTarget(ctx, targetCapId) + if err2 != nil { + s.lggr.Errorw("target not found yet", "capabilityId", targetCapId, "error", err2) + return + } + workflowDONs := map[string]capabilities.DON{ + s.networkSetup.WorkflowsDonInfo.ID: s.networkSetup.WorkflowsDonInfo, + } + targetCap := target.NewServer(myId, underlying, targetInfo, *targetInfo.DON, workflowDONs, s.dispatcher, 60*time.Second, s.lggr) + err = s.dispatcher.SetReceiver(targetCapId, s.networkSetup.TargetCapabilityDonInfo.ID, targetCap) + if err != nil { + s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TargetCapabilityDonInfo.ID, "error", err) + return + } + s.subServices = append(s.subServices, targetCap) + } // NOTE: temporary service start - should be managed by capability creation for _, srv := range s.subServices { err = srv.Start(ctx) @@ -200,11 +245,13 @@ func (s *registrySyncer) Name() string { type HardcodedDonNetworkSetup struct { workflowDonPeers []string triggerDonPeers []string + targetDonPeers []string triggerDonSigners []string allPeers map[ragetypes.PeerID]p2ptypes.StreamConfig WorkflowsDonInfo capabilities.DON TriggerCapabilityDonInfo capabilities.DON + TargetCapabilityDonInfo capabilities.DON } func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) { @@ -234,6 +281,12 @@ func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) { "0x5d1e87d87bF2e0cD4Ea64F381a2dbF45e5f0a553", "0x91d9b0062265514f012Eb8fABA59372fD9520f56", } + result.targetDonPeers = []string{ // "cap-one" + "12D3KooWJrthXtnPHw7xyHFAxo6NxifYTvc8igKYaA6wRRRqtsMb", + "12D3KooWFQekP9sGex4XhqEJav5EScjTpDVtDqJFg1JvrePBCEGJ", + "12D3KooWFLEq4hYtdyKWwe47dXGEbSiHMZhmr5xLSJNhpfiEz8NF", + "12D3KooWN2hztiXNNS1jMQTTvvPRYcarK1C7T3Mdqk4x4gwyo5WS", + } result.allPeers = make(map[ragetypes.PeerID]p2ptypes.StreamConfig) addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { @@ -257,6 +310,11 @@ func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) { return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to trigger DON info: %w", err) } + result.TargetCapabilityDonInfo = capabilities.DON{ID: "targetDon1", F: 1} + if err := addPeersToDONInfo(result.targetDonPeers, &result.TargetCapabilityDonInfo); err != nil { + return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to target DON info: %w", err) + } + return result, nil } @@ -268,6 +326,10 @@ func (h HardcodedDonNetworkSetup) IsTriggerDon(id p2ptypes.PeerID) bool { return slices.Contains(h.triggerDonPeers, id.String()) } +func (h HardcodedDonNetworkSetup) IsTargetDon(id p2ptypes.PeerID) bool { + return slices.Contains(h.targetDonPeers, id.String()) +} + type mockMercuryDataProducer struct { trigger *triggers.MercuryTriggerService wg sync.WaitGroup diff --git a/core/scripts/go.mod b/core/scripts/go.mod index e87d9ada680..028c1733889 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -24,7 +24,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 3f4a18d7e21..830d599a6c5 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a h1:sQB8v5zuWMVEhWjTMeMRIcgazfhFz43glfaLgVsToMM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce h1:/CjY8L4lVJh9E8NKg3bdAgsxj+zKg9XYtXR71ZWWMXo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240606130021-a4b7359e1580 h1:nsnLzpBTDAQWkfsOz/qd8BTlb1hUpaow1KmA1tPwTf4= diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 9fb2a6beacd..47638f71434 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "fmt" "sync" "time" @@ -475,7 +476,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow } switch step.Status { - case store.StatusCompleted, store.StatusErrored: + case store.StatusCompleted, store.StatusErrored, store.StatusCompletedEarlyExit: default: workflowCompleted = false } @@ -493,6 +494,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow // We haven't completed the workflow, but should we continue? // If we've been executing for too long, let's time the workflow out and stop here. if state.CreatedAt != nil && e.clock.Since(*state.CreatedAt) > e.maxExecutionDuration { + e.logger.Infow("execution timed out", "executionID", state.ExecutionID) return e.finishExecution(ctx, state.ExecutionID, store.StatusTimeout) } @@ -501,7 +503,18 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow for _, sd := range stepDependents { e.queueIfReady(state, sd) } + case store.StatusCompletedEarlyExit: + e.logger.Infow("execution terminated early", "executionID", state.ExecutionID) + // NOTE: even though this marks the workflow as completed, any branches of the DAG + // that don't depend on the step that signaled for an early exit will still complete. + // This is to ensure that any side effects are executed consistently, since otherwise + // the async nature of the workflow engine would provide no guarantees. + err := e.finishExecution(ctx, state.ExecutionID, store.StatusCompletedEarlyExit) + if err != nil { + return err + } case store.StatusErrored: + e.logger.Infow("execution errored", "executionID", state.ExecutionID) err := e.finishExecution(ctx, state.ExecutionID, store.StatusErrored) if err != nil { return err @@ -568,16 +581,22 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { } inputs, outputs, err := e.executeStep(ctx, l, msg) - if err != nil { + var stepStatus string + switch { + case errors.Is(err, capabilities.ErrStopExecution): + l.Infow("step executed successfully with a termination") + stepStatus = store.StatusCompletedEarlyExit + case err != nil: l.Errorf("error executing step request: %s", err) - stepState.Outputs.Err = err - stepState.Status = store.StatusErrored - } else { + stepStatus = store.StatusErrored + default: l.Infow("step executed successfully", "outputs", outputs) - stepState.Outputs.Value = outputs - stepState.Status = store.StatusCompleted + stepStatus = store.StatusCompleted } + stepState.Status = stepStatus + stepState.Outputs.Value = outputs + stepState.Outputs.Err = err stepState.Inputs = inputs // Let's try and emit the stepUpdate. diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 7f76db137e9..ddb08c24316 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -336,6 +336,21 @@ func mockFailingConsensus() *mockCapability { ) } +func mockConsensusWithEarlyTermination() *mockCapability { + return newMockCapability( + capabilities.MustNewCapabilityInfo( + "offchain_reporting@1.0.0", + capabilities.CapabilityTypeConsensus, + "an ocr3 consensus capability", + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + return capabilities.CapabilityResponse{ + Err: capabilities.ErrStopExecution, + }, nil + }, + ) +} + func mockConsensus() *mockCapability { return newMockCapability( capabilities.MustNewCapabilityInfo( @@ -403,6 +418,31 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { assert.Equal(t, state.Steps["evm_median"].Status, store.StatusErrored) } +func TestEngine_GracefulEarlyTermination(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensusWithEarlyTermination())) + require.NoError(t, reg.Add(ctx, mockTarget())) + + eng, hooks := newTestEngine(t, reg, simpleWorkflow) + + err := eng.Start(ctx) + require.NoError(t, err) + defer eng.Close() + + eid := getExecutionId(t, eng, hooks) + state, err := eng.executionStates.Get(ctx, eid) + require.NoError(t, err) + + assert.Equal(t, state.Status, store.StatusCompletedEarlyExit) + assert.Nil(t, state.Steps["write_polygon-testnet-mumbai"]) +} + const ( multiStepWorkflow = ` triggers: diff --git a/core/services/workflows/store/models.go b/core/services/workflows/store/models.go index 27604543ede..8b4632b1f83 100644 --- a/core/services/workflows/store/models.go +++ b/core/services/workflows/store/models.go @@ -7,10 +7,11 @@ import ( ) const ( - StatusStarted = "started" - StatusErrored = "errored" - StatusTimeout = "timeout" - StatusCompleted = "completed" + StatusStarted = "started" + StatusErrored = "errored" + StatusTimeout = "timeout" + StatusCompleted = "completed" + StatusCompletedEarlyExit = "completed_early_exit" ) type StepOutput struct { diff --git a/go.mod b/go.mod index 631e8283ebf..56d50c21cb2 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240606130021-a4b7359e1580 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 diff --git a/go.sum b/go.sum index cda013bc631..08b79b92c04 100644 --- a/go.sum +++ b/go.sum @@ -1171,8 +1171,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a h1:sQB8v5zuWMVEhWjTMeMRIcgazfhFz43glfaLgVsToMM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce h1:/CjY8L4lVJh9E8NKg3bdAgsxj+zKg9XYtXR71ZWWMXo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240606130021-a4b7359e1580 h1:nsnLzpBTDAQWkfsOz/qd8BTlb1hUpaow1KmA1tPwTf4= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 23e7bcf7169..9866bd174dc 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -27,7 +27,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce github.com/smartcontractkit/chainlink-testing-framework v1.30.1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index d3ece17f8ba..eb24d16f7bb 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1512,8 +1512,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a h1:sQB8v5zuWMVEhWjTMeMRIcgazfhFz43glfaLgVsToMM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce h1:/CjY8L4lVJh9E8NKg3bdAgsxj+zKg9XYtXR71ZWWMXo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240606130021-a4b7359e1580 h1:nsnLzpBTDAQWkfsOz/qd8BTlb1hUpaow1KmA1tPwTf4= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index d99df951102..9405cb73a41 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce github.com/smartcontractkit/chainlink-testing-framework v1.30.1 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 2ea9c2e1406..789f790b820 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1502,8 +1502,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a h1:sQB8v5zuWMVEhWjTMeMRIcgazfhFz43glfaLgVsToMM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607130706-b99591fa7e4a/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce h1:/CjY8L4lVJh9E8NKg3bdAgsxj+zKg9XYtXR71ZWWMXo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240607135320-c9bc0a2ac0ce/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240606130021-a4b7359e1580 h1:nsnLzpBTDAQWkfsOz/qd8BTlb1hUpaow1KmA1tPwTf4=