Skip to content

Commit

Permalink
Log NewReportingPlugin errors for Commit and Exec plugins
Browse files Browse the repository at this point in the history
Also output metrics
  • Loading branch information
rstout committed Jul 12, 2024
1 parent 2bb582b commit 3614f1c
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 19 deletions.
29 changes: 21 additions & 8 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ type reportingPluginAndInfo struct {
func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -86,35 +89,35 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to
// function, hence why we can only keep retrying it until it succeeds.
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider adding some timeout

destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.ChangeConfig error: %w", err)
}

priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("GenericAddrToEvm error: %w", err)
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("UpdateDynamicReaders error: %w", err)
}

pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.OffchainConfig error: %w", err)
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("commitStore.GasPriceEstimator error: %w", err)
}

err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return reportingPluginAndInfo{}, err
return reportingPluginAndInfo{}, fmt.Errorf("priceService.UpdateDynamicConfig error: %w", err)
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
Expand Down Expand Up @@ -147,4 +150,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.Report

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
3 changes: 3 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -24,6 +25,8 @@ import (
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
commitConfig := CommitPluginStaticConfig{}
commitConfig.lggr = logger.TestLogger(t)
commitConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down
6 changes: 5 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
MaxRetries: 6 * 24, // Retry for approximately 24hrs (MaxDelay of 10m = 6 times per hour, times 24 hours)
}

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec
Expand Down
17 changes: 15 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ type reportingPluginAndInfo struct {
func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay
maxRetries := rf.config.newReportingPluginRetryConfig.MaxRetries

pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(
rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay, maxRetries,
)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
Expand All @@ -82,7 +85,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function,
// hence why we can only keep retrying it until it succeeds.
func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
newReportingPluginFn := func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider setting a timeout

destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
Expand Down Expand Up @@ -154,4 +157,14 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}

return func() (reportingPluginAndInfo, error) {
result, err := newReportingPluginFn()
if err != nil {
rf.config.lggr.Errorw("NewReportingPlugin failed", "err", err)
rf.config.metricsCollector.NewReportingPluginError()
}

return result, err
}
}
3 changes: 3 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
ccip2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
Expand All @@ -23,6 +24,8 @@ import (
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
execConfig := ExecutionPluginStaticConfig{}
execConfig.lggr = logger.TestLogger(t)
execConfig.metricsCollector = ccip2.NoopMetricsCollector

// For this unit test, ensure that there is no delay between retries
execConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
Expand Down
6 changes: 5 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ var (
tokenDataWorkerNumWorkers = 5
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}
var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: time.Second,
MaxDelay: 10 * time.Minute,
MaxRetries: 6 * 24, // Retry for approximately 24hrs (MaxDelay of 10m = 6 times per hour, times 24 hours)
}

func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
if jb.OCR2OracleSpec == nil {
Expand Down
13 changes: 9 additions & 4 deletions core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,19 @@ func SelectorToBytes(chainSelector uint64) [16]byte {
return b
}

// RetryUntilSuccess repeatedly calls fn until it returns a nil error. After each failed call there is an exponential
// backoff applied, between initialDelay and maxDelay.
func RetryUntilSuccess[T any](fn func() (T, error), initialDelay time.Duration, maxDelay time.Duration) (T, error) {
// RetryUntilSuccess repeatedly calls fn until it returns a nil error or retries have been exhausted. After each failed
// call there is an exponential backoff applied, between initialDelay and maxDelay.
func RetryUntilSuccess[T any](
fn func() (T, error),
initialDelay time.Duration,
maxDelay time.Duration,
maxRetries uint,
) (T, error) {
return retry.DoWithData(
fn,
retry.Delay(initialDelay),
retry.MaxDelay(maxDelay),
retry.DelayType(retry.BackOffDelay),
retry.UntilSucceeded(),
retry.Attempts(maxRetries),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,20 @@ func TestRetryUntilSuccess(t *testing.T) {
}

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 5th attempt
numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay)
numCalls, err := RetryUntilSuccess(fn, initialDelay, maxDelay, 10)
assert.Nil(t, err)
assert.Equal(t, 5, numCalls)

// Assert that RetryUntilSuccess returns the expected value when fn returns success on the 8th attempt
numAttempts = 8
numCalls = 0
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay)
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 10)
assert.Nil(t, err)
assert.Equal(t, 8, numCalls)

// Assert that RetryUntilSuccess exhausts retries
numAttempts = 8
numCalls = 0
numCalls, err = RetryUntilSuccess(fn, initialDelay, maxDelay, 2)
assert.NotNil(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package ccipdata

import "time"

// RetryConfig configures an initial delay between retries and a max delay between retries
// RetryConfig configures an initial delay between retries, a max delay between retries, and a maximum number of
// times to retry
type RetryConfig struct {
InitialDelay time.Duration
MaxDelay time.Duration
MaxRetries uint
}
14 changes: 14 additions & 0 deletions core/services/ocr2/plugins/ccip/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var (
Name: "ccip_sequence_number_counter",
Help: "Sequence number of the last message processed by the plugin",
}, []string{"plugin", "source", "dest", "ocrPhase"})
newReportingPluginErrorCounter = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "ccip_new_reporting_plugin_error_counter",
Help: "The count of the number of errors when calling NewReportingPlugin",
}, []string{"plugin"})
)

type ocrPhase string
Expand All @@ -35,6 +39,7 @@ type PluginMetricsCollector interface {
NumberOfMessagesBasedOnInterval(phase ocrPhase, seqNrMin, seqNrMax uint64)
UnexpiredCommitRoots(count int)
SequenceNumber(phase ocrPhase, seqNr uint64)
NewReportingPluginError()
}

type pluginMetricsCollector struct {
Expand Down Expand Up @@ -79,6 +84,12 @@ func (p *pluginMetricsCollector) SequenceNumber(phase ocrPhase, seqNr uint64) {
Set(float64(seqNr))
}

func (p *pluginMetricsCollector) NewReportingPluginError() {
newReportingPluginErrorCounter.
WithLabelValues(p.pluginName).
Inc()
}

var (
// NoopMetricsCollector is a no-op implementation of PluginMetricsCollector
NoopMetricsCollector PluginMetricsCollector = noop{}
Expand All @@ -97,3 +108,6 @@ func (d noop) UnexpiredCommitRoots(int) {

func (d noop) SequenceNumber(ocrPhase, uint64) {
}

func (d noop) NewReportingPluginError() {
}

0 comments on commit 3614f1c

Please sign in to comment.