Skip to content

Commit

Permalink
Commit NewReportingPlugin retries on error
Browse files Browse the repository at this point in the history
Previously, if there was an error when starting the Commit Pluging
(i.e. calling NewReportingPlugin), the Commit Plugin would remain in
a non-started state. Now, NewReportingPlugin will retry until the
Commit Plugin successfully starts.
  • Loading branch information
rstout committed Jul 8, 2024
1 parent d5ab3e6 commit 8947b21
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 39 deletions.
79 changes: 52 additions & 27 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
Expand Down Expand Up @@ -64,40 +65,60 @@ func (rf *CommitReportingPluginFactory) UpdateDynamicReaders(ctx context.Context
return nil
}

// NewReportingPlugin returns the ccip CommitReportingPlugin and satisfies the ReportingPluginFactory interface.
type reportingPluginAndInfo struct {
plugin types.ReportingPlugin
pluginInfo types.ReportingPluginInfo
}

// NewReportingPlugin registers a new ReportingPlugin
func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.ReportingPluginConfig) (types.ReportingPlugin, types.ReportingPluginInfo, error) {
ctx := context.Background() // todo: consider adding some timeout
initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay
maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay

destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
return pluginAndInfo.plugin, pluginAndInfo.pluginInfo, err
}

priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return nil, types.ReportingPluginInfo{}, err
}
// NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be
// retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Commit plugin to
// function, hence why we can only keep retrying it until it succeeds.
func (rf *CommitReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) {
return func() (reportingPluginAndInfo, error) {
ctx := context.Background() // todo: consider adding some timeout

pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
destPriceReg, err := rf.config.commitStore.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig)
if err != nil {
return reportingPluginAndInfo{}, err
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
priceRegEvmAddr, err := ccipcalc.GenericAddrToEvm(destPriceReg)
if err != nil {
return reportingPluginAndInfo{}, err
}
if err = rf.UpdateDynamicReaders(ctx, priceRegEvmAddr); err != nil {
return reportingPluginAndInfo{}, err
}

err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return nil, types.ReportingPluginInfo{}, err
}
pluginOffChainConfig, err := rf.config.commitStore.OffchainConfig(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
}

gasPriceEstimator, err := rf.config.commitStore.GasPriceEstimator(ctx)
if err != nil {
return reportingPluginAndInfo{}, err
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
return &CommitReportingPlugin{
err = rf.config.priceService.UpdateDynamicConfig(ctx, gasPriceEstimator, rf.destPriceRegReader)
if err != nil {
return reportingPluginAndInfo{}, err
}

lggr := rf.config.lggr.Named("CommitReportingPlugin")
plugin := &CommitReportingPlugin{
sourceChainSelector: rf.config.sourceChainSelector,
sourceNative: rf.config.sourceNative,
onRampReader: rf.config.onRampReader,
Expand All @@ -112,14 +133,18 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
priceService: rf.config.priceService,
},
types.ReportingPluginInfo{
}

pluginInfo := types.ReportingPluginInfo{
Name: "CCIPCommit",
UniqueReports: false, // See comment in CommitStore constructor.
Limits: types.ReportingPluginLimits{
MaxQueryLength: ccip.MaxQueryLength,
MaxObservationLength: ccip.MaxObservationLength,
MaxReportLength: MaxCommitReportLength,
},
}, nil
}

return reportingPluginAndInfo{plugin, pluginInfo}, nil
}
}
100 changes: 100 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipcommit/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package ccipcommit

import (
"errors"
"testing"
"time"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
ccipdataprovidermocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/ccipdataprovider/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
dbMocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdb/mocks"
)

// Assert that NewReportingPlugin keeps retrying until it succeeds.
//
// NewReportingPlugin makes several calls (e.g. CommitStoreReader.ChangeConfig) that can fail. We use mocks to cause the
// first call to each of these functions to fail, then all subsequent calls succeed. We assert that NewReportingPlugin
// retries a sufficient number of times to get through the transient errors and eventually succeed.
func TestNewReportingPluginRetriesUntilSuccess(t *testing.T) {
commitConfig := CommitPluginStaticConfig{}

// For this unit test, ensure that there is no delay between retries
commitConfig.newReportingPluginRetryConfig = ccipdata.RetryConfig{
InitialDelay: 0 * time.Nanosecond,
MaxDelay: 0 * time.Nanosecond,
}

// Set up the OffRampReader mock
mockCommitStore := new(mocks.CommitStoreReader)

// The first call is set to return an error, the following calls return a nil error
mockCommitStore.
On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything).
Return(ccip.Address(""), errors.New("")).
Once()
mockCommitStore.
On("ChangeConfig", mock.Anything, mock.Anything, mock.Anything).
Return(ccip.Address("0x7c6e4F0BDe29f83BC394B75a7f313B7E5DbD2d77"), nil).
Times(5)

