Skip to content

Commit

Permalink
using provider based commitStoreReader for exec (#1150)
Browse files Browse the repository at this point in the history
## Motivation
Follow on to #1080, using a provider based commitStoreReader in the exec
plugin

---------

Co-authored-by: ilija42 <[email protected]>
Co-authored-by: Aaron Lu <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent 93ce593 commit a8b921c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 28 deletions.
12 changes: 1 addition & 11 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,10 +1976,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
dstChain, err := d.legacyChains.Get(dstRid.ChainID)
if err != nil {
return nil, fmt.Errorf("ccip services; failed to get chain %s: %w", dstRid.ChainID, err)
}

logError := func(msg string) {
lggr.ErrorIf(d.jobORM.RecordError(context.Background(), jb.ID, msg), "unable to record error")
Expand Down Expand Up @@ -2008,12 +2004,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
return nil, err
}

srcChainIDstr := strconv.FormatUint(srcChainID, 10)
srcChain, err := d.legacyChains.Get(srcChainIDstr)
if err != nil {
return nil, fmt.Errorf("open source chain: %w", err)
}

oracleArgsNoPlugin2 := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Expand All @@ -2033,7 +2023,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer),
}

return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, srcChain, dstChain, int64(srcChainID), dstChainID, d.legacyChains, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError)
return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError)
}

func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string) (types.CCIPExecProvider, error) {
Expand Down
5 changes: 4 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c
}

commitStoreAddress := common.HexToAddress(spec.ContractID)

// commit store contract doesn't exist on the source chain, but we have an implementation of it
// to get access to a gas estimator on the source chain
srcCommitStore, err := srcProvider.NewCommitStoreReader(ctx, ccipcalc.EvmAddrToGeneric(commitStoreAddress))
if err != nil {
return nil, err
Expand Down Expand Up @@ -222,7 +225,7 @@ func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (f
// https://github.com/smartcontractkit/ccip/blob/68e2197472fb017dd4e5630d21e7878d58bc2a44/core/services/feeds/service.go#L716
// TODO once that transaction is broken up, we should be able to simply rely on oracle.Close() to cleanup the filters.
// Until then we have to deterministically reload the readers from the spec (and thus their filters) and close them.
func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer) error {
func UnregisterCommitPluginLpFilters(_ context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer) error {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return err
Expand Down
17 changes: 7 additions & 10 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}

func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChain legacyevm.Chain, dstChain legacyevm.Chain, srcChainID int64, dstChainID int64, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
if jb.OCR2OracleSpec == nil {
return nil, fmt.Errorf("spec is nil")
}
Expand Down Expand Up @@ -89,21 +89,18 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro
return nil, fmt.Errorf("get source wrapped native token: %w", err)
}

versionFinder := ccip.NewEvmVersionFinder()
commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, offRampConfig.CommitStore, dstChain.Client(), dstChain.LogPoller())
srcCommitStore, err := srcProvider.NewCommitStoreReader(ctx, offRampConfig.CommitStore)
if err != nil {
return nil, fmt.Errorf("could not load commitStoreReader reader: %w", err)
return nil, fmt.Errorf("could not create src commitStoreReader reader: %w", err)
}

err = commitStoreReader.SetGasEstimator(ctx, srcChain.GasEstimator())
dstCommitStore, err := dstProvider.NewCommitStoreReader(ctx, offRampConfig.CommitStore)
if err != nil {
return nil, fmt.Errorf("could not set gas estimator: %w", err)
return nil, fmt.Errorf("could not create dst commitStoreReader reader: %w", err)
}

err = commitStoreReader.SetSourceMaxGasPrice(ctx, srcChain.Config().EVM().GasEstimator().PriceMax().ToInt())
if err != nil {
return nil, fmt.Errorf("could not set source max gas price: %w", err)
}
var commitStoreReader ccipdata.CommitStoreReader
commitStoreReader = ccip.NewProviderProxyCommitStoreReader(srcCommitStore, dstCommitStore)

tokenDataProviders := make(map[cciptypes.Address]tokendata.Reader)
// init usdc token data provider
Expand Down
2 changes: 2 additions & 0 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont
r.lggr,
versionFinder,
r.chain.Client(),
r.chain.GasEstimator(),
r.chain.Config().EVM().GasEstimator().PriceMax().ToInt(),
r.chain.LogPoller(),
execPluginConfig.SourceStartBlock,
execPluginConfig.JobID,
Expand Down
19 changes: 13 additions & 6 deletions core/services/relay/evm/exec_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type SrcExecProvider struct {
client client.Client
lp logpoller.LogPoller
startBlock uint64
estimator gas.EvmFeeEstimator
maxGasPrice *big.Int
usdcReader *ccip.USDCReaderImpl
usdcAttestationAPI string
usdcAttestationAPITimeoutSeconds int
Expand All @@ -39,6 +41,8 @@ func NewSrcExecProvider(
lggr logger.Logger,
versionFinder ccip.VersionFinder,
client client.Client,
estimator gas.EvmFeeEstimator,
maxGasPrice *big.Int,
lp logpoller.LogPoller,
startBlock uint64,
jobID string,
Expand All @@ -60,6 +64,8 @@ func NewSrcExecProvider(
lggr: lggr,
versionFinder: versionFinder,
client: client,
estimator: estimator,
maxGasPrice: maxGasPrice,
lp: lp,
startBlock: startBlock,
usdcReader: usdcReader,
Expand Down Expand Up @@ -120,9 +126,9 @@ func (s SrcExecProvider) Codec() commontypes.Codec {
return nil
}

func (s SrcExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (cciptypes.CommitStoreReader, error) {
// TODO CCIP-2493
return nil, fmt.Errorf("invalid: NewCommitStoreReader not implemented")
func (s SrcExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (commitStoreReader cciptypes.CommitStoreReader, err error) {
commitStoreReader = NewIncompleteSourceCommitStoreReader(s.estimator, s.maxGasPrice)
return
}

func (s SrcExecProvider) NewOffRampReader(ctx context.Context, addr cciptypes.Address) (cciptypes.OffRampReader, error) {
Expand Down Expand Up @@ -262,9 +268,10 @@ func (d DstExecProvider) Codec() commontypes.Codec {
return nil
}

func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (cciptypes.CommitStoreReader, error) {
// TODO CCIP-2493
return nil, fmt.Errorf("invalid: NewCommitStoreReader not yet implemented")
func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptypes.Address) (commitStoreReader cciptypes.CommitStoreReader, err error) {
versionFinder := ccip.NewEvmVersionFinder()
commitStoreReader, err = NewIncompleteDestCommitStoreReader(d.lggr, versionFinder, addr, d.client, d.lp)
return
}

func (d DstExecProvider) NewOffRampReader(ctx context.Context, offRampAddress cciptypes.Address) (offRampReader cciptypes.OffRampReader, err error) {
Expand Down

0 comments on commit a8b921c

Please sign in to comment.