diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 615bcbc67b..966014b239 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -293,7 +293,7 @@ func (d *Delegate) cleanupEVM(jb job.Job, q pg.Queryer, relayID relay.ID) error } return nil case types.CCIPExecution: - err = ccip.UnregisterExecPluginLpFilters(context.Background(), spec, d.legacyChains, pg.WithQueryer(q)) + err = ccip.UnregisterExecPluginLpFilters(context.Background(), d.lggr, spec, d.legacyChains, pg.WithQueryer(q)) if err != nil { d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err, "spec", spec) } diff --git a/core/services/ocr2/plugins/ccip/abihelpers/abi_helpers.go b/core/services/ocr2/plugins/ccip/abihelpers/abi_helpers.go index 52b8d983f5..1ed681f271 100644 --- a/core/services/ocr2/plugins/ccip/abihelpers/abi_helpers.go +++ b/core/services/ocr2/plugins/ccip/abihelpers/abi_helpers.go @@ -45,6 +45,8 @@ var EventSignatures struct { FeeTokenAdded common.Hash FeeTokenRemoved common.Hash + USDCMessageSent common.Hash + // offset || sourceChainID || seqNum || ... SendRequestedSequenceNumberWord int // offset || priceUpdatesOffset || minSeqNum || maxSeqNum || merkleRoot @@ -128,6 +130,8 @@ func init() { panic("missing event 'manuallyExecute'") } ExecutionReportArgs = manuallyExecuteMethod.Inputs[:1] + + EventSignatures.USDCMessageSent = utils.Keccak256Fixed([]byte("MessageSent(bytes)")) } func MessagesFromExecutionReport(report types.Report) ([]evm_2_evm_offramp.InternalEVM2EVMMessage, error) { diff --git a/core/services/ocr2/plugins/ccip/config/config.go b/core/services/ocr2/plugins/ccip/config/config.go index 12e31dbf56..441ecf21e4 100644 --- a/core/services/ocr2/plugins/ccip/config/config.go +++ b/core/services/ocr2/plugins/ccip/config/config.go @@ -1,5 +1,12 @@ package config +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + // CommitPluginJobSpecConfig contains the plugin specific variables for the ccip.CCIPCommit plugin. // We use ID here to keep it as general as possible, e.g. abstracting for chains which don't have an address concept. type CommitPluginJobSpecConfig struct { @@ -14,4 +21,29 @@ type CommitPluginJobSpecConfig struct { // ExecutionPluginJobSpecConfig contains the plugin specific variables for the ccip.CCIPExecution plugin. type ExecutionPluginJobSpecConfig struct { SourceStartBlock, DestStartBlock int64 // Only for first time job add. + USDCConfig USDCConfig +} + +type USDCConfig struct { + SourceTokenAddress common.Address + SourceMessageTransmitterAddress common.Address + AttestationAPI string +} + +func (uc *USDCConfig) ValidateUSDCConfig() error { + if uc.AttestationAPI == "" && uc.SourceTokenAddress == utils.ZeroAddress && uc.SourceMessageTransmitterAddress == utils.ZeroAddress { + return nil + } + + if uc.AttestationAPI == "" { + return errors.New("AttestationAPI is required") + } + if uc.SourceTokenAddress == utils.ZeroAddress { + return errors.New("SourceTokenAddress is required") + } + if uc.SourceMessageTransmitterAddress == utils.ZeroAddress { + return errors.New("SourceMessageTransmitterAddress is required") + } + + return nil } diff --git a/core/services/ocr2/plugins/ccip/execution_plugin.go b/core/services/ocr2/plugins/ccip/execution_plugin.go index 366432cddf..2d00333d93 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/url" "strconv" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -15,6 +16,7 @@ import ( libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" relaylogger "github.com/smartcontractkit/chainlink-relay/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/hashlib" @@ -34,6 +36,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/observability" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -110,12 +114,19 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha "sourceChain", ChainName(int64(chainId)), "destChain", ChainName(destChainID)) + sourceChainEventClient := ccipdata.NewLogPollerReader(sourceChain.LogPoller(), execLggr, sourceChain.Client()) + + tokenDataProviders, err := getTokenDataProviders(lggr, pluginConfig, offRampConfig.OnRamp, sourceChainEventClient) + if err != nil { + return nil, errors.Wrap(err, "could not get token data providers") + } + wrappedPluginFactory := NewExecutionReportingPluginFactory( ExecutionPluginConfig{ lggr: execLggr, sourceLP: sourceChain.LogPoller(), destLP: destChain.LogPoller(), - sourceReader: ccipdata.NewLogPollerReader(sourceChain.LogPoller(), execLggr, sourceChain.Client()), + sourceReader: sourceChainEventClient, destReader: ccipdata.NewLogPollerReader(destChain.LogPoller(), execLggr, destChain.Client()), onRamp: onRamp, offRamp: offRamp, @@ -126,6 +137,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha sourceClient: sourceChain.Client(), destGasEstimator: destChain.GasEstimator(), leafHasher: hashlib.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hashlib.NewKeccakCtx()), + tokenDataProviders: tokenDataProviders, }) err = wrappedPluginFactory.UpdateLogPollerFilters(utils.ZeroAddress, qopts...) @@ -161,8 +173,42 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil } -func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address) []logpoller.Filter { - return []logpoller.Filter{ +func getTokenDataProviders(lggr logger.Logger, pluginConfig ccipconfig.ExecutionPluginJobSpecConfig, onRampAddress common.Address, sourceChainEventClient *ccipdata.LogPollerReader) (map[common.Address]tokendata.Reader, error) { + tokenDataProviders := make(map[common.Address]tokendata.Reader) + + if pluginConfig.USDCConfig.AttestationAPI != "" { + lggr.Infof("USDC token data provider enabled") + err := pluginConfig.USDCConfig.ValidateUSDCConfig() + if err != nil { + return nil, err + } + + attestationURI, err2 := url.ParseRequestURI(pluginConfig.USDCConfig.AttestationAPI) + if err2 != nil { + return nil, errors.Wrap(err2, "failed to parse USDC attestation API") + } + + tokenDataProviders[pluginConfig.USDCConfig.SourceTokenAddress] = tokendata.NewCachedReader( + usdc.NewUSDCTokenDataReader( + sourceChainEventClient, + pluginConfig.USDCConfig.SourceTokenAddress, + pluginConfig.USDCConfig.SourceMessageTransmitterAddress, + onRampAddress, + attestationURI, + ), + ) + } + + return tokenDataProviders, nil +} + +func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address, tokenDataProviders map[common.Address]tokendata.Reader) []logpoller.Filter { + var filters []logpoller.Filter + for _, provider := range tokenDataProviders { + filters = append(filters, provider.GetSourceLogPollerFilters()...) + } + + return append(filters, []logpoller.Filter{ { Name: logpoller.FilterName(EXEC_CCIP_SENDS, onRamp.String()), EventSigs: []common.Hash{abihelpers.EventSignatures.SendRequested}, @@ -178,7 +224,7 @@ func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address EventSigs: []common.Hash{abihelpers.EventSignatures.FeeTokenRemoved}, Addresses: []common.Address{priceRegistry}, }, - } + }...) } func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry common.Address) []logpoller.Filter { @@ -217,7 +263,7 @@ func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry co } // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. -func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { +func UnregisterExecPluginLpFilters(ctx context.Context, lggr logger.Logger, spec *job.OCR2OracleSpec, chainSet evm.LegacyChainContainer, qopts ...pg.QOpt) error { if spec == nil { return errors.New("spec is nil") } @@ -264,17 +310,19 @@ func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec return errors.Wrap(err, "failed loading onRamp") } - return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...) + return unregisterExecutionPluginLpFilters(ctx, lggr, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), pluginConfig, qopts...) } func unregisterExecutionPluginLpFilters( ctx context.Context, + lggr logger.Logger, sourceLP logpoller.LogPoller, destLP logpoller.LogPoller, destOffRamp evm_2_evm_offramp.EVM2EVMOffRampInterface, destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig, sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface, sourceChainClient client.Client, + pluginConfig ccipconfig.ExecutionPluginJobSpecConfig, qopts ...pg.QOpt) error { destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx}) if err != nil { @@ -286,9 +334,15 @@ func unregisterExecutionPluginLpFilters( return err } - if err := logpollerutil.UnregisterLpFilters( + // SourceChainEventClient can be nil because it is not used in unregisterExecutionPluginLpFilters + tokenDataProviders, err := getTokenDataProviders(lggr, pluginConfig, destOffRampConfig.OnRamp, nil) + if err != nil { + return err + } + + if err = logpollerutil.UnregisterLpFilters( sourceLP, - getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry), + getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry, tokenDataProviders), qopts..., ); err != nil { return err diff --git a/core/services/ocr2/plugins/ccip/execution_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_plugin_test.go index 4fc7b2a747..dec6e2f5ad 100644 --- a/core/services/ocr2/plugins/ccip/execution_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_plugin_test.go @@ -12,8 +12,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/mocks" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_offramp" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_onramp" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) { @@ -53,7 +57,7 @@ func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) { for _, tc := range testCases { chainSet := &mocks.LegacyChainContainer{} t.Run(tc.description, func(t *testing.T) { - err := UnregisterExecPluginLpFilters(context.Background(), tc.spec, chainSet) + err := UnregisterExecPluginLpFilters(context.Background(), logger.TestLogger(t), tc.spec, chainSet) if tc.expectingErr { assert.Error(t, err) } else { @@ -74,11 +78,20 @@ func TestGetExecutionPluginFilterNames(t *testing.T) { mockOnRamp, onRampAddr := testhelpers.NewFakeOnRamp(t) mockOnRamp.SetDynamicCfg(evm_2_evm_onramp.EVM2EVMOnRampDynamicConfig{PriceRegistry: srcPriceRegAddr}) + pluginConfig := config.ExecutionPluginJobSpecConfig{ + USDCConfig: config.USDCConfig{ + SourceTokenAddress: utils.RandomAddress(), + SourceMessageTransmitterAddress: utils.RandomAddress(), + AttestationAPI: "http://localhost:8080", + }, + } + srcLP := mocklp.NewLogPoller(t) srcFilters := []string{ "Exec ccip sends - " + onRampAddr.String(), "Fee token added - 0xdAFea492D9c6733aE3d56B7ed1ADb60692c98bC9", "Fee token removed - 0xdAFea492D9c6733aE3d56B7ed1ADb60692c98bC9", + usdc.MESSAGE_SENT_FILTER_NAME + " - " + pluginConfig.USDCConfig.SourceMessageTransmitterAddress.Hex(), } for _, f := range srcFilters { srcLP.On("UnregisterFilter", f, mock.Anything).Return(nil) @@ -99,15 +112,18 @@ func TestGetExecutionPluginFilterNames(t *testing.T) { err := unregisterExecutionPluginLpFilters( context.Background(), + logger.TestLogger(t), srcLP, dstLP, mockOffRamp, evm_2_evm_offramp.EVM2EVMOffRampStaticConfig{ - CommitStore: commitStoreAddr, - OnRamp: onRampAddr, + CommitStore: commitStoreAddr, + OnRamp: onRampAddr, + SourceChainSelector: 5009297550715157269, }, mockOnRamp, nil, + pluginConfig, ) assert.NoError(t, err) diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index 1281fbb69e..805366217e 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -31,12 +31,14 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/hashlib" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/logpollerutil" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/observability" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -67,6 +69,7 @@ type ExecutionPluginConfig struct { sourceClient evmclient.Client destGasEstimator gas.EvmFeeEstimator leafHasher hashlib.LeafHasherInterface[[32]byte] + tokenDataProviders map[common.Address]tokendata.Reader } type ExecutionReportingPlugin struct { @@ -205,7 +208,11 @@ func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegis defer rf.filtersMu.Unlock() // source chain filters - sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters(rf.config.onRamp.Address(), rf.config.sourcePriceRegistry.Address()) + sourceFiltersBefore, sourceFiltersNow := rf.sourceChainFilters, getExecutionPluginSourceLpChainFilters( + rf.config.onRamp.Address(), + rf.config.sourcePriceRegistry.Address(), + rf.config.tokenDataProviders, + ) created, deleted := logpollerutil.FiltersDiff(sourceFiltersBefore, sourceFiltersNow) if err := logpollerutil.UnregisterLpFilters(rf.config.sourceLP, deleted, qopts...); err != nil { return err @@ -229,14 +236,6 @@ func (rf *ExecutionReportingPluginFactory) UpdateLogPollerFilters(destPriceRegis return nil } -// helper struct to hold the send request and some metadata -type evm2EVMOnRampCCIPSendRequestedWithMeta struct { - evm_2_evm_offramp.InternalEVM2EVMMessage - blockTimestamp time.Time - executed bool - finalized bool -} - func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ObservedMessage, error) { unexpiredReports, err := getUnexpiredCommitReports( ctx, @@ -360,8 +359,17 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context } buildBatchDuration := time.Now() - batch := r.buildBatch(rootLggr, rep, inflight, allowedTokenAmountValue.Tokens, - sourceTokensPricesValue, destTokensPricesValue, getDestGasPrice, sourceToDestTokens, destPoolRateLimits) + batch := r.buildBatch( + ctx, + rootLggr, + rep, + inflight, + allowedTokenAmountValue.Tokens, + sourceTokensPricesValue, + destTokensPricesValue, + getDestGasPrice, + sourceToDestTokens, + destPoolRateLimits) measureBatchBuildDuration(timestamp, time.Since(buildBatchDuration)) if len(batch) != 0 { return batch, nil @@ -465,6 +473,7 @@ func (r *ExecutionReportingPlugin) getExecutedSeqNrsInRange(ctx context.Context, // the available gas, rate limiting, execution state, nonce state, and // profitability of execution. func (r *ExecutionReportingPlugin) buildBatch( + ctx context.Context, lggr logger.Logger, report commitReportWithSendRequests, inflight []InflightInternalExecutionReport, @@ -486,7 +495,7 @@ func (r *ExecutionReportingPlugin) buildBatch( for _, msg := range report.sendRequestsWithMeta { msgLggr := lggr.With("messageID", hexutil.Encode(msg.MessageId[:])) - if msg.executed { + if msg.Executed { msgLggr.Infow("Skipping message already executed", "seqNr", msg.SequenceNumber) continue } @@ -504,7 +513,7 @@ func (r *ExecutionReportingPlugin) buildBatch( // Chain holds existing nonce. nonce, err := r.config.offRamp.GetSenderNonce(nil, msg.Sender) if err != nil { - lggr.Errorw("unable to get sender nonce", "err", err) + lggr.Errorw("unable to get sender nonce", "err", err, "seqNr", msg.SequenceNumber) continue } expectedNonces[msg.Sender] = nonce + 1 @@ -526,11 +535,23 @@ func (r *ExecutionReportingPlugin) buildBatch( msgLggr.Errorw("Skipping message unable to compute aggregate value", "err", err) continue } + // if token limit is smaller than message value skip message if tokensLeft, hasCapacity := hasEnoughTokens(aggregateTokenLimit, msgValue, inflightAggregateValue); !hasCapacity { msgLggr.Warnw("token limit is smaller than message value", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String()) continue } + + tokenData, ready, err2 := getTokenData(ctx, msgLggr, msg, r.config.tokenDataProviders) + if err2 != nil { + msgLggr.Errorw("Skipping message unable to check token data", "err", err2) + continue + } + if !ready { + msgLggr.Warnw("Skipping message attestation not ready") + continue + } + // Fee boosting execGasPriceEstimateValue, err := execGasPriceEstimate() if err != nil { @@ -564,10 +585,10 @@ func (r *ExecutionReportingPlugin) buildBatch( availableFee := big.NewInt(0).Mul(msg.FeeTokenAmount, sourceFeeTokenPrice) availableFee = availableFee.Div(availableFee, big.NewInt(1e18)) - availableFeeUsd := waitBoostedFee(time.Since(msg.blockTimestamp), availableFee, r.offchainConfig.RelativeBoostPerWaitHour) + availableFeeUsd := waitBoostedFee(time.Since(msg.BlockTimestamp), availableFee, r.offchainConfig.RelativeBoostPerWaitHour) if availableFeeUsd.Cmp(execCostUsd) < 0 { msgLggr.Infow("Insufficient remaining fee", "availableFeeUsd", availableFeeUsd, "execCostUsd", execCostUsd, - "sourceBlockTimestamp", msg.blockTimestamp, "waitTime", time.Since(msg.blockTimestamp), "boost", r.offchainConfig.RelativeBoostPerWaitHour) + "sourceBlockTimestamp", msg.BlockTimestamp, "waitTime", time.Since(msg.BlockTimestamp), "boost", r.offchainConfig.RelativeBoostPerWaitHour) continue } @@ -600,13 +621,6 @@ func (r *ExecutionReportingPlugin) buildBatch( } } - var tokenData [][]byte - - // TODO add attestation data for USDC here - for range msg.TokenAmounts { - tokenData = append(tokenData, []byte{}) - } - msgLggr.Infow("Adding msg to batch", "seqNum", msg.SequenceNumber, "nonce", msg.Nonce, "value", msgValue, "aggregateTokenLimit", aggregateTokenLimit) executableMessages = append(executableMessages, NewObservedMessage(msg.SequenceNumber, tokenData)) @@ -619,6 +633,29 @@ func (r *ExecutionReportingPlugin) buildBatch( return executableMessages } +func getTokenData(ctx context.Context, lggr logger.Logger, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenDataProviders map[common.Address]tokendata.Reader) (tokenData [][]byte, allReady bool, err error) { + for _, token := range msg.TokenAmounts { + offchainTokenDataProvider, ok := tokenDataProviders[token.Token] + if !ok { + // No token data required + tokenData = append(tokenData, []byte{}) + continue + } + lggr.Infow("Fetching token data", "token", token.Token.Hex()) + tknData, err2 := offchainTokenDataProvider.ReadTokenData(ctx, msg) + if err2 != nil { + if errors.Is(err2, tokendata.ErrNotReady) { + lggr.Infof("Token data not ready yet for token %s", token.Token.Hex()) + return [][]byte{}, false, nil + } + return [][]byte{}, false, err2 + } + + tokenData = append(tokenData, tknData) + } + return tokenData, true, nil +} + func (r *ExecutionReportingPlugin) isRateLimitEnoughForTokenPool( destTokenPoolRateLimits map[common.Address]*big.Int, sourceTokenAmounts []evm_2_evm_offramp.ClientEVMTokenAmount, @@ -686,7 +723,7 @@ func calculateMessageMaxGas(gasLimit *big.Int, numRequests, dataLen, numTokens i // helper struct to hold the commitReport and the related send requests type commitReportWithSendRequests struct { commitReport commit_store.CommitStoreCommitReport - sendRequestsWithMeta []evm2EVMOnRampCCIPSendRequestedWithMeta + sendRequestsWithMeta []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta } func (r *commitReportWithSendRequests) validate() error { @@ -701,7 +738,7 @@ func (r *commitReportWithSendRequests) validate() error { func (r *commitReportWithSendRequests) allRequestsAreExecutedAndFinalized() bool { for _, req := range r.sendRequestsWithMeta { - if !req.executed || !req.finalized { + if !req.Executed || !req.Finalized { return false } } @@ -709,7 +746,7 @@ func (r *commitReportWithSendRequests) allRequestsAreExecutedAndFinalized() bool } // checks if the send request fits the commit report interval -func (r *commitReportWithSendRequests) sendReqFits(sendReq evm2EVMOnRampCCIPSendRequestedWithMeta) bool { +func (r *commitReportWithSendRequests) sendReqFits(sendReq internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) bool { return sendReq.SequenceNumber >= r.commitReport.Interval.Min && sendReq.SequenceNumber <= r.commitReport.Interval.Max } @@ -778,7 +815,7 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( for i, report := range reports { reportsWithSendReqs[i] = commitReportWithSendRequests{ commitReport: report, - sendRequestsWithMeta: make([]evm2EVMOnRampCCIPSendRequestedWithMeta, 0, report.Interval.Max-report.Interval.Min+1), + sendRequestsWithMeta: make([]internal.EVM2EVMOnRampCCIPSendRequestedWithMeta, 0, report.Interval.Max-report.Interval.Min+1), } } @@ -789,11 +826,13 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( // if value exists, and it's true then it's considered finalized finalized, executed := executedSeqNums[msg.SequenceNumber] - reqWithMeta := evm2EVMOnRampCCIPSendRequestedWithMeta{ + reqWithMeta := internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: msg, - blockTimestamp: sendReq.BlockTimestamp, - executed: executed, - finalized: finalized, + BlockTimestamp: sendReq.BlockTimestamp, + Executed: executed, + Finalized: finalized, + LogIndex: sendReq.Data.Raw.Index, + TxHash: sendReq.Data.Raw.TxHash, } // attach the msg to the appropriate reports diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go index c5b9722b3f..18e1db63fc 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go @@ -36,9 +36,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/commit_store" @@ -433,7 +435,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { lggr: logger.TestLogger(t), } - msg1 := evm2EVMOnRampCCIPSendRequestedWithMeta{ + msg1 := internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ SequenceNumber: 1, FeeTokenAmount: big.NewInt(1e9), @@ -447,15 +449,15 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { FeeToken: srcNative, MessageId: [32]byte{}, }, - blockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), } msg2 := msg1 - msg2.executed = true + msg2.Executed = true msg3 := msg1 - msg3.executed = true - msg3.finalized = true + msg3.Executed = true + msg3.Finalized = true msg4 := msg1 msg4.TokenAmounts = []evm_2_evm_offramp.ClientEVMTokenAmount{ @@ -468,7 +470,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { var tt = []struct { name string - reqs []evm2EVMOnRampCCIPSendRequestedWithMeta + reqs []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta inflight []InflightInternalExecutionReport tokenLimit, destGasPrice *big.Int srcPrices, dstPrices map[common.Address]*big.Int @@ -479,7 +481,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }{ { name: "single message no tokens", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg1}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg1}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), @@ -490,7 +492,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "executed non finalized messages should be skipped", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg2}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), @@ -501,7 +503,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "finalized executed log", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg3}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg3}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), @@ -512,7 +514,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "dst token price does not exist", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg2}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), @@ -523,7 +525,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "src token price does not exist", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg2}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg2}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), @@ -534,7 +536,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "rate limit hit", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg4}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4}, tokenLimit: big.NewInt(0), destGasPrice: big.NewInt(10), srcPrices: map[common.Address]*big.Int{srcNative: big.NewInt(1)}, @@ -550,7 +552,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "message with tokens is not executed if limit is reached", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg4}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg4}, inflight: []InflightInternalExecutionReport{}, tokenLimit: big.NewInt(2), destGasPrice: big.NewInt(10), @@ -564,7 +566,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "message with tokens is not executed if limit is reached when inflight is full", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{msg5}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{msg5}, inflight: []InflightInternalExecutionReport{ { createdAt: time.Now(), @@ -583,7 +585,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { }, { name: "some messages skipped after hitting max batch data len", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{ + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ SequenceNumber: 10, @@ -595,7 +597,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { FeeToken: srcNative, MessageId: [32]byte{}, }, - blockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), }, { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ @@ -608,7 +610,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { FeeToken: srcNative, MessageId: [32]byte{}, }, - blockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), }, { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ @@ -621,7 +623,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { FeeToken: srcNative, MessageId: [32]byte{}, }, - blockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), + BlockTimestamp: time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC), }, }, inflight: []InflightInternalExecutionReport{}, @@ -639,6 +641,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { t.Run(tc.name, func(t *testing.T) { offRamp.SetSenderNonces(tc.offRampNoncesBySender) seqNrs := plugin.buildBatch( + context.Background(), lggr, commitReportWithSendRequests{sendRequestsWithMeta: tc.reqs}, tc.inflight, @@ -855,7 +858,7 @@ func TestExecutionReportingPlugin_destPoolRateLimits(t *testing.T) { rateLimits, err := p.destPoolRateLimits(ctx, []commitReportWithSendRequests{ { - sendRequestsWithMeta: []evm2EVMOnRampCCIPSendRequestedWithMeta{ + sendRequestsWithMeta: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ TokenAmounts: tc.tokenAmounts, @@ -975,16 +978,16 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { Interval: commit_store.CommitStoreInterval{Min: 1, Max: 2}, MerkleRoot: [32]byte{100}, }, - sendRequestsWithMeta: []evm2EVMOnRampCCIPSendRequestedWithMeta{ + sendRequestsWithMeta: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 1}, - executed: true, - finalized: true, + Executed: true, + Finalized: true, }, { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 2}, - executed: false, - finalized: false, + Executed: false, + Finalized: false, }, }, }, @@ -993,11 +996,11 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { Interval: commit_store.CommitStoreInterval{Min: 3, Max: 3}, MerkleRoot: [32]byte{200}, }, - sendRequestsWithMeta: []evm2EVMOnRampCCIPSendRequestedWithMeta{ + sendRequestsWithMeta: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ { InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 3}, - executed: false, - finalized: false, + Executed: false, + Finalized: false, }, }, }, @@ -1046,8 +1049,8 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { for i, expReport := range tc.expReports { assert.Equal(t, len(expReport.sendRequestsWithMeta), len(populatedReports[i].sendRequestsWithMeta)) for j, expReq := range expReport.sendRequestsWithMeta { - assert.Equal(t, expReq.executed, populatedReports[i].sendRequestsWithMeta[j].executed) - assert.Equal(t, expReq.finalized, populatedReports[i].sendRequestsWithMeta[j].finalized) + assert.Equal(t, expReq.Executed, populatedReports[i].sendRequestsWithMeta[j].Executed) + assert.Equal(t, expReq.Finalized, populatedReports[i].sendRequestsWithMeta[j].Finalized) assert.Equal(t, expReq.SequenceNumber, populatedReports[i].sendRequestsWithMeta[j].SequenceNumber) } } @@ -1077,6 +1080,8 @@ func TestExecutionReportingPluginFactory_UpdateLogPollerFilters(t *testing.T) { destPriceRegistryAddr := utils.RandomAddress() + tokenDataProviders := make(map[common.Address]tokendata.Reader) + rf := &ExecutionReportingPluginFactory{ filtersMu: &sync.Mutex{}, sourceChainFilters: filters[:5], @@ -1088,10 +1093,11 @@ func TestExecutionReportingPluginFactory_UpdateLogPollerFilters(t *testing.T) { commitStore: commitStore, offRamp: offRamp, sourcePriceRegistry: sourcePriceRegistry, + tokenDataProviders: tokenDataProviders, }, } - for _, f := range getExecutionPluginSourceLpChainFilters(onRamp.Address(), sourcePriceRegistry.Address()) { + for _, f := range getExecutionPluginSourceLpChainFilters(onRamp.Address(), sourcePriceRegistry.Address(), tokenDataProviders) { sourceLP.On("RegisterFilter", f).Return(nil) } for _, f := range getExecutionPluginDestLpChainFilters(commitStore.Address(), offRamp.Address(), destPriceRegistryAddr) { @@ -1573,7 +1579,7 @@ func Test_commitReportWithSendRequests_validate(t *testing.T) { commitReport: commit_store.CommitStoreCommitReport{ Interval: tc.reportInterval, }, - sendRequestsWithMeta: make([]evm2EVMOnRampCCIPSendRequestedWithMeta, tc.numReqs), + sendRequestsWithMeta: make([]internal.EVM2EVMOnRampCCIPSendRequestedWithMeta, tc.numReqs), } err := rep.validate() isValid := err == nil @@ -1585,38 +1591,38 @@ func Test_commitReportWithSendRequests_validate(t *testing.T) { func Test_commitReportWithSendRequests_allRequestsAreExecutedAndFinalized(t *testing.T) { testCases := []struct { name string - reqs []evm2EVMOnRampCCIPSendRequestedWithMeta + reqs []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta expRes bool }{ { name: "all requests executed and finalized", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{ - {executed: true, finalized: true}, - {executed: true, finalized: true}, - {executed: true, finalized: true}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + {Executed: true, Finalized: true}, + {Executed: true, Finalized: true}, + {Executed: true, Finalized: true}, }, expRes: true, }, { name: "true when there are zero requests", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{}, expRes: true, }, { name: "some request not executed", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{ - {executed: true, finalized: true}, - {executed: true, finalized: true}, - {executed: false, finalized: true}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + {Executed: true, Finalized: true}, + {Executed: true, Finalized: true}, + {Executed: false, Finalized: true}, }, expRes: false, }, { name: "some request not finalized", - reqs: []evm2EVMOnRampCCIPSendRequestedWithMeta{ - {executed: true, finalized: true}, - {executed: true, finalized: true}, - {executed: true, finalized: false}, + reqs: []internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + {Executed: true, Finalized: true}, + {Executed: true, Finalized: true}, + {Executed: true, Finalized: false}, }, expRes: false, }, @@ -1634,13 +1640,13 @@ func Test_commitReportWithSendRequests_allRequestsAreExecutedAndFinalized(t *tes func Test_commitReportWithSendRequests_sendReqFits(t *testing.T) { testCases := []struct { name string - req evm2EVMOnRampCCIPSendRequestedWithMeta + req internal.EVM2EVMOnRampCCIPSendRequestedWithMeta report commit_store.CommitStoreCommitReport expRes bool }{ { name: "all requests executed and finalized", - req: evm2EVMOnRampCCIPSendRequestedWithMeta{ + req: internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 1}, }, report: commit_store.CommitStoreCommitReport{ @@ -1650,7 +1656,7 @@ func Test_commitReportWithSendRequests_sendReqFits(t *testing.T) { }, { name: "all requests executed and finalized", - req: evm2EVMOnRampCCIPSendRequestedWithMeta{ + req: internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 10}, }, report: commit_store.CommitStoreCommitReport{ @@ -1660,7 +1666,7 @@ func Test_commitReportWithSendRequests_sendReqFits(t *testing.T) { }, { name: "all requests executed and finalized", - req: evm2EVMOnRampCCIPSendRequestedWithMeta{ + req: internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 11}, }, report: commit_store.CommitStoreCommitReport{ @@ -1670,7 +1676,7 @@ func Test_commitReportWithSendRequests_sendReqFits(t *testing.T) { }, { name: "all requests executed and finalized", - req: evm2EVMOnRampCCIPSendRequestedWithMeta{ + req: internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{SequenceNumber: 10}, }, report: commit_store.CommitStoreCommitReport{ diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go index 4473e72703..8a14dc450f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go @@ -217,6 +217,25 @@ func (c *LogPollerReader) GetExecutionStateChangesBetweenSeqNums(ctx context.Con ) } +func (c *LogPollerReader) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) { + logs, err := c.lp.IndexedLogsByTxHash( + abihelpers.EventSignatures.USDCMessageSent, + txHash, + pg.WithParentCtx(ctx), + ) + if err != nil { + return nil, err + } + + for i := range logs { + current := logs[len(logs)-i-1] + if current.LogIndex < logIndex { + return current.Data, nil + } + } + return nil, errors.Errorf("no USDC message found prior to log index %d in tx %s", logIndex, txHash.Hex()) +} + func (c *LogPollerReader) LatestBlock(ctx context.Context) (int64, error) { return c.lp.LatestBlock(pg.WithParentCtx(ctx)) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller_test.go index 42cb107c64..56f5ff1db2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller_test.go @@ -133,3 +133,47 @@ func TestLogPollerClient_GetSendRequestsGteSeqNum(t *testing.T) { cl.AssertExpectations(t) }) } + +func TestLogPollerClient_GetLastUSDCMessagePriorToLogIndexInTx(t *testing.T) { + txHash := utils.RandomAddress().Hash() + ccipLogIndex := int64(100) + + expectedData := []byte("-1") + + t.Run("multiple found", func(t *testing.T) { + lp := mocks.NewLogPoller(t) + lp.On("IndexedLogsByTxHash", + abihelpers.EventSignatures.USDCMessageSent, + txHash, + mock.Anything, + ).Return([]logpoller.Log{ + {LogIndex: ccipLogIndex - 2, Data: []byte("-2")}, + {LogIndex: ccipLogIndex - 1, Data: expectedData}, + {LogIndex: ccipLogIndex, Data: []byte("0")}, + {LogIndex: ccipLogIndex + 1, Data: []byte("1")}, + }, nil) + + c := &LogPollerReader{lp: lp} + usdcMessageData, err := c.GetLastUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, txHash) + assert.NoError(t, err) + assert.Equal(t, expectedData, usdcMessageData) + + lp.AssertExpectations(t) + }) + + t.Run("none found", func(t *testing.T) { + lp := mocks.NewLogPoller(t) + lp.On("IndexedLogsByTxHash", + abihelpers.EventSignatures.USDCMessageSent, + txHash, + mock.Anything, + ).Return([]logpoller.Log{}, nil) + + c := &LogPollerReader{lp: lp} + usdcMessageData, err := c.GetLastUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, txHash) + assert.Errorf(t, err, fmt.Sprintf("no USDC message found prior to log index %d in tx %s", ccipLogIndex, txHash.Hex())) + assert.Nil(t, usdcMessageData) + + lp.AssertExpectations(t) + }) +} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mock.go index 4389ed2fcb..5790158c37 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mock.go @@ -128,6 +128,32 @@ func (_m *MockReader) GetGasPriceUpdatesCreatedAfter(ctx context.Context, priceR return r0, r1 } +// GetLastUSDCMessagePriorToLogIndexInTx provides a mock function with given fields: ctx, logIndex, txHash +func (_m *MockReader) GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) { + ret := _m.Called(ctx, logIndex, txHash) + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, common.Hash) ([]byte, error)); ok { + return rf(ctx, logIndex, txHash) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, common.Hash) []byte); ok { + r0 = rf(ctx, logIndex, txHash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, common.Hash) error); ok { + r1 = rf(ctx, logIndex, txHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetSendRequestsBetweenSeqNums provides a mock function with given fields: ctx, onRamp, seqNumMin, seqNumMax, confs func (_m *MockReader) GetSendRequestsBetweenSeqNums(ctx context.Context, onRamp common.Address, seqNumMin uint64, seqNumMax uint64, confs int) ([]Event[evm_2_evm_onramp.EVM2EVMOnRampCCIPSendRequested], error) { ret := _m.Called(ctx, onRamp, seqNumMin, seqNumMax, confs) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go index 4f43187e67..ffae5ec7ca 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go @@ -48,6 +48,9 @@ type Reader interface { // GetAcceptedCommitReportsGteTimestamp returns all the commit reports with timestamp greater than or equal to the provided. GetAcceptedCommitReportsGteTimestamp(ctx context.Context, commitStoreAddress common.Address, ts time.Time, confs int) ([]Event[commit_store.CommitStoreReportAccepted], error) + // GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction. + GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) + // LatestBlock returns the latest known/parsed block of the underlying implementation. LatestBlock(ctx context.Context) (int64, error) } diff --git a/core/services/ocr2/plugins/ccip/internal/chains/chains.go b/core/services/ocr2/plugins/ccip/internal/chains/chains.go new file mode 100644 index 0000000000..33cf377bf9 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/chains/chains.go @@ -0,0 +1,15 @@ +package chains + +type EVMChain uint64 + +const ( + Ethereum EVMChain = 1 + Optimism EVMChain = 10 + Arbitrum EVMChain = 42161 + Avalanche EVMChain = 43114 + + GoerliTestnet EVMChain = 5 + OptimismGoerliTestnet EVMChain = 420 + AvalancheFujiTestnet EVMChain = 43113 + ArbitrumGoerliTestnet EVMChain = 421613 +) diff --git a/core/services/ocr2/plugins/ccip/internal/models.go b/core/services/ocr2/plugins/ccip/internal/models.go new file mode 100644 index 0000000000..b9eee7bea4 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/models.go @@ -0,0 +1,19 @@ +package internal + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_offramp" +) + +// EVM2EVMOnRampCCIPSendRequestedWithMeta helper struct to hold the send request and some metadata +type EVM2EVMOnRampCCIPSendRequestedWithMeta struct { + evm_2_evm_offramp.InternalEVM2EVMMessage + BlockTimestamp time.Time + Executed bool + Finalized bool + LogIndex uint + TxHash common.Hash +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/cached_reader.go b/core/services/ocr2/plugins/ccip/tokendata/cached_reader.go new file mode 100644 index 0000000000..06781c52f6 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/cached_reader.go @@ -0,0 +1,47 @@ +package tokendata + +import ( + "context" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" +) + +type CachedReader struct { + Reader + + cache map[uint64][]byte + cacheMutex sync.RWMutex +} + +func NewCachedReader(reader Reader) *CachedReader { + return &CachedReader{ + Reader: reader, + cache: make(map[uint64][]byte), + } +} + +// ReadTokenData tries to get the tokenData from cache, if not found then calls the underlying reader +// and updates the cache. +func (r *CachedReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([]byte, error) { + r.cacheMutex.RLock() + data, ok := r.cache[msg.SequenceNumber] + r.cacheMutex.RUnlock() + + if ok { + return data, nil + } + + tokenData, err := r.Reader.ReadTokenData(ctx, msg) + if err != nil { + return []byte{}, err + } + + r.cacheMutex.Lock() + defer r.cacheMutex.Unlock() + + // Update the cache + r.cache[msg.SequenceNumber] = tokenData + + return tokenData, nil +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/chached_reader_test.go b/core/services/ocr2/plugins/ccip/tokendata/chached_reader_test.go new file mode 100644 index 0000000000..ae78688fb4 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/chached_reader_test.go @@ -0,0 +1,39 @@ +package tokendata_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" +) + +// Black box test +func TestCachedReader_ReadTokenData(t *testing.T) { + mockReader := tokendata.MockReader{} + cachedReader := tokendata.NewCachedReader(&mockReader) + + msgData := []byte("msgData") + mockReader.On("ReadTokenData", mock.Anything, mock.Anything).Return(msgData, nil) + + ctx := context.Background() + msg := internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{} + + // Call ReadTokenData twice, expect only one call to underlying reader + data, err := cachedReader.ReadTokenData(ctx, msg) + require.NoError(t, err) + require.Equal(t, msgData, data) + + // First time, calls the underlying reader + mockReader.AssertNumberOfCalls(t, "ReadTokenData", 1) + + data, err = cachedReader.ReadTokenData(ctx, msg) + require.NoError(t, err) + require.Equal(t, msgData, data) + + // Second time, get data from cache + mockReader.AssertNumberOfCalls(t, "ReadTokenData", 1) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader.go b/core/services/ocr2/plugins/ccip/tokendata/reader.go new file mode 100644 index 0000000000..ec3a2d7497 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/reader.go @@ -0,0 +1,24 @@ +package tokendata + +import ( + "context" + "errors" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" +) + +var ( + ErrNotReady = errors.New("token data not ready") +) + +// Reader is an interface for fetching offchain token data +// +//go:generate mockery --quiet --name Reader --output . --filename reader_mock.go --inpackage --case=underscore +type Reader interface { + // ReadTokenData returns the attestation bytes if ready, and throws an error if not ready. + ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (tokenData []byte, err error) + + // GetSourceLogPollerFilters returns the filters that should be used for the source chain log poller + GetSourceLogPollerFilters() []logpoller.Filter +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go new file mode 100644 index 0000000000..9a87fafb34 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/reader_mock.go @@ -0,0 +1,74 @@ +// Code generated by mockery v2.28.1. DO NOT EDIT. + +package tokendata + +import ( + context "context" + + logpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + internal "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + + mock "github.com/stretchr/testify/mock" +) + +// MockReader is an autogenerated mock type for the Reader type +type MockReader struct { + mock.Mock +} + +// GetSourceLogPollerFilters provides a mock function with given fields: +func (_m *MockReader) GetSourceLogPollerFilters() []logpoller.Filter { + ret := _m.Called() + + var r0 []logpoller.Filter + if rf, ok := ret.Get(0).(func() []logpoller.Filter); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]logpoller.Filter) + } + } + + return r0 +} + +// ReadTokenData provides a mock function with given fields: ctx, msg +func (_m *MockReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([]byte, error) { + ret := _m.Called(ctx, msg) + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([]byte, error)); ok { + return rf(ctx, msg) + } + if rf, ok := ret.Get(0).(func(context.Context, internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) []byte); ok { + r0 = rf(ctx, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) error); ok { + r1 = rf(ctx, msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewMockReader interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockReader creates a new instance of MockReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockReader(t mockConstructorTestingTNewMockReader) *MockReader { + mock := &MockReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go new file mode 100644 index 0000000000..3b1f1b6177 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go @@ -0,0 +1,150 @@ +package usdc + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type TokenDataReader struct { + sourceChainEvents ccipdata.Reader + attestationApi *url.URL + messageTransmitter common.Address + sourceToken common.Address + onRampAddress common.Address + + // Cache of sequence number -> usdc message body + usdcMessageHashCache map[uint64][32]byte + usdcMessageHashCacheMutex sync.Mutex +} + +type attestationResponse struct { + Status attestationStatus `json:"status"` + Attestation string `json:"attestation"` +} + +const ( + version = "v1" + attestationPath = "attestations" + MESSAGE_SENT_FILTER_NAME = "USDC message sent" +) + +type attestationStatus string + +const ( + attestationStatusSuccess attestationStatus = "complete" + attestationStatusPending attestationStatus = "pending_confirmations" +) + +var _ tokendata.Reader = &TokenDataReader{} + +func NewUSDCTokenDataReader(sourceChainEvents ccipdata.Reader, usdcTokenAddress, usdcMessageTransmitterAddress, onRampAddress common.Address, usdcAttestationApi *url.URL) *TokenDataReader { + return &TokenDataReader{ + sourceChainEvents: sourceChainEvents, + attestationApi: usdcAttestationApi, + messageTransmitter: usdcMessageTransmitterAddress, + onRampAddress: onRampAddress, + sourceToken: usdcTokenAddress, + usdcMessageHashCache: make(map[uint64][32]byte), + } +} + +func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (attestation []byte, err error) { + response, err := s.getUpdatedAttestation(ctx, msg) + if err != nil { + return []byte{}, err + } + + if response.Status == attestationStatusSuccess { + attestationBytes, err := hex.DecodeString(response.Attestation) + if err != nil { + return nil, fmt.Errorf("decode response attestation: %w", err) + } + return attestationBytes, nil + } + return []byte{}, tokendata.ErrNotReady +} + +func (s *TokenDataReader) getUpdatedAttestation(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) (attestationResponse, error) { + messageBody, err := s.getUSDCMessageBody(ctx, msg) + if err != nil { + return attestationResponse{}, errors.Wrap(err, "failed getting the USDC message body") + } + + response, err := s.callAttestationApi(ctx, messageBody) + if err != nil { + return attestationResponse{}, errors.Wrap(err, "failed calling usdc attestation API ") + } + + return response, nil +} + +func (s *TokenDataReader) getUSDCMessageBody(ctx context.Context, msg internal.EVM2EVMOnRampCCIPSendRequestedWithMeta) ([32]byte, error) { + s.usdcMessageHashCacheMutex.Lock() + defer s.usdcMessageHashCacheMutex.Unlock() + + if body, ok := s.usdcMessageHashCache[msg.SequenceNumber]; ok { + return body, nil + } + + usdcMessageBody, err := s.sourceChainEvents.GetLastUSDCMessagePriorToLogIndexInTx(ctx, int64(msg.LogIndex), msg.TxHash) + if err != nil { + return [32]byte{}, err + } + + msgBodyHash := utils.Keccak256Fixed(usdcMessageBody) + + // Save the attempt in the cache in case the external call fails + s.usdcMessageHashCache[msg.SequenceNumber] = msgBodyHash + return msgBodyHash, nil +} + +func (s *TokenDataReader) callAttestationApi(ctx context.Context, usdcMessageHash [32]byte) (attestationResponse, error) { + fullAttestationUrl := fmt.Sprintf("%s/%s/%s/0x%x", s.attestationApi, version, attestationPath, usdcMessageHash) + req, err := http.NewRequestWithContext(ctx, "GET", fullAttestationUrl, nil) + if err != nil { + return attestationResponse{}, err + } + req.Header.Add("accept", "application/json") + res, err := http.DefaultClient.Do(req) + if err != nil { + return attestationResponse{}, err + } + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + if err != nil { + return attestationResponse{}, err + } + + var response attestationResponse + err = json.Unmarshal(body, &response) + if err != nil { + return attestationResponse{}, err + } + return response, nil +} + +func (s *TokenDataReader) GetSourceLogPollerFilters() []logpoller.Filter { + return []logpoller.Filter{ + { + Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, s.messageTransmitter.Hex()), + EventSigs: []common.Hash{abihelpers.EventSignatures.USDCMessageSent}, + Addresses: []common.Address{s.messageTransmitter}, + }, + } +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go new file mode 100644 index 0000000000..6e61be5ed3 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_blackbox_test.go @@ -0,0 +1,96 @@ +package usdc_test + +import ( + "context" + "encoding/hex" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_offramp" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_onramp" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + mockOnRampAddress = utils.RandomAddress() + mockUSDCTokenAddress = utils.RandomAddress() + mockMsgTransmitter = utils.RandomAddress() +) + +type attestationResponse struct { + Status string `json:"status"` + Attestation string `json:"attestation"` +} + +func TestUSDCReader_ReadTokenData(t *testing.T) { + response := attestationResponse{ + Status: "complete", + Attestation: "720502893578a89a8a87982982ef781c18b193", + } + + attestationBytes, err := hex.DecodeString(response.Attestation) + require.NoError(t, err) + + responseBytes, err := json.Marshal(response) + require.NoError(t, err) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err = w.Write(responseBytes) + require.NoError(t, err) + })) + + defer ts.Close() + + seqNum := uint64(23825) + txHash := utils.RandomBytes32() + logIndex := int64(4) + + eventsClient := ccipdata.MockReader{} + eventsClient.On("GetSendRequestsBetweenSeqNums", + mock.Anything, + mockOnRampAddress, + seqNum, + seqNum, + 0, + ).Return([]ccipdata.Event[evm_2_evm_onramp.EVM2EVMOnRampCCIPSendRequested]{ + { + Data: evm_2_evm_onramp.EVM2EVMOnRampCCIPSendRequested{ + Raw: types.Log{ + TxHash: txHash, + Index: uint(logIndex), + }, + }, + }, + }, nil) + + eventsClient.On("GetLastUSDCMessagePriorToLogIndexInTx", + mock.Anything, + logIndex, + common.Hash(txHash), + ).Return(attestationBytes, nil) + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + usdcService := usdc.NewUSDCTokenDataReader(&eventsClient, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, attestationURI) + attestation, err := usdcService.ReadTokenData(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + InternalEVM2EVMMessage: evm_2_evm_offramp.InternalEVM2EVMMessage{ + SequenceNumber: seqNum, + }, + TxHash: txHash, + LogIndex: uint(logIndex), + }) + require.NoError(t, err) + + require.Equal(t, attestationBytes, attestation) +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go new file mode 100644 index 0000000000..d61ef530d5 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/usdc/usdc_test.go @@ -0,0 +1,119 @@ +package usdc + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +var ( + mockOnRampAddress = utils.RandomAddress() + mockUSDCTokenAddress = utils.RandomAddress() + mockMsgTransmitter = utils.RandomAddress() +) + +func TestUSDCReader_callAttestationApi(t *testing.T) { + t.Skipf("Skipping test because it uses the real USDC attestation API") + usdcMessageHash := "912f22a13e9ccb979b621500f6952b2afd6e75be7eadaed93fc2625fe11c52a2" + attestationURI, err := url.ParseRequestURI("https://iris-api-sandbox.circle.com") + require.NoError(t, err) + usdcService := NewUSDCTokenDataReader(nil, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, attestationURI) + + attestation, err := usdcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(usdcMessageHash))) + require.NoError(t, err) + + require.Equal(t, attestationStatusPending, attestation.Status) + require.Equal(t, "PENDING", attestation.Attestation) +} + +func TestUSDCReader_callAttestationApiMock(t *testing.T) { + response := attestationResponse{ + Status: attestationStatusSuccess, + Attestation: "720502893578a89a8a87982982ef781c18b193", + } + + ts := getMockUSDCEndpoint(t, response) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + usdcService := NewUSDCTokenDataReader(nil, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, attestationURI) + attestation, err := usdcService.callAttestationApi(context.Background(), utils.RandomBytes32()) + require.NoError(t, err) + + require.Equal(t, response.Status, attestation.Status) + require.Equal(t, response.Attestation, attestation.Attestation) +} + +func TestUSDCReader_callAttestationApiMockError(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + attestationURI, err := url.ParseRequestURI(ts.URL) + require.NoError(t, err) + + usdcService := NewUSDCTokenDataReader(nil, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, attestationURI) + _, err = usdcService.callAttestationApi(context.Background(), utils.RandomBytes32()) + require.Error(t, err) +} + +func getMockUSDCEndpoint(t *testing.T, response attestationResponse) *httptest.Server { + responseBytes, err := json.Marshal(response) + require.NoError(t, err) + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write(responseBytes) + require.NoError(t, err) + })) +} + +// Asserts the hard coded event signature matches Keccak256("MessageSent(bytes)") +func TestGetUSDCReaderSourceLPFilters(t *testing.T) { + usdcService := NewUSDCTokenDataReader(nil, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, nil) + + filters := usdcService.GetSourceLogPollerFilters() + + require.Equal(t, 1, len(filters)) + filter := filters[0] + require.Equal(t, logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, mockMsgTransmitter.Hex()), filter.Name) + hash, err := utils.Keccak256([]byte("MessageSent(bytes)")) + require.NoError(t, err) + require.Equal(t, hash, filter.EventSigs[0].Bytes()) + require.Equal(t, mockMsgTransmitter, filter.Addresses[0]) +} + +func TestGetUSDCMessageBody(t *testing.T) { + expectedBody := []byte("TestGetUSDCMessageBody") + expectedBodyHash := utils.Keccak256Fixed(expectedBody) + + sourceChainEventsMock := ccipdata.MockReader{} + sourceChainEventsMock.On("GetLastUSDCMessagePriorToLogIndexInTx", mock.Anything, mock.Anything, mock.Anything).Return(expectedBody, nil) + + usdcService := NewUSDCTokenDataReader(&sourceChainEventsMock, mockUSDCTokenAddress, mockMsgTransmitter, mockOnRampAddress, nil) + + // Make the first call and assert the underlying function is called + body, err := usdcService.getUSDCMessageBody(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{}) + require.NoError(t, err) + require.Equal(t, body, expectedBodyHash) + + sourceChainEventsMock.AssertNumberOfCalls(t, "GetLastUSDCMessagePriorToLogIndexInTx", 1) + + // Make another call and assert that the cache is used + body, err = usdcService.getUSDCMessageBody(context.Background(), internal.EVM2EVMOnRampCCIPSendRequestedWithMeta{}) + require.NoError(t, err) + require.Equal(t, body, expectedBodyHash) + sourceChainEventsMock.AssertNumberOfCalls(t, "GetLastUSDCMessagePriorToLogIndexInTx", 1) +}