Skip to content

Commit

Permalink
Move into v2 specific package
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Apr 5, 2024
1 parent ae7093f commit 3b2d65b
Show file tree
Hide file tree
Showing 26 changed files with 83 additions and 85 deletions.
2 changes: 1 addition & 1 deletion internal/util/array.go → internal/util/v2/array.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import "sync"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"sort"
Expand Down
2 changes: 1 addition & 1 deletion internal/util/closer.go → internal/util/v2/closer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion internal/util/rand.go → internal/util/v2/rand.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"crypto/aes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"crypto/cipher"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"log"
Expand Down
2 changes: 1 addition & 1 deletion internal/util/result.go → internal/util/v2/result.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

type Results struct {
Successes int
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cache.go → pkg/util/v2/cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"sync"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cache_test.go → pkg/util/v2/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cleaner.go → pkg/util/v2/cleaner.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"sync"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/context.go → pkg/util/v2/context.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/context_test.go → pkg/util/v2/context_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/worker.go → pkg/util/v2/worker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/worker_test.go → pkg/util/v2/worker_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package v2

import (
"context"
Expand Down
26 changes: 13 additions & 13 deletions pkg/v2/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ package coordinator
import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-automation/pkg/util/v2"
"log"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-automation/pkg/util"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
)

Expand Down Expand Up @@ -64,10 +64,10 @@ type reportCoordinator struct {
encoder Encoder

// initialised by the constructor
idBlocks *util.Cache[idBlocker] // should clear out when the next perform with this id occurs
activeKeys *util.Cache[bool]
cacheCleaner *util.IntervalCacheCleaner[bool]
idCacheCleaner *util.IntervalCacheCleaner[idBlocker]
idBlocks *v2.Cache[idBlocker] // should clear out when the next perform with this id occurs
activeKeys *v2.Cache[bool]
cacheCleaner *v2.IntervalCacheCleaner[bool]
idCacheCleaner *v2.IntervalCacheCleaner[idBlocker]

// configurations
minConfs int
Expand Down Expand Up @@ -99,10 +99,10 @@ func NewReportCoordinator(
logger: logger,
logs: logs,
minConfs: minConfs,
idBlocks: util.NewCache[idBlocker](lockoutWindow),
activeKeys: util.NewCache[bool](time.Hour), // 1 hour allows the cleanup routine to clear stale data
idCacheCleaner: util.NewIntervalCacheCleaner[idBlocker](cacheClean),
cacheCleaner: util.NewIntervalCacheCleaner[bool](cacheClean),
idBlocks: v2.NewCache[idBlocker](lockoutWindow),
activeKeys: v2.NewCache[bool](time.Hour), // 1 hour allows the cleanup routine to clear stale data
idCacheCleaner: v2.NewIntervalCacheCleaner[idBlocker](cacheClean),
cacheCleaner: v2.NewIntervalCacheCleaner[bool](cacheClean),
chStop: make(chan struct{}, 1),
encoder: encoder,
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (rc *reportCoordinator) Accept(key ocr2keepers.UpkeepKey) error {
// there might be other keys in the same report which can get accepted
if _, ok := rc.activeKeys.Get(string(key)); !ok {
// Set the key as accepted within activeKeys
rc.activeKeys.Set(string(key), false, util.DefaultCacheExpiration)
rc.activeKeys.Set(string(key), false, v2.DefaultCacheExpiration)

// Set idBlocks with the key as checkBlockNumber and IndefiniteBlockingKey as TransmitBlockNumber
rc.updateIdBlock(string(id), idBlocker{
Expand Down Expand Up @@ -202,7 +202,7 @@ func (rc *reportCoordinator) checkLogs(ctx context.Context) error {
rc.logger.Printf("Perform log found for key %s in transaction %s at block %s, with confirmations %d", l.Key, l.TransactionHash, l.TransmitBlock, l.Confirmations)

// set state of key to indicate that the report was transmitted
rc.activeKeys.Set(string(l.Key), true, util.DefaultCacheExpiration)
rc.activeKeys.Set(string(l.Key), true, v2.DefaultCacheExpiration)

rc.updateIdBlock(string(id), idBlocker{
CheckBlockNumber: logCheckBlockKey,
Expand Down Expand Up @@ -262,7 +262,7 @@ func (rc *reportCoordinator) checkLogs(ctx context.Context) error {
// Process log if the key hasn't been confirmed yet
rc.logger.Printf("Stale report log found for key %s in transaction %s at block %s, with confirmations %d", l.Key, l.TransactionHash, l.TransmitBlock, l.Confirmations)
// set state of key to indicate that the report was transmitted
rc.activeKeys.Set(string(l.Key), true, util.DefaultCacheExpiration)
rc.activeKeys.Set(string(l.Key), true, v2.DefaultCacheExpiration)

rc.updateIdBlock(string(id), idBlocker{
CheckBlockNumber: logCheckBlockKey,
Expand Down Expand Up @@ -357,7 +357,7 @@ func (rc *reportCoordinator) updateIdBlock(key string, val idBlocker) {
}

rc.logger.Printf("updateIdBlock for key %s: value updated to %+v", key, val)
rc.idBlocks.Set(key, val, util.DefaultCacheExpiration)
rc.idBlocks.Set(key, val, v2.DefaultCacheExpiration)
}

// Start starts all subprocesses
Expand Down
12 changes: 6 additions & 6 deletions pkg/v2/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coordinator
import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-automation/pkg/util/v2"
"io"
"log"
"testing"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-automation/pkg/util"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
"github.com/smartcontractkit/chainlink-automation/pkg/v2/coordinator/mocks"
)
Expand All @@ -25,8 +25,8 @@ func TestReportCoordinator(t *testing.T) {
logger: l,
logs: mLogs,
encoder: mEnc,
idBlocks: util.NewCache[idBlocker](time.Second),
activeKeys: util.NewCache[bool](time.Minute),
idBlocks: v2.NewCache[idBlocker](time.Second),
activeKeys: v2.NewCache[bool](time.Minute),
minConfs: 1,
chStop: make(chan struct{}),
}, mEnc, mLogs
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestReportCoordinator(t *testing.T) {

rc.idBlocks.Set(string(id1), idBlocker{
TransmitBlockNumber: bk15,
}, util.DefaultCacheExpiration)
}, v2.DefaultCacheExpiration)

mr.On("SplitUpkeepKey", mock.Anything).Return(bk4, id1, nil).Once()
mr.On("After", bk4, bk15).Return(false, nil).Once()
Expand All @@ -892,7 +892,7 @@ func TestReportCoordinator(t *testing.T) {

rc.idBlocks.Set(string(id1), idBlocker{
TransmitBlockNumber: bk15,
}, util.DefaultCacheExpiration)
}, v2.DefaultCacheExpiration)

key := ocr2keepers.UpkeepKey("invalid")
expected := fmt.Errorf("test")
Expand All @@ -916,7 +916,7 @@ func TestReportCoordinator(t *testing.T) {

rc.idBlocks.Set("1234", idBlocker{
TransmitBlockNumber: invalid,
}, util.DefaultCacheExpiration)
}, v2.DefaultCacheExpiration)

mr.On("SplitUpkeepKey", key).Return(bk1, id, nil).Once()
mr.On("After", bk1, invalid).Return(false, expected).Once()
Expand Down
6 changes: 3 additions & 3 deletions pkg/v2/observer/polling/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package polling
import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-automation/internal/util/v2"
"log"
"sync"
"time"

"github.com/smartcontractkit/chainlink-automation/internal/util"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
"github.com/smartcontractkit/chainlink-automation/pkg/v2/observer"
)
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewPollingObserver(
cancel: cancel,
logger: logger,
samplingDuration: maxSamplingDuration,
shuffler: util.Shuffler[ocr2keepers.UpkeepKey]{Source: util.NewCryptoRandSource()}, // use crypto/rand shuffling for true random
shuffler: v2.Shuffler[ocr2keepers.UpkeepKey]{Source: v2.NewCryptoRandSource()}, // use crypto/rand shuffling for true random
ratio: ratio,
stager: &stager{},
coordinator: coord,
Expand All @@ -101,7 +101,7 @@ func NewPollingObserver(
// make all go-routines started by this entity automatically recoverable
// on panics
ob.services = []Service{
util.NewRecoverableService(&observer.SimpleService{F: ob.runHeadTasks, C: cancel}, logger),
v2.NewRecoverableService(&observer.SimpleService{F: ob.runHeadTasks, C: cancel}, logger),
}

return ob
Expand Down
22 changes: 11 additions & 11 deletions pkg/v2/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package runner
import (
"context"
"fmt"
v22 "github.com/smartcontractkit/chainlink-automation/internal/util/v2"
"github.com/smartcontractkit/chainlink-automation/pkg/util/v2"
"log"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-automation/internal/util"
pkgutil "github.com/smartcontractkit/chainlink-automation/pkg/util"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
)

Expand Down Expand Up @@ -38,9 +38,9 @@ type Runner struct {
encoder Encoder

// initialized by the constructor
workers *pkgutil.WorkerGroup[[]ocr2keepers.UpkeepResult] // parallelizer for RPC calls
cache *pkgutil.Cache[ocr2keepers.UpkeepResult]
cacheCleaner *pkgutil.IntervalCacheCleaner[ocr2keepers.UpkeepResult]
workers *v2.WorkerGroup[[]ocr2keepers.UpkeepResult] // parallelizer for RPC calls
cache *v2.Cache[ocr2keepers.UpkeepResult]
cacheCleaner *v2.IntervalCacheCleaner[ocr2keepers.UpkeepResult]

// configurations
workerBatchLimit int // the maximum number of items in RPC batch call
Expand All @@ -63,9 +63,9 @@ func NewRunner(
logger: logger,
registry: registry,
encoder: encoder,
workers: pkgutil.NewWorkerGroup[[]ocr2keepers.UpkeepResult](workers, workerQueueLength),
cache: pkgutil.NewCache[ocr2keepers.UpkeepResult](cacheExpire),
cacheCleaner: pkgutil.NewIntervalCacheCleaner[ocr2keepers.UpkeepResult](cacheClean),
workers: v2.NewWorkerGroup[[]ocr2keepers.UpkeepResult](workers, workerQueueLength),
cache: v2.NewCache[ocr2keepers.UpkeepResult](cacheExpire),
cacheCleaner: v2.NewIntervalCacheCleaner[ocr2keepers.UpkeepResult](cacheClean),
workerBatchLimit: 10,
}, nil
}
Expand Down Expand Up @@ -126,10 +126,10 @@ func (o *Runner) parallelCheck(ctx context.Context, mercuryEnabled bool, keys []

// Create batches from the given keys.
// Max keyBatchSize items in the batch.
pkgutil.RunJobs(
v2.RunJobs(
ctx,
o.workers,
util.Unflatten(toRun, o.workerBatchLimit),
v22.Unflatten(toRun, o.workerBatchLimit),
o.wrapWorkerFunc(mercuryEnabled),
o.wrapAggregate(result),
)
Expand Down Expand Up @@ -188,7 +188,7 @@ func (o *Runner) wrapAggregate(r *Result) func([]ocr2keepers.UpkeepResult, error

for _, res := range result {
key, _, _ := o.encoder.Detail(res)
o.cache.Set(string(key), res, pkgutil.DefaultCacheExpiration)
o.cache.Set(string(key), res, v2.DefaultCacheExpiration)
r.Add(res)
}
} else {
Expand Down
7 changes: 3 additions & 4 deletions pkg/v2/shuffle.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package ocr2keepers

import (
"github.com/smartcontractkit/chainlink-automation/internal/util/v2"
"math/rand"

"github.com/smartcontractkit/chainlink-automation/internal/util"
)

func filterDedupeShuffleObservations(upkeepKeys [][]UpkeepKey, keyRandSource [16]byte, filters ...func(UpkeepKey) (bool, error)) ([]UpkeepKey, error) {
Expand All @@ -12,7 +11,7 @@ func filterDedupeShuffleObservations(upkeepKeys [][]UpkeepKey, keyRandSource [16
return nil, err
}

rand.New(util.NewKeyedCryptoRandSource(keyRandSource)).Shuffle(len(uniqueKeys), func(i, j int) {
rand.New(v2.NewKeyedCryptoRandSource(keyRandSource)).Shuffle(len(uniqueKeys), func(i, j int) {
uniqueKeys[i], uniqueKeys[j] = uniqueKeys[j], uniqueKeys[i]
})

Expand Down Expand Up @@ -50,7 +49,7 @@ func filterAndDedupe(inputs [][]UpkeepKey, filters ...func(UpkeepKey) (bool, err
}

func shuffleObservations(upkeepIdentifiers []UpkeepIdentifier, source [16]byte) []UpkeepIdentifier {
rand.New(util.NewKeyedCryptoRandSource(source)).Shuffle(len(upkeepIdentifiers), func(i, j int) {
rand.New(v2.NewKeyedCryptoRandSource(source)).Shuffle(len(upkeepIdentifiers), func(i, j int) {
upkeepIdentifiers[i], upkeepIdentifiers[j] = upkeepIdentifiers[j], upkeepIdentifiers[i]
})

Expand Down
Loading

0 comments on commit 3b2d65b

Please sign in to comment.