Skip to content

Commit

Permalink
CCIP-4403 LBTC attestation cherry-pick (#1564)
Browse files Browse the repository at this point in the history
Cherry-picking LBTC attestation commits for core 2.17
commits and its order:
45ddcac
484940a
9628956
b925245
7f2ba83
d354759

---------

Co-authored-by: Nour Elrashidy <[email protected]>
Co-authored-by: Nour Elrashidy <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent fcc388c commit 976f457
Show file tree
Hide file tree
Showing 7 changed files with 881 additions and 63 deletions.
14 changes: 14 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro
}
tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader
}
// init lbtc token data provider
if pluginConfig.LBTCConfig.AttestationAPI != "" {
lggr.Infof("LBTC token data provider enabled")
err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig()
if err2 != nil {
return nil, err2
}

lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress))
if err2 != nil {
return nil, fmt.Errorf("new lbtc reader: %w", err2)
}
tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel)
Expand Down
31 changes: 27 additions & 4 deletions core/services/ocr2/plugins/ccip/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *DynamicPriceGetterConfig) Validate() error {
type ExecPluginJobSpecConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
USDCConfig USDCConfig
LBTCConfig LBTCConfig
}

type USDCConfig struct {
Expand All @@ -119,10 +120,19 @@ type USDCConfig struct {
AttestationAPIIntervalMilliseconds int
}

type LBTCConfig struct {
SourceTokenAddress common.Address
AttestationAPI string
AttestationAPITimeoutSeconds uint
// AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval.
AttestationAPIIntervalMilliseconds int
}

type ExecPluginConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
IsSourceProvider bool
USDCConfig USDCConfig
LBTCConfig LBTCConfig
JobID string
}

Expand All @@ -136,17 +146,30 @@ func (e ExecPluginConfig) Encode() ([]byte, error) {

func (uc *USDCConfig) ValidateUSDCConfig() error {
if uc.AttestationAPI == "" {
return errors.New("AttestationAPI is required")
return errors.New("USDCConfig: AttestationAPI is required")
}
if uc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if uc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("SourceTokenAddress is required")
return errors.New("USDCConfig: SourceTokenAddress is required")
}
if uc.SourceMessageTransmitterAddress == utils.ZeroAddress {
return errors.New("SourceMessageTransmitterAddress is required")
return errors.New("USDCConfig: SourceMessageTransmitterAddress is required")
}

return nil
}

func (lc *LBTCConfig) ValidateLBTCConfig() error {
if lc.AttestationAPI == "" {
return errors.New("LBTCConfig: AttestationAPI is required")
}
if lc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if lc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("LBTCConfig: SourceTokenAddress is required")
}
return nil
}
17 changes: 14 additions & 3 deletions core/services/ocr2/plugins/ccip/tokendata/http/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ import (
)

type IHttpClient interface {
// Get issue a GET request to the given url and return the response body and status code.
// Get issues a GET request to the given url and returns the response body and status code.
Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error)

// Post issues a POST request to the given url with the given request data and returns the response body and status code.
Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error)
}

type HttpClient struct {
}

func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) {
func doRequest(ctx context.Context, url string, requestType string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) {
// Use a timeout to guard against attestation API hanging, causing observation timeout and failing to make any progress.
timeoutCtx, cancel := context.WithTimeoutCause(ctx, timeout, tokendata.ErrTimeout)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil)
req, err := http.NewRequestWithContext(timeoutCtx, requestType, url, requestBody)
if err != nil {
return nil, http.StatusBadRequest, nil, err
}
Expand All @@ -46,3 +49,11 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration)
body, err := io.ReadAll(res.Body)
return body, res.StatusCode, res.Header, err
}

func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error) {
return doRequest(ctx, url, http.MethodGet, nil, timeout)
}

func (s *HttpClient) Post(ctx context.Context, url string, requestBody io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) {
return doRequest(ctx, url, http.MethodPost, requestBody, timeout)
}
275 changes: 275 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package lbtc

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/url"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"golang.org/x/time/rate"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
)

const (
apiVersion = "v1"
attestationPath = "deposits/getByHash"
defaultAttestationTimeout = 5 * time.Second

// defaultCoolDownDurationSec defines the default time to wait after getting rate limited.
// this value is only used if the 429 response does not contain the Retry-After header
defaultCoolDownDuration = 30 * time.Second

// defaultRequestInterval defines the rate in requests per second that the attestation API can be called.
// this is set according to the APIs recommended 5 requests per second rate limit.
defaultRequestInterval = 200 * time.Millisecond

// APIIntervalRateLimitDisabled is a special value to disable the rate limiting.
APIIntervalRateLimitDisabled = -1
// APIIntervalRateLimitDefault is a special value to select the default rate limit interval.
APIIntervalRateLimitDefault = 0
)

type attestationStatus string

const (
attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED"
attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING"
attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED"
attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED"
attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED"
)

var (
ErrUnknownResponse = errors.New("unexpected response from attestation API")
)

type TokenDataReader struct {
lggr logger.Logger
httpClient http.IHttpClient
attestationApi *url.URL
attestationApiTimeout time.Duration
lbtcTokenAddress common.Address
rate *rate.Limiter

// coolDownUntil defines whether requests are blocked or not.
coolDownUntil time.Time
coolDownMu *sync.RWMutex
}

type messageAttestationResponse struct {
MessageHash string `json:"message_hash"`
Status attestationStatus `json:"status"`
Attestation string `json:"attestation,omitempty"` // Attestation represented by abi.encode(payload, proof)
}

type attestationRequest struct {
PayloadHashes []string `json:"messageHash"`
}

type attestationResponse struct {
Attestations []messageAttestationResponse `json:"attestations"`
}

type sourceTokenData struct {
SourcePoolAddress []byte
DestTokenAddress []byte
ExtraData []byte
DestGasAmount uint32
}

func (m sourceTokenData) AbiString() string {
return `[{
"components": [
{"name": "sourcePoolAddress", "type": "bytes"},
{"name": "destTokenAddress", "type": "bytes"},
{"name": "extraData", "type": "bytes"},
{"name": "destGasAmount", "type": "uint32"}
],
"type": "tuple"
}]`
}

func (m sourceTokenData) Validate() error {
if len(m.SourcePoolAddress) == 0 {
return errors.New("sourcePoolAddress must be non-empty")
}
if len(m.DestTokenAddress) == 0 {
return errors.New("destTokenAddress must be non-empty")
}
if len(m.ExtraData) == 0 {
return errors.New("extraData must be non-empty")
}
return nil
}

var _ tokendata.Reader = &TokenDataReader{}

func NewLBTCTokenDataReader(
lggr logger.Logger,
lbtcAttestationApi *url.URL,
lbtcAttestationApiTimeoutSeconds int,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second
if lbtcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

if requestInterval == APIIntervalRateLimitDisabled {
requestInterval = 0
} else if requestInterval == APIIntervalRateLimitDefault {
requestInterval = defaultRequestInterval
}

return &TokenDataReader{
lggr: lggr,
httpClient: http.NewObservedIHttpClient(&http.HttpClient{}),
attestationApi: lbtcAttestationApi,
attestationApiTimeout: timeout,
lbtcTokenAddress: lbtcTokenAddress,
coolDownMu: &sync.RWMutex{},
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

func NewLBTCTokenDataReaderWithHttpClient(
origin TokenDataReader,
httpClient http.IHttpClient,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
httpClient: httpClient,
attestationApi: origin.attestationApi,
attestationApiTimeout: origin.attestationApiTimeout,
coolDownMu: origin.coolDownMu,
lbtcTokenAddress: lbtcTokenAddress,
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

// ReadTokenData queries the LBTC attestation API.
func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) {
if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) {
return nil, fmt.Errorf("token index out of bounds")
}

if s.inCoolDownPeriod() {
// rate limiting cool-down period, we prevent new requests from being sent
return nil, tokendata.ErrRequestsBlocked
}

if s.rate != nil {
// Wait blocks until it the attestation API can be called or the
// context is Done.
if waitErr := s.rate.Wait(ctx); waitErr != nil {
return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr)
}
}

decodedSourceTokenData, err := abihelpers.DecodeAbiStruct[sourceTokenData](msg.SourceTokenData[tokenIndex])
if err != nil {
return []byte{}, err
}
destTokenData := decodedSourceTokenData.ExtraData
// We don't have better way to determine if the extraData is a payload or sha256(payload)
// Last parameter of the payload struct is 32-bytes nonce (see Lombard's Bridge._deposit(...) method),
// so we can assume that payload always exceeds 32 bytes
if len(destTokenData) != 32 {
s.lggr.Infow("SourceTokenData.extraData size is not 32. This is deposit payload, not sha256(payload). Attestation is disabled onchain",
"destTokenData", hexutil.Encode(destTokenData))
return destTokenData, nil
}
payloadHash := [32]byte(destTokenData)

msgID := hexutil.Encode(msg.MessageID[:])
payloadHashHex := hexutil.Encode(payloadHash[:])
s.lggr.Infow("Calling attestation API", "messageBodyHash", payloadHashHex, "messageID", msgID)

attestationResp, err := s.callAttestationApi(ctx, payloadHash)
if err != nil {
return nil, errors.Wrap(err, "failed calling lbtc attestation API")
}
if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 {
return nil, errors.New("attestation response is empty")
}
if len(attestationResp.Attestations) > 1 {
s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations)
}
var attestation messageAttestationResponse
for _, attestationCandidate := range attestationResp.Attestations {
if attestationCandidate.MessageHash == payloadHashHex {
attestation = attestationCandidate
}
}
if attestation == (messageAttestationResponse{}) {
return nil, fmt.Errorf("requested attestation %s not found in response", payloadHashHex)
}
s.lggr.Infow("Got response from attestation API", "messageID", msgID,
"attestationStatus", attestation.Status, "attestation", attestation)
switch attestation.Status {
case attestationStatusSessionApproved:
payloadAndProof, err := hexutil.Decode(attestation.Attestation)
if err != nil {
return nil, err
}
return payloadAndProof, nil
case attestationStatusPending:
return nil, tokendata.ErrNotReady
case attestationStatusSubmitted:
return nil, tokendata.ErrNotReady
default:
s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation)
return nil, ErrUnknownResponse
}
}

func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) {
attestationUrl := fmt.Sprintf("%s/bridge/%s/%s", s.attestationApi.String(), apiVersion, attestationPath)
request := attestationRequest{PayloadHashes: []string{hexutil.Encode(lbtcMessageHash[:])}}
encodedRequest, err := json.Marshal(request)
requestBuffer := bytes.NewBuffer(encodedRequest)
if err != nil {
return attestationResponse{}, err
}
respRaw, _, _, err := s.httpClient.Post(ctx, attestationUrl, requestBuffer, s.attestationApiTimeout)
switch {
case errors.Is(err, tokendata.ErrRateLimit):
s.setCoolDownPeriod(defaultCoolDownDuration)
return attestationResponse{}, tokendata.ErrRateLimit
case err != nil:
return attestationResponse{}, err
}
var attestationResp attestationResponse
err = json.Unmarshal(respRaw, &attestationResp)
return attestationResp, err
}

func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) {
s.coolDownMu.Lock()
s.coolDownUntil = time.Now().Add(d)
s.coolDownMu.Unlock()
}

func (s *TokenDataReader) inCoolDownPeriod() bool {
s.coolDownMu.RLock()
defer s.coolDownMu.RUnlock()
return time.Now().Before(s.coolDownUntil)
}

func (s *TokenDataReader) Close() error {
return nil
}
Loading

0 comments on commit 976f457

Please sign in to comment.