Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-4403 skeleton: LBTC attestation #1554

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro
}
tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader
}
// init lbtc token data provider
if pluginConfig.LBTCConfig.AttestationAPI != "" {
lggr.Infof("LBTC token data provider enabled")
err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig()
if err2 != nil {
return nil, err2
}

lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress))
if err2 != nil {
return nil, fmt.Errorf("new usdc reader: %w", err2)
}
tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel)
Expand Down
32 changes: 28 additions & 4 deletions core/services/ocr2/plugins/ccip/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *DynamicPriceGetterConfig) Validate() error {
type ExecPluginJobSpecConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
USDCConfig USDCConfig
LBTCConfig LBTCConfig
}

type USDCConfig struct {
Expand All @@ -119,10 +120,20 @@ type USDCConfig struct {
AttestationAPIIntervalMilliseconds int
}

type LBTCConfig struct {
SourceTokenAddress common.Address
SourceMessageTransmitterAddress common.Address
AttestationAPI string
AttestationAPITimeoutSeconds uint
// AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval.
AttestationAPIIntervalMilliseconds int
}

type ExecPluginConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
IsSourceProvider bool
USDCConfig USDCConfig
LBTCConfig LBTCConfig
JobID string
}

Expand All @@ -136,17 +147,30 @@ func (e ExecPluginConfig) Encode() ([]byte, error) {

func (uc *USDCConfig) ValidateUSDCConfig() error {
if uc.AttestationAPI == "" {
return errors.New("AttestationAPI is required")
return errors.New("USDCConfig: AttestationAPI is required")
}
if uc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if uc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("SourceTokenAddress is required")
return errors.New("USDCConfig: SourceTokenAddress is required")
}
if uc.SourceMessageTransmitterAddress == utils.ZeroAddress {
return errors.New("SourceMessageTransmitterAddress is required")
return errors.New("USDCConfig: SourceMessageTransmitterAddress is required")
}

return nil
}

func (lc *LBTCConfig) ValidateLBTCConfig() error {
if lc.AttestationAPI == "" {
return errors.New("LBTCConfig: AttestationAPI is required")
}
if lc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if lc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("LBTCConfig: SourceTokenAddress is required")
}
return nil
}
9 changes: 9 additions & 0 deletions core/services/ocr2/plugins/ccip/exportinternal.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ func CloseUSDCReader(lggr logger.Logger, jobID string, transmitter common.Addres
return ccipdata.CloseUSDCReader(lggr, jobID, transmitter, lp)
}

func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*ccipdata.LBTCReaderImpl, error) {
return ccipdata.NewLBTCReader(lggr, jobID, transmitter, lp, registerFilters)
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return ccipdata.CloseLBTCReader(lggr, jobID, transmitter, lp)
}

type USDCReaderImpl = ccipdata.USDCReaderImpl
type LBTCReaderImpl = ccipdata.LBTCReaderImpl

var DefaultRpcBatchSizeLimit = rpclib.DefaultRpcBatchSizeLimit
var DefaultRpcBatchBackOffMultiplier = rpclib.DefaultRpcBatchBackOffMultiplier
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ccipdata

import (
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// TODO: Implement lbtc token reader
type LBTCReader interface {
}

type LBTCReaderImpl struct {
}

func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*LBTCReaderImpl, error) {
return &LBTCReaderImpl{}, nil
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

type IHttpClient interface {
// Get issue a GET request to the given url and return the response body and status code.
// Get issues a GET request to the given url and returns the response body and status code.
Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error)

// Post issues a POST request to the given url with the given request data and returns the response body and status code.
Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error)
}

type HttpClient struct {
Expand Down Expand Up @@ -46,3 +49,8 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration)
body, err := io.ReadAll(res.Body)
return body, res.StatusCode, res.Header, err
}

func (s *HttpClient) Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) {
// TODO: Implement
return nil, 0, nil, nil
}
234 changes: 234 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package lbtc

import (
"context"
"crypto/sha256"
"fmt"
"net/url"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"golang.org/x/time/rate"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
)

// TODO: double check the validty of default values for lombard's API after checking docs
const (
apiVersion = "v1"
attestationPath = "deposits/getByHash"
defaultAttestationTimeout = 5 * time.Second

// defaultCoolDownDurationSec defines the default time to wait after getting rate limited.
// this value is only used if the 429 response does not contain the Retry-After header
defaultCoolDownDuration = 5 * time.Minute

// maxCoolDownDuration defines the maximum duration we can wait till firing the next request
maxCoolDownDuration = 10 * time.Minute

// defaultRequestInterval defines the rate in requests per second that the attestation API can be called.
// this is set according to the APIs documentated 10 requests per second rate limit.
defaultRequestInterval = 100 * time.Millisecond

// APIIntervalRateLimitDisabled is a special value to disable the rate limiting.
APIIntervalRateLimitDisabled = -1
// APIIntervalRateLimitDefault is a special value to select the default rate limit interval.
APIIntervalRateLimitDefault = 0
)

type attestationStatus string

const (
attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED"
attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING"
attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED"
attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED"
attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED"
)

var (
ErrUnknownResponse = errors.New("unexpected response from attestation API")
)

type TokenDataReader struct {
lggr logger.Logger
httpClient http.IHttpClient
attestationApi *url.URL
attestationApiTimeout time.Duration
lbtcTokenAddress common.Address
rate *rate.Limiter

// coolDownUntil defines whether requests are blocked or not.
coolDownUntil time.Time
coolDownMu *sync.RWMutex
}

type messageAttestationResponse struct {
MessageHash string `json:"message_hash"`
Status attestationStatus `json:"status"`
Attestation string `json:"attestation"`
}

// TODO: Adjust after checking API docs
type attestationResponse struct {
Attestations []messageAttestationResponse `json:"attestations"`
}

// TODO: Implement encoding/decoding

var _ tokendata.Reader = &TokenDataReader{}

func NewLBTCTokenDataReader(
lggr logger.Logger,
lbtcAttestationApi *url.URL,
lbtcAttestationApiTimeoutSeconds int,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second
if lbtcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

if requestInterval == APIIntervalRateLimitDisabled {
requestInterval = 0
} else if requestInterval == APIIntervalRateLimitDefault {
requestInterval = defaultRequestInterval
}

return &TokenDataReader{
lggr: lggr,
httpClient: http.NewObservedIHttpClient(&http.HttpClient{}),
attestationApi: lbtcAttestationApi,
attestationApiTimeout: timeout,
lbtcTokenAddress: lbtcTokenAddress,
coolDownMu: &sync.RWMutex{},
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

func NewLBTCTokenDataReaderWithHttpClient(
origin TokenDataReader,
httpClient http.IHttpClient,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
httpClient: httpClient,
attestationApi: origin.attestationApi,
attestationApiTimeout: origin.attestationApiTimeout,
coolDownMu: origin.coolDownMu,
lbtcTokenAddress: lbtcTokenAddress,
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

// ReadTokenData queries the LBTC attestation API.
func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) {
if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) {
return nil, fmt.Errorf("token index out of bounds")
}

if s.inCoolDownPeriod() {
// rate limiting cool-down period, we prevent new requests from being sent
return nil, tokendata.ErrRequestsBlocked
}

if s.rate != nil {
// Wait blocks until it the attestation API can be called or the
// context is Done.
if waitErr := s.rate.Wait(ctx); waitErr != nil {
return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr)
}
}

messageBody, err := s.getLBTCMessageBody(ctx, msg, tokenIndex)
if err != nil {
return []byte{}, errors.Wrap(err, "failed getting the LBTC message body")
}

msgID := hexutil.Encode(msg.MessageID[:])
messageBodyHash := sha256.Sum256(messageBody)
messageBodyHashHex := hexutil.Encode(messageBodyHash[:])
s.lggr.Infow("Calling attestation API", "messageBodyHash", messageBodyHashHex, "messageID", msgID)

attestationResp, err := s.callAttestationApi(ctx, messageBodyHash)
if err != nil {
return nil, err
}
if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 {
return nil, errors.New("attestation response is empty")
}
if len(attestationResp.Attestations) > 1 {
s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations)
}
var attestation messageAttestationResponse
for _, attestationCandidate := range attestationResp.Attestations {
if attestationCandidate.MessageHash == messageBodyHashHex {
attestation = attestationCandidate
}
}
s.lggr.Infow("Got response from attestation API", "messageID", msgID,
"attestationStatus", attestation.Status, "attestation", attestation)
switch attestation.Status {
case attestationStatusSessionApproved:
messageAndAttestation, err := encodeMessageAndAttestation(messageBody, attestation.Attestation)
if err != nil {
return nil, fmt.Errorf("failed to encode messageAndAttestation : %w", err)
}
return messageAndAttestation, nil
case attestationStatusPending:
return nil, tokendata.ErrNotReady
case attestationStatusSubmitted:
return nil, tokendata.ErrNotReady
default:
s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation)
return nil, ErrUnknownResponse
}
}

func (s *TokenDataReader) getLBTCMessageBody(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) {
return nil, nil
}

func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) {
_, _, _, err := s.httpClient.Get(ctx, "", s.attestationApiTimeout)
switch {
case errors.Is(err, tokendata.ErrRateLimit):
s.setCoolDownPeriod(defaultCoolDownDuration)
return attestationResponse{}, tokendata.ErrRateLimit
case err != nil:
return attestationResponse{}, err
}
return attestationResponse{}, nil
}

func encodeMessageAndAttestation(messageBody []byte, attestation string) ([]byte, error) {
return nil, nil
}

func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) {
s.coolDownMu.Lock()
if d > maxCoolDownDuration {
d = maxCoolDownDuration
}
s.coolDownUntil = time.Now().Add(d)
s.coolDownMu.Unlock()
}

func (s *TokenDataReader) inCoolDownPeriod() bool {
s.coolDownMu.RLock()
defer s.coolDownMu.RUnlock()
return time.Now().Before(s.coolDownUntil)
}

func (s *TokenDataReader) Close() error {
return nil
}
Loading
Loading