Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-449] Automation Benchmark RPC Fault Tolerance #10676

Merged
merged 36 commits into from
Oct 11, 2023
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ff28ccb
Fix imports
kalverra Sep 18, 2023
670c8bd
More imports
kalverra Sep 18, 2023
187a028
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 18, 2023
796281b
Consolidate logger and tester
kalverra Sep 18, 2023
4bd216d
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 18, 2023
2b95285
Update
kalverra Sep 18, 2023
46c4ace
Cleanup
kalverra Sep 18, 2023
f6f35d9
Format
kalverra Sep 18, 2023
4caba46
Merge
kalverra Sep 18, 2023
09bf393
RPC fault tolerance step 1
kalverra Sep 18, 2023
00a7de0
Update vers
kalverra Sep 19, 2023
bfab7d1
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 19, 2023
f5a8b88
Roughly working
kalverra Sep 19, 2023
c820d1f
Do final report
kalverra Sep 19, 2023
07bcdcf
Merge
kalverra Sep 19, 2023
a8bdd21
Avoid loop reference
kalverra Sep 20, 2023
ff2eefc
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 20, 2023
98b7818
Tighter test loop
kalverra Sep 20, 2023
03dd539
Adds checks for interrupt
kalverra Sep 20, 2023
f5a38f2
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 20, 2023
6e9df81
Merge
kalverra Sep 22, 2023
2024848
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Sep 22, 2023
5328ef1
Merge
kalverra Sep 28, 2023
5b3b443
Cleanup
kalverra Sep 28, 2023
6805955
Merge
kalverra Oct 2, 2023
1918cb4
Fix imports
kalverra Oct 2, 2023
005fa3e
Fix build triggers
kalverra Oct 2, 2023
5d27746
Merge'
kalverra Oct 3, 2023
3fbdf75
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Oct 5, 2023
29aef0f
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Oct 5, 2023
08592fa
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
kalverra Oct 6, 2023
df5af30
Simplify subscriptions
kalverra Oct 6, 2023
2fb00f4
Merge
kalverra Oct 10, 2023
2d4af00
Add backoff
kalverra Oct 11, 2023
4551cfe
Remove debug
kalverra Oct 11, 2023
232c9e0
Merge branch 'develop' into keeperRPC
kalverra Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Do final report
  • Loading branch information
