Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Feb 25, 2024
1 parent 0bcbe19 commit 360a14b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL
}

func (s *streams) CheckErrorHandler(ctx context.Context, errCode encoding.ErrCode, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error {
s.lggr.Debugf("at block %d upkeep %s requested time %s CheckErrorHandler error code: %d", lookup.Block, lookup.UpkeepId, lookup.Time, errCode)

userPayload, err := s.packer.PackUserCheckErrorHandler(errCode, lookup.ExtraData)
if err != nil {
checkResults[i].Retryable = false
Expand Down
94 changes: 24 additions & 70 deletions core/services/ocr2/plugins/ocr2keeper/integration_21_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import (
"encoding/json"
"fmt"
"math/big"
"math/rand"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -421,35 +419,28 @@ func TestIntegration_KeeperPluginLogUpkeep_ErrHandler(t *testing.T) {

respTimeout := -1
errResponses := []int{
respTimeout,
// respTimeout, // TODO: uncomment once mercuryServer is fixed
http.StatusUnauthorized,
http.StatusBadRequest,
http.StatusInternalServerError,
}
// inline retries (3) * retry rounds (6) + timeout round (1)
mercuryFailCountPerUpkeep := 3*6 + 1
// mercuryFailCount is the number of times the mercury server will return an
// error before returning a success
mercuryFailCount := 2 + (mercuryFailCountPerUpkeep * 3)
for i := 3; i < mercuryFailCount; i++ {
errResponses = append(errResponses, http.StatusNotFound)
}
startMercuryServer(t, mercuryServer, func(i int) (int, []byte) {
var resp int
if i < len(errResponses) {
resp := errResponses[i]
switch resp {
case http.StatusNotFound, http.StatusInternalServerError:
// in case we got a 404 or 500, we wait a bit to simulate real behavior
time.Sleep(time.Duration(rand.Intn(2500)) * time.Millisecond)
case respTimeout: // mercury server timeout
time.Sleep(60 * time.Second) // 60s is the default timeout
resp = http.StatusNotFound
default:
}
return resp, nil
resp = errResponses[i]
}
output := `{"chainlinkBlob":"0x0001c38d71fed6c320b90e84b6f559459814d068e2a1700adc931ca9717d4fe70000000000000000000000000000000000000000000000000000000001a80b52b4bf1233f9cb71144a253a1791b202113c4ab4a92fa1b176d684b4959666ff8200000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001004254432d5553442d415242495452554d2d544553544e4554000000000000000000000000000000000000000000000000000000000000000000000000645570be000000000000000000000000000000000000000000000000000002af2b818dc5000000000000000000000000000000000000000000000000000002af2426faf3000000000000000000000000000000000000000000000000000002af32dc209700000000000000000000000000000000000000000000000000000000012130f8df0a9745bb6ad5e2df605e158ba8ad8a33ef8a0acf9851f0f01668a3a3f2b68600000000000000000000000000000000000000000000000000000000012130f60000000000000000000000000000000000000000000000000000000000000002c4a7958dce105089cf5edb68dad7dcfe8618d7784eb397f97d5a5fade78c11a58275aebda478968e545f7e3657aba9dcbe8d44605e4c6fde3e24edd5e22c94270000000000000000000000000000000000000000000000000000000000000002459c12d33986018a8959566d145225f0c4a4e61a9a3f50361ccff397899314f0018162cf10cd89897635a0bb62a822355bd199d09f4abe76e4d05261bb44733d"}`
return http.StatusOK, []byte(output)
switch resp {
case http.StatusNotFound, http.StatusInternalServerError:
// TODO: uncomment once mercuryServer is fixed
// in case we got a 404 or 500, wait a bit to simulate real world
// time.Sleep(time.Duration(rand.Intn(2500)) * time.Millisecond)
case respTimeout: // mercury server timeout
time.Sleep(30 * time.Second)
resp = http.StatusNotFound
default:
resp = http.StatusNotFound
}
return resp, nil
})
defer mercuryServer.Stop()

Expand Down Expand Up @@ -483,18 +474,14 @@ func TestIntegration_KeeperPluginLogUpkeep_ErrHandler(t *testing.T) {
})
}()

topic := log_triggered_streams_lookup_wrapper.LogTriggeredStreamsLookupIgnoringErrorHandlerData{}.Topic()
addrsToCheck := make([]common.Address, 0)
go makeDummyBlocks(t, backend, 3*time.Second, 1000)

idsToCheck := make([]*big.Int, 0)
for i, uid := range feeds.UpkeepsIds() {
if checkResultsProvider(i) {
addrsToCheck = append(addrsToCheck, feeds.addresses[i])
idsToCheck = append(idsToCheck, uid)
}
}
errHandlerListener, errHandlerDone := listenEvents(t, backend, addrsToCheck, topic, startBlock, 1)
g.Eventually(errHandlerListener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue())
errHandlerDone()

listener, done := listenPerformed(t, backend, registry, idsToCheck, startBlock)
g.Eventually(listener, testutils.WaitTimeout(t)-(5*time.Second), cltest.DBPollingInterval).Should(gomega.BeTrue())
Expand All @@ -520,49 +507,16 @@ func startMercuryServer(t *testing.T, mercuryServer *SimulatedMercuryServer, res
})
}

func listenEvents(t *testing.T, backend *backends.SimulatedBackend, addrs []common.Address, topic common.Hash, startBlock int64, count int) (func() bool, func()) {
ctx, cancel := context.WithCancel(testutils.Context(t))
visited := make(map[string]bool)
cache := &sync.Map{}
start := startBlock

func makeDummyBlocks(t *testing.T, backend *backends.SimulatedBackend, interval time.Duration, count int) {
go func() {
for ctx.Err() == nil {
currentBlock := backend.Blockchain().CurrentBlock().Number
logs, err := backend.FilterLogs(ctx, ethereum.FilterQuery{
Addresses: addrs,
Topics: [][]common.Hash{{topic}},
FromBlock: currentBlock,
ToBlock: big.NewInt(start),
})
if err != nil {
if ctx.Err() != nil {
return
}
t.Logf("Error fetching logs: %v", err)
continue
}
for _, log := range logs {
visitedID := fmt.Sprintf("%s:%s:%d", log.BlockHash.Hex(), log.TxHash.Hex(), log.Index)
if visited[visitedID] {
continue
}
visited[visitedID] = true
t.Logf("found err handler log in block %d [%s]", log.BlockNumber, log.BlockHash.Hex())
cacheID := log.Address.Hex()
count, ok := cache.Load(cacheID)
if !ok {
cache.Store(cacheID, 1)
continue
}
countI := count.(int)
cache.Store(cacheID, countI+1)
}
time.Sleep(time.Second)
ctx, cancel := context.WithCancel(testutils.Context(t))
defer cancel()

for i := 0; i < count && ctx.Err() == nil; i++ {
backend.Commit()
time.Sleep(interval)
}
}()

return mapListener(cache, count), cancel
}

func emitEvents(ctx context.Context, t *testing.T, n int, contracts []*log_upkeep_counter_wrapper.LogUpkeepCounter, carrol *bind.TransactOpts, afterEmit func()) {
Expand Down

0 comments on commit 360a14b

Please sign in to comment.