Skip to content

Commit

Permalink
[FUN-1332] Allowlist optimisation (#12588)
Browse files Browse the repository at this point in the history
* feat: update allowlist in batches giving priority to latest allowed addresses

* fix: adjust iteration and add tests on updateAllowedSendersInBatches

* fix: make a deep copy of the map to avoid race conditions

* feat: extra step to fetch latest added addresses while batching

* fix: check allowlist size is bigger than the batchsize

* chore: remove leftover and add modify tests to be closer to a real scenario

* chore: simplify lastBatchIdxStart

* chore: remove newlines to pass sonarqube check
  • Loading branch information
agparadiso authored and cedric-cordenier committed May 29, 2024
1 parent 5c50371 commit 91fb9db
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 34 deletions.
129 changes: 95 additions & 34 deletions core/services/gateway/handlers/functions/allowlist/allowlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,33 +210,23 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b
return errors.Wrap(err, "unexpected error during functions_allow_list.NewTermsOfServiceAllowList")
}

var allowedSenderList []common.Address
typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return errors.Wrap(err, "failed to fetch the tos contract type and version")
}

currentVersion, err := ExtractContractVersion(typeAndVersion)
currentVersion, err := fetchTosCurrentVersion(ctx, tosContract, blockNum)
if err != nil {
return fmt.Errorf("failed to extract version: %w", err)
return fmt.Errorf("failed to fetch tos current version: %w", err)
}

if semver.Compare(tosContractMinBatchProcessingVersion, currentVersion) <= 0 {
err = a.syncBlockedSenders(ctx, tosContract, blockNum)
err = a.updateAllowedSendersInBatches(ctx, tosContract, blockNum)
if err != nil {
return errors.Wrap(err, "failed to sync the stored allowed and blocked senders")
return errors.Wrap(err, "failed to get allowed senders in rage")
}

allowedSenderList, err = a.getAllowedSendersBatched(ctx, tosContract, blockNum)
err := a.syncBlockedSenders(ctx, tosContract, blockNum)
if err != nil {
return errors.Wrap(err, "failed to get allowed senders in rage")
return errors.Wrap(err, "failed to sync the stored allowed and blocked senders")
}
} else {
allowedSenderList, err = tosContract.GetAllAllowedSenders(&bind.CallOpts{
allowedSenderList, err := tosContract.GetAllAllowedSenders(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
Expand All @@ -254,50 +244,108 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
}

a.update(allowedSenderList)
}

a.update(allowedSenderList)
return nil
}

func (a *onchainAllowlist) getAllowedSendersBatched(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) ([]common.Address, error) {
allowedSenderList := make([]common.Address, 0)
count, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
// updateAllowedSendersInBatches will update the node's inmemory state and the orm layer representing the allowlist.
// it will get the current node's in memory allowlist and start fetching and adding from the tos contract in batches.
// the iteration order will give priority to new allowed senders, if new addresses are added while iterating over the batches
// an extra step will be executed to keep this up to date.
func (a *onchainAllowlist) updateAllowedSendersInBatches(ctx context.Context, tosContract functions_allow_list.TermsOfServiceAllowListInterface, blockNum *big.Int) error {
// currentAllowedSenderList will be the starting point from which we will be adding the new allowed senders
currentAllowedSenderList := make(map[common.Address]struct{}, 0)
if cal := a.allowlist.Load(); cal != nil {
for k := range *cal {
currentAllowedSenderList[k] = struct{}{}
}
}

currentAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return nil, errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount")
return errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount")
}

throttleTicker := time.NewTicker(time.Duration(a.config.FetchingDelayInRangeSec) * time.Second)
for idxStart := uint64(0); idxStart < count; idxStart += uint64(a.config.OnchainAllowlistBatchSize) {
<-throttleTicker.C

idxEnd := idxStart + uint64(a.config.OnchainAllowlistBatchSize)
if idxEnd >= count {
idxEnd = count - 1
for i := int64(currentAllowedSenderCount); i > 0; i -= int64(a.config.OnchainAllowlistBatchSize) {
<-throttleTicker.C
var idxStart uint64
if uint64(i) > uint64(a.config.OnchainAllowlistBatchSize) {
idxStart = uint64(i) - uint64(a.config.OnchainAllowlistBatchSize)
}

allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{
idxEnd := uint64(i) - 1

// before continuing we evaluate if the size of the list changed, if that happens we trigger an extra step
// getting the latest added addresses from the list
updatedAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
}, idxStart, idxEnd)
})
if err != nil {
return nil, errors.Wrap(err, "error calling GetAllowedSendersInRange")
return errors.Wrap(err, "unexpected error while fetching the updated functions_allow_list.GetAllowedSendersCount")
}

if updatedAllowedSenderCount > currentAllowedSenderCount {
lastBatchIdxStart := currentAllowedSenderCount
lastBatchIdxEnd := updatedAllowedSenderCount - 1
currentAllowedSenderCount = updatedAllowedSenderCount

err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, lastBatchIdxStart, lastBatchIdxEnd, currentAllowedSenderList)
if err != nil {
return err
}
}

allowedSenderList = append(allowedSenderList, allowedSendersBatch...)
err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch)
err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, idxStart, idxEnd, currentAllowedSenderList)
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
return err
}
}
throttleTicker.Stop()

return allowedSenderList, nil
return nil
}

func (a *onchainAllowlist) updateAllowedSendersBatch(
ctx context.Context,
tosContract functions_allow_list.TermsOfServiceAllowListInterface,
blockNum *big.Int,
idxStart uint64,
idxEnd uint64,
currentAllowedSenderList map[common.Address]struct{},
) error {
allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
}, idxStart, idxEnd)
if err != nil {
return errors.Wrap(err, "error calling GetAllowedSendersInRange")
}

// add the fetched batch to the currentAllowedSenderList and replace the existing allowlist
for _, addr := range allowedSendersBatch {
currentAllowedSenderList[addr] = struct{}{}
}
a.allowlist.Store(&currentAllowedSenderList)
a.lggr.Infow("allowlist updated in batches successfully", "len", len(currentAllowedSenderList))

// persist each batch to the underalying orm layer
err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch)
if err != nil {
a.lggr.Errorf("failed to update stored allowedSenderList: %w", err)
}
return nil
}

// syncBlockedSenders fetches the list of blocked addresses from the contract in batches
Expand Down Expand Up @@ -370,6 +418,19 @@ func (a *onchainAllowlist) loadStoredAllowedSenderList(ctx context.Context) {
a.update(allowedList)
}

func fetchTosCurrentVersion(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) (string, error) {
typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{
Pending: false,
BlockNumber: blockNum,
Context: ctx,
})
if err != nil {
return "", errors.Wrap(err, "failed to fetch the tos contract type and version")
}

return ExtractContractVersion(typeAndVersion)
}

func ExtractContractVersion(str string) (string, error) {
pattern := `v(\d+).(\d+).(\d+)`
re := regexp.MustCompile(pattern)
Expand Down
Loading

0 comments on commit 91fb9db

Please sign in to comment.