mockCommitStore.
On("OffchainConfig", mock.Anything).
Return(ccip.CommitOffchainConfig{}, errors.New("")).
Once()
mockCommitStore.
On("OffchainConfig", mock.Anything).
Return(ccip.CommitOffchainConfig{}, nil).
Times(3)

mockCommitStore.
On("GasPriceEstimator", mock.Anything).
Return(nil, errors.New("")).
Once()
mockCommitStore.
On("GasPriceEstimator", mock.Anything).
Return(nil, nil).
Times(2)

commitConfig.commitStore = mockCommitStore

mockPriceService := new(dbMocks.PriceService)

mockPriceService.
On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("")).
Once()
mockPriceService.
On("UpdateDynamicConfig", mock.Anything, mock.Anything, mock.Anything).
Return(nil)

commitConfig.priceService = mockPriceService

priceRegistryProvider := new(ccipdataprovidermocks.PriceRegistry)
priceRegistryProvider.
On("NewPriceRegistryReader", mock.Anything, mock.Anything).
Return(nil, errors.New("")).
Once()
priceRegistryProvider.
On("NewPriceRegistryReader", mock.Anything, mock.Anything).
Return(nil, nil).
Once()
commitConfig.priceRegistryProvider = priceRegistryProvider

commitConfig.lggr, _ = logger.NewLogger()

factory := NewCommitReportingPluginFactory(commitConfig)
reportingConfig := types.ReportingPluginConfig{}
reportingConfig.OnchainConfig = []byte{1, 2, 3}
reportingConfig.OffchainConfig = []byte{1, 2, 3}

// Assert that NewReportingPlugin succeeds despite many transient internal failures (mocked out above)
_, _, err := factory.NewReportingPlugin(reportingConfig)
assert.Equal(t, nil, err)
}
26 changes: 15 additions & 11 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strings"
"time"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib"
Expand Down Expand Up @@ -43,6 +44,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}

func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider commontypes.CCIPCommitProvider, dstProvider commontypes.CCIPCommitProvider, chainSet legacyevm.LegacyChainContainer, jb job.Job, lggr logger.Logger, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, new bool, sourceChainID int64, destChainID int64, logError func(string)) ([]job.ServiceCtx, error) {
spec := jb.OCR2OracleSpec

Expand Down Expand Up @@ -171,17 +174,18 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
)

wrappedPluginFactory := NewCommitReportingPluginFactory(CommitPluginStaticConfig{
lggr: lggr,
onRampReader: onRampReader,
sourceChainSelector: staticConfig.SourceChainSelector,
sourceNative: sourceNative,
offRamp: offRampReader,
commitStore: commitStoreReader,
destChainSelector: staticConfig.ChainSelector,
priceRegistryProvider: ccip.NewChainAgnosticPriceRegistry(dstProvider),
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthCheck,
priceService: priceService,
lggr: lggr,
newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig,
onRampReader: onRampReader,
sourceChainSelector: staticConfig.SourceChainSelector,
sourceNative: sourceNative,
offRamp: offRampReader,
commitStore: commitStoreReader,
destChainSelector: staticConfig.ChainSelector,
priceRegistryProvider: ccip.NewChainAgnosticPriceRegistry(dstProvider),
metricsCollector: metricsCollector,
chainHealthcheck: chainHealthCheck,
priceService: priceService,
})
argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPCommit", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(destChainID))
argsNoPlugin.Logger = commonlogger.NewOCRWrapper(commitLggr, true, logError)
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type update struct {
}

type CommitPluginStaticConfig struct {
lggr logger.Logger
lggr logger.Logger
newReportingPluginRetryConfig ccipdata.RetryConfig
// Source
onRampReader ccipdata.OnRampReader
sourceChainSelector uint64
Expand Down

0 comments on commit 8947b21

Please sign in to comment.