diff --git a/core/services/vrf/v2/listener_v2_log_processor.go b/core/services/vrf/v2/listener_v2_log_processor.go index e6a4220cc12..7f93479902b 100644 --- a/core/services/vrf/v2/listener_v2_log_processor.go +++ b/core/services/vrf/v2/listener_v2_log_processor.go @@ -19,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" "go.uber.org/multierr" + "golang.org/x/sync/semaphore" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -109,6 +110,7 @@ func (lsn *listenerV2) pruneConfirmedRequestCounts() { // we simply retry TODO: follow up where if we see a fulfillment revert, return log to the queue. func (lsn *listenerV2) processPendingVRFRequests(ctx context.Context, pendingRequests []pendingRequest) { confirmed := lsn.getConfirmedLogsBySub(lsn.getLatestHead(), pendingRequests) + var processedMu sync.Mutex processed := make(map[string]struct{}) start := time.Now() @@ -140,64 +142,81 @@ func (lsn *listenerV2) processPendingVRFRequests(ctx context.Context, pendingReq lsn.l.Infow("No pending requests ready for processing") return } + var ( + numWorkers int64 = 4 + sem = semaphore.NewWeighted(numWorkers) + ) for subID, reqs := range confirmed { - l := lsn.l.With("subID", subID, "startTime", time.Now(), "numReqsForSub", len(reqs)) - // Get the balance of the subscription and also it's active status. - // The reason we need both is that we cannot determine if a subscription - // is active solely by it's balance, since an active subscription could legitimately - // have a zero balance. - var ( - startLinkBalance *big.Int - startEthBalance *big.Int - subIsActive bool - ) - sID, ok := new(big.Int).SetString(subID, 10) - if !ok { - l.Criticalw("Unable to convert %s to Int", subID) - continue + if err := sem.Acquire(ctx, 1); err != nil { + lsn.l.Errorw("Failed to acquire semaphore", "err", err) + return } - sub, err := lsn.coordinator.GetSubscription(&bind.CallOpts{ - Context: ctx}, sID) - if err != nil { - if strings.Contains(err.Error(), "execution reverted") { - // "execution reverted" indicates that the subscription no longer exists. - // We can no longer just mark these as processed and continue, - // since it could be that the subscription was canceled while there - // were still unfulfilled requests. - // The simplest approach to handle this is to enter the processRequestsPerSub - // loop rather than create a bunch of largely duplicated code - // to handle this specific situation, since we need to run the pipeline to get - // the VRF proof, abi-encode it, etc. - l.Warnw("Subscription not found - setting start balance to zero", "subID", subID, "err", err) - startLinkBalance = big.NewInt(0) - } else { - // Most likely this is an RPC error, so we re-try later. - l.Errorw("Unable to read subscription balance", "err", err) - continue + go func(subID string, reqs []pendingRequest) { + defer sem.Release(1) + l := lsn.l.With("subID", subID, "startTime", time.Now(), "numReqsForSub", len(reqs)) + // Get the balance of the subscription and also it's active status. + // The reason we need both is that we cannot determine if a subscription + // is active solely by it's balance, since an active subscription could legitimately + // have a zero balance. + var ( + startLinkBalance *big.Int + startEthBalance *big.Int + subIsActive bool + ) + sID, ok := new(big.Int).SetString(subID, 10) + if !ok { + l.Criticalw("Unable to convert %s to Int", subID) + return } - } else { - // Happy path - sub is active. - startLinkBalance = sub.Balance() - if sub.Version() == vrfcommon.V2Plus { - startEthBalance = sub.NativeBalance() + sub, err := lsn.coordinator.GetSubscription(&bind.CallOpts{ + Context: ctx}, sID) + + if err != nil { + if strings.Contains(err.Error(), "execution reverted") { + // "execution reverted" indicates that the subscription no longer exists. + // We can no longer just mark these as processed and continue, + // since it could be that the subscription was canceled while there + // were still unfulfilled requests. + // The simplest approach to handle this is to enter the processRequestsPerSub + // loop rather than create a bunch of largely duplicated code + // to handle this specific situation, since we need to run the pipeline to get + // the VRF proof, abi-encode it, etc. + l.Warnw("Subscription not found - setting start balance to zero", "subID", subID, "err", err) + startLinkBalance = big.NewInt(0) + } else { + // Most likely this is an RPC error, so we re-try later. + l.Errorw("Unable to read subscription balance", "err", err) + return + } + } else { + // Happy path - sub is active. + startLinkBalance = sub.Balance() + if sub.Version() == vrfcommon.V2Plus { + startEthBalance = sub.NativeBalance() + } + subIsActive = true } - subIsActive = true - } - // Sort requests in ascending order by CallbackGasLimit - // so that we process the "cheapest" requests for each subscription - // first. This allows us to break out of the processing loop as early as possible - // in the event that a subscription is too underfunded to have it's - // requests processed. - slices.SortFunc(reqs, func(a, b pendingRequest) int { - return cmp.Compare(a.req.CallbackGasLimit(), b.req.CallbackGasLimit()) - }) + // Sort requests in ascending order by CallbackGasLimit + // so that we process the "cheapest" requests for each subscription + // first. This allows us to break out of the processing loop as early as possible + // in the event that a subscription is too underfunded to have it's + // requests processed. + slices.SortFunc(reqs, func(a, b pendingRequest) int { + return cmp.Compare(a.req.CallbackGasLimit(), b.req.CallbackGasLimit()) + }) - p := lsn.processRequestsPerSub(ctx, sID, startLinkBalance, startEthBalance, reqs, subIsActive) - for reqID := range p { - processed[reqID] = struct{}{} - } + p := lsn.processRequestsPerSub(ctx, sID, startLinkBalance, startEthBalance, reqs, subIsActive) + processedMu.Lock() + for reqID := range p { + processed[reqID] = struct{}{} + } + processedMu.Unlock() + }(subID, reqs) + } + if err := sem.Acquire(ctx, numWorkers); err != nil { + lsn.l.Errorw("Failed to acquire semaphore at the end", "err", err) } lsn.pruneConfirmedRequestCounts() }