Skip to content

Commit

Permalink
Simplify background worker and implement ServiceCtx
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Mar 14, 2024
1 parent 2b0e3a5 commit 5b13060
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 176 deletions.
8 changes: 5 additions & 3 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ocr2keepers20runner "github.com/smartcontractkit/chainlink-automation/pkg/v2/runner"
ocr2keepers21config "github.com/smartcontractkit/chainlink-automation/pkg/v3/config"
ocr2keepers21 "github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin"

"github.com/smartcontractkit/chainlink/v2/core/config/env"

"github.com/smartcontractkit/chainlink-vrf/altbn_128"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/ccipcommit"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/ccipexec"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/rebalancer"
Expand Down Expand Up @@ -316,13 +318,13 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error
d.lggr.Errorw("failed to derive ocr2keeper filter names from spec", "err", err, "spec", spec)
}
case types.CCIPCommit:
err = ccipcommit.UnregisterCommitPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
err = ccipcommit.UnregisterCommitPluginLpFilters(d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip commit plugin filters", "err", err, "spec", spec)
}
return nil
case types.CCIPExecution:
err = ccipexec.UnregisterExecPluginLpFilters(context.Background(), d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
err = ccipexec.UnregisterExecPluginLpFilters(d.lggr, jb, d.legacyChains, pg.WithQueryer(q))
if err != nil {
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec)
}
Expand Down Expand Up @@ -1604,7 +1606,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(jb.ID, msg), "unable to record error")
}
return ccipexec.NewExecutionServices(ctx, lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
return ccipexec.NewExecutionServices(lggr, jb, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin, logError, qopts...)
}

func (d *Delegate) newServicesRebalancer(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (f
// https://github.com/smartcontractkit/ccip/blob/68e2197472fb017dd4e5630d21e7878d58bc2a44/core/services/feeds/service.go#L716
// TODO once that transaction is broken up, we should be able to simply rely on oracle.Close() to cleanup the filters.
// Until then we have to deterministically reload the readers from the spec (and thus their filters) and close them.
func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) error {
func UnregisterCommitPluginLpFilters(lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) error {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ccipcommit

import (
"context"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -66,7 +65,7 @@ func TestGetCommitPluginFilterNamesFromSpec(t *testing.T) {
}
}

err := UnregisterCommitPluginLpFilters(context.Background(), lggr, job.Job{OCR2OracleSpec: tc.spec}, chainSet)
err := UnregisterCommitPluginLpFilters(lggr, job.Job{OCR2OracleSpec: tc.spec}, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down
53 changes: 28 additions & 25 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ import (

const numTokenDataWorkers = 5

func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet, qopts...)
func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, chainHealthcheck, tokenWorker, err := jobSpecToExecPluginConfig(lggr, jb, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand All @@ -74,18 +74,20 @@ func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, c
job.NewServiceAdapter(oracle),
),
chainHealthcheck,
tokenWorker,
}, nil
}
return []job.ServiceCtx{
job.NewServiceAdapter(oracle),
chainHealthcheck,
tokenWorker,
}, nil
}

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
// See comment in UnregisterCommitPluginLpFilters
// It MUST mirror the filters registered in NewExecutionServices.
func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) error {
func UnregisterExecPluginLpFilters(lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) error {
params, err := extractJobSpecParams(lggr, jb, chainSet, false, qopts...)
if err != nil {
return err
Expand Down Expand Up @@ -158,10 +160,10 @@ func initTokenDataProviders(lggr logger.Logger, jobID string, pluginConfig ccipc
return tokenDataProviders, nil
}

func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
func jobSpecToExecPluginConfig(lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, *tokendata.BackgroundWorker, error) {
params, err := extractJobSpecParams(lggr, jb, chainSet, true, qopts...)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

lggr.Infow("Initializing exec plugin",
Expand All @@ -177,39 +179,39 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

sourceChainName, destChainName, err := ccipconfig.ResolveChainNames(sourceChainID, destChainID)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
execLggr := lggr.Named("CCIPExecution").With("sourceChain", sourceChainName, "destChain", destChainName)
onRampReader, err := factory.NewOnRampReader(execLggr, versionFinder, params.offRampConfig.SourceChainSelector, params.offRampConfig.ChainSelector, params.offRampConfig.OnRamp, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "create onramp reader")
return nil, nil, nil, nil, errors.Wrap(err, "create onramp reader")
}
dynamicOnRampConfig, err := onRampReader.GetDynamicConfig()
if err != nil {
return nil, nil, nil, errors.Wrap(err, "get onramp dynamic config")
return nil, nil, nil, nil, errors.Wrap(err, "get onramp dynamic config")
}

routerAddr, err := ccipcalc.GenericAddrToEvm(dynamicOnRampConfig.Router)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed loading source router")
return nil, nil, nil, nil, errors.Wrap(err, "failed loading source router")
}
sourceWrappedNative, err := sourceRouter.GetWrappedNative(&bind.CallOpts{})
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not get source native token")
return nil, nil, nil, nil, errors.Wrap(err, "could not get source native token")
}

commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.offRampConfig.CommitStore, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), params.sourceChain.Config().EVM().GasEstimator().PriceMax().ToInt(), qopts...)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not load commitStoreReader reader")
return nil, nil, nil, nil, errors.Wrap(err, "could not load commitStoreReader reader")
}

tokenDataProviders, err := initTokenDataProviders(lggr, jobIDToString(jb.ID), params.pluginConfig, params.sourceChain.LogPoller(), qopts...)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not get token data providers")
return nil, nil, nil, nil, errors.Wrap(err, "could not get token data providers")
}

// Prom wrappers
Expand All @@ -220,11 +222,11 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

destChainSelector, err := chainselectors.SelectorFromChainId(uint64(destChainID))
if err != nil {
return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err)
return nil, nil, nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err)
}
sourceChainSelector, err := chainselectors.SelectorFromChainId(uint64(sourceChainID))
if err != nil {
return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err)
return nil, nil, nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err)
}

execLggr.Infow("Initialized exec plugin",
Expand All @@ -238,7 +240,7 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

tokenPoolBatchedReader, err := batchreader.NewEVMTokenPoolBatchedReader(execLggr, sourceChainSelector, offRampReader.Address(), batchCaller)
if err != nil {
return nil, nil, nil, fmt.Errorf("new token pool batched reader: %w", err)
return nil, nil, nil, nil, fmt.Errorf("new token pool batched reader: %w", err)
}

chainHealthcheck := cache.NewObservedChainHealthCheck(
Expand All @@ -259,6 +261,12 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
params.offRampConfig.OnRamp,
)

tokenBackgroundWorker := tokendata.NewBackgroundWorker(
tokenDataProviders,
numTokenDataWorkers,
5*time.Second,
offRampReader.OnchainConfig().PermissionLessExecutionThresholdSeconds,
)
return &ExecutionPluginStaticConfig{
lggr: execLggr,
onRampReader: onRampReader,
Expand All @@ -269,22 +277,17 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
destChainSelector: destChainSelector,
priceRegistryProvider: ccipdataprovider.NewEvmPriceRegistry(params.destChain.LogPoller(), params.destChain.Client(), execLggr, ccip.ExecPluginLabel),
tokenPoolBatchedReader: tokenPoolBatchedReader,
tokenDataWorker: tokendata.NewBackgroundWorker(
ctx,
tokenDataProviders,
numTokenDataWorkers,
5*time.Second,
offRampReader.OnchainConfig().PermissionLessExecutionThresholdSeconds,
),
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthcheck,
tokenDataWorker: tokenBackgroundWorker,
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthcheck,
}, &ccipcommon.BackfillArgs{
SourceLP: params.sourceChain.LogPoller(),
DestLP: params.destChain.LogPoller(),
SourceStartBlock: params.pluginConfig.SourceStartBlock,
DestStartBlock: params.pluginConfig.DestStartBlock,
},
chainHealthcheck,
tokenBackgroundWorker,
nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ccipexec

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -48,7 +47,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) {
for _, tc := range testCases {
chainSet := &legacyEvmORMMocks.LegacyChainContainer{}
t.Run(tc.description, func(t *testing.T) {
err := UnregisterExecPluginLpFilters(context.Background(), logger.TestLogger(t), job.Job{OCR2OracleSpec: tc.spec}, chainSet)
err := UnregisterExecPluginLpFilters(logger.TestLogger(t), job.Job{OCR2OracleSpec: tc.spec}, chainSet)
if tc.expectingErr {
assert.Error(t, err)
} else {
Expand Down
6 changes: 2 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
p.inflightReports.reports = tc.inflightReports
p.lggr = logger.TestLogger(t)
p.tokenDataWorker = tokendata.NewBackgroundWorker(
ctx, make(map[cciptypes.Address]tokendata.Reader), 10, 5*time.Second, time.Hour)
make(map[cciptypes.Address]tokendata.Reader), 10, 5*time.Second, time.Hour)
p.metricsCollector = ccip.NoopMetricsCollector

commitStoreReader := ccipdatamocks.NewCommitStoreReader(t)
Expand Down Expand Up @@ -668,8 +668,6 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
},
}

ctx := testutils.Context(t)

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -685,7 +683,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) {
mockOffRampReader.On("GetSenderNonce", mock.Anything, sender1).Return(uint64(0), nil).Maybe()

plugin := ExecutionReportingPlugin{
tokenDataWorker: tokendata.NewBackgroundWorker(ctx, map[cciptypes.Address]tokendata.Reader{}, 10, 5*time.Second, time.Hour),
tokenDataWorker: tokendata.NewBackgroundWorker(map[cciptypes.Address]tokendata.Reader{}, 10, 5*time.Second, time.Hour),
offRampReader: mockOffRampReader,
destWrappedNative: destNative,
offchainConfig: cciptypes.ExecOffchainConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewChainHealthcheck(lggr logger.Logger, onRamp ccipdata.OnRampReader, commi
func newChainHealthcheckWithCustomEviction(lggr logger.Logger, onRamp ccipdata.OnRampReader, commitStore ccipdata.CommitStoreReader, globalStatusDuration time.Duration, rmnStatusRefreshInterval time.Duration) *chainHealthcheck {
ctx, cancel := context.WithCancel(context.Background())

ch := &chainHealthcheck{
return &chainHealthcheck{
cache: cache.New(rmnStatusRefreshInterval, 0),
rmnStatusKey: rmnStatusKey,
globalStatusKey: globalStatusKey,
Expand All @@ -105,7 +105,6 @@ func newChainHealthcheckWithCustomEviction(lggr logger.Logger, onRamp ccipdata.O
backgroundCtx: ctx,
backgroundCancel: cancel,
}
return ch
}

type rmnResponse struct {
Expand Down Expand Up @@ -164,9 +163,9 @@ func (c *chainHealthcheck) Close() error {
}

func (c *chainHealthcheck) run() {
defer c.wg.Done()
ticker := time.NewTicker(c.rmnStatusRefreshInterval)
go func() {
defer c.wg.Done()
// Refresh the RMN state immediately after starting the background refresher
_, _ = c.refresh(c.backgroundCtx)

Expand Down
Loading

0 comments on commit 5b13060

Please sign in to comment.