diff --git a/internal/util/array.go b/internal/util/v2/array.go similarity index 97% rename from internal/util/array.go rename to internal/util/v2/array.go index eb3a8f9f..08b617e2 100644 --- a/internal/util/array.go +++ b/internal/util/v2/array.go @@ -1,4 +1,4 @@ -package util +package v2 import "sync" diff --git a/internal/util/array_test.go b/internal/util/v2/array_test.go similarity index 99% rename from internal/util/array_test.go rename to internal/util/v2/array_test.go index 27da8325..a8d97223 100644 --- a/internal/util/array_test.go +++ b/internal/util/v2/array_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "sort" diff --git a/internal/util/closer.go b/internal/util/v2/closer.go similarity index 97% rename from internal/util/closer.go rename to internal/util/v2/closer.go index 8667417c..4eed30dc 100644 --- a/internal/util/closer.go +++ b/internal/util/v2/closer.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/internal/util/closer_test.go b/internal/util/v2/closer_test.go similarity index 96% rename from internal/util/closer_test.go rename to internal/util/v2/closer_test.go index 54cba4fe..db9cbe4e 100644 --- a/internal/util/closer_test.go +++ b/internal/util/v2/closer_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "testing" diff --git a/internal/util/rand.go b/internal/util/v2/rand.go similarity index 99% rename from internal/util/rand.go rename to internal/util/v2/rand.go index 305ddcbf..63556e70 100644 --- a/internal/util/rand.go +++ b/internal/util/v2/rand.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "crypto/aes" diff --git a/internal/util/rand_test.go b/internal/util/v2/rand_test.go similarity index 99% rename from internal/util/rand_test.go rename to internal/util/v2/rand_test.go index f848310e..11d65a1f 100644 --- a/internal/util/rand_test.go +++ b/internal/util/v2/rand_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "crypto/cipher" diff --git a/internal/util/recoverable.go b/internal/util/v2/recoverable.go similarity index 99% rename from internal/util/recoverable.go rename to internal/util/v2/recoverable.go index 3f4f2573..f0a0e589 100644 --- a/internal/util/recoverable.go +++ b/internal/util/v2/recoverable.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/internal/util/recoverable_test.go b/internal/util/v2/recoverable_test.go similarity index 99% rename from internal/util/recoverable_test.go rename to internal/util/v2/recoverable_test.go index 64349380..f16099b1 100644 --- a/internal/util/recoverable_test.go +++ b/internal/util/v2/recoverable_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "log" diff --git a/internal/util/result.go b/internal/util/v2/result.go similarity index 96% rename from internal/util/result.go rename to internal/util/v2/result.go index b5474fd9..b2b6e371 100644 --- a/internal/util/result.go +++ b/internal/util/v2/result.go @@ -1,4 +1,4 @@ -package util +package v2 type Results struct { Successes int diff --git a/internal/util/result_test.go b/internal/util/v2/result_test.go similarity index 97% rename from internal/util/result_test.go rename to internal/util/v2/result_test.go index 038f36b4..ca582974 100644 --- a/internal/util/result_test.go +++ b/internal/util/v2/result_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "testing" diff --git a/pkg/util/cache.go b/pkg/util/v2/cache.go similarity index 99% rename from pkg/util/cache.go rename to pkg/util/v2/cache.go index 31538c8c..41b31463 100644 --- a/pkg/util/cache.go +++ b/pkg/util/v2/cache.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "sync" diff --git a/pkg/util/cache_test.go b/pkg/util/v2/cache_test.go similarity index 99% rename from pkg/util/cache_test.go rename to pkg/util/v2/cache_test.go index 8316b718..567fe361 100644 --- a/pkg/util/cache_test.go +++ b/pkg/util/v2/cache_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "fmt" diff --git a/pkg/util/cleaner.go b/pkg/util/v2/cleaner.go similarity index 98% rename from pkg/util/cleaner.go rename to pkg/util/v2/cleaner.go index 738d0274..d83c95be 100644 --- a/pkg/util/cleaner.go +++ b/pkg/util/v2/cleaner.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "sync" diff --git a/pkg/util/context.go b/pkg/util/v2/context.go similarity index 99% rename from pkg/util/context.go rename to pkg/util/v2/context.go index cf7d3782..40ee229e 100644 --- a/pkg/util/context.go +++ b/pkg/util/v2/context.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/pkg/util/context_test.go b/pkg/util/v2/context_test.go similarity index 99% rename from pkg/util/context_test.go rename to pkg/util/v2/context_test.go index 3fbb83f8..67bc317d 100644 --- a/pkg/util/context_test.go +++ b/pkg/util/v2/context_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/pkg/util/worker.go b/pkg/util/v2/worker.go similarity index 99% rename from pkg/util/worker.go rename to pkg/util/v2/worker.go index d598fedb..5d24a15e 100644 --- a/pkg/util/worker.go +++ b/pkg/util/v2/worker.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/pkg/util/worker_test.go b/pkg/util/v2/worker_test.go similarity index 99% rename from pkg/util/worker_test.go rename to pkg/util/v2/worker_test.go index b5180dd9..0f36edf7 100644 --- a/pkg/util/worker_test.go +++ b/pkg/util/v2/worker_test.go @@ -1,4 +1,4 @@ -package util +package v2 import ( "context" diff --git a/pkg/v2/coordinator/coordinator.go b/pkg/v2/coordinator/coordinator.go index 0c3c7b79..80e7ac2e 100644 --- a/pkg/v2/coordinator/coordinator.go +++ b/pkg/v2/coordinator/coordinator.go @@ -26,11 +26,11 @@ package coordinator import ( "context" "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "log" "sync/atomic" "time" - "github.com/smartcontractkit/chainlink-automation/pkg/util" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2" ) @@ -64,10 +64,10 @@ type reportCoordinator struct { encoder Encoder // initialised by the constructor - idBlocks *util.Cache[idBlocker] // should clear out when the next perform with this id occurs - activeKeys *util.Cache[bool] - cacheCleaner *util.IntervalCacheCleaner[bool] - idCacheCleaner *util.IntervalCacheCleaner[idBlocker] + idBlocks *v2.Cache[idBlocker] // should clear out when the next perform with this id occurs + activeKeys *v2.Cache[bool] + cacheCleaner *v2.IntervalCacheCleaner[bool] + idCacheCleaner *v2.IntervalCacheCleaner[idBlocker] // configurations minConfs int @@ -99,10 +99,10 @@ func NewReportCoordinator( logger: logger, logs: logs, minConfs: minConfs, - idBlocks: util.NewCache[idBlocker](lockoutWindow), - activeKeys: util.NewCache[bool](time.Hour), // 1 hour allows the cleanup routine to clear stale data - idCacheCleaner: util.NewIntervalCacheCleaner[idBlocker](cacheClean), - cacheCleaner: util.NewIntervalCacheCleaner[bool](cacheClean), + idBlocks: v2.NewCache[idBlocker](lockoutWindow), + activeKeys: v2.NewCache[bool](time.Hour), // 1 hour allows the cleanup routine to clear stale data + idCacheCleaner: v2.NewIntervalCacheCleaner[idBlocker](cacheClean), + cacheCleaner: v2.NewIntervalCacheCleaner[bool](cacheClean), chStop: make(chan struct{}, 1), encoder: encoder, } @@ -142,7 +142,7 @@ func (rc *reportCoordinator) Accept(key ocr2keepers.UpkeepKey) error { // there might be other keys in the same report which can get accepted if _, ok := rc.activeKeys.Get(string(key)); !ok { // Set the key as accepted within activeKeys - rc.activeKeys.Set(string(key), false, util.DefaultCacheExpiration) + rc.activeKeys.Set(string(key), false, v2.DefaultCacheExpiration) // Set idBlocks with the key as checkBlockNumber and IndefiniteBlockingKey as TransmitBlockNumber rc.updateIdBlock(string(id), idBlocker{ @@ -202,7 +202,7 @@ func (rc *reportCoordinator) checkLogs(ctx context.Context) error { rc.logger.Printf("Perform log found for key %s in transaction %s at block %s, with confirmations %d", l.Key, l.TransactionHash, l.TransmitBlock, l.Confirmations) // set state of key to indicate that the report was transmitted - rc.activeKeys.Set(string(l.Key), true, util.DefaultCacheExpiration) + rc.activeKeys.Set(string(l.Key), true, v2.DefaultCacheExpiration) rc.updateIdBlock(string(id), idBlocker{ CheckBlockNumber: logCheckBlockKey, @@ -262,7 +262,7 @@ func (rc *reportCoordinator) checkLogs(ctx context.Context) error { // Process log if the key hasn't been confirmed yet rc.logger.Printf("Stale report log found for key %s in transaction %s at block %s, with confirmations %d", l.Key, l.TransactionHash, l.TransmitBlock, l.Confirmations) // set state of key to indicate that the report was transmitted - rc.activeKeys.Set(string(l.Key), true, util.DefaultCacheExpiration) + rc.activeKeys.Set(string(l.Key), true, v2.DefaultCacheExpiration) rc.updateIdBlock(string(id), idBlocker{ CheckBlockNumber: logCheckBlockKey, @@ -357,7 +357,7 @@ func (rc *reportCoordinator) updateIdBlock(key string, val idBlocker) { } rc.logger.Printf("updateIdBlock for key %s: value updated to %+v", key, val) - rc.idBlocks.Set(key, val, util.DefaultCacheExpiration) + rc.idBlocks.Set(key, val, v2.DefaultCacheExpiration) } // Start starts all subprocesses diff --git a/pkg/v2/coordinator/coordinator_test.go b/pkg/v2/coordinator/coordinator_test.go index e81b3105..b7586c3a 100644 --- a/pkg/v2/coordinator/coordinator_test.go +++ b/pkg/v2/coordinator/coordinator_test.go @@ -3,6 +3,7 @@ package coordinator import ( "context" "fmt" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "io" "log" "testing" @@ -11,7 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/smartcontractkit/chainlink-automation/pkg/util" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2" "github.com/smartcontractkit/chainlink-automation/pkg/v2/coordinator/mocks" ) @@ -25,8 +25,8 @@ func TestReportCoordinator(t *testing.T) { logger: l, logs: mLogs, encoder: mEnc, - idBlocks: util.NewCache[idBlocker](time.Second), - activeKeys: util.NewCache[bool](time.Minute), + idBlocks: v2.NewCache[idBlocker](time.Second), + activeKeys: v2.NewCache[bool](time.Minute), minConfs: 1, chStop: make(chan struct{}), }, mEnc, mLogs @@ -875,7 +875,7 @@ func TestReportCoordinator(t *testing.T) { rc.idBlocks.Set(string(id1), idBlocker{ TransmitBlockNumber: bk15, - }, util.DefaultCacheExpiration) + }, v2.DefaultCacheExpiration) mr.On("SplitUpkeepKey", mock.Anything).Return(bk4, id1, nil).Once() mr.On("After", bk4, bk15).Return(false, nil).Once() @@ -892,7 +892,7 @@ func TestReportCoordinator(t *testing.T) { rc.idBlocks.Set(string(id1), idBlocker{ TransmitBlockNumber: bk15, - }, util.DefaultCacheExpiration) + }, v2.DefaultCacheExpiration) key := ocr2keepers.UpkeepKey("invalid") expected := fmt.Errorf("test") @@ -916,7 +916,7 @@ func TestReportCoordinator(t *testing.T) { rc.idBlocks.Set("1234", idBlocker{ TransmitBlockNumber: invalid, - }, util.DefaultCacheExpiration) + }, v2.DefaultCacheExpiration) mr.On("SplitUpkeepKey", key).Return(bk1, id, nil).Once() mr.On("After", bk1, invalid).Return(false, expected).Once() diff --git a/pkg/v2/observer/polling/observer.go b/pkg/v2/observer/polling/observer.go index bfcc3381..5f5e74a4 100644 --- a/pkg/v2/observer/polling/observer.go +++ b/pkg/v2/observer/polling/observer.go @@ -7,11 +7,11 @@ package polling import ( "context" "fmt" + "github.com/smartcontractkit/chainlink-automation/internal/util/v2" "log" "sync" "time" - "github.com/smartcontractkit/chainlink-automation/internal/util" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2" "github.com/smartcontractkit/chainlink-automation/pkg/v2/observer" ) @@ -87,7 +87,7 @@ func NewPollingObserver( cancel: cancel, logger: logger, samplingDuration: maxSamplingDuration, - shuffler: util.Shuffler[ocr2keepers.UpkeepKey]{Source: util.NewCryptoRandSource()}, // use crypto/rand shuffling for true random + shuffler: v2.Shuffler[ocr2keepers.UpkeepKey]{Source: v2.NewCryptoRandSource()}, // use crypto/rand shuffling for true random ratio: ratio, stager: &stager{}, coordinator: coord, @@ -101,7 +101,7 @@ func NewPollingObserver( // make all go-routines started by this entity automatically recoverable // on panics ob.services = []Service{ - util.NewRecoverableService(&observer.SimpleService{F: ob.runHeadTasks, C: cancel}, logger), + v2.NewRecoverableService(&observer.SimpleService{F: ob.runHeadTasks, C: cancel}, logger), } return ob diff --git a/pkg/v2/runner/runner.go b/pkg/v2/runner/runner.go index fef40ba1..c959d1b3 100644 --- a/pkg/v2/runner/runner.go +++ b/pkg/v2/runner/runner.go @@ -3,12 +3,12 @@ package runner import ( "context" "fmt" + v22 "github.com/smartcontractkit/chainlink-automation/internal/util/v2" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "log" "sync/atomic" "time" - "github.com/smartcontractkit/chainlink-automation/internal/util" - pkgutil "github.com/smartcontractkit/chainlink-automation/pkg/util" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2" ) @@ -38,9 +38,9 @@ type Runner struct { encoder Encoder // initialized by the constructor - workers *pkgutil.WorkerGroup[[]ocr2keepers.UpkeepResult] // parallelizer for RPC calls - cache *pkgutil.Cache[ocr2keepers.UpkeepResult] - cacheCleaner *pkgutil.IntervalCacheCleaner[ocr2keepers.UpkeepResult] + workers *v2.WorkerGroup[[]ocr2keepers.UpkeepResult] // parallelizer for RPC calls + cache *v2.Cache[ocr2keepers.UpkeepResult] + cacheCleaner *v2.IntervalCacheCleaner[ocr2keepers.UpkeepResult] // configurations workerBatchLimit int // the maximum number of items in RPC batch call @@ -63,9 +63,9 @@ func NewRunner( logger: logger, registry: registry, encoder: encoder, - workers: pkgutil.NewWorkerGroup[[]ocr2keepers.UpkeepResult](workers, workerQueueLength), - cache: pkgutil.NewCache[ocr2keepers.UpkeepResult](cacheExpire), - cacheCleaner: pkgutil.NewIntervalCacheCleaner[ocr2keepers.UpkeepResult](cacheClean), + workers: v2.NewWorkerGroup[[]ocr2keepers.UpkeepResult](workers, workerQueueLength), + cache: v2.NewCache[ocr2keepers.UpkeepResult](cacheExpire), + cacheCleaner: v2.NewIntervalCacheCleaner[ocr2keepers.UpkeepResult](cacheClean), workerBatchLimit: 10, }, nil } @@ -126,10 +126,10 @@ func (o *Runner) parallelCheck(ctx context.Context, mercuryEnabled bool, keys [] // Create batches from the given keys. // Max keyBatchSize items in the batch. - pkgutil.RunJobs( + v2.RunJobs( ctx, o.workers, - util.Unflatten(toRun, o.workerBatchLimit), + v22.Unflatten(toRun, o.workerBatchLimit), o.wrapWorkerFunc(mercuryEnabled), o.wrapAggregate(result), ) @@ -188,7 +188,7 @@ func (o *Runner) wrapAggregate(r *Result) func([]ocr2keepers.UpkeepResult, error for _, res := range result { key, _, _ := o.encoder.Detail(res) - o.cache.Set(string(key), res, pkgutil.DefaultCacheExpiration) + o.cache.Set(string(key), res, v2.DefaultCacheExpiration) r.Add(res) } } else { diff --git a/pkg/v2/shuffle.go b/pkg/v2/shuffle.go index 29587a6e..162b9e00 100644 --- a/pkg/v2/shuffle.go +++ b/pkg/v2/shuffle.go @@ -1,9 +1,8 @@ package ocr2keepers import ( + "github.com/smartcontractkit/chainlink-automation/internal/util/v2" "math/rand" - - "github.com/smartcontractkit/chainlink-automation/internal/util" ) func filterDedupeShuffleObservations(upkeepKeys [][]UpkeepKey, keyRandSource [16]byte, filters ...func(UpkeepKey) (bool, error)) ([]UpkeepKey, error) { @@ -12,7 +11,7 @@ func filterDedupeShuffleObservations(upkeepKeys [][]UpkeepKey, keyRandSource [16 return nil, err } - rand.New(util.NewKeyedCryptoRandSource(keyRandSource)).Shuffle(len(uniqueKeys), func(i, j int) { + rand.New(v2.NewKeyedCryptoRandSource(keyRandSource)).Shuffle(len(uniqueKeys), func(i, j int) { uniqueKeys[i], uniqueKeys[j] = uniqueKeys[j], uniqueKeys[i] }) @@ -50,7 +49,7 @@ func filterAndDedupe(inputs [][]UpkeepKey, filters ...func(UpkeepKey) (bool, err } func shuffleObservations(upkeepIdentifiers []UpkeepIdentifier, source [16]byte) []UpkeepIdentifier { - rand.New(util.NewKeyedCryptoRandSource(source)).Shuffle(len(upkeepIdentifiers), func(i, j int) { + rand.New(v2.NewKeyedCryptoRandSource(source)).Shuffle(len(upkeepIdentifiers), func(i, j int) { upkeepIdentifiers[i], upkeepIdentifiers[j] = upkeepIdentifiers[j], upkeepIdentifiers[i] }) diff --git a/pkg/v3/coordinator/coordinator.go b/pkg/v3/coordinator/coordinator.go index dfc859f7..fd18724d 100644 --- a/pkg/v3/coordinator/coordinator.go +++ b/pkg/v3/coordinator/coordinator.go @@ -4,13 +4,13 @@ import ( "context" "encoding/hex" "fmt" + internalutil "github.com/smartcontractkit/chainlink-automation/internal/util/v2" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "log" "time" common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - internalutil "github.com/smartcontractkit/chainlink-automation/internal/util" - "github.com/smartcontractkit/chainlink-automation/pkg/util" "github.com/smartcontractkit/chainlink-automation/pkg/v3/config" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) @@ -27,8 +27,8 @@ type coordinator struct { eventsProvider types.TransmitEventProvider upkeepTypeGetter types.UpkeepTypeGetter - cache *util.Cache[record] - visited *util.Cache[bool] + cache *v2.Cache[record] + visited *v2.Cache[bool] minimumConfirmations int performLockoutWindow time.Duration @@ -49,8 +49,8 @@ func NewCoordinator(transmitEventProvider types.TransmitEventProvider, upkeepTyp logger: logger, eventsProvider: transmitEventProvider, upkeepTypeGetter: upkeepTypeGetter, - cache: util.NewCache[record](performLockoutWindow), - visited: util.NewCache[bool](performLockoutWindow), + cache: v2.NewCache[record](performLockoutWindow), + visited: v2.NewCache[bool](performLockoutWindow), minimumConfirmations: conf.MinConfirmations, performLockoutWindow: performLockoutWindow, } @@ -61,13 +61,13 @@ func (c *coordinator) Accept(reportedUpkeep common.ReportedUpkeep) bool { c.cache.Set(reportedUpkeep.WorkID, record{ checkBlockNumber: reportedUpkeep.Trigger.BlockNumber, isTransmissionPending: true, - }, util.DefaultCacheExpiration) + }, v2.DefaultCacheExpiration) return true } else if v.checkBlockNumber < reportedUpkeep.Trigger.BlockNumber { c.cache.Set(reportedUpkeep.WorkID, record{ checkBlockNumber: reportedUpkeep.Trigger.BlockNumber, isTransmissionPending: true, - }, util.DefaultCacheExpiration) + }, v2.DefaultCacheExpiration) return true } // We are already waiting on a higher checkBlockNumber so no need to accept this report @@ -195,11 +195,11 @@ func (c *coordinator) checkEvents(ctx context.Context) error { if event.CheckBlock == v.checkBlockNumber { c.logger.Printf("Got event in transaction %s of type %d for upkeepID %s, workID %s and check block %v", hex.EncodeToString(event.TransactionHash[:]), event.Type, event.UpkeepID.String(), event.WorkID, event.CheckBlock) r.checkBlockNumber = v.checkBlockNumber - c.cache.Set(event.WorkID, r, util.DefaultCacheExpiration) + c.cache.Set(event.WorkID, r, v2.DefaultCacheExpiration) } else if event.CheckBlock > v.checkBlockNumber { c.logger.Printf("Got event in transaction %s of type %d for upkeepID %s, workID %s from newer report (block %v) while waiting for (block %v)", hex.EncodeToString(event.TransactionHash[:]), event.Type, event.UpkeepID.String(), event.WorkID, event.CheckBlock, v.checkBlockNumber) r.checkBlockNumber = event.CheckBlock - c.cache.Set(event.WorkID, r, util.DefaultCacheExpiration) + c.cache.Set(event.WorkID, r, v2.DefaultCacheExpiration) } // otherwise this is an old event, ignore it } diff --git a/pkg/v3/coordinator/coordinator_test.go b/pkg/v3/coordinator/coordinator_test.go index 56e05378..91a657f7 100644 --- a/pkg/v3/coordinator/coordinator_test.go +++ b/pkg/v3/coordinator/coordinator_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "io" "log" "reflect" @@ -16,7 +17,6 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - "github.com/smartcontractkit/chainlink-automation/pkg/util" "github.com/smartcontractkit/chainlink-automation/pkg/v3/config" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) @@ -319,10 +319,10 @@ func TestNewCoordinator_checkEvents(t *testing.T) { c := NewCoordinator(tc.eventProvider, tc.upkeepTypeGetter, config.OffchainConfig{PerformLockoutWindow: 3600 * 1000, MinConfirmations: 2}, logger) // initialise the cache if needed for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } for k := range tc.visitedInit { - c.visited.Set(k, true, util.DefaultCacheExpiration) + c.visited.Set(k, true, v2.DefaultCacheExpiration) } err := c.checkEvents(context.Background()) @@ -452,7 +452,7 @@ func TestCoordinator_ShouldAccept(t *testing.T) { c := NewCoordinator(nil, nil, config.OffchainConfig{}, nil) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } shouldAccept := c.Accept(tc.reportedUpkeep) @@ -557,7 +557,7 @@ func TestCoordinator_ShouldTransmit(t *testing.T) { c := NewCoordinator(nil, nil, config.OffchainConfig{}, logger) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } shouldTransmit := c.ShouldTransmit(tc.reportedUpkeep) assert.Equal(t, tc.shouldTransmit, shouldTransmit) @@ -688,7 +688,7 @@ func TestCoordinator_ShouldProcess(t *testing.T) { c := NewCoordinator(nil, tc.upkeepTypeGetter, config.OffchainConfig{}, nil) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } shouldProcess := c.ShouldProcess(tc.payload.WorkID, tc.payload.UpkeepID, tc.payload.Trigger) assert.Equal(t, tc.shouldProcess, shouldProcess) @@ -840,7 +840,7 @@ func TestNewCoordinator_Preprocess(t *testing.T) { c := NewCoordinator(nil, tc.upkeepTypeGetter, config.OffchainConfig{}, nil) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } payloads, err := c.PreProcess(context.Background(), tc.payloads) assert.NoError(t, err) @@ -1002,7 +1002,7 @@ func TestCoordinator_FilterResults(t *testing.T) { c := NewCoordinator(nil, tc.upkeepTypeGetter, config.OffchainConfig{}, nil) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } results, err := c.FilterResults(tc.results) assert.NoError(t, err) @@ -1140,7 +1140,7 @@ func TestCoordinator_FilterProposals(t *testing.T) { c := NewCoordinator(nil, tc.upkeepTypeGetter, config.OffchainConfig{}, nil) // initialise the cache for k, v := range tc.cacheInit { - c.cache.Set(k, v, util.DefaultCacheExpiration) + c.cache.Set(k, v, v2.DefaultCacheExpiration) } results, err := c.FilterProposals(tc.results) assert.NoError(t, err) diff --git a/pkg/v3/runner/runner.go b/pkg/v3/runner/runner.go index ba5a1a16..fc62d8b0 100644 --- a/pkg/v3/runner/runner.go +++ b/pkg/v3/runner/runner.go @@ -3,6 +3,8 @@ package runner import ( "context" "fmt" + v22 "github.com/smartcontractkit/chainlink-automation/internal/util/v2" + "github.com/smartcontractkit/chainlink-automation/pkg/util/v2" "log" "strings" "sync/atomic" @@ -12,8 +14,6 @@ import ( ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - "github.com/smartcontractkit/chainlink-automation/internal/util" - pkgutil "github.com/smartcontractkit/chainlink-automation/pkg/util" "github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry" ) @@ -38,8 +38,8 @@ type Runner struct { logger *log.Logger runnable types.Runnable // initialized by the constructor - workers *pkgutil.WorkerGroup[[]ocr2keepers.CheckResult] // parallelizer - cache *pkgutil.Cache[ocr2keepers.CheckResult] // result cache + workers *v2.WorkerGroup[[]ocr2keepers.CheckResult] // parallelizer + cache *v2.Cache[ocr2keepers.CheckResult] // result cache // configurations workerBatchLimit int // the maximum number of items in RPC batch call cacheGcInterval time.Duration @@ -66,8 +66,8 @@ func NewRunner( return &Runner{ logger: log.New(logger.Writer(), fmt.Sprintf("[%s | check-pipeline-runner]", telemetry.ServiceName), telemetry.LogPkgStdFlags), runnable: runnable, - workers: pkgutil.NewWorkerGroup[[]ocr2keepers.CheckResult](conf.Workers, conf.WorkerQueueLength), - cache: pkgutil.NewCache[ocr2keepers.CheckResult](conf.CacheExpire), + workers: v2.NewWorkerGroup[[]ocr2keepers.CheckResult](conf.Workers, conf.WorkerQueueLength), + cache: v2.NewCache[ocr2keepers.CheckResult](conf.CacheExpire), cacheGcInterval: conf.CacheClean, workerBatchLimit: WorkerBatchLimit, chClose: make(chan struct{}, 1), @@ -147,10 +147,10 @@ func (o *Runner) parallelCheck(ctx context.Context, payloads []ocr2keepers.Upkee // Create batches from the given keys. // Max keyBatchSize items in the batch. - pkgutil.RunJobs( + v2.RunJobs( ctx, o.workers, - util.Unflatten(toRun, o.workerBatchLimit), + v22.Unflatten(toRun, o.workerBatchLimit), o.wrapWorkerFunc(), o.wrapAggregate(result), ) @@ -203,7 +203,7 @@ func (o *Runner) wrapAggregate(r *result) func([]ocr2keepers.CheckResult, error) c, ok := o.cache.Get(result.WorkID) if !ok || result.Trigger.BlockNumber > c.Trigger.BlockNumber { // Add to cache if the workID didn't exist before or if we got a result on a higher checkBlockNumber - o.cache.Set(result.WorkID, result, pkgutil.DefaultCacheExpiration) + o.cache.Set(result.WorkID, result, v2.DefaultCacheExpiration) } } diff --git a/pkg/v3/tickers/time.go b/pkg/v3/tickers/time.go index a48fa8ad..4492f18b 100644 --- a/pkg/v3/tickers/time.go +++ b/pkg/v3/tickers/time.go @@ -3,10 +3,9 @@ package tickers import ( "context" "fmt" + "github.com/smartcontractkit/chainlink-automation/internal/util/v2" "log" "time" - - "github.com/smartcontractkit/chainlink-automation/internal/util" ) type observer[T any] interface { @@ -16,7 +15,7 @@ type observer[T any] interface { type getterFunc[T any] func(context.Context, time.Time) (Tick[T], error) type timeTicker[T any] struct { - closer util.Closer + closer v2.Closer interval time.Duration observer observer[T]