kalverra committed Sep 19, 2023
commit c820d1f4558b078bb185ff7d10981f0ac222adbe
95 changes: 56 additions & 39 deletions integration-tests/testsetups/keeper_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type KeeperBenchmarkTest struct {
keeperRegistrars []contracts.KeeperRegistrar
keeperConsumerContracts []contracts.AutomationConsumerBenchmark
upkeepIDs [][]*big.Int
filterQuery geth.FilterQuery

env *environment.Environment
namespace string
Expand Down Expand Up @@ -284,39 +283,60 @@ func (k *KeeperBenchmarkTest) Run() {
}()

// Main test loop
k.setFilterQuery()
require.NoError(k.t, err, "Setting filter query shouldn't fail")
for rIndex := range k.keeperRegistries {
k.observeUpkeepEvents(rIndex)
}
err = k.chainClient.WaitForEvents()
require.NoError(k.t, err, "Error waiting for keeper subscriptions")

// Collect test metrics
var (
logs []types.Log
timeout = 5 * time.Second
)

// This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats.
for {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
logs, err = k.chainClient.FilterLogs(ctx, k.filterQuery)
cancel()
if err != nil {
k.log.Error().Err(err).
Interface("Filter Query", k.filterQuery).
Str("Timeout", timeout.String()).
Msg("Error getting logs from chain, trying again")
continue
// Collect test metrics for each registry
registryLogs := make([][]types.Log, len(k.keeperRegistries))
for rIndex := range k.keeperRegistries {
var (
logs []types.Log
timeout = 5 * time.Second
contractABI = k.contractABI(rIndex)
addr = k.keeperRegistries[rIndex].Address()
filterQuery = geth.FilterQuery{
Addresses: []common.Address{common.HexToAddress(addr)},
FromBlock: k.startingBlock,
Topics: [][]common.Hash{{contractABI.Events["UpkeepPerformed"].ID}, {contractABI.Events["StaleUpkeepReport"].ID}},
}
err = fmt.Errorf("initial error") // to ensure our for loop runs at least once
)
for err != nil { // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
logs, err = k.chainClient.FilterLogs(ctx, filterQuery)
cancel()
if err != nil {
k.log.Error().Err(err).
Interface("Filter Query", filterQuery).
Str("Timeout", timeout.String()).
Msg("Error getting logs from chain, trying again")
} else {
k.log.Info().Int("Log Count", len(logs)).Str("Registry Address", addr).Msg("Collected logs")
}
}
k.log.Info().Int("Log Count", len(logs)).Msg("Collected logs from chain")
break
registryLogs[rIndex] = logs
}

contractABI := k.contractABI(0)
for _, log := range logs {
eventDetails := contractABI.EventByID(log.Topics[0])
// Count reverts and stale upkeeps
for rIndex := range k.keeperRegistries {
contractABI := k.contractABI(rIndex)
for _, log := range registryLogs[rIndex] {
eventDetails, err := contractABI.EventByID(log.Topics[0])
require.NoError(k.t, err, "Getting event details for subscribed log shouldn't fail")
if eventDetails.Name == "UpkeepPerformed" {
parsedLog, err := k.keeperRegistries[rIndex].ParseUpkeepPerformedLog(&log)
require.NoError(k.t, err, "Parsing upkeep performed log shouldn't fail")
if !parsedLog.Success {
k.TestReporter.NumRevertedUpkeeps++
}
} else if eventDetails.Name == "StaleUpkeepReport" {
k.TestReporter.NumStaleUpkeepReports++
}
}
}

for _, chainlinkNode := range k.chainlinkNodes {
Expand Down Expand Up @@ -370,8 +390,17 @@ func (k *KeeperBenchmarkTest) TearDownVals(t *testing.T) (
// due to how fragile subscriptions can be
func (k *KeeperBenchmarkTest) observeUpkeepEvents(rIndex int) {
eventLogs := make(chan types.Log)
registryAddresses := make([]common.Address, len(k.keeperRegistries))
for index, registry := range k.keeperRegistries {
registryAddresses[index] = common.HexToAddress(registry.Address())
}
filterQuery := geth.FilterQuery{
Addresses: registryAddresses,
FromBlock: k.startingBlock,
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
sub, err := k.chainClient.SubscribeFilterLogs(ctx, k.filterQuery, eventLogs)
sub, err := k.chainClient.SubscribeFilterLogs(ctx, filterQuery, eventLogs)
cancel()
require.NoError(k.t, err, "Subscribing to upkeep performed events log shouldn't fail")
contractABI := k.contractABI(rIndex)
Expand All @@ -381,10 +410,10 @@ func (k *KeeperBenchmarkTest) observeUpkeepEvents(rIndex int) {
select {
case err := <-sub.Err():
for err != nil {
k.log.Error().Err(err).Interface("Query", k.filterQuery).Msg("Error while subscribing to Keeper Event Logs. Resubscribing...")
k.log.Error().Err(err).Interface("Query", filterQuery).Msg("Error while subscribing to Keeper Event Logs. Resubscribing...")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
sub, err = k.chainClient.SubscribeFilterLogs(ctx, k.filterQuery, eventLogs)
sub, err = k.chainClient.SubscribeFilterLogs(ctx, filterQuery, eventLogs)
cancel()
}
log.Info().Msg("Resubscribed to Keeper Event Logs")
Expand Down Expand Up @@ -461,18 +490,6 @@ func (k *KeeperBenchmarkTest) contractABI(rIndex int) *abi.ABI {
return contractABI
}

// setFilterQuery sets the filter query for the test to watch for events on contracts
func (k *KeeperBenchmarkTest) setFilterQuery() {
registryAddresses := make([]common.Address, len(k.keeperRegistries))
for index, registry := range k.keeperRegistries {
registryAddresses[index] = common.HexToAddress(registry.Address())
}
k.filterQuery = geth.FilterQuery{
Addresses: registryAddresses,
FromBlock: k.startingBlock,
}
}

// ensureValues ensures that all values needed to run the test are present
func (k *KeeperBenchmarkTest) ensureInputValues() {
inputs := k.Inputs
Expand Down