diff --git a/core/services/gateway/handlers/functions/allowlist/allowlist.go b/core/services/gateway/handlers/functions/allowlist/allowlist.go index f0fe5c8c829..2a27f51471a 100644 --- a/core/services/gateway/handlers/functions/allowlist/allowlist.go +++ b/core/services/gateway/handlers/functions/allowlist/allowlist.go @@ -210,33 +210,23 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b return errors.Wrap(err, "unexpected error during functions_allow_list.NewTermsOfServiceAllowList") } - var allowedSenderList []common.Address - typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{ - Pending: false, - BlockNumber: blockNum, - Context: ctx, - }) - if err != nil { - return errors.Wrap(err, "failed to fetch the tos contract type and version") - } - - currentVersion, err := ExtractContractVersion(typeAndVersion) + currentVersion, err := fetchTosCurrentVersion(ctx, tosContract, blockNum) if err != nil { - return fmt.Errorf("failed to extract version: %w", err) + return fmt.Errorf("failed to fetch tos current version: %w", err) } if semver.Compare(tosContractMinBatchProcessingVersion, currentVersion) <= 0 { - err = a.syncBlockedSenders(ctx, tosContract, blockNum) + err = a.updateAllowedSendersInBatches(ctx, tosContract, blockNum) if err != nil { - return errors.Wrap(err, "failed to sync the stored allowed and blocked senders") + return errors.Wrap(err, "failed to get allowed senders in rage") } - allowedSenderList, err = a.getAllowedSendersBatched(ctx, tosContract, blockNum) + err := a.syncBlockedSenders(ctx, tosContract, blockNum) if err != nil { - return errors.Wrap(err, "failed to get allowed senders in rage") + return errors.Wrap(err, "failed to sync the stored allowed and blocked senders") } } else { - allowedSenderList, err = tosContract.GetAllAllowedSenders(&bind.CallOpts{ + allowedSenderList, err := tosContract.GetAllAllowedSenders(&bind.CallOpts{ Pending: false, BlockNumber: blockNum, Context: ctx, @@ -254,50 +244,108 @@ func (a *onchainAllowlist) updateFromContractV1(ctx context.Context, blockNum *b if err != nil { a.lggr.Errorf("failed to update stored allowedSenderList: %w", err) } + + a.update(allowedSenderList) } - a.update(allowedSenderList) return nil } -func (a *onchainAllowlist) getAllowedSendersBatched(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) ([]common.Address, error) { - allowedSenderList := make([]common.Address, 0) - count, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{ +// updateAllowedSendersInBatches will update the node's inmemory state and the orm layer representing the allowlist. +// it will get the current node's in memory allowlist and start fetching and adding from the tos contract in batches. +// the iteration order will give priority to new allowed senders, if new addresses are added while iterating over the batches +// an extra step will be executed to keep this up to date. +func (a *onchainAllowlist) updateAllowedSendersInBatches(ctx context.Context, tosContract functions_allow_list.TermsOfServiceAllowListInterface, blockNum *big.Int) error { + // currentAllowedSenderList will be the starting point from which we will be adding the new allowed senders + currentAllowedSenderList := make(map[common.Address]struct{}, 0) + if cal := a.allowlist.Load(); cal != nil { + for k := range *cal { + currentAllowedSenderList[k] = struct{}{} + } + } + + currentAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{ Pending: false, BlockNumber: blockNum, Context: ctx, }) if err != nil { - return nil, errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount") + return errors.Wrap(err, "unexpected error during functions_allow_list.GetAllowedSendersCount") } throttleTicker := time.NewTicker(time.Duration(a.config.FetchingDelayInRangeSec) * time.Second) - for idxStart := uint64(0); idxStart < count; idxStart += uint64(a.config.OnchainAllowlistBatchSize) { - <-throttleTicker.C - idxEnd := idxStart + uint64(a.config.OnchainAllowlistBatchSize) - if idxEnd >= count { - idxEnd = count - 1 + for i := int64(currentAllowedSenderCount); i > 0; i -= int64(a.config.OnchainAllowlistBatchSize) { + <-throttleTicker.C + var idxStart uint64 + if uint64(i) > uint64(a.config.OnchainAllowlistBatchSize) { + idxStart = uint64(i) - uint64(a.config.OnchainAllowlistBatchSize) } - allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{ + idxEnd := uint64(i) - 1 + + // before continuing we evaluate if the size of the list changed, if that happens we trigger an extra step + // getting the latest added addresses from the list + updatedAllowedSenderCount, err := tosContract.GetAllowedSendersCount(&bind.CallOpts{ Pending: false, BlockNumber: blockNum, Context: ctx, - }, idxStart, idxEnd) + }) if err != nil { - return nil, errors.Wrap(err, "error calling GetAllowedSendersInRange") + return errors.Wrap(err, "unexpected error while fetching the updated functions_allow_list.GetAllowedSendersCount") + } + + if updatedAllowedSenderCount > currentAllowedSenderCount { + lastBatchIdxStart := currentAllowedSenderCount + lastBatchIdxEnd := updatedAllowedSenderCount - 1 + currentAllowedSenderCount = updatedAllowedSenderCount + + err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, lastBatchIdxStart, lastBatchIdxEnd, currentAllowedSenderList) + if err != nil { + return err + } } - allowedSenderList = append(allowedSenderList, allowedSendersBatch...) - err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch) + err = a.updateAllowedSendersBatch(ctx, tosContract, blockNum, idxStart, idxEnd, currentAllowedSenderList) if err != nil { - a.lggr.Errorf("failed to update stored allowedSenderList: %w", err) + return err } } throttleTicker.Stop() - return allowedSenderList, nil + return nil +} + +func (a *onchainAllowlist) updateAllowedSendersBatch( + ctx context.Context, + tosContract functions_allow_list.TermsOfServiceAllowListInterface, + blockNum *big.Int, + idxStart uint64, + idxEnd uint64, + currentAllowedSenderList map[common.Address]struct{}, +) error { + allowedSendersBatch, err := tosContract.GetAllowedSendersInRange(&bind.CallOpts{ + Pending: false, + BlockNumber: blockNum, + Context: ctx, + }, idxStart, idxEnd) + if err != nil { + return errors.Wrap(err, "error calling GetAllowedSendersInRange") + } + + // add the fetched batch to the currentAllowedSenderList and replace the existing allowlist + for _, addr := range allowedSendersBatch { + currentAllowedSenderList[addr] = struct{}{} + } + a.allowlist.Store(¤tAllowedSenderList) + a.lggr.Infow("allowlist updated in batches successfully", "len", len(currentAllowedSenderList)) + + // persist each batch to the underalying orm layer + err = a.orm.CreateAllowedSenders(ctx, allowedSendersBatch) + if err != nil { + a.lggr.Errorf("failed to update stored allowedSenderList: %w", err) + } + return nil } // syncBlockedSenders fetches the list of blocked addresses from the contract in batches @@ -370,6 +418,19 @@ func (a *onchainAllowlist) loadStoredAllowedSenderList(ctx context.Context) { a.update(allowedList) } +func fetchTosCurrentVersion(ctx context.Context, tosContract *functions_allow_list.TermsOfServiceAllowList, blockNum *big.Int) (string, error) { + typeAndVersion, err := tosContract.TypeAndVersion(&bind.CallOpts{ + Pending: false, + BlockNumber: blockNum, + Context: ctx, + }) + if err != nil { + return "", errors.Wrap(err, "failed to fetch the tos contract type and version") + } + + return ExtractContractVersion(typeAndVersion) +} + func ExtractContractVersion(str string) (string, error) { pattern := `v(\d+).(\d+).(\d+)` re := regexp.MustCompile(pattern) diff --git a/core/services/gateway/handlers/functions/allowlist/allowlist_internal_test.go b/core/services/gateway/handlers/functions/allowlist/allowlist_internal_test.go new file mode 100644 index 00000000000..966db032636 --- /dev/null +++ b/core/services/gateway/handlers/functions/allowlist/allowlist_internal_test.go @@ -0,0 +1,216 @@ +package allowlist + +import ( + "context" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_allow_list" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + amocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions/allowlist/mocks" +) + +func TestUpdateAllowedSendersInBatches(t *testing.T) { + t.Run("OK-simple_update_in_batches", func(t *testing.T) { + ctx := context.Background() + config := OnchainAllowlistConfig{ + ContractAddress: testutils.NewAddress(), + ContractVersion: 1, + BlockConfirmations: 1, + UpdateFrequencySec: 2, + UpdateTimeoutSec: 1, + StoredAllowlistBatchSize: 2, + OnchainAllowlistBatchSize: 10, + FetchingDelayInRangeSec: 1, + } + + // allowlistSize defines how big the mocked allowlist will be + allowlistSize := 53 + // allowlist represents the actual allowlist the tos contract will return + allowlist := make([]common.Address, 0, allowlistSize) + // expectedAllowlist will be used to compare the actual status with what we actually want + expectedAllowlist := make(map[common.Address]struct{}, 0) + + // we load both the expectedAllowlist and the allowlist the contract will return with some new addresses + for i := 0; i < allowlistSize; i++ { + addr := testutils.NewAddress() + allowlist = append(allowlist, addr) + expectedAllowlist[addr] = struct{}{} + } + + tosContract := NewTosContractMock(allowlist) + + // with the orm mock we can validate the actual order in which the allowlist is fetched giving priority to newest addresses + orm := amocks.NewORM(t) + firstCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[43:53]).Times(1).Return(nil) + secondCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[33:43]).Times(1).Return(nil).NotBefore(firstCall) + thirdCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[23:33]).Times(1).Return(nil).NotBefore(secondCall) + forthCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[13:23]).Times(1).Return(nil).NotBefore(thirdCall) + fifthCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[3:13]).Times(1).Return(nil).NotBefore(forthCall) + orm.On("CreateAllowedSenders", context.Background(), allowlist[0:3]).Times(1).Return(nil).NotBefore(fifthCall) + + onchainAllowlist := &onchainAllowlist{ + config: config, + orm: orm, + blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), + lggr: logger.TestLogger(t).Named("OnchainAllowlist"), + stopCh: make(services.StopChan), + } + + // we set the onchain allowlist to an empty state before updating it in batches + emptyMap := make(map[common.Address]struct{}) + onchainAllowlist.allowlist.Store(&emptyMap) + + err := onchainAllowlist.updateAllowedSendersInBatches(ctx, tosContract, big.NewInt(0)) + require.NoError(t, err) + + currentAllowlist := onchainAllowlist.allowlist.Load() + require.Equal(t, &expectedAllowlist, currentAllowlist) + }) + + t.Run("OK-new_address_added_while_updating_in_batches", func(t *testing.T) { + ctx := context.Background() + config := OnchainAllowlistConfig{ + ContractAddress: testutils.NewAddress(), + ContractVersion: 1, + BlockConfirmations: 1, + UpdateFrequencySec: 2, + UpdateTimeoutSec: 1, + StoredAllowlistBatchSize: 2, + OnchainAllowlistBatchSize: 10, + FetchingDelayInRangeSec: 1, + } + + // allowlistSize defines how big the initial mocked allowlist will be + allowlistSize := 50 + // allowlist represents the actual allowlist the tos contract will return + allowlist := make([]common.Address, 0) + // expectedAllowlist will be used to compare the actual status with what we actually want + expectedAllowlist := make(map[common.Address]struct{}, 0) + + // we load both the expectedAllowlist and the allowlist the contract will return with some new addresses + for i := 0; i < allowlistSize; i++ { + addr := testutils.NewAddress() + allowlist = append(allowlist, addr) + expectedAllowlist[addr] = struct{}{} + } + + tosContract := NewTosContractMock(allowlist) + + // with the orm mock we can validate the actual order in which the allowlist is fetched giving priority to newest addresses + orm := amocks.NewORM(t) + firstCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[40:50]).Times(1).Run(func(args mock.Arguments) { + // after the first call we update the tosContract by adding a new address + addr := testutils.NewAddress() + allowlist = append(allowlist, addr) + expectedAllowlist[addr] = struct{}{} + *tosContract = *NewTosContractMock(allowlist) + }).Return(nil) + + // this is the extra step that will fetch the new address we want to validate + extraStepCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[50:51]).Times(1).Return(nil).NotBefore(firstCall) + + secondCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[30:40]).Times(1).Return(nil).NotBefore(extraStepCall) + thirdCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[20:30]).Times(1).Return(nil).NotBefore(secondCall) + forthCall := orm.On("CreateAllowedSenders", context.Background(), allowlist[10:20]).Times(1).Return(nil).NotBefore(thirdCall) + orm.On("CreateAllowedSenders", context.Background(), allowlist[0:10]).Times(1).Return(nil).NotBefore(forthCall) + + onchainAllowlist := &onchainAllowlist{ + config: config, + orm: orm, + blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), + lggr: logger.TestLogger(t).Named("OnchainAllowlist"), + stopCh: make(services.StopChan), + } + + // we set the onchain allowlist to an empty state before updating it in batches + emptyMap := make(map[common.Address]struct{}) + onchainAllowlist.allowlist.Store(&emptyMap) + + err := onchainAllowlist.updateAllowedSendersInBatches(ctx, tosContract, big.NewInt(0)) + require.NoError(t, err) + + currentAllowlist := onchainAllowlist.allowlist.Load() + require.Equal(t, &expectedAllowlist, currentAllowlist) + }) + + t.Run("OK-allowlist_size_smaller_than_batchsize", func(t *testing.T) { + ctx := context.Background() + config := OnchainAllowlistConfig{ + ContractAddress: testutils.NewAddress(), + ContractVersion: 1, + BlockConfirmations: 1, + UpdateFrequencySec: 2, + UpdateTimeoutSec: 1, + StoredAllowlistBatchSize: 2, + OnchainAllowlistBatchSize: 100, + FetchingDelayInRangeSec: 1, + } + + // allowlistSize defines how big the mocked allowlist will be + allowlistSize := 50 + // allowlist represents the actual allowlist the tos contract will return + allowlist := make([]common.Address, 0, allowlistSize) + // expectedAllowlist will be used to compare the actual status with what we actually want + expectedAllowlist := make(map[common.Address]struct{}, 0) + + // we load both the expectedAllowlist and the allowlist the contract will return with some new addresses + for i := 0; i < allowlistSize; i++ { + addr := testutils.NewAddress() + allowlist = append(allowlist, addr) + expectedAllowlist[addr] = struct{}{} + } + + tosContract := NewTosContractMock(allowlist) + + // with the orm mock we can validate the actual order in which the allowlist is fetched giving priority to newest addresses + orm := amocks.NewORM(t) + orm.On("CreateAllowedSenders", context.Background(), allowlist[0:50]).Times(1).Return(nil) + + onchainAllowlist := &onchainAllowlist{ + config: config, + orm: orm, + blockConfirmations: big.NewInt(int64(config.BlockConfirmations)), + lggr: logger.TestLogger(t).Named("OnchainAllowlist"), + stopCh: make(services.StopChan), + } + + // we set the onchain allowlist to an empty state before updating it in batches + emptyMap := make(map[common.Address]struct{}) + onchainAllowlist.allowlist.Store(&emptyMap) + + err := onchainAllowlist.updateAllowedSendersInBatches(ctx, tosContract, big.NewInt(0)) + require.NoError(t, err) + + currentAllowlist := onchainAllowlist.allowlist.Load() + require.Equal(t, &expectedAllowlist, currentAllowlist) + }) +} + +type tosContractMock struct { + functions_allow_list.TermsOfServiceAllowListInterface + + onchainAllowlist []common.Address +} + +func NewTosContractMock(onchainAllowlist []common.Address) *tosContractMock { + return &tosContractMock{ + onchainAllowlist: onchainAllowlist, + } +} + +func (t *tosContractMock) GetAllowedSendersCount(opts *bind.CallOpts) (uint64, error) { + return uint64(len(t.onchainAllowlist)), nil +} + +func (t *tosContractMock) GetAllowedSendersInRange(opts *bind.CallOpts, allowedSenderIdxStart uint64, allowedSenderIdxEnd uint64) ([]common.Address, error) { + // we replicate the onchain behaviour of including start and end indexes + return t.onchainAllowlist[allowedSenderIdxStart : allowedSenderIdxEnd+1], nil +}