Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FUN-1332] Allowlist optimisation #12588

Merged
merged 10 commits into from
May 23, 2024
129 changes: 95 additions & 34 deletions core/services/gateway/handlers/functions/allowlist/allowlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,33 +210,23 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b
return errors.Wrap(err, "unexpected error during functions_allow_list.NewTermsOfServiceAllowList")
}

var allowedSenderList []common.Address
typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return errors.Wrap(err, "failed to fetch the tos contract type and version")
}

currentVersion, err := ExtractContractVersion(typeAndVersion)
currentVersion, err := fetchTosCurrentVersion(ctx, tosContract, blockNum)
if err != nil {
return fmt.Errorf("failed to extract version: %w", err)
return fmt.Errorf("failed to fetch tos current version: %w", err)
}

if semver.Compare(tosContractMinBatchProcessingVersion, currentVersion) <= 0 {
err = a.syncBlockedSenders(ctx, tosContract, blockNum)
err = a.updateAllowedSendersInBatches(ctx, tosContract, blockNum)
if err != nil {
return errors.Wrap(err, "failed to sync the stored allowed and blocked senders")
return errors.Wrap(err, "failed to get allowed senders in rage")
}

allowedSenderList, err = a.getAllowedSendersBatched(ctx, tosContract, blockNum)
err := a.syncBlockedSenders(ctx, tosContract, blockNum)
if err != nil {
return errors.Wrap(err, "failed to get allowed senders in rage")
return errors.Wrap(err, "failed to sync the stored allowed and blocked senders")
}
} else {
allowedSenderList, err = tosContract.GetAllAllowedSenders(&bind.CallOpts{
allowedSenderList, err := tosContract.GetAllAllowedSenders(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
Expand All @@ -254,50 +244,108 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
}

a.update(allowedSenderList)
}

a.update(allowedSenderList)
return nil
}

func (a *onchainAllowlist) getAllowedSendersBatched(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) ([]common.Address, error) {
allowedSenderList := make([]common.Address, 0)
count, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
// updateAllowedSendersInBatches will update the node's inmemory state and the orm layer representing the allowlist.
// it will get the current node's in memory allowlist and start fetching and adding from the tos contract in batches.
// the iteration order will give priority to new allowed senders, if new addresses are added while iterating over the batches
// an extra step will be executed to keep this up to date.
func (a *onchainAllowlist) updateAllowedSendersInBatches(ctx context.Context, tosContract functions_allow_list.TermsOfServiceAllowListInterface, blockNum *big.Int) error {
// currentAllowedSenderList will be the starting point from which we will be adding the new allowed senders
currentAllowedSenderList := make(map[common.Address]struct{}, 0)
if cal := a.allowlist.Load(); cal != nil {
for k := range *cal {
currentAllowedSenderList[k] = struct{}{}
}
}

currentAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return nil, errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount")
return errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount")
}

throttleTicker := time.NewTicker(time.Duration(a.config.FetchingDelayInRangeSec) * time.Second)
for idxStart := uint64(0); idxStart < count; idxStart += uint64(a.config.OnchainAllowlistBatchSize) {
<-throttleTicker.C

idxEnd := idxStart + uint64(a.config.OnchainAllowlistBatchSize)
if idxEnd >= count {
idxEnd = count - 1
for i := int64(currentAllowedSenderCount); i > 0; i -= int64(a.config.OnchainAllowlistBatchSize) {
<-throttleTicker.C
var idxStart uint64
if uint64(i) > uint64(a.config.OnchainAllowlistBatchSize) {
idxStart = uint64(i) - uint64(a.config.OnchainAllowlistBatchSize)
}

allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{
idxEnd := uint64(i) - 1

// before continuing we evaluate if the size of the list changed, if that happens we trigger an extra step
// getting the latest added addresses from the list
updatedAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
}, idxStart, idxEnd)
})
if err != nil {
return nil, errors.Wrap(err, "error calling GetAllowedSendersInRange")
return errors.Wrap(err, "unexpected error while fetching the updated functions_allow_list.GetAllowedSendersCount")
}

if updatedAllowedSenderCount > currentAllowedSenderCount {
lastBatchIdxStart := currentAllowedSenderCount
lastBatchIdxEnd := updatedAllowedSenderCount - 1
currentAllowedSenderCount = updatedAllowedSenderCount

err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, lastBatchIdxStart, lastBatchIdxEnd, currentAllowedSenderList)
if err != nil {
return err
}
}

allowedSenderList = append(allowedSenderList, allowedSendersBatch...)
err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch)
err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, idxStart, idxEnd, currentAllowedSenderList)
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
return err
}
}
throttleTicker.Stop()

return allowedSenderList, nil
return nil
}

func (a *onchainAllowlist) updateAllowedSendersBatch(
ctx context.Context,
tosContract functions_allow_list.TermsOfServiceAllowListInterface,
blockNum *big.Int,
idxStart uint64,
idxEnd uint64,
currentAllowedSenderList map[common.Address]struct{},
) error {
allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
}, idxStart, idxEnd)
if err != nil {
return errors.Wrap(err, "error calling GetAllowedSendersInRange")
}

// add the fetched batch to the currentAllowedSenderList and replace the existing allowlist
for _, addr := range allowedSendersBatch {
currentAllowedSenderList[addr] = struct{}{}
}
a.allowlist.Store(&currentAllowedSenderList)
a.lggr.Infow("allowlist updated in batches successfully", "len", len(currentAllowedSenderList))

// persist each batch to the underalying orm layer
err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch)
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
}
return nil
}

// syncBlockedSenders fetches the list of blocked addresses from the contract in batches
Expand Down Expand Up @@ -370,6 +418,19 @@ func (a *onchainAllowlist) loadStoredAllowedSenderList(ctx context.Context) {
a.update(allowedList)
}

func fetchTosCurrentVersion(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) (string, error) {
typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return "", errors.Wrap(err, "failed to fetch the tos contract type and version")
}

return ExtractContractVersion(typeAndVersion)
}

func ExtractContractVersion(str string) (string, error) {
pattern := `v(\d+).(\d+).(\d+)`
re := regexp.MustCompile(pattern)
Expand Down
Loading
Loading