From bf261bb5a6c1d17b6da6a8fd92a13fcb61159066 Mon Sep 17 00:00:00 2001 From: Aleksandr Bukata Date: Mon, 9 Dec 2024 10:54:52 +0000 Subject: [PATCH] Revert "CCIP-4403 LBTC attestation cherry-pick (#1564)" This reverts commit 976f457d3310c1f04c73164af0d2ae35ba82f3dd. --- core/services/ocr2/delegate.go | 7 +- .../plugins/ccip/ccipexec/initializers.go | 14 - .../ocr2/plugins/ccip/config/config.go | 31 +- .../ccip/tokendata/http/http_client.go | 17 +- .../ocr2/plugins/ccip/tokendata/lbtc/lbtc.go | 275 ---------- .../plugins/ccip/tokendata/lbtc/lbtc_test.go | 486 ------------------ core/services/relay/evm/evm.go | 8 +- core/services/relay/evm/exec_provider.go | 113 ++-- 8 files changed, 66 insertions(+), 885 deletions(-) delete mode 100644 core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go delete mode 100644 core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 16c3440dc5..8bca97c37e 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1894,7 +1894,7 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi // PROVIDER BASED ARG CONSTRUCTION // Write PluginConfig bytes to send source/dest relayer provider + info outside of top level rargs/pargs over the wire - dstConfigBytes, err := newExecPluginConfig(false, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, pluginJobSpecConfig.LBTCConfig, string(jb.ID)).Encode() + dstConfigBytes, err := newExecPluginConfig(false, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, string(jb.ID)).Encode() if err != nil { return nil, err } @@ -1927,7 +1927,7 @@ func (d *Delegate) ccipExecGetDstProvider(ctx context.Context, jb job.Job, plugi func (d *Delegate) ccipExecGetSrcProvider(ctx context.Context, jb job.Job, pluginJobSpecConfig ccipconfig.ExecPluginJobSpecConfig, transmitterID string, dstProvider types.CCIPExecProvider) (srcProvider types.CCIPExecProvider, srcChainID uint64, err error) { spec := jb.OCR2OracleSpec - srcConfigBytes, err := newExecPluginConfig(true, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, pluginJobSpecConfig.LBTCConfig, string(jb.ID)).Encode() + srcConfigBytes, err := newExecPluginConfig(true, pluginJobSpecConfig.SourceStartBlock, pluginJobSpecConfig.DestStartBlock, pluginJobSpecConfig.USDCConfig, string(jb.ID)).Encode() if err != nil { return nil, 0, err } @@ -1976,13 +1976,12 @@ func (d *Delegate) ccipExecGetSrcProvider(ctx context.Context, jb job.Job, plugi return } -func newExecPluginConfig(isSourceProvider bool, srcStartBlock uint64, dstStartBlock uint64, usdcConfig ccipconfig.USDCConfig, lbtcConfig ccipconfig.LBTCConfig, jobID string) config.ExecPluginConfig { +func newExecPluginConfig(isSourceProvider bool, srcStartBlock uint64, dstStartBlock uint64, usdcConfig ccipconfig.USDCConfig, jobID string) config.ExecPluginConfig { return config.ExecPluginConfig{ IsSourceProvider: isSourceProvider, SourceStartBlock: srcStartBlock, DestStartBlock: dstStartBlock, USDCConfig: usdcConfig, - LBTCConfig: lbtcConfig, JobID: jobID, } } diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index d2d3d32ce9..aa42ff2828 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -121,20 +121,6 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro } tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader } - // init lbtc token data provider - if pluginConfig.LBTCConfig.AttestationAPI != "" { - lggr.Infof("LBTC token data provider enabled") - err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig() - if err2 != nil { - return nil, err2 - } - - lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress)) - if err2 != nil { - return nil, fmt.Errorf("new lbtc reader: %w", err2) - } - tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader - } // Prom wrappers onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel) diff --git a/core/services/ocr2/plugins/ccip/config/config.go b/core/services/ocr2/plugins/ccip/config/config.go index fbf8d590cf..a24a6edfd1 100644 --- a/core/services/ocr2/plugins/ccip/config/config.go +++ b/core/services/ocr2/plugins/ccip/config/config.go @@ -108,7 +108,6 @@ func (c *DynamicPriceGetterConfig) Validate() error { type ExecPluginJobSpecConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. USDCConfig USDCConfig - LBTCConfig LBTCConfig } type USDCConfig struct { @@ -120,19 +119,10 @@ type USDCConfig struct { AttestationAPIIntervalMilliseconds int } -type LBTCConfig struct { - SourceTokenAddress common.Address - AttestationAPI string - AttestationAPITimeoutSeconds uint - // AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval. - AttestationAPIIntervalMilliseconds int -} - type ExecPluginConfig struct { SourceStartBlock, DestStartBlock uint64 // Only for first time job add. IsSourceProvider bool USDCConfig USDCConfig - LBTCConfig LBTCConfig JobID string } @@ -146,30 +136,17 @@ func (e ExecPluginConfig) Encode() ([]byte, error) { func (uc *USDCConfig) ValidateUSDCConfig() error { if uc.AttestationAPI == "" { - return errors.New("USDCConfig: AttestationAPI is required") + return errors.New("AttestationAPI is required") } if uc.AttestationAPIIntervalMilliseconds < -1 { - return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") + return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") } if uc.SourceTokenAddress == utils.ZeroAddress { - return errors.New("USDCConfig: SourceTokenAddress is required") + return errors.New("SourceTokenAddress is required") } if uc.SourceMessageTransmitterAddress == utils.ZeroAddress { - return errors.New("USDCConfig: SourceMessageTransmitterAddress is required") + return errors.New("SourceMessageTransmitterAddress is required") } return nil } - -func (lc *LBTCConfig) ValidateLBTCConfig() error { - if lc.AttestationAPI == "" { - return errors.New("LBTCConfig: AttestationAPI is required") - } - if lc.AttestationAPIIntervalMilliseconds < -1 { - return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval") - } - if lc.SourceTokenAddress == utils.ZeroAddress { - return errors.New("LBTCConfig: SourceTokenAddress is required") - } - return nil -} diff --git a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go index dd303822c7..79ec21b1b8 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go +++ b/core/services/ocr2/plugins/ccip/tokendata/http/http_client.go @@ -12,21 +12,18 @@ import ( ) type IHttpClient interface { - // Get issues a GET request to the given url and returns the response body and status code. + // Get issue a GET request to the given url and return the response body and status code. Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) - - // Post issues a POST request to the given url with the given request data and returns the response body and status code. - Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) } type HttpClient struct { } -func doRequest(ctx context.Context, url string, requestType string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) { +func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) { // Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress. timeoutCtx, cancel := context.WithTimeoutCause(ctx, timeout, tokendata.ErrTimeout) defer cancel() - req, err := http.NewRequestWithContext(timeoutCtx, requestType, url, requestBody) + req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil) if err != nil { return nil, http.StatusBadRequest, nil, err } @@ -49,11 +46,3 @@ func doRequest(ctx context.Context, url string, requestType string, requestBody body, err := io.ReadAll(res.Body) return body, res.StatusCode, res.Header, err } - -func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) { - return doRequest(ctx, url, http.MethodGet, nil, timeout) -} - -func (s *HttpClient) Post(ctx context.Context, url string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) { - return doRequest(ctx, url, http.MethodPost, requestBody, timeout) -} diff --git a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go deleted file mode 100644 index 94df0c76ad..0000000000 --- a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go +++ /dev/null @@ -1,275 +0,0 @@ -package lbtc - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/url" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/pkg/errors" - "golang.org/x/time/rate" - - cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http" -) - -const ( - apiVersion = "v1" - attestationPath = "deposits/getByHash" - defaultAttestationTimeout = 5 * time.Second - - // defaultCoolDownDurationSec defines the default time to wait after getting rate limited. - // this value is only used if the 429 response does not contain the Retry-After header - defaultCoolDownDuration = 30 * time.Second - - // defaultRequestInterval defines the rate in requests per second that the attestation API can be called. - // this is set according to the APIs recommended 5 requests per second rate limit. - defaultRequestInterval = 200 * time.Millisecond - - // APIIntervalRateLimitDisabled is a special value to disable the rate limiting. - APIIntervalRateLimitDisabled = -1 - // APIIntervalRateLimitDefault is a special value to select the default rate limit interval. - APIIntervalRateLimitDefault = 0 -) - -type attestationStatus string - -const ( - attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED" - attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING" - attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED" - attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED" - attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED" -) - -var ( - ErrUnknownResponse = errors.New("unexpected response from attestation API") -) - -type TokenDataReader struct { - lggr logger.Logger - httpClient http.IHttpClient - attestationApi *url.URL - attestationApiTimeout time.Duration - lbtcTokenAddress common.Address - rate *rate.Limiter - - // coolDownUntil defines whether requests are blocked or not. - coolDownUntil time.Time - coolDownMu *sync.RWMutex -} - -type messageAttestationResponse struct { - MessageHash string `json:"message_hash"` - Status attestationStatus `json:"status"` - Attestation string `json:"attestation,omitempty"` // Attestation represented by abi.encode(payload, proof) -} - -type attestationRequest struct { - PayloadHashes []string `json:"messageHash"` -} - -type attestationResponse struct { - Attestations []messageAttestationResponse `json:"attestations"` -} - -type sourceTokenData struct { - SourcePoolAddress []byte - DestTokenAddress []byte - ExtraData []byte - DestGasAmount uint32 -} - -func (m sourceTokenData) AbiString() string { - return `[{ - "components": [ - {"name": "sourcePoolAddress", "type": "bytes"}, - {"name": "destTokenAddress", "type": "bytes"}, - {"name": "extraData", "type": "bytes"}, - {"name": "destGasAmount", "type": "uint32"} - ], - "type": "tuple" - }]` -} - -func (m sourceTokenData) Validate() error { - if len(m.SourcePoolAddress) == 0 { - return errors.New("sourcePoolAddress must be non-empty") - } - if len(m.DestTokenAddress) == 0 { - return errors.New("destTokenAddress must be non-empty") - } - if len(m.ExtraData) == 0 { - return errors.New("extraData must be non-empty") - } - return nil -} - -var _ tokendata.Reader = &TokenDataReader{} - -func NewLBTCTokenDataReader( - lggr logger.Logger, - lbtcAttestationApi *url.URL, - lbtcAttestationApiTimeoutSeconds int, - lbtcTokenAddress common.Address, - requestInterval time.Duration, -) *TokenDataReader { - timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second - if lbtcAttestationApiTimeoutSeconds == 0 { - timeout = defaultAttestationTimeout - } - - if requestInterval == APIIntervalRateLimitDisabled { - requestInterval = 0 - } else if requestInterval == APIIntervalRateLimitDefault { - requestInterval = defaultRequestInterval - } - - return &TokenDataReader{ - lggr: lggr, - httpClient: http.NewObservedIHttpClient(&http.HttpClient{}), - attestationApi: lbtcAttestationApi, - attestationApiTimeout: timeout, - lbtcTokenAddress: lbtcTokenAddress, - coolDownMu: &sync.RWMutex{}, - rate: rate.NewLimiter(rate.Every(requestInterval), 1), - } -} - -func NewLBTCTokenDataReaderWithHttpClient( - origin TokenDataReader, - httpClient http.IHttpClient, - lbtcTokenAddress common.Address, - requestInterval time.Duration, -) *TokenDataReader { - return &TokenDataReader{ - lggr: origin.lggr, - httpClient: httpClient, - attestationApi: origin.attestationApi, - attestationApiTimeout: origin.attestationApiTimeout, - coolDownMu: origin.coolDownMu, - lbtcTokenAddress: lbtcTokenAddress, - rate: rate.NewLimiter(rate.Every(requestInterval), 1), - } -} - -// ReadTokenData queries the LBTC attestation API. -func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) { - if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) { - return nil, fmt.Errorf("token index out of bounds") - } - - if s.inCoolDownPeriod() { - // rate limiting cool-down period, we prevent new requests from being sent - return nil, tokendata.ErrRequestsBlocked - } - - if s.rate != nil { - // Wait blocks until it the attestation API can be called or the - // context is Done. - if waitErr := s.rate.Wait(ctx); waitErr != nil { - return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr) - } - } - - decodedSourceTokenData, err := abihelpers.DecodeAbiStruct[sourceTokenData](msg.SourceTokenData[tokenIndex]) - if err != nil { - return []byte{}, err - } - destTokenData := decodedSourceTokenData.ExtraData - // We don't have better way to determine if the extraData is a payload or sha256(payload) - // Last parameter of the payload struct is 32-bytes nonce (see Lombard's Bridge._deposit(...) method), - // so we can assume that payload always exceeds 32 bytes - if len(destTokenData) != 32 { - s.lggr.Infow("SourceTokenData.extraData size is not 32. This is deposit payload, not sha256(payload). Attestation is disabled onchain", - "destTokenData", hexutil.Encode(destTokenData)) - return destTokenData, nil - } - payloadHash := [32]byte(destTokenData) - - msgID := hexutil.Encode(msg.MessageID[:]) - payloadHashHex := hexutil.Encode(payloadHash[:]) - s.lggr.Infow("Calling attestation API", "messageBodyHash", payloadHashHex, "messageID", msgID) - - attestationResp, err := s.callAttestationApi(ctx, payloadHash) - if err != nil { - return nil, errors.Wrap(err, "failed calling lbtc attestation API") - } - if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 { - return nil, errors.New("attestation response is empty") - } - if len(attestationResp.Attestations) > 1 { - s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations) - } - var attestation messageAttestationResponse - for _, attestationCandidate := range attestationResp.Attestations { - if attestationCandidate.MessageHash == payloadHashHex { - attestation = attestationCandidate - } - } - if attestation == (messageAttestationResponse{}) { - return nil, fmt.Errorf("requested attestation %s not found in response", payloadHashHex) - } - s.lggr.Infow("Got response from attestation API", "messageID", msgID, - "attestationStatus", attestation.Status, "attestation", attestation) - switch attestation.Status { - case attestationStatusSessionApproved: - payloadAndProof, err := hexutil.Decode(attestation.Attestation) - if err != nil { - return nil, err - } - return payloadAndProof, nil - case attestationStatusPending: - return nil, tokendata.ErrNotReady - case attestationStatusSubmitted: - return nil, tokendata.ErrNotReady - default: - s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation) - return nil, ErrUnknownResponse - } -} - -func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) { - attestationUrl := fmt.Sprintf("%s/bridge/%s/%s", s.attestationApi.String(), apiVersion, attestationPath) - request := attestationRequest{PayloadHashes: []string{hexutil.Encode(lbtcMessageHash[:])}} - encodedRequest, err := json.Marshal(request) - requestBuffer := bytes.NewBuffer(encodedRequest) - if err != nil { - return attestationResponse{}, err - } - respRaw, _, _, err := s.httpClient.Post(ctx, attestationUrl, requestBuffer, s.attestationApiTimeout) - switch { - case errors.Is(err, tokendata.ErrRateLimit): - s.setCoolDownPeriod(defaultCoolDownDuration) - return attestationResponse{}, tokendata.ErrRateLimit - case err != nil: - return attestationResponse{}, err - } - var attestationResp attestationResponse - err = json.Unmarshal(respRaw, &attestationResp) - return attestationResp, err -} - -func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) { - s.coolDownMu.Lock() - s.coolDownUntil = time.Now().Add(d) - s.coolDownMu.Unlock() -} - -func (s *TokenDataReader) inCoolDownPeriod() bool { - s.coolDownMu.RLock() - defer s.coolDownMu.RUnlock() - return time.Now().Before(s.coolDownUntil) -} - -func (s *TokenDataReader) Close() error { - return nil -} diff --git a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go b/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go deleted file mode 100644 index 8b65685faa..0000000000 --- a/core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc_test.go +++ /dev/null @@ -1,486 +0,0 @@ -package lbtc - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - - cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" -) - -var ( - lbtcMessageHash = "0xbc427abf571a5cfcf7c98799d1f0055f4db25f203f657d30026728a19d16f092" - lbtcMessageAttestation = "0x0000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000016000000000000000000000000000000000000000000000000000000000000000e45c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e60000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000001200000000000000000000000000000000000000000000000000000000000000040277eeafba008d767c2636d9428f2ebb13ab29ac70337f4fc34b0f5606767cae546f9be3f12160de6d142e5b3c1c3ebd0bf4298662b32b597d0cc5970c7742fc10000000000000000000000000000000000000000000000000000000000000040bbcd60ecc9e06f2effe7c94161219498a1eb435b419387adadb86ec9a52dfb066ce027532517df7216404049d193a25b85c35edfa3e7c5aa4757bfe84887a3980000000000000000000000000000000000000000000000000000000000000040da4a6dc619b5ca2349783cabecc4efdbc910090d3e234d7b8d0430165f8fae532f9a965ceb85c18bb92e059adefa7ce5835850a705761ab9e026d2db4a13ef9a" - payloadAndProof, _ = hexutil.Decode(lbtcMessageAttestation) -) - -func getMockLBTCEndpoint(t *testing.T, response attestationResponse) *httptest.Server { - responseBytes, err := json.Marshal(response) - require.NoError(t, err) - - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write(responseBytes) - require.NoError(t, err) - })) -} - -func TestLBTCReader_callAttestationApi(t *testing.T) { - t.Skipf("Skipping test because it uses the real LBTC attestation API") - attestationURI, err := url.ParseRequestURI("https://bridge-manager.staging.lombard.finance") - require.NoError(t, err) - lggr := logger.TestLogger(t) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) - - attestation, err := lbtcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(lbtcMessageHash))) - require.NoError(t, err) - - require.Equal(t, lbtcMessageHash, attestation.Attestations[0].MessageHash) - require.Equal(t, attestationStatusSessionApproved, attestation.Attestations[0].Status) - require.Equal(t, lbtcMessageAttestation, attestation.Attestations[0].Attestation) -} - -func TestLBTCReader_callAttestationApiMock(t *testing.T) { - response := attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSessionApproved, - Attestation: lbtcMessageAttestation, - }, - }, - } - - ts := getMockLBTCEndpoint(t, response) - defer ts.Close() - attestationURI, err := url.ParseRequestURI(ts.URL) - require.NoError(t, err) - - lggr := logger.TestLogger(t) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, common.Address{}, APIIntervalRateLimitDisabled) - attestation, err := lbtcService.callAttestationApi(context.Background(), [32]byte(common.FromHex(lbtcMessageHash))) - require.NoError(t, err) - - require.Equal(t, response.Attestations[0].Status, attestation.Attestations[0].Status) - require.Equal(t, response.Attestations[0].Attestation, attestation.Attestations[0].Attestation) -} - -func TestLBTCReader_callAttestationApiMockError(t *testing.T) { - t.Parallel() - - sessionApprovedResponse := attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSessionApproved, - Attestation: lbtcMessageAttestation, - }, - }, - } - - tests := []struct { - name string - getTs func() *httptest.Server - parentTimeoutSeconds int - customTimeoutSeconds int - expectedError error - }{ - { - name: "server error", - getTs: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - })) - }, - parentTimeoutSeconds: 60, - expectedError: nil, - }, - { - name: "default timeout", - getTs: func() *httptest.Server { - responseBytes, _ := json.Marshal(sessionApprovedResponse) - - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(defaultAttestationTimeout + time.Second) - _, err := w.Write(responseBytes) - require.NoError(t, err) - })) - }, - parentTimeoutSeconds: 60, - expectedError: tokendata.ErrTimeout, - }, - { - name: "custom timeout", - getTs: func() *httptest.Server { - responseBytes, _ := json.Marshal(sessionApprovedResponse) - - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(2*time.Second + time.Second) - _, err := w.Write(responseBytes) - require.NoError(t, err) - })) - }, - parentTimeoutSeconds: 60, - customTimeoutSeconds: 2, - expectedError: tokendata.ErrTimeout, - }, - { - name: "rate limit", - getTs: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusTooManyRequests) - })) - }, - parentTimeoutSeconds: 60, - expectedError: tokendata.ErrRateLimit, - }, - { - name: "parent context timeout", - getTs: func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(defaultAttestationTimeout + time.Second) - })) - }, - parentTimeoutSeconds: 1, - expectedError: nil, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ts := test.getTs() - defer ts.Close() - - attestationURI, err := url.ParseRequestURI(ts.URL) - require.NoError(t, err) - - lggr := logger.TestLogger(t) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, test.customTimeoutSeconds, common.Address{}, APIIntervalRateLimitDisabled) - - parentCtx, cancel := context.WithTimeout(context.Background(), time.Duration(test.parentTimeoutSeconds)*time.Second) - defer cancel() - - _, err = lbtcService.callAttestationApi(parentCtx, [32]byte(common.FromHex(lbtcMessageHash))) - require.Error(t, err) - - if test.expectedError != nil { - require.True(t, errors.Is(err, test.expectedError)) - } - }) - } -} - -func TestLBTCReader_rateLimiting(t *testing.T) { - sessionApprovedResponse := attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSessionApproved, - Attestation: lbtcMessageAttestation, - }, - }, - } - - testCases := []struct { - name string - requests uint64 - rateConfig time.Duration - testDuration time.Duration - timeout time.Duration - err string - additionalErr string - }{ - { - name: "no rate limit when disabled", - requests: 10, - rateConfig: APIIntervalRateLimitDisabled, - testDuration: 1 * time.Millisecond, - }, - { - name: "yes rate limited with default config", - requests: 5, - rateConfig: APIIntervalRateLimitDefault, - testDuration: 4 * defaultRequestInterval, - }, - { - name: "yes rate limited with config", - requests: 10, - rateConfig: 50 * time.Millisecond, - testDuration: 9 * 50 * time.Millisecond, - }, - { - name: "request timeout", - requests: 5, - rateConfig: 100 * time.Millisecond, - testDuration: 1 * time.Millisecond, - timeout: 1 * time.Millisecond, - err: "lbtc rate limiting error:", - additionalErr: "token data API timed out", - }, - } - - extraData, err := hexutil.Decode(lbtcMessageHash) - require.NoError(t, err) - - srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ - SourcePoolAddress: utils.RandomAddress().Bytes(), - DestTokenAddress: utils.RandomAddress().Bytes(), - ExtraData: extraData, - }) - require.NoError(t, err) - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - ts := getMockLBTCEndpoint(t, sessionApprovedResponse) - defer ts.Close() - attestationURI, err := url.ParseRequestURI(ts.URL) - require.NoError(t, err) - - lggr := logger.TestLogger(t) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), tc.rateConfig) - - ctx := context.Background() - if tc.timeout > 0 { - var cf context.CancelFunc - ctx, cf = context.WithTimeout(ctx, tc.timeout) - defer cf() - } - - trigger := make(chan struct{}) - errorChan := make(chan error, tc.requests) - wg := sync.WaitGroup{} - for i := 0; i < int(tc.requests); i++ { - wg.Add(1) - go func() { - defer wg.Done() - - <-trigger - _, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SourceTokenData: [][]byte{srcTokenData}, - TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address - }, - }, 0) - - errorChan <- err - }() - } - - // Start the test - start := time.Now() - close(trigger) - - // Wait for requests to complete - wg.Wait() - finish := time.Now() - close(errorChan) - - // Collect errors - errorFound := false - for err := range errorChan { - if tc.err != "" && strings.Contains(err.Error(), tc.err) { - errorFound = true - } else if tc.additionalErr != "" && strings.Contains(err.Error(), tc.additionalErr) { - errorFound = true - } else if err != nil { - require.Fail(t, "unexpected error", err) - } - } - - if tc.err != "" { - assert.True(t, errorFound) - } - assert.WithinDuration(t, start.Add(tc.testDuration), finish, 50*time.Millisecond) - }) - } -} - -func TestLBTCReader_skipApiOnFullPayload(t *testing.T) { - sessionApprovedResponse := attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSessionApproved, - Attestation: lbtcMessageAttestation, - }, - }, - } - - srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ - SourcePoolAddress: utils.RandomAddress().Bytes(), - DestTokenAddress: utils.RandomAddress().Bytes(), - ExtraData: []byte(lbtcMessageHash), // more than 32 bytes - }) - require.NoError(t, err) - - ts := getMockLBTCEndpoint(t, sessionApprovedResponse) - defer ts.Close() - attestationURI, err := url.ParseRequestURI(ts.URL) - require.NoError(t, err) - - lggr, logs := logger.TestLoggerObserved(t, zapcore.InfoLevel) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), APIIntervalRateLimitDefault) - - ctx := context.Background() - - destTokenData, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SourceTokenData: [][]byte{srcTokenData}, - TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address - }, - }, 0) - require.NoError(t, err) - require.EqualValues(t, []byte(lbtcMessageHash), destTokenData) - - require.Equal(t, 1, logs.Len()) - require.Contains(t, logs.All()[0].Message, "SourceTokenData.extraData size is not 32. This is deposit payload, not sha256(payload). Attestation is disabled onchain") -} - -func TestLBTCReader_expectedOutput(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - response attestationResponse - expectedReturn []byte - expectedError error - }{ - { - name: "expected payloadAndProof when status SESSION_APPROVED", - response: attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSessionApproved, - Attestation: lbtcMessageAttestation, - }, - }, - }, - expectedReturn: payloadAndProof, - expectedError: nil, - }, - { - name: "expected ErrNotReady on status PENDING", - response: attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusPending, - Attestation: lbtcMessageAttestation, - }, - }, - }, - expectedReturn: nil, - expectedError: tokendata.ErrNotReady, - }, - { - name: "expected ErrNotReady on status SUBMITTED", - response: attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusSubmitted, - Attestation: lbtcMessageAttestation, - }, - }, - }, - expectedReturn: nil, - expectedError: tokendata.ErrNotReady, - }, - { - name: "expected ErrUnknownResponse on status UNSPECIFIED", - response: attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusUnspecified, - Attestation: lbtcMessageAttestation, - }, - }, - }, - expectedReturn: nil, - expectedError: ErrUnknownResponse, - }, - { - name: "expected ErrUnknownResponse on status FAILED", - response: attestationResponse{ - Attestations: []messageAttestationResponse{ - { - MessageHash: lbtcMessageHash, - Status: attestationStatusFailed, - Attestation: lbtcMessageAttestation, - }, - }, - }, - expectedReturn: nil, - expectedError: ErrUnknownResponse, - }, - } - - extraData, err := hexutil.Decode(lbtcMessageHash) - require.NoError(t, err) - - srcTokenData, err := abihelpers.EncodeAbiStruct[sourceTokenData](sourceTokenData{ - SourcePoolAddress: utils.RandomAddress().Bytes(), - DestTokenAddress: utils.RandomAddress().Bytes(), - ExtraData: extraData, - }) - require.NoError(t, err) - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ts := getMockLBTCEndpoint(t, tc.response) - defer ts.Close() - attestationURI, err := url.ParseRequestURI(ts.URL) - require.NoError(t, err) - - lggr := logger.TestLogger(t) - lbtcService := NewLBTCTokenDataReader(lggr, attestationURI, 0, utils.RandomAddress(), APIIntervalRateLimitDefault) - - ctx := context.Background() - - payloadAndProof, err := lbtcService.ReadTokenData(ctx, cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ - EVM2EVMMessage: cciptypes.EVM2EVMMessage{ - SourceTokenData: [][]byte{srcTokenData}, - TokenAmounts: []cciptypes.TokenAmount{{Token: ccipcalc.EvmAddrToGeneric(utils.ZeroAddress), Amount: nil}}, // trigger failure due to wrong address - }, - }, 0) - - if tc.expectedReturn != nil { - require.EqualValues(t, tc.expectedReturn, payloadAndProof) - } else if tc.expectedError != nil { - require.True(t, strings.Contains(err.Error(), tc.expectedError.Error())) - } - }) - } -} - -func Test_DecodeSourceTokenData(t *testing.T) { - input, err := hexutil.Decode("0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000249f00000000000000000000000000000000000000000000000000000000000000020000000000000000000000000267d40f64ecc4d95f3e8b2237df5f37b10812c250000000000000000000000000000000000000000000000000000000000000020000000000000000000000000c47e4b3124597fdf8dd07843d4a7052f2ee80c3000000000000000000000000000000000000000000000000000000000000000e45c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e6000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000") - require.NoError(t, err) - decoded, err := abihelpers.DecodeAbiStruct[sourceTokenData](input) - require.NoError(t, err) - expected, err := hexutil.Decode("0x5c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e60000000000000000000000000000000000000000000000000000000000000006") - require.NoError(t, err) - require.Equal(t, expected, decoded.ExtraData) -} diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 516b418ac9..1dff3e7b7a 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -563,6 +563,8 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont return nil, err } + usdcConfig := execPluginConfig.USDCConfig + feeEstimatorConfig := estimatorconfig.NewFeeEstimatorConfigService() // CCIPExec reads when dest chain is mantle, and uses it to calc boosting in batching @@ -589,8 +591,10 @@ func (r *Relayer) NewCCIPExecProvider(rargs commontypes.RelayArgs, pargs commont r.chain.LogPoller(), execPluginConfig.SourceStartBlock, execPluginConfig.JobID, - execPluginConfig.USDCConfig, - execPluginConfig.LBTCConfig, + usdcConfig.AttestationAPI, + int(usdcConfig.AttestationAPITimeoutSeconds), + usdcConfig.AttestationAPIIntervalMilliseconds, + usdcConfig.SourceMessageTransmitterAddress, feeEstimatorConfig, ) } diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 91d5b090fa..193f5e3b00 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -10,13 +10,12 @@ import ( "go.uber.org/multierr" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/types" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/lbtc" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -30,21 +29,21 @@ import ( ) type SrcExecProvider struct { - lggr logger.Logger - versionFinder ccip.VersionFinder - client client.Client - lp logpoller.LogPoller - startBlock uint64 - estimator gas.EvmFeeEstimator - maxGasPrice *big.Int - usdcReader *ccip.USDCReaderImpl - usdcConfig config.USDCConfig - lbtcConfig config.LBTCConfig + lggr logger.Logger + versionFinder ccip.VersionFinder + client client.Client + lp logpoller.LogPoller + startBlock uint64 + estimator gas.EvmFeeEstimator + maxGasPrice *big.Int + usdcReader *ccip.USDCReaderImpl + usdcAttestationAPI string + usdcAttestationAPITimeoutSeconds int + usdcAttestationAPIIntervalMilliseconds int + usdcSrcMsgTransmitterAddr common.Address feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider - // TODO: Add lbtc reader & api fields - // these values are nil and are updated for Close() seenOnRampAddress *cciptypes.Address seenSourceChainSelector *uint64 @@ -60,31 +59,35 @@ func NewSrcExecProvider( lp logpoller.LogPoller, startBlock uint64, jobID string, - usdcConfig config.USDCConfig, - lbtcConfig config.LBTCConfig, + usdcAttestationAPI string, + usdcAttestationAPITimeoutSeconds int, + usdcAttestationAPIIntervalMilliseconds int, + usdcSrcMsgTransmitterAddr common.Address, feeEstimatorConfig estimatorconfig.FeeEstimatorConfigProvider, ) (commontypes.CCIPExecProvider, error) { var usdcReader *ccip.USDCReaderImpl var err error - if usdcConfig.AttestationAPI != "" { - usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcConfig.SourceMessageTransmitterAddress, lp, true) + if usdcAttestationAPI != "" { + usdcReader, err = ccip.NewUSDCReader(lggr, jobID, usdcSrcMsgTransmitterAddr, lp, true) if err != nil { return nil, fmt.Errorf("new usdc reader: %w", err) } } return &SrcExecProvider{ - lggr: lggr, - versionFinder: versionFinder, - client: client, - estimator: estimator, - maxGasPrice: maxGasPrice, - lp: lp, - startBlock: startBlock, - usdcReader: usdcReader, - usdcConfig: usdcConfig, - lbtcConfig: lbtcConfig, - feeEstimatorConfig: feeEstimatorConfig, + lggr: lggr, + versionFinder: versionFinder, + client: client, + estimator: estimator, + maxGasPrice: maxGasPrice, + lp: lp, + startBlock: startBlock, + usdcReader: usdcReader, + usdcAttestationAPI: usdcAttestationAPI, + usdcAttestationAPITimeoutSeconds: usdcAttestationAPITimeoutSeconds, + usdcAttestationAPIIntervalMilliseconds: usdcAttestationAPIIntervalMilliseconds, + usdcSrcMsgTransmitterAddr: usdcSrcMsgTransmitterAddr, + feeEstimatorConfig: feeEstimatorConfig, }, nil } @@ -113,10 +116,10 @@ func (s *SrcExecProvider) Close() error { return ccip.CloseOnRampReader(s.lggr, versionFinder, *s.seenSourceChainSelector, *s.seenDestChainSelector, *s.seenOnRampAddress, s.lp, s.client) }) unregisterFuncs = append(unregisterFuncs, func() error { - if s.usdcConfig.AttestationAPI == "" { + if s.usdcAttestationAPI == "" { return nil } - return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcConfig.SourceMessageTransmitterAddress, s.lp) + return ccip.CloseUSDCReader(s.lggr, s.lggr.Name(), s.usdcSrcMsgTransmitterAddr, s.lp) }) var multiErr error for _, fn := range unregisterFuncs { @@ -192,40 +195,24 @@ func (s *SrcExecProvider) NewPriceRegistryReader(ctx context.Context, addr ccipt return } -func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (cciptypes.TokenDataReader, error) { - tokenAddr, err := ccip.GenericAddrToEvm(tokenAddress) - if err != nil { - return nil, fmt.Errorf("failed to parse token address: %w", err) +func (s *SrcExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cciptypes.Address) (tokenDataReader cciptypes.TokenDataReader, err error) { + attestationURI, err2 := url.ParseRequestURI(s.usdcAttestationAPI) + if err2 != nil { + return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err2) } - switch tokenAddr { - case s.usdcConfig.SourceTokenAddress: - attestationURI, err := url.ParseRequestURI(s.usdcConfig.AttestationAPI) - if err != nil { - return nil, fmt.Errorf("failed to parse USDC attestation API: %w", err) - } - return usdc.NewUSDCTokenDataReader( - s.lggr, - s.usdcReader, - attestationURI, - int(s.usdcConfig.AttestationAPITimeoutSeconds), - tokenAddr, - time.Duration(s.usdcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, - ), nil - case s.lbtcConfig.SourceTokenAddress: - attestationURI, err := url.ParseRequestURI(s.lbtcConfig.AttestationAPI) - if err != nil { - return nil, fmt.Errorf("failed to parse LBTC attestation API: %w", err) - } - return lbtc.NewLBTCTokenDataReader( - s.lggr, - attestationURI, - int(s.lbtcConfig.AttestationAPITimeoutSeconds), - tokenAddr, - time.Duration(s.lbtcConfig.AttestationAPIIntervalMilliseconds)*time.Millisecond, - ), nil - default: - return nil, fmt.Errorf("unsupported token address: %s", tokenAddress) + tokenAddr, err2 := ccip.GenericAddrToEvm(tokenAddress) + if err2 != nil { + return nil, fmt.Errorf("failed to parse token address: %w", err2) } + tokenDataReader = usdc.NewUSDCTokenDataReader( + s.lggr, + s.usdcReader, + attestationURI, + s.usdcAttestationAPITimeoutSeconds, + tokenAddr, + time.Duration(s.usdcAttestationAPIIntervalMilliseconds)*time.Millisecond, + ) + return } func (s *SrcExecProvider) NewTokenPoolBatchedReader(ctx context.Context, offRampAddr cciptypes.Address, sourceChainSelector uint64) (cciptypes.TokenPoolBatchedReader, error) {