From 45ddcacfd8695368b81cfef052bc96367511e0fc Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata <96521086+bukata-sa@users.noreply.github.com> Date: Mon, 2 Dec 2024 10:05:27 +0000 Subject: [PATCH] CCIP-4403 skeleton: LBTC attestation (#1554) https://smartcontract-it.atlassian.net/browse/CCIP-3488 ## Motivation Basic logic needed for LBTC attestation --------- Co-authored-by: Nour Elrashidy --- .../plugins/ccip/ccipexec/initializers.go | 14 ++ .../ocr2/plugins/ccip/config/config.go | 32 ++- .../ocr2/plugins/ccip/exportinternal.go | 9 + .../ccip/internal/ccipdata/lbtc_reader.go | 23 ++ .../ccip/tokendata/http/http_client.go | 10 +- .../ocr2/plugins/ccip/tokendata/lbtc/lbtc.go | 234 ++++++++++++++++++ core/services/relay/evm/evm.go | 8 +- core/services/relay/evm/exec_provider.go | 128 ++++++---- 8 files changed, 397 insertions(+), 61 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go create mode 100644 core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index aa42ff2828..ccadd45d5b 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -121,6 +121,20 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro } tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader } + // init lbtc token data provider + if pluginConfig.LBTCConfig.AttestationAPI != "" { + lggr.Infof("LBTC token data provider enabled") + err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig() + if err2 != nil { + return nil, err2 + } + + lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress)) + if err2 != nil { + return nil, fmt.Errorf("new usdc reader: %w", err2) + } + tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader + } // Prom wrappers onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel) diff --git a/core/services/ocr2/plugins/ccip/config/config.go b/core/services/ocr2/plugins/ccip/config/config.go index a24a6edfd1..16da4d3255 100644 --- a/core/services/ocr2/plugins/ccip/config/config.go +++ b/core/services/ocr2/plugins/ccip/config/config.go @@ -108,6 +108,7 @@ func (c *DynamicPriceGetterConfig) Validate() error { type ExecPluginJobSpecConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. USDCConfig USDCConfig + LBTCConfig LBTCConfig } type USDCConfig struct { @@ -119,10 +120,20 @@ type USDCConfig struct { AttestationAPIIntervalMilliseconds int } +type LBTCConfig struct { + SourceTokenAddress common.Address + SourceMessageTransmitterAddress common.Address + AttestationAPI string + AttestationAPITimeoutSeconds uint + // AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval. + AttestationAPIIntervalMilliseconds int +} + type ExecPluginConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. IsSourceProvider bool USDCConfig USDCConfig + LBTCConfig LBTCConfig JobID string } @@ -136,17 +147,30 @@ func (e ExecPluginConfig) Encode() ([]byte, error) { func (uc *USDCConfig) ValidateUSDCConfig() error { if uc.AttestationAPI == "" { - return errors.New("AttestationAPI is required") + return errors.New("USDCConfig: AttestationAPI is required") } if uc.AttestationAPIIntervalMilliseconds < -1 { - return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") + return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") } if uc.SourceTokenAddress == utils.ZeroAddress { - return errors.New("SourceTokenAddress is required") + return errors.New("USDCConfig: SourceTokenAddress is required") } if uc.SourceMessageTransmitterAddress == utils.ZeroAddress { - return errors.New("SourceMessageTransmitterAddress is required") + return errors.New("USDCConfig: SourceMessageTransmitterAddress is required") } return nil } + +func (lc *LBTCConfig) ValidateLBTCConfig() error { + if lc.AttestationAPI == "" { + return errors.New("LBTCConfig: AttestationAPI is required") + } + if lc.AttestationAPIIntervalMilliseconds < -1 { + return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") + } + if lc.SourceTokenAddress == utils.ZeroAddress { + return errors.New("LBTCConfig: SourceTokenAddress is required") + } + return nil +} diff --git a/core/services/ocr2/plugins/ccip/exportinternal.go b/core/services/ocr2/plugins/ccip/exportinternal.go index be39346984..3558a8d807 100644 --- a/core/services/ocr2/plugins/ccip/exportinternal.go +++ b/core/services/ocr2/plugins/ccip/exportinternal.go @@ -108,7 +108,16 @@ func CloseUSDCReader(lggr logger.Logger, jobID string, transmitter common.Addres return ccipdata.CloseUSDCReader(lggr, jobID, transmitter, lp) } +func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*ccipdata.LBTCReaderImpl, error) { + return ccipdata.NewLBTCReader(lggr, jobID, transmitter, lp, registerFilters) +} + +func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error { + return ccipdata.CloseLBTCReader(lggr, jobID, transmitter, lp) +} + type USDCReaderImpl = ccipdata.USDCReaderImpl +type LBTCReaderImpl = ccipdata.LBTCReaderImpl var DefaultRpcBatchSizeLimit = rpclib.DefaultRpcBatchSizeLimit var DefaultRpcBatchBackOffMultiplier = rpclib.DefaultRpcBatchBackOffMultiplier diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go new file mode 100644 index 0000000000..d1c26f7d6c --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go @@ -0,0 +1,23 @@ +package ccipdata + +import ( + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// TODO: Implement lbtc token reader +type LBTCReader interface { +} + +type LBTCReaderImpl struct { +} + +func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*LBTCReaderImpl, error) { + return &LBTCReaderImpl{}, nil +} + +func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error { + return nil +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go index 79ec21b1b8..d8e80b914c 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go +++ b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go @@ -12,8 +12,11 @@ import ( ) type IHttpClient interface { - // Get issue a GET request to the given url and return the response body and status code. + // Get issues a GET request to the given url and returns the response body and status code. Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) + + // Post issues a POST request to the given url with the given request data and returns the response body and status code. + Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) } type HttpClient struct { @@ -46,3 +49,8 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) body, err := io.ReadAll(res.Body) return body, res.StatusCode, res.Header, err } + +func (s *HttpClient) Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) { + // TODO: Implement + return nil, 0, nil, nil +} diff --git a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go new file mode 100644 index 0000000000..0060f99d95 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go @@ -0,0 +1,234 @@ +package lbtc + +import ( + "context" + "crypto/sha256" + "fmt" + "net/url" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" + "golang.org/x/time/rate" + + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http" +) + +// TODO: double check the validty of default values for lombard's API after checking docs +const ( + apiVersion = "v1" + attestationPath = "deposits/getByHash" + defaultAttestationTimeout = 5 * time.Second + + // defaultCoolDownDurationSec defines the default time to wait after getting rate limited. + // this value is only used if the 429 response does not contain the Retry-After header + defaultCoolDownDuration = 5 * time.Minute + + // maxCoolDownDuration defines the maximum duration we can wait till firing the next request + maxCoolDownDuration = 10 * time.Minute + + // defaultRequestInterval defines the rate in requests per second that the attestation API can be called. + // this is set according to the APIs documentated 10 requests per second rate limit. + defaultRequestInterval = 100 * time.Millisecond + + // APIIntervalRateLimitDisabled is a special value to disable the rate limiting. + APIIntervalRateLimitDisabled = -1 + // APIIntervalRateLimitDefault is a special value to select the default rate limit interval. + APIIntervalRateLimitDefault = 0 +) + +type attestationStatus string + +const ( + attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED" + attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING" + attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED" + attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED" + attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED" +) + +var ( + ErrUnknownResponse = errors.New("unexpected response from attestation API") +) + +type TokenDataReader struct { + lggr logger.Logger + httpClient http.IHttpClient + attestationApi *url.URL + attestationApiTimeout time.Duration + lbtcTokenAddress common.Address + rate *rate.Limiter + + // coolDownUntil defines whether requests are blocked or not. + coolDownUntil time.Time + coolDownMu *sync.RWMutex +} + +type messageAttestationResponse struct { + MessageHash string `json:"message_hash"` + Status attestationStatus `json:"status"` + Attestation string `json:"attestation"` +} + +// TODO: Adjust after checking API docs +type attestationResponse struct { + Attestations []messageAttestationResponse `json:"attestations"` +} + +// TODO: Implement encoding/decoding + +var _ tokendata.Reader = &TokenDataReader{} + +func NewLBTCTokenDataReader( + lggr logger.Logger, + lbtcAttestationApi *url.URL, + lbtcAttestationApiTimeoutSeconds int, + lbtcTokenAddress common.Address, + requestInterval time.Duration, +) *TokenDataReader { + timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second + if lbtcAttestationApiTimeoutSeconds == 0 { + timeout = defaultAttestationTimeout + } + + if requestInterval == APIIntervalRateLimitDisabled { + requestInterval = 0 + } else if requestInterval == APIIntervalRateLimitDefault { + requestInterval = defaultRequestInterval + } + + return &TokenDataReader{ + lggr: lggr, + httpClient: http.NewObservedIHttpClient(&http.HttpClient{}), + attestationApi: lbtcAttestationApi, + attestationApiTimeout: timeout, + lbtcTokenAddress: lbtcTokenAddress, + coolDownMu: &sync.RWMutex{}, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), + } +} + +func NewLBTCTokenDataReaderWithHttpClient( + origin TokenDataReader, + httpClient http.IHttpClient, + lbtcTokenAddress common.Address, + requestInterval time.Duration, +) *TokenDataReader { + return &TokenDataReader{ + lggr: origin.lggr, + httpClient: httpClient, + attestationApi: origin.attestationApi, + attestationApiTimeout: origin.attestationApiTimeout, + coolDownMu: origin.coolDownMu, + lbtcTokenAddress: lbtcTokenAddress, + rate: rate.NewLimiter(rate.Every(requestInterval), 1), + } +} + +// ReadTokenData queries the LBTC attestation API. +func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) { + if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) { + return nil, fmt.Errorf("token index out of bounds") + } + + if s.inCoolDownPeriod() { + // rate limiting cool-down period, we prevent new requests from being sent + return nil, tokendata.ErrRequestsBlocked + } + + if s.rate != nil { + // Wait blocks until it the attestation API can be called or the + // context is Done. + if waitErr := s.rate.Wait(ctx); waitErr != nil { + return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr) + } + } + + messageBody, err := s.getLBTCMessageBody(ctx, msg, tokenIndex) + if err != nil { + return []byte{}, errors.Wrap(err, "failed getting the LBTC message body") + } + + msgID := hexutil.Encode(msg.MessageID[:]) + messageBodyHash := sha256.Sum256(messageBody) + messageBodyHashHex := hexutil.Encode(messageBodyHash[:]) + s.lggr.Infow("Calling attestation API", "messageBodyHash", messageBodyHashHex, "messageID", msgID) + + attestationResp, err := s.callAttestationApi(ctx, messageBodyHash) + if err != nil { + return nil, err + } + if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 { + return nil, errors.New("attestation response is empty") + } + if len(attestationResp.Attestations) > 1 { + s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations) + } + var attestation messageAttestationResponse + for _, attestationCandidate := range attestationResp.Attestations { + if attestationCandidate.MessageHash == messageBodyHashHex { + attestation = attestationCandidate + } + } + s.lggr.Infow("Got response from attestation API", "messageID", msgID, + "attestationStatus", attestation.Status, "attestation", attestation) + switch attestation.Status { + case attestationStatusSessionApproved: + messageAndAttestation, err := encodeMessageAndAttestation(messageBody, attestation.Attestation) + if err != nil { + return nil, fmt.Errorf("failed to encode messageAndAttestation : %w", err) + } + return messageAndAttestation, nil + case attestationStatusPending: + return nil, tokendata.ErrNotReady + case attestationStatusSubmitted: + return nil, tokendata.ErrNotReady + default: + s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation) + return nil, ErrUnknownResponse + } +} + +func (s *TokenDataReader) getLBTCMessageBody(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) { + return nil, nil +} + +func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) { + _, _, _, err := s.httpClient.Get(ctx, "", s.attestationApiTimeout) + switch { + case errors.Is(err, tokendata.ErrRateLimit): + s.setCoolDownPeriod(defaultCoolDownDuration) + return attestationResponse{}, tokendata.ErrRateLimit + case err != nil: + return attestationResponse{}, err + } + return attestationResponse{}, nil +} + +func encodeMessageAndAttestation(messageBody []byte, attestation string) ([]byte, error) { + return nil, nil +} + +func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) { + s.coolDownMu.Lock() + if d > maxCoolDownDuration { + d = maxCoolDownDuration + } + s.coolDownUntil = time.Now().Add(d) + s.coolDownMu.Unlock() +} + +func (s *TokenDataReader) inCoolDownPeriod() bool { + s.coolDownMu.RLock() + defer s.coolDownMu.RUnlock() + return time.Now().Before(s.coolDownUntil) +} + +func (s *TokenDataReader) Close() error { + return nil +} diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 1dff3e7b7a..516b418ac9 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -563,8 +563,6 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont return nil, err } - usdcConfig := execPluginConfig.USDCConfig - feeEstimatorConfig := estimatorconfig.NewFeeEstimatorConfigService() // CCIPExec reads when dest chain is mantle, and uses it to calc boosting in batching @@ -591,10 +589,8 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont r.chain.LogPoller(), execPluginConfig.SourceStartBlock, execPluginConfig.JobID, - usdcConfig.AttestationAPI, - int(usdcConfig.AttestationAPITimeoutSeconds), - usdcConfig.AttestationAPIIntervalMilliseconds, - usdcConfig.SourceMessageTransmitterAddress, + execPluginConfig.USDCConfig, + execPluginConfig.LBTCConfig, feeEstimatorConfig, ) } diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 193f5e3b00..2b167ed74f 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -10,12 +10,13 @@ import ( "go.uber.org/multierr" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/types" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/lbtc" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -29,21 +30,22 @@ import ( ) type SrcExecProvider struct { - lggr logger.Logger - versionFinder ccip.VersionFinder - client client.Client - lp logpoller.LogPoller - startBlock uint64 - estimator gas.EvmFeeEstimator - maxGasPrice *big.Int - usdcReader *ccip.USDCReaderImpl - usdcAttestationAPI string - usdcAttestationAPITimeoutSeconds int - usdcAttestationAPIIntervalMilliseconds int - usdcSrcMsgTransmitterAddr common.Address + lggr logger.Logger + versionFinder ccip.VersionFinder + client client.Client + lp logpoller.LogPoller + startBlock uint64 + estimator gas.EvmFeeEstimator + maxGasPrice *big.Int + usdcReader *ccip.USDCReaderImpl + usdcConfig config.USDCConfig + lbtcReader *ccip.LBTCReaderImpl + lbtcConfig config.LBTCConfig feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider + // TODO: Add lbtc reader & api fields + // these values are nil and are updated for Close() seenOnRampAddress *cciptypes.Address seenSourceChainSelector *uint64 @@ -59,35 +61,39 @@ func NewSrcExecProvider( lp logpoller.LogPoller, startBlock uint64, jobID string, - usdcAttestationAPI string, - usdcAttestationAPITimeoutSeconds int, - usdcAttestationAPIIntervalMilliseconds int, - usdcSrcMsgTransmitterAddr common.Address, + usdcConfig config.USDCConfig, + lbtcConfig config.LBTCConfig, feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider, ) (commontypes.CCIPExecProvider, error) { var usdcReader *ccip.USDCReaderImpl var err error - if usdcAttestationAPI != "" { - usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcSrcMsgTransmitterAddr, lp, true) + if usdcConfig.AttestationAPI != "" { + usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcConfig.SourceMessageTransmitterAddress, lp, true) + if err != nil { + return nil, fmt.Errorf("new usdc reader: %w", err) + } + } + var lbtcReader *ccip.LBTCReaderImpl + if lbtcConfig.AttestationAPI != "" { + lbtcReader, err = ccip.NewLBTCReader(lggr, jobID, lbtcConfig.SourceMessageTransmitterAddress, lp, true) if err != nil { return nil, fmt.Errorf("new usdc reader: %w", err) } } return &SrcExecProvider{ - lggr: lggr, - versionFinder: versionFinder, - client: client, - estimator: estimator, - maxGasPrice: maxGasPrice, - lp: lp, - startBlock: startBlock, - usdcReader: usdcReader, - usdcAttestationAPI: usdcAttestationAPI, - usdcAttestationAPITimeoutSeconds: usdcAttestationAPITimeoutSeconds, - usdcAttestationAPIIntervalMilliseconds: usdcAttestationAPIIntervalMilliseconds, - usdcSrcMsgTransmitterAddr: usdcSrcMsgTransmitterAddr, - feeEstimatorConfig: feeEstimatorConfig, + lggr: lggr, + versionFinder: versionFinder, + client: client, + estimator: estimator, + maxGasPrice: maxGasPrice, + lp: lp, + startBlock: startBlock, + usdcReader: usdcReader, + usdcConfig: usdcConfig, + lbtcReader: lbtcReader, + lbtcConfig: lbtcConfig, + feeEstimatorConfig: feeEstimatorConfig, }, nil } @@ -116,10 +122,16 @@ func (s *SrcExecProvider) Close() error { return ccip.CloseOnRampReader(s.lggr, versionFinder, *s.seenSourceChainSelector, *s.seenDestChainSelector, *s.seenOnRampAddress, s.lp, s.client) }) unregisterFuncs = append(unregisterFuncs, func() error { - if s.usdcAttestationAPI == "" { + if s.usdcConfig.AttestationAPI == "" { + return nil + } + return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcConfig.SourceMessageTransmitterAddress, s.lp) + }) + unregisterFuncs = append(unregisterFuncs, func() error { + if s.lbtcConfig.AttestationAPI == "" { return nil } - return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcSrcMsgTransmitterAddr, s.lp) + return ccip.CloseLBTCReader(s.lggr, s.lggr.Name(), s.lbtcConfig.SourceMessageTransmitterAddress, s.lp) }) var multiErr error for _, fn := range unregisterFuncs { @@ -195,24 +207,40 @@ func (s *SrcExecProvider) NewPriceRegistryReader(ctx context.Context, addr ccipt return } -func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) { - attestationURI, err2 := url.ParseRequestURI(s.usdcAttestationAPI) - if err2 != nil { - return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err2) +func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (cciptypes.TokenDataReader, error) { + tokenAddr, err := ccip.GenericAddrToEvm(tokenAddress) + if err != nil { + return nil, fmt.Errorf("failed to parse token address: %w", err) } - tokenAddr, err2 := ccip.GenericAddrToEvm(tokenAddress) - if err2 != nil { - return nil, fmt.Errorf("failed to parse token address: %w", err2) + switch tokenAddr { + case s.usdcConfig.SourceTokenAddress: + attestationURI, err := url.ParseRequestURI(s.usdcConfig.AttestationAPI) + if err != nil { + return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err) + } + return usdc.NewUSDCTokenDataReader( + s.lggr, + s.usdcReader, + attestationURI, + int(s.usdcConfig.AttestationAPITimeoutSeconds), + tokenAddr, + time.Duration(s.usdcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, + ), nil + case s.lbtcConfig.SourceTokenAddress: + attestationURI, err := url.ParseRequestURI(s.lbtcConfig.AttestationAPI) + if err != nil { + return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err) + } + return lbtc.NewLBTCTokenDataReader( + s.lggr, + attestationURI, + int(s.lbtcConfig.AttestationAPITimeoutSeconds), + tokenAddr, + time.Duration(s.lbtcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, + ), nil + default: + return nil, fmt.Errorf("unsupported token address: %s", tokenAddress) } - tokenDataReader = usdc.NewUSDCTokenDataReader( - s.lggr, - s.usdcReader, - attestationURI, - s.usdcAttestationAPITimeoutSeconds, - tokenAddr, - time.Duration(s.usdcAttestationAPIIntervalMilliseconds)*time.Millisecond, - ) - return } func (s *SrcExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampAddr cciptypes.Address, sourceChainSelector uint64) (cciptypes.TokenPoolBatchedReader, error) {