Skip to content

Commit

Permalink
Updating execution factory to take sourceTokenAddress in constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickhuie19 committed Jul 8, 2024
1 parent 34e360b commit 3f199a7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
26 changes: 18 additions & 8 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/smartcontractkit/chainlink-vrf/altbn_128"
dkgpkg "github.com/smartcontractkit/chainlink-vrf/dkg"
"github.com/smartcontractkit/chainlink-vrf/ocr2vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm"
Expand Down Expand Up @@ -1970,12 +1971,27 @@ func (d *Delegate) ccipCommitGetSrcProvider(ctx context.Context, jb job.Job, plu
}

func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.SugaredLogger, jb job.Job, bootstrapPeers []commontypes.BootstrapperLocator, kb ocr2key.KeyBundle, ocrDB *db, lc ocrtypes.LocalConfig, transmitterID string) ([]job.ServiceCtx, error) {
if jb.OCR2OracleSpec == nil {
return nil, fmt.Errorf("spec is nil")
}
spec := jb.OCR2OracleSpec
if spec.Relay != relay.NetworkEVM {
return nil, fmt.Errorf("non evm chains are not supported for CCIP execution")
}
dstRid, err := spec.RelayID()
var pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig
err := json.Unmarshal(spec.PluginConfig.Bytes(), &pluginJobSpecConfig)
if err != nil {
return nil, err
}

err = pluginJobSpecConfig.USDCConfig.ValidateUSDCConfig()
if err != nil {
return nil, err
}

sourceTokenAddress := ccip.EvmAddrToGeneric(pluginJobSpecConfig.USDCConfig.SourceTokenAddress)

dstRid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
}
Expand All @@ -1986,12 +2002,6 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug

// PROVIDER BASED ARG CONSTRUCTION
// Write PluginConfig bytes to send source/dest relayer provider + info outside of top level rargs/pargs over the wire
var pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig
err = json.Unmarshal(spec.PluginConfig.Bytes(), &pluginJobSpecConfig)
if err != nil {
return nil, err
}

dstChainID, err := strconv.ParseInt(dstRid.ChainID, 10, 64)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2026,7 +2036,7 @@ func (d *Delegate) newServicesCCIPExecution(ctx context.Context, lggr logger.Sug
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer),
}

return ccipexec.NewExecServices(ctx, lggr, jb, srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError)
return ccipexec.NewExecServices(ctx, lggr, jb, string(sourceTokenAddress), srcProvider, dstProvider, int64(srcChainID), dstChainID, d.isNewlyCreatedJob, oracleArgsNoPlugin2, logError)
}

func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string) (types.CCIPExecProvider, error) {
Expand Down
21 changes: 6 additions & 15 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (rf *ExecutionReportingPluginFactory) HealthReport() map[string]error {
panic("implement me")
}

func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logger, srcChainID int64, dstChainID int64, srcProvider commontypes.CCIPExecProvider, dstProvider commontypes.CCIPExecProvider) (*ExecutionReportingPluginFactory, error) {
func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logger, sourceTokenAddress string, srcChainID int64, dstChainID int64, srcProvider commontypes.CCIPExecProvider, dstProvider commontypes.CCIPExecProvider) (*ExecutionReportingPluginFactory, error) {
// TODO: common logger is a subset of core logger.
// what's the golden path for passing a logger through from the plugin to the LOOP reporting plugin factory?
if lggr == nil {
Expand Down Expand Up @@ -104,27 +104,19 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge

tokenDataProviders := make(map[cciptypes.Address]tokendata.Reader)
// init usdc token data provider
if pluginConfig.USDCConfig.AttestationAPI != "" {
lggr.Infof("USDC token data provider enabled")
err2 := pluginConfig.USDCConfig.ValidateUSDCConfig()
if err2 != nil {
return nil, err2
}

usdcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.USDCConfig.SourceTokenAddress))
if err2 != nil {
return nil, fmt.Errorf("new usdc reader: %w", err2)
}
tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader
usdcReader, err2 := srcProvider.NewTokenDataReader(ctx, "")
if err2 != nil {
return nil, fmt.Errorf("new usdc reader: %w", err2)
}
tokenDataProviders[cciptypes.Address(sourceTokenAddress)] = usdcReader

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel)
commitStoreReader = observability.NewObservedCommitStoreReader(commitStoreReader, dstChainID, ccip.ExecPluginLabel)
offRampReader = observability.NewObservedOffRampReader(offRampReader, dstChainID, ccip.ExecPluginLabel)
metricsCollector := ccip.NewPluginMetricsCollector(ccip.ExecPluginLabel, srcChainID, dstChainID)

tokenPoolBatchedReader, err := dstProvider.NewTokenPoolBatchedReader(ctx, offRampAddress, srcChainSelector)
tokenPoolBatchedReader, err := dstProvider.NewTokenPoolBatchedReader(ctx, "", srcChainSelector)
if err != nil {
return nil, fmt.Errorf("new token pool batched reader: %w", err)
}
Expand All @@ -136,7 +128,6 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge
lggr.With(
"onramp", offRampConfig.OnRamp,
"commitStore", offRampConfig.CommitStore,
"offramp", offRampAddress,
),
onRampReader,
commitStoreReader,
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ var (

var defaultNewReportingPluginRetryConfig = ccipdata.RetryConfig{InitialDelay: time.Second, MaxDelay: 5 * time.Minute}

func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
wrappedPluginFactory, err := NewExecutionReportingPluginFactoryV2(ctx, lggr, srcChainID, dstChainID, srcProvider, dstProvider)
func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, sourceTokenAddress string, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string)) ([]job.ServiceCtx, error) {
wrappedPluginFactory, err := NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont
execPluginConfig.SourceStartBlock,
execPluginConfig.JobID,
usdcConfig.AttestationAPI,
string(ccip.EvmAddrToGeneric(usdcConfig.SourceTokenAddress)),
int(usdcConfig.AttestationAPITimeoutSeconds),
usdcConfig.AttestationAPIIntervalMilliseconds,
usdcConfig.SourceMessageTransmitterAddress,
Expand Down
11 changes: 7 additions & 4 deletions core/services/relay/evm/exec_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type SrcExecProvider struct {
maxGasPrice *big.Int
usdcReader *ccip.USDCReaderImpl
usdcAttestationAPI string
usdcSourceTokenAddress string
usdcAttestationAPITimeoutSeconds int
usdcAttestationAPIIntervalMilliseconds int
usdcSrcMsgTransmitterAddr common.Address
Expand All @@ -47,6 +48,7 @@ func NewSrcExecProvider(
startBlock uint64,
jobID string,
usdcAttestationAPI string,
usdcSourceTokenAddress string,
usdcAttestationAPITimeoutSeconds int,
usdcAttestationAPIIntervalMilliseconds int,
usdcSrcMsgTransmitterAddr common.Address,
Expand All @@ -70,6 +72,7 @@ func NewSrcExecProvider(
startBlock: startBlock,
usdcReader: usdcReader,
usdcAttestationAPI: usdcAttestationAPI,
usdcSourceTokenAddress: usdcSourceTokenAddress,
usdcAttestationAPITimeoutSeconds: usdcAttestationAPITimeoutSeconds,
usdcAttestationAPIIntervalMilliseconds: usdcAttestationAPIIntervalMilliseconds,
usdcSrcMsgTransmitterAddr: usdcSrcMsgTransmitterAddr,
Expand Down Expand Up @@ -147,12 +150,12 @@ func (s SrcExecProvider) NewPriceRegistryReader(ctx context.Context, addr ccipty
return
}

func (s SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) {
func (s SrcExecProvider) NewTokenDataReader(ctx context.Context, _ cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) {
attestationURI, err2 := url.ParseRequestURI(s.usdcAttestationAPI)
if err2 != nil {
return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err2)
}
tokenAddr, err2 := ccip.GenericAddrToEvm(tokenAddress)
tokenAddr, err2 := ccip.GenericAddrToEvm(cciptypes.Address(s.usdcSourceTokenAddress))
if err2 != nil {
return nil, fmt.Errorf("failed to parse token address: %w", err2)
}
Expand Down Expand Up @@ -296,7 +299,7 @@ func (d DstExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cc
return nil, fmt.Errorf("invalid: NewTokenDataReader called on DstExecProvider. It should only be called on SrcExecProvider")
}

func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampAddress cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) {
func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, _ cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) {
batchCaller := ccip.NewDynamicLimitedBatchCaller(
d.lggr,
d.client,
Expand All @@ -305,7 +308,7 @@ func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampA
uint(ccip.DefaultMaxParallelRpcCalls),
)

tokenPoolBatchedReader, err = ccip.NewEVMTokenPoolBatchedReader(d.lggr, sourceChainSelector, offRampAddress, batchCaller)
tokenPoolBatchedReader, err = ccip.NewEVMTokenPoolBatchedReader(d.lggr, sourceChainSelector, d.offRampAddress, batchCaller)
if err != nil {
return nil, fmt.Errorf("new token pool batched reader: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/cmd/chainlink-ccip-exec/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ func NewPlugin(lggr logger.Logger) *Plugin {
return &Plugin{Plugin: loop.Plugin{Logger: lggr}, stop: make(services.StopChan)}
}

func (p *Plugin) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64) (types.ReportingPluginFactory, error) {
return ccipexec.NewExecutionReportingPluginFactoryV2(ctx, nil, srcChainID, dstChainID, srcProvider, dstProvider)
func (p *Plugin) NewExecutionFactory(ctx context.Context, srcProvider types.CCIPExecProvider, dstProvider types.CCIPExecProvider, srcChainID int64, dstChainID int64, sourceTokenAddress string) (types.ReportingPluginFactory, error) {
return ccipexec.NewExecutionReportingPluginFactoryV2(ctx, nil, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider)
}

0 comments on commit 3f199a7

Please sign in to comment.