Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed May 24, 2024
1 parent efe3e38 commit ed31fae
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions core/capabilities/remote/target_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/google/uuid"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand All @@ -24,8 +26,8 @@ type remoteTargetReceiver struct {
dispatcher types.Dispatcher
lggr logger.Logger

requestMsgIDToResponse map[string]remoteTargetCapabilityRequest
requestTimeout time.Duration
requestMsgIDToRequest map[string]*remoteTargetCapabilityRequest
requestTimeout time.Duration

receiveLock sync.Mutex
}
Expand All @@ -43,8 +45,8 @@ func NewRemoteTargetReceiver(ctx context.Context, lggr logger.Logger, peerID p2p
workflowDONs: workflowDONs,
dispatcher: dispatcher,

requestMsgIDToResponse: map[string]remoteTargetCapabilityRequest{},
requestTimeout: requestTimeout,
requestMsgIDToRequest: map[string]*remoteTargetCapabilityRequest{},
requestTimeout: requestTimeout,

lggr: lggr,
}
Expand All @@ -69,7 +71,7 @@ func (r *remoteTargetReceiver) ExpireRequests(ctx context.Context) {
r.receiveLock.Lock()
defer r.receiveLock.Unlock()

for messageId, executeReq := range r.requestMsgIDToResponse {
for messageId, executeReq := range r.requestMsgIDToRequest {
if time.Since(executeReq.createdTime) > r.requestTimeout {

if !executeReq.hasResponse() {
Expand All @@ -79,7 +81,7 @@ func (r *remoteTargetReceiver) ExpireRequests(ctx context.Context) {
}
}

delete(r.requestMsgIDToResponse, messageId)
delete(r.requestMsgIDToRequest, messageId)
}

}
Expand Down Expand Up @@ -111,12 +113,12 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) {
requester := ToPeerID(msg.Sender)
messageId := GetMessageID(msg)

if _, ok := r.requestMsgIDToResponse[messageId]; !ok {
r.requestMsgIDToResponse[messageId] = newTargetCapabilityRequest(r.capInfo.ID, r.localDonInfo.ID, r.peerID,
if _, ok := r.requestMsgIDToRequest[messageId]; !ok {

Check failure on line 116 in core/capabilities/remote/target_receiver.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "ok" shadows declaration at line 107 (govet)
r.requestMsgIDToRequest[messageId] = newTargetCapabilityRequest(r.capInfo.ID, r.localDonInfo.ID, r.peerID,
msg.CallerDonId, messageId, r.dispatcher)
}

request, ok := r.requestMsgIDToResponse[messageId]
request, ok := r.requestMsgIDToRequest[messageId]

Check failure on line 121 in core/capabilities/remote/target_receiver.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to ok (ineffassign)

if err := request.addRequester(requester, msg.CallerDonId, messageId); err != nil {
r.lggr.Errorw("failed to add request to response", "capabilityId", r.capInfo.ID, "sender",
Expand All @@ -143,7 +145,6 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) {
request.setResult(responsePayload)
}
} else {

r.lggr.Errorw("failed to execute capability", "capabilityId", r.capInfo.ID, "err", err)
request.setError(types.Error_INTERNAL_ERROR)
}
Expand All @@ -165,6 +166,8 @@ func (r *remoteTargetReceiver) Receive(msg *types.MessageBody) {
}

type remoteTargetCapabilityRequest struct {
id string

capabilityPeerId p2ptypes.PeerID
capabilityID string
capabilityDonID string
Expand All @@ -185,8 +188,9 @@ type remoteTargetCapabilityRequest struct {

func newTargetCapabilityRequest(capabilityID string, capabilityDonID string, capabilityPeerId p2ptypes.PeerID,
callingDonID string, requestMessageID string,
dispatcher types.Dispatcher) remoteTargetCapabilityRequest {
return remoteTargetCapabilityRequest{
dispatcher types.Dispatcher) *remoteTargetCapabilityRequest {
return &remoteTargetCapabilityRequest{
id: uuid.New().String(),
capabilityID: capabilityID,
capabilityDonID: capabilityDonID,
capabilityPeerId: capabilityPeerId,
Expand Down Expand Up @@ -234,9 +238,9 @@ func (e *remoteTargetCapabilityRequest) hasResponse() bool {
}

func (e *remoteTargetCapabilityRequest) sendResponseToAllRequesters() error {
for peer := range e.requesters {
if err := e.sendResponse(peer); err != nil {
return fmt.Errorf("failed to send response to peer %s: %w", peer, err)
for requester := range e.requesters {
if err := e.sendResponse(requester); err != nil {
return fmt.Errorf("failed to send response to requester %s: %w", requester, err)
}
}

Expand Down

0 comments on commit ed31fae

Please sign in to comment.