diff --git a/core/capabilities/remote/target_receiver.go b/core/capabilities/remote/target_receiver.go index 5cff3c00e82..f8e00b89d79 100644 --- a/core/capabilities/remote/target_receiver.go +++ b/core/capabilities/remote/target_receiver.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/google/uuid" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -24,8 +26,8 @@ type remoteTargetReceiver struct { dispatcher types.Dispatcher lggr logger.Logger - requestMsgIDToResponse map[string]remoteTargetCapabilityRequest - requestTimeout time.Duration + requestMsgIDToRequest map[string]*remoteTargetCapabilityRequest + requestTimeout time.Duration receiveLock sync.Mutex } @@ -43,8 +45,8 @@ func NewRemoteTargetReceiver(ctx context.Context, lggr logger.Logger, peerID p2p workflowDONs: workflowDONs, dispatcher: dispatcher, - requestMsgIDToResponse: map[string]remoteTargetCapabilityRequest{}, - requestTimeout: requestTimeout, + requestMsgIDToRequest: map[string]*remoteTargetCapabilityRequest{}, + requestTimeout: requestTimeout, lggr: lggr, } @@ -69,7 +71,7 @@ func (r *remoteTargetReceiver) ExpireRequests(ctx context.Context) { r.receiveLock.Lock() defer r.receiveLock.Unlock() - for messageId, executeReq := range r.requestMsgIDToResponse { + for messageId, executeReq := range r.requestMsgIDToRequest { if time.Since(executeReq.createdTime) > r.requestTimeout { if !executeReq.hasResponse() { @@ -79,7 +81,7 @@ func (r *remoteTargetReceiver) ExpireRequests(ctx context.Context) { } } - delete(r.requestMsgIDToResponse, messageId) + delete(r.requestMsgIDToRequest, messageId) } } @@ -111,12 +113,12 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) { requester := ToPeerID(msg.Sender) messageId := GetMessageID(msg) - if _, ok := r.requestMsgIDToResponse[messageId]; !ok { - r.requestMsgIDToResponse[messageId] = newTargetCapabilityRequest(r.capInfo.ID, r.localDonInfo.ID, r.peerID, + if _, ok := r.requestMsgIDToRequest[messageId]; !ok { + r.requestMsgIDToRequest[messageId] = newTargetCapabilityRequest(r.capInfo.ID, r.localDonInfo.ID, r.peerID, msg.CallerDonId, messageId, r.dispatcher) } - request, ok := r.requestMsgIDToResponse[messageId] + request, ok := r.requestMsgIDToRequest[messageId] if err := request.addRequester(requester, msg.CallerDonId, messageId); err != nil { r.lggr.Errorw("failed to add request to response", "capabilityId", r.capInfo.ID, "sender", @@ -143,7 +145,6 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) { request.setResult(responsePayload) } } else { - r.lggr.Errorw("failed to execute capability", "capabilityId", r.capInfo.ID, "err", err) request.setError(types.Error_INTERNAL_ERROR) } @@ -165,6 +166,8 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) { } type remoteTargetCapabilityRequest struct { + id string + capabilityPeerId p2ptypes.PeerID capabilityID string capabilityDonID string @@ -185,8 +188,9 @@ type remoteTargetCapabilityRequest struct { func newTargetCapabilityRequest(capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID, callingDonID string, requestMessageID string, - dispatcher types.Dispatcher) remoteTargetCapabilityRequest { - return remoteTargetCapabilityRequest{ + dispatcher types.Dispatcher) *remoteTargetCapabilityRequest { + return &remoteTargetCapabilityRequest{ + id: uuid.New().String(), capabilityID: capabilityID, capabilityDonID: capabilityDonID, capabilityPeerId: capabilityPeerId, @@ -234,9 +238,9 @@ func (e *remoteTargetCapabilityRequest) hasResponse() bool { } func (e *remoteTargetCapabilityRequest) sendResponseToAllRequesters() error { - for peer := range e.requesters { - if err := e.sendResponse(peer); err != nil { - return fmt.Errorf("failed to send response to peer %s: %w", peer, err) + for requester := range e.requesters { + if err := e.sendResponse(requester); err != nil { + return fmt.Errorf("failed to send response to requester %s: %w", requester, err) } }