Skip to content

Commit

Permalink
idea: process multiple subs concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
makramkd committed Nov 13, 2023
1 parent e647e04 commit 498d5d3
Showing 1 changed file with 70 additions and 51 deletions.
121 changes: 70 additions & 51 deletions core/services/vrf/v2/listener_v2_log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/sync/semaphore"

txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -109,6 +110,7 @@ func (lsn *listenerV2) pruneConfirmedRequestCounts() {
// we simply retry TODO: follow up where if we see a fulfillment revert, return log to the queue.
func (lsn *listenerV2) processPendingVRFRequests(ctx context.Context, pendingRequests []pendingRequest) {
confirmed := lsn.getConfirmedLogsBySub(lsn.getLatestHead(), pendingRequests)
var processedMu sync.Mutex
processed := make(map[string]struct{})
start := time.Now()

Expand Down Expand Up @@ -140,64 +142,81 @@ func (lsn *listenerV2) processPendingVRFRequests(ctx context.Context, pendingReq
lsn.l.Infow("No pending requests ready for processing")
return
}
var (
numWorkers int64 = 4
sem = semaphore.NewWeighted(numWorkers)
)
for subID, reqs := range confirmed {
l := lsn.l.With("subID", subID, "startTime", time.Now(), "numReqsForSub", len(reqs))
// Get the balance of the subscription and also it's active status.
// The reason we need both is that we cannot determine if a subscription
// is active solely by it's balance, since an active subscription could legitimately
// have a zero balance.
var (
startLinkBalance *big.Int
startEthBalance *big.Int
subIsActive bool
)
sID, ok := new(big.Int).SetString(subID, 10)
if !ok {
l.Criticalw("Unable to convert %s to Int", subID)
continue
if err := sem.Acquire(ctx, 1); err != nil {
lsn.l.Errorw("Failed to acquire semaphore", "err", err)
return
}
sub, err := lsn.coordinator.GetSubscription(&bind.CallOpts{
Context: ctx}, sID)

if err != nil {
if strings.Contains(err.Error(), "execution reverted") {
// "execution reverted" indicates that the subscription no longer exists.
// We can no longer just mark these as processed and continue,
// since it could be that the subscription was canceled while there
// were still unfulfilled requests.
// The simplest approach to handle this is to enter the processRequestsPerSub
// loop rather than create a bunch of largely duplicated code
// to handle this specific situation, since we need to run the pipeline to get
// the VRF proof, abi-encode it, etc.
l.Warnw("Subscription not found - setting start balance to zero", "subID", subID, "err", err)
startLinkBalance = big.NewInt(0)
} else {
// Most likely this is an RPC error, so we re-try later.
l.Errorw("Unable to read subscription balance", "err", err)
continue
go func(subID string, reqs []pendingRequest) {
defer sem.Release(1)
l := lsn.l.With("subID", subID, "startTime", time.Now(), "numReqsForSub", len(reqs))
// Get the balance of the subscription and also it's active status.
// The reason we need both is that we cannot determine if a subscription
// is active solely by it's balance, since an active subscription could legitimately
// have a zero balance.
var (
startLinkBalance *big.Int
startEthBalance *big.Int
subIsActive bool
)
sID, ok := new(big.Int).SetString(subID, 10)
if !ok {
l.Criticalw("Unable to convert %s to Int", subID)
return
}
} else {
// Happy path - sub is active.
startLinkBalance = sub.Balance()
if sub.Version() == vrfcommon.V2Plus {
startEthBalance = sub.NativeBalance()
sub, err := lsn.coordinator.GetSubscription(&bind.CallOpts{
Context: ctx}, sID)

if err != nil {
if strings.Contains(err.Error(), "execution reverted") {
// "execution reverted" indicates that the subscription no longer exists.
// We can no longer just mark these as processed and continue,
// since it could be that the subscription was canceled while there
// were still unfulfilled requests.
// The simplest approach to handle this is to enter the processRequestsPerSub
// loop rather than create a bunch of largely duplicated code
// to handle this specific situation, since we need to run the pipeline to get
// the VRF proof, abi-encode it, etc.
l.Warnw("Subscription not found - setting start balance to zero", "subID", subID, "err", err)
startLinkBalance = big.NewInt(0)
} else {
// Most likely this is an RPC error, so we re-try later.
l.Errorw("Unable to read subscription balance", "err", err)
return
}
} else {
// Happy path - sub is active.
startLinkBalance = sub.Balance()
if sub.Version() == vrfcommon.V2Plus {
startEthBalance = sub.NativeBalance()
}
subIsActive = true
}
subIsActive = true
}

// Sort requests in ascending order by CallbackGasLimit
// so that we process the "cheapest" requests for each subscription
// first. This allows us to break out of the processing loop as early as possible
// in the event that a subscription is too underfunded to have it's
// requests processed.
slices.SortFunc(reqs, func(a, b pendingRequest) int {
return cmp.Compare(a.req.CallbackGasLimit(), b.req.CallbackGasLimit())
})
// Sort requests in ascending order by CallbackGasLimit
// so that we process the "cheapest" requests for each subscription
// first. This allows us to break out of the processing loop as early as possible
// in the event that a subscription is too underfunded to have it's
// requests processed.
slices.SortFunc(reqs, func(a, b pendingRequest) int {
return cmp.Compare(a.req.CallbackGasLimit(), b.req.CallbackGasLimit())
})

p := lsn.processRequestsPerSub(ctx, sID, startLinkBalance, startEthBalance, reqs, subIsActive)
for reqID := range p {
processed[reqID] = struct{}{}
}
p := lsn.processRequestsPerSub(ctx, sID, startLinkBalance, startEthBalance, reqs, subIsActive)
processedMu.Lock()
for reqID := range p {
processed[reqID] = struct{}{}
}
processedMu.Unlock()
}(subID, reqs)
}
if err := sem.Acquire(ctx, numWorkers); err != nil {
lsn.l.Errorw("Failed to acquire semaphore at the end", "err", err)
}
lsn.pruneConfirmedRequestCounts()
}
Expand Down

0 comments on commit 498d5d3

Please sign in to comment.