Skip to content

Commit

Permalink
Fix log poller test early exit (#13719)
Browse files Browse the repository at this point in the history
* WIP#1 - doesn't hang on error

* WIP#3 - better early exit

* use concurrent executor in log poller tests
  • Loading branch information
Tofel authored Jun 28, 2024
1 parent 336e419 commit 8ba2504
Showing 1 changed file with 68 additions and 131 deletions.
199 changes: 68 additions & 131 deletions integration-tests/universal/log_poller/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"testing"
"time"

seth_utils "github.com/smartcontractkit/chainlink-testing-framework/utils/seth"

geth "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,9 +26,11 @@ import (
"github.com/smartcontractkit/wasp"

"github.com/smartcontractkit/chainlink-testing-framework/blockchain"
ctf_concurrency "github.com/smartcontractkit/chainlink-testing-framework/concurrency"
ctf_test_env "github.com/smartcontractkit/chainlink-testing-framework/docker/test_env"
"github.com/smartcontractkit/chainlink-testing-framework/logging"
"github.com/smartcontractkit/chainlink-testing-framework/networks"
seth_utils "github.com/smartcontractkit/chainlink-testing-framework/utils/seth"

"github.com/smartcontractkit/chainlink/integration-tests/actions"
"github.com/smartcontractkit/chainlink/integration-tests/client"
Expand Down Expand Up @@ -204,11 +204,6 @@ func randomWait(minMilliseconds, maxMilliseconds int) {
time.Sleep(time.Duration(randomMilliseconds) * time.Millisecond)
}

type LogEmitterChannel struct {
logsEmitted int
err error
}

// getIntSlice returns a slice of ints of the provided length
func getIntSlice(length int) []int {
result := make([]int, length)
Expand All @@ -229,95 +224,6 @@ func getStringSlice(length int) []string {
return result
}

// emitEvents emits events from the provided log emitter concurrently according to the provided config
func emitEvents(ctx context.Context, l zerolog.Logger, client *seth.Client, logEmitter *contracts.LogEmitter, cfg *lp_config.Config, wg *sync.WaitGroup, results chan LogEmitterChannel) {
address := (*logEmitter).Address().String()
defer wg.Done()

var executionGroup sync.WaitGroup

// Atomic counter is used to keep track of the number of logs emitted
var atomicCounter = atomic.Int32{}

for i := 0; i < *cfg.LoopedConfig.ExecutionCount; i++ {
executionGroup.Add(1)
}

var emitAllEvents = func() {
defer executionGroup.Done()
current := atomicCounter.Add(1)

for _, event := range cfg.General.EventsToEmit {
select {
case <-ctx.Done():
l.Warn().Str("Emitter address", address).Msg("Context cancelled, not emitting events")
return
default:
l.Debug().Str("Emitter address", address).Str("Event type", event.Name).Str("index", fmt.Sprintf("%d/%d", current, *cfg.LoopedConfig.ExecutionCount)).Msg("Emitting log from emitter")
var err error
switch event.Name {
case "Log1":
_, err = client.Decode((*logEmitter).EmitLogIntsFromKey(getIntSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log2":
_, err = client.Decode((*logEmitter).EmitLogIntsIndexedFromKey(getIntSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log3":
_, err = client.Decode((*logEmitter).EmitLogStringsFromKey(getStringSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log4":
_, err = client.Decode((*logEmitter).EmitLogIntMultiIndexedFromKey(1, 1, *cfg.General.EventsPerTx, client.AnySyncedKey()))
default:
err = fmt.Errorf("unknown event name: %s", event.Name)
}

if err != nil {
results <- LogEmitterChannel{
err: err,
}
return
}
randomWait(*cfg.LoopedConfig.MinEmitWaitTimeMs, *cfg.LoopedConfig.MaxEmitWaitTimeMs)
}

if (current)%10 == 0 {
l.Info().Str("Emitter address", address).Str("Index", fmt.Sprintf("%d/%d", current, *cfg.LoopedConfig.ExecutionCount)).Msgf("Emitted all %d events", len(cfg.General.EventsToEmit))
}
}
}

clientNumber := int(*client.Cfg.EphemeralAddrs)
emissionsPerClient := *cfg.LoopedConfig.ExecutionCount / clientNumber
extraEmissions := *cfg.LoopedConfig.ExecutionCount % clientNumber

l.Debug().Str("Emitter address", address).
Int("Total logs to emit", *cfg.LoopedConfig.ExecutionCount*len(cfg.General.EventsToEmit)*(*cfg.General.EventsPerTx)).
Int("Total clients", clientNumber).
Int("Emissions per client", emissionsPerClient).
Int("Extra emissions", extraEmissions).
Msg("Starting to emit events")

for i := 0; i < clientNumber; i++ {
go func(key int) {
numTasks := emissionsPerClient
if key < extraEmissions {
numTasks++
}

for idx := 0; idx < numTasks; idx++ {
emitAllEvents()
}
}(i)
}

executionGroup.Wait()

localCounter := int(atomicCounter.Load()) * *cfg.General.EventsPerTx * len(cfg.General.EventsToEmit)
l.Info().Str("Emitter address", address).Int("Total logs emitted", localCounter).Msg("Finished emitting events")

results <- LogEmitterChannel{
logsEmitted: localCounter,
err: nil,
}
}

// LogPollerHasFinalisedEndBlock returns true if all CL nodes have finalised processing the provided end block
func LogPollerHasFinalisedEndBlock(endBlock int64, chainID *big.Int, l zerolog.Logger, coreLogger core_logger.SugaredLogger, nodes *test_env.ClCluster) (bool, error) {
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -699,7 +605,7 @@ func GetMissingLogs(startBlock, endBlock int64, logEmitters []*contracts.LogEmit
if int64(len(allLogsInEVMNode)) != expectedTotalLogsEmitted {
l.Warn().
Str("Actual/Expected", fmt.Sprintf("%d/%d", expectedTotalLogsEmitted, len(allLogsInEVMNode))).
Msg("Some of the test logs were not found in EVM node. This is a bug in the test")
Msg("Actual number of logs found on EVM nodes differs from expected ones. Most probably this is a bug in the test")
}

return missingLogs, nil
Expand Down Expand Up @@ -854,58 +760,89 @@ func runWaspGenerator(t *testing.T, cfg *lp_config.Config, logEmitters []*contra
return counter.value, nil
}

type logEmissionTask struct {
emitter *contracts.LogEmitter
eventsToEmit []abi.Event
eventsPerTx int
}

type emittedLogsData struct {
count int
}

func (d emittedLogsData) GetResult() emittedLogsData {
return d
}

// runLoopedGenerator runs the looped generator and returns the total number of logs emitted
func runLoopedGenerator(t *testing.T, cfg *lp_config.Config, client *seth.Client, logEmitters []*contracts.LogEmitter) (int, error) {
l := logging.GetTestLogger(t)

// Start emitting events in parallel, each contract is emitting events in a separate goroutine
// We will stop as soon as we encounter an error
wg := &sync.WaitGroup{}
emitterCh := make(chan LogEmitterChannel, len(logEmitters))
tasks := make([]logEmissionTask, 0)
for i := 0; i < *cfg.LoopedConfig.ExecutionCount; i++ {
for _, logEmitter := range logEmitters {
tasks = append(tasks, logEmissionTask{
emitter: logEmitter,
eventsToEmit: cfg.General.EventsToEmit,
eventsPerTx: *cfg.General.EventsPerTx,
})
}
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
l.Info().Int("Total tasks", len(tasks)).Msg("Starting to emit events")

for i := 0; i < len(logEmitters); i++ {
wg.Add(1)
go func(idx int) {
emitEvents(ctx, l, client, logEmitters[idx], cfg, wg, emitterCh)
}(i)
}
var atomicCounter = atomic.Int32{}

var emitErr error
total := 0
var emitAllEventsFn = func(resultCh chan emittedLogsData, errorCh chan error, _ int, task logEmissionTask) {
current := atomicCounter.Add(1)

aggrChan := make(chan int, len(logEmitters))
address := (*task.emitter).Address().String()

go func() {
for {
select {
case <-ctx.Done():
for _, event := range cfg.General.EventsToEmit {
l.Debug().Str("Emitter address", address).Str("Event type", event.Name).Str("index", fmt.Sprintf("%d/%d", current, *cfg.LoopedConfig.ExecutionCount)).Msg("Emitting log from emitter")
var err error
switch event.Name {
case "Log1":
_, err = client.Decode((*task.emitter).EmitLogIntsFromKey(getIntSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log2":
_, err = client.Decode((*task.emitter).EmitLogIntsIndexedFromKey(getIntSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log3":
_, err = client.Decode((*task.emitter).EmitLogStringsFromKey(getStringSlice(*cfg.General.EventsPerTx), client.AnySyncedKey()))
case "Log4":
_, err = client.Decode((*task.emitter).EmitLogIntMultiIndexedFromKey(1, 1, *cfg.General.EventsPerTx, client.AnySyncedKey()))
default:
err = fmt.Errorf("unknown event name: %s", event.Name)
}

if err != nil {
errorCh <- err
return
case emitter := <-emitterCh:
if emitter.err != nil {
emitErr = emitter.err
cancelFn()
return
}
aggrChan <- emitter.logsEmitted
}
randomWait(*cfg.LoopedConfig.MinEmitWaitTimeMs, *cfg.LoopedConfig.MaxEmitWaitTimeMs)

if (current)%10 == 0 {
l.Info().Str("Emitter address", address).Str("Index", fmt.Sprintf("%d/%d", current, *cfg.LoopedConfig.ExecutionCount)).Msgf("Emitted all %d events", len(cfg.General.EventsToEmit))
}
}
}()

wg.Wait()
close(emitterCh)
resultCh <- emittedLogsData{
*cfg.General.EventsPerTx * len(cfg.General.EventsToEmit),
}
}

executor := ctf_concurrency.NewConcurrentExecutor[emittedLogsData, emittedLogsData, logEmissionTask](l)
r, err := executor.Execute(int(*client.Cfg.EphemeralAddrs), tasks, emitAllEventsFn)

if emitErr != nil {
return 0, emitErr
if err != nil {
return 0, err
}

for i := 0; i < len(logEmitters); i++ {
total += <-aggrChan
var total int
for _, result := range r {
total += result.count
}

return int(total), nil
return total, nil
}

// GetExpectedLogCount returns the expected number of logs to be emitted based on the provided config
Expand Down

0 comments on commit 8ba2504

Please sign in to comment.