From 3f199a78597a5540d180d94e115225d5f64890a5 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Mon, 8 Jul 2024 13:37:09 -0400 Subject: [PATCH] Updating execution factory to take sourceTokenAddress in constructor --- core/services/ocr2/delegate.go | 26 +++++++++++++------ .../ocr2/plugins/ccip/ccipexec/factory.go | 21 +++++---------- .../plugins/ccip/ccipexec/initializers.go | 4 +-- core/services/relay/evm/evm.go | 1 + core/services/relay/evm/exec_provider.go | 11 +++++--- plugins/cmd/chainlink-ccip-exec/plugin.go | 4 +-- 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 55c16e5e7b..97c099dc65 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -43,6 +43,7 @@ import ( "github.com/smartcontractkit/chainlink-vrf/altbn_128" dkgpkg "github.com/smartcontractkit/chainlink-vrf/dkg" "github.com/smartcontractkit/chainlink-vrf/ocr2vrf" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -1970,12 +1971,27 @@ func (d *Delegate) ccipCommitGetSrcProvider(ctx context.Context, jb job.Job, plu } func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) { + if jb.OCR2OracleSpec == nil { + return nil, fmt.Errorf("spec is nil") + } spec := jb.OCR2OracleSpec if spec.Relay != relay.NetworkEVM { return nil, fmt.Errorf("non evm chains are not supported for CCIP execution") } - dstRid, err := spec.RelayID() + var pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig + err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginJobSpecConfig) + if err != nil { + return nil, err + } + + err = pluginJobSpecConfig.USDCConfig.ValidateUSDCConfig() + if err != nil { + return nil, err + } + + sourceTokenAddress := ccip.EvmAddrToGeneric(pluginJobSpecConfig.USDCConfig.SourceTokenAddress) + dstRid, err := spec.RelayID() if err != nil { return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)} } @@ -1986,12 +2002,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug // PROVIDER BASED ARG CONSTRUCTION // Write PluginConfig bytes to send source/dest relayer provider + info outside of top level rargs/pargs over the wire - var pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig - err = json.Unmarshal(spec.PluginConfig.Bytes(), &pluginJobSpecConfig) - if err != nil { - return nil, err - } - dstChainID, err := strconv.ParseInt(dstRid.ChainID, 10, 64) if err != nil { return nil, err @@ -2026,7 +2036,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer), } - return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError) + return ccipexec.NewExecServices(ctx, lggr, jb, string(sourceTokenAddress), srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError) } func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string) (types.CCIPExecProvider, error) { diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index 940d87f881..e24185472d 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -54,7 +54,7 @@ func (rf *ExecutionReportingPluginFactory) HealthReport() map[string]error { panic("implement me") } -func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logger, srcChainID int64, dstChainID int64, srcProvider commontypes.CCIPExecProvider, dstProvider commontypes.CCIPExecProvider) (*ExecutionReportingPluginFactory, error) { +func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logger, sourceTokenAddress string, srcChainID int64, dstChainID int64, srcProvider commontypes.CCIPExecProvider, dstProvider commontypes.CCIPExecProvider) (*ExecutionReportingPluginFactory, error) { // TODO: common logger is a subset of core logger. // what's the golden path for passing a logger through from the plugin to the LOOP reporting plugin factory? if lggr == nil { @@ -104,19 +104,11 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge tokenDataProviders := make(map[cciptypes.Address]tokendata.Reader) // init usdc token data provider - if pluginConfig.USDCConfig.AttestationAPI != "" { - lggr.Infof("USDC token data provider enabled") - err2 := pluginConfig.USDCConfig.ValidateUSDCConfig() - if err2 != nil { - return nil, err2 - } - - usdcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.USDCConfig.SourceTokenAddress)) - if err2 != nil { - return nil, fmt.Errorf("new usdc reader: %w", err2) - } - tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader + usdcReader, err2 := srcProvider.NewTokenDataReader(ctx, "") + if err2 != nil { + return nil, fmt.Errorf("new usdc reader: %w", err2) } + tokenDataProviders[cciptypes.Address(sourceTokenAddress)] = usdcReader // Prom wrappers onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel) @@ -124,7 +116,7 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge offRampReader = observability.NewObservedOffRampReader(offRampReader, dstChainID, ccip.ExecPluginLabel) metricsCollector := ccip.NewPluginMetricsCollector(ccip.ExecPluginLabel, srcChainID, dstChainID) - tokenPoolBatchedReader, err := dstProvider.NewTokenPoolBatchedReader(ctx, offRampAddress, srcChainSelector) + tokenPoolBatchedReader, err := dstProvider.NewTokenPoolBatchedReader(ctx, "", srcChainSelector) if err != nil { return nil, fmt.Errorf("new token pool batched reader: %w", err) } @@ -136,7 +128,6 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge lggr.With( "onramp", offRampConfig.OnRamp, "commitStore", offRampConfig.CommitStore, - "offramp", offRampAddress, ), onRampReader, commitStoreReader, diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index 022fafe5ea..f63e55a4b7 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -46,8 +46,8 @@ var ( var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute} -func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { - wrappedPluginFactory, err := NewExecutionReportingPluginFactoryV2(ctx, lggr, srcChainID, dstChainID, srcProvider, dstProvider) +func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, sourceTokenAddress string, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) { + wrappedPluginFactory, err := NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider) if err != nil { return nil, err } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index a700a4f6d6..b4cac9025f 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -486,6 +486,7 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont execPluginConfig.SourceStartBlock, execPluginConfig.JobID, usdcConfig.AttestationAPI, + string(ccip.EvmAddrToGeneric(usdcConfig.SourceTokenAddress)), int(usdcConfig.AttestationAPITimeoutSeconds), usdcConfig.AttestationAPIIntervalMilliseconds, usdcConfig.SourceMessageTransmitterAddress, diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 3a29ca4bac..3313386036 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -32,6 +32,7 @@ type SrcExecProvider struct { maxGasPrice *big.Int usdcReader *ccip.USDCReaderImpl usdcAttestationAPI string + usdcSourceTokenAddress string usdcAttestationAPITimeoutSeconds int usdcAttestationAPIIntervalMilliseconds int usdcSrcMsgTransmitterAddr common.Address @@ -47,6 +48,7 @@ func NewSrcExecProvider( startBlock uint64, jobID string, usdcAttestationAPI string, + usdcSourceTokenAddress string, usdcAttestationAPITimeoutSeconds int, usdcAttestationAPIIntervalMilliseconds int, usdcSrcMsgTransmitterAddr common.Address, @@ -70,6 +72,7 @@ func NewSrcExecProvider( startBlock: startBlock, usdcReader: usdcReader, usdcAttestationAPI: usdcAttestationAPI, + usdcSourceTokenAddress: usdcSourceTokenAddress, usdcAttestationAPITimeoutSeconds: usdcAttestationAPITimeoutSeconds, usdcAttestationAPIIntervalMilliseconds: usdcAttestationAPIIntervalMilliseconds, usdcSrcMsgTransmitterAddr: usdcSrcMsgTransmitterAddr, @@ -147,12 +150,12 @@ func (s SrcExecProvider) NewPriceRegistryReader(ctx context.Context, addr ccipty return } -func (s SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) { +func (s SrcExecProvider) NewTokenDataReader(ctx context.Context, _ cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) { attestationURI, err2 := url.ParseRequestURI(s.usdcAttestationAPI) if err2 != nil { return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err2) } - tokenAddr, err2 := ccip.GenericAddrToEvm(tokenAddress) + tokenAddr, err2 := ccip.GenericAddrToEvm(cciptypes.Address(s.usdcSourceTokenAddress)) if err2 != nil { return nil, fmt.Errorf("failed to parse token address: %w", err2) } @@ -296,7 +299,7 @@ func (d DstExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cc return nil, fmt.Errorf("invalid: NewTokenDataReader called on DstExecProvider. It should only be called on SrcExecProvider") } -func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampAddress cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) { +func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, _ cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) { batchCaller := ccip.NewDynamicLimitedBatchCaller( d.lggr, d.client, @@ -305,7 +308,7 @@ func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampA uint(ccip.DefaultMaxParallelRpcCalls), ) - tokenPoolBatchedReader, err = ccip.NewEVMTokenPoolBatchedReader(d.lggr, sourceChainSelector, offRampAddress, batchCaller) + tokenPoolBatchedReader, err = ccip.NewEVMTokenPoolBatchedReader(d.lggr, sourceChainSelector, d.offRampAddress, batchCaller) if err != nil { return nil, fmt.Errorf("new token pool batched reader: %w", err) } diff --git a/plugins/cmd/chainlink-ccip-exec/plugin.go b/plugins/cmd/chainlink-ccip-exec/plugin.go index 205506de9e..5ee9fc06a9 100644 --- a/plugins/cmd/chainlink-ccip-exec/plugin.go +++ b/plugins/cmd/chainlink-ccip-exec/plugin.go @@ -20,6 +20,6 @@ func NewPlugin(lggr logger.Logger) *Plugin { return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)} } -func (p *Plugin) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64) (types.ReportingPluginFactory, error) { - return ccipexec.NewExecutionReportingPluginFactoryV2(ctx, nil, srcChainID, dstChainID, srcProvider, dstProvider) +func (p *Plugin) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (types.ReportingPluginFactory, error) { + return ccipexec.NewExecutionReportingPluginFactoryV2(ctx, nil, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider) }