Skip to content

Commit

Permalink
Using Service interface for wrapping background worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Mar 12, 2024
1 parent 7cfe6c9 commit d632c01
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 43 deletions.
56 changes: 32 additions & 24 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...)
pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand All @@ -60,16 +60,22 @@ func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chai
}
// If this is a brand-new job, then we make use of the start blocks. If not then we're rebooting and log poller will pick up where we left off.
if new {
return []job.ServiceCtx{oraclelib.NewBackfilledOracle(
pluginConfig.lggr,
backfillArgs.SourceLP,
backfillArgs.DestLP,
backfillArgs.SourceStartBlock,
backfillArgs.DestStartBlock,
job.NewServiceAdapter(oracle)),
return []job.ServiceCtx{
oraclelib.NewBackfilledOracle(
pluginConfig.lggr,
backfillArgs.SourceLP,
backfillArgs.DestLP,
backfillArgs.SourceStartBlock,
backfillArgs.DestStartBlock,
job.NewServiceAdapter(oracle),
),
chainHealthcheck,
}, nil
}
return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil
return []job.ServiceCtx{
job.NewServiceAdapter(oracle),
chainHealthcheck,
}, nil
}

func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (func(report []byte) (*txmgr.TxMeta, error), error) {
Expand Down Expand Up @@ -108,10 +114,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb
return multiErr
}

func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, error) {
func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

lggr.Infow("Initializing commit plugin",
Expand All @@ -124,11 +130,11 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
versionFinder := factory.NewEvmVersionFinder()
commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "could not create commitStore reader")
return nil, nil, nil, errors.Wrap(err, "could not create commitStore reader")
}
sourceChainName, destChainName, err := ccipconfig.ResolveChainNames(params.sourceChain.ID().Int64(), params.destChain.ID().Int64())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainName, "destChain", destChainName)

Expand All @@ -137,12 +143,12 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
if withPipeline {
priceGetter, err = pricegetter.NewPipelineGetter(params.pluginConfig.TokenPricesUSDPipeline, pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), lggr)
if err != nil {
return nil, nil, fmt.Errorf("creating pipeline price getter: %w", err)
return nil, nil, nil, fmt.Errorf("creating pipeline price getter: %w", err)
}
} else {
// Use dynamic price getter.
if params.pluginConfig.PriceGetterConfig == nil {
return nil, nil, fmt.Errorf("priceGetterConfig is nil")
return nil, nil, nil, fmt.Errorf("priceGetterConfig is nil")
}

// Build price getter clients for all chains specified in the aggregator configurations.
Expand All @@ -153,7 +159,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
// Retrieve the chain.
chain, _, err2 := ccipconfig.GetChainByChainID(chainSet, chainID)
if err2 != nil {
return nil, nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
return nil, nil, nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2)
}
caller := rpclib.NewDynamicLimitedBatchCaller(
lggr,
Expand All @@ -166,35 +172,35 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run

priceGetter, err = pricegetter.NewDynamicPriceGetter(*params.pluginConfig.PriceGetterConfig, priceGetterClients)
if err != nil {
return nil, nil, fmt.Errorf("creating dynamic price getter: %w", err)
return nil, nil, nil, fmt.Errorf("creating dynamic price getter: %w", err)
}
}

// Load all the readers relevant for this plugin.
onrampAddress := cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String())
onRampReader, err := factory.NewOnRampReader(commitLggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, onrampAddress, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "failed onramp reader")
return nil, nil, nil, errors.Wrap(err, "failed onramp reader")
}
offRampReader, err := factory.NewOffRampReader(commitLggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), true, qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "failed offramp reader")
return nil, nil, nil, errors.Wrap(err, "failed offramp reader")
}
onRampRouterAddr, err := onRampReader.RouterAddress()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
routerAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client())
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
sourceNative, err := sourceRouter.GetWrappedNative(nil)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

// Prom wrappers
Expand Down Expand Up @@ -245,7 +251,9 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run
DestLP: params.destChain.LogPoller(),
SourceStartBlock: params.pluginConfig.SourceStartBlock,
DestStartBlock: params.pluginConfig.DestStartBlock,
}, nil
},
chainHealthcheck,
nil
}

type jobSpecParams struct {
Expand Down
41 changes: 24 additions & 17 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
const numTokenDataWorkers = 5

func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet, qopts...)
execPluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand All @@ -71,10 +71,15 @@ func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, c
backfillArgs.DestLP,
backfillArgs.SourceStartBlock,
backfillArgs.DestStartBlock,
job.NewServiceAdapter(oracle)),
job.NewServiceAdapter(oracle),
),
chainHealthcheck,
}, nil
}
return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil
return []job.ServiceCtx{
job.NewServiceAdapter(oracle),
chainHealthcheck,
}, nil
}

// UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains.
Expand Down Expand Up @@ -153,10 +158,10 @@ func initTokenDataProviders(lggr logger.Logger, jobID string, pluginConfig ccipc
return tokenDataProviders, nil
}

func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, error) {
func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) {
params, err := extractJobSpecParams(lggr, jb, chainSet, true, qopts...)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

lggr.Infow("Initializing exec plugin",
Expand All @@ -172,39 +177,39 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

sourceChainName, destChainName, err := ccipconfig.ResolveChainNames(sourceChainID, destChainID)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
execLggr := lggr.Named("CCIPExecution").With("sourceChain", sourceChainName, "destChain", destChainName)
onRampReader, err := factory.NewOnRampReader(execLggr, versionFinder, params.offRampConfig.SourceChainSelector, params.offRampConfig.ChainSelector, params.offRampConfig.OnRamp, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "create onramp reader")
return nil, nil, nil, errors.Wrap(err, "create onramp reader")
}
dynamicOnRampConfig, err := onRampReader.GetDynamicConfig()
if err != nil {
return nil, nil, errors.Wrap(err, "get onramp dynamic config")
return nil, nil, nil, errors.Wrap(err, "get onramp dynamic config")
}

routerAddr, err := ccipcalc.GenericAddrToEvm(dynamicOnRampConfig.Router)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client())
if err != nil {
return nil, nil, errors.Wrap(err, "failed loading source router")
return nil, nil, nil, errors.Wrap(err, "failed loading source router")
}
sourceWrappedNative, err := sourceRouter.GetWrappedNative(&bind.CallOpts{})
if err != nil {
return nil, nil, errors.Wrap(err, "could not get source native token")
return nil, nil, nil, errors.Wrap(err, "could not get source native token")
}

commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.offRampConfig.CommitStore, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "could not load commitStoreReader reader")
return nil, nil, nil, errors.Wrap(err, "could not load commitStoreReader reader")
}

tokenDataProviders, err := initTokenDataProviders(lggr, jobIDToString(jb.ID), params.pluginConfig, params.sourceChain.LogPoller(), qopts...)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get token data providers")
return nil, nil, nil, errors.Wrap(err, "could not get token data providers")
}

// Prom wrappers
Expand All @@ -215,11 +220,11 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

destChainSelector, err := chainselectors.SelectorFromChainId(uint64(destChainID))
if err != nil {
return nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err)
return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err)
}
sourceChainSelector, err := chainselectors.SelectorFromChainId(uint64(sourceChainID))
if err != nil {
return nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err)
return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err)
}

execLggr.Infow("Initialized exec plugin",
Expand All @@ -233,7 +238,7 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

tokenPoolBatchedReader, err := batchreader.NewEVMTokenPoolBatchedReader(execLggr, sourceChainSelector, offRampReader.Address(), batchCaller)
if err != nil {
return nil, nil, fmt.Errorf("new token pool batched reader: %w", err)
return nil, nil, nil, fmt.Errorf("new token pool batched reader: %w", err)
}

chainHealthcheck := cache.NewObservedChainHealthCheck(
Expand Down Expand Up @@ -278,7 +283,9 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J
DestLP: params.destChain.LogPoller(),
SourceStartBlock: params.pluginConfig.SourceStartBlock,
DestStartBlock: params.pluginConfig.DestStartBlock,
}, nil
},
chainHealthcheck,
nil
}

type jobSpecParams struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)

Expand All @@ -32,6 +33,7 @@ import (
//
//go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore
type ChainHealthcheck interface {
job.ServiceCtx
// IsHealthy checks if the chain is healthy and returns true if it is, false otherwise.
IsHealthy(ctx context.Context) (bool, error)
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func (c *chainHealthcheck) IsHealthy(ctx context.Context) (bool, error) {
return true, nil
}

func (c *chainHealthcheck) Start() error {
func (c *chainHealthcheck) Start(context.Context) error {
return c.StateMachine.StartOnce("ChainHealthcheck", func() error {
c.lggr.Info("Starting ChainHealthcheck")
c.wg.Add(1)
Expand Down Expand Up @@ -222,6 +224,7 @@ func (c *chainHealthcheck) checkIfRMNsAreHealthy(ctx context.Context) (bool, err
}

// If the value is not found in the cache, fetch the RMN curse state in a sync manner for the first time
c.lggr.Warnw("Refreshing RMN state from the plugin routine, this should happen only once per lane during boot")
return c.refresh(ctx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func Test_RefreshingInBackground(t *testing.T) {
10*time.Microsecond,
)
defer chainState.backgroundCancel()
require.NoError(t, chainState.Start())
require.NoError(t, chainState.Start(tests.Context(t)))

// All healthy
assertHealthy(t, chainState, true)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d632c01

Please sign in to comment.