Skip to content

Commit

Permalink
add short-lived in-mem logs and improve usdc attestation api response…
Browse files Browse the repository at this point in the history
… logging
  • Loading branch information
dimkouv committed Jul 10, 2024
1 parent a8b921c commit 4766bf7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 11 deletions.
45 changes: 35 additions & 10 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/usdc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package ccipdata
import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -36,6 +38,10 @@ type USDCReaderImpl struct {
filter logpoller.Filter
lggr logger.Logger
transmitterAddress common.Address

// shortLivedInMemLogs is a short-lived cache (items expire every few seconds)
// used to prevent frequent log fetching from the log poller
shortLivedInMemLogs *cache.Cache
}

func (u *USDCReaderImpl) Close() error {
Expand Down Expand Up @@ -71,20 +77,38 @@ func parseUSDCMessageSent(logData []byte) ([]byte, error) {
}

func (u *USDCReaderImpl) GetUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, usdcTokenIndexOffset int, txHash string) ([]byte, error) {
var lpLogs []logpoller.Log

// fetch all the usdc logs for the provided tx hash
logs, err := u.lp.IndexedLogsByTxHash(
ctx,
u.usdcMessageSent,
u.transmitterAddress,
common.HexToHash(txHash),
)
if err != nil {
return nil, err
k := fmt.Sprintf("getUsdcMsgPriorToLogIdx-%s", txHash)
if rawLogs, foundInMem := u.shortLivedInMemLogs.Get(k); foundInMem {
inMemLogs, ok := rawLogs.([]logpoller.Log)
if !ok {
return nil, errors.Errorf("unexpected in-mem logs type %T", rawLogs)
}
u.lggr.Debugw("found logs in memory", "k", k, "len", len(inMemLogs))
lpLogs = inMemLogs
}

if len(lpLogs) == 0 {
u.lggr.Debugw("fetching logs from lp", "k", k)
logs, err := u.lp.IndexedLogsByTxHash(
ctx,
u.usdcMessageSent,
u.transmitterAddress,
common.HexToHash(txHash),
)
if err != nil {
return nil, err
}
lpLogs = logs
u.shortLivedInMemLogs.Set(k, logs, cache.DefaultExpiration)
u.lggr.Debugw("fetched logs from lp", "logs", len(lpLogs))
}

// collect the logs with log index less than the provided log index
allUsdcTokensData := make([][]byte, 0)
for _, current := range logs {
for _, current := range lpLogs {
if current.LogIndex < logIndex {
u.lggr.Infow("Found USDC message", "logIndex", current.LogIndex, "txHash", current.TxHash.Hex(), "data", hexutil.Encode(current.Data))
allUsdcTokensData = append(allUsdcTokensData, current.Data)
Expand Down Expand Up @@ -118,7 +142,8 @@ func NewUSDCReader(lggr logger.Logger, jobID string, transmitter common.Address,
Addresses: []common.Address{transmitter},
Retention: CommitExecLogsRetention,
},
transmitterAddress: transmitter,
transmitterAddress: transmitter,
shortLivedInMemLogs: cache.New(20*time.Second, time.Minute),
}

if registerFilters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,35 @@ func TestLogPollerClient_GetUSDCMessagePriorToLogIndexInTx(t *testing.T) {
lp.AssertExpectations(t)
})

t.Run("logs fetched from memory in subsequent calls", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false)

lp.On("IndexedLogsByTxHash",
mock.Anything,
u.usdcMessageSent,
u.transmitterAddress,
txHash,
).Return([]logpoller.Log{
{LogIndex: ccipLogIndex - 2, Data: hexutil.MustDecode(expectedData)},
{LogIndex: ccipLogIndex - 1, Data: []byte("-2")},
{LogIndex: ccipLogIndex, Data: []byte("0")},
{LogIndex: ccipLogIndex + 1, Data: []byte("1")},
}, nil).Once()

// first call logs must be fetched from lp
usdcMessageData, err := u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String())
assert.NoError(t, err)
assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData))

// subsequent call, logs must be fetched from memory
usdcMessageData, err = u.GetUSDCMessagePriorToLogIndexInTx(context.Background(), ccipLogIndex, 1, txHash.String())
assert.NoError(t, err)
assert.Equal(t, expectedPostParse, hexutil.Encode(usdcMessageData))

lp.AssertExpectations(t)
})

t.Run("none found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
u, _ := NewUSDCReader(lggr, "job_123", utils.RandomAddress(), lp, false)
Expand Down
8 changes: 7 additions & 1 deletion core/services/ocr2/plugins/ccip/tokendata/usdc/usdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,19 @@ func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2E
return []byte{}, errors.Wrap(err, "failed getting the USDC message body")
}

s.lggr.Infow("Calling attestation API", "messageBodyHash", hexutil.Encode(messageBody[:]), "messageID", hexutil.Encode(msg.MessageID[:]))
msgID := hexutil.Encode(msg.MessageID[:])
msgBody := hexutil.Encode(messageBody)
s.lggr.Infow("Calling attestation API", "messageBodyHash", msgBody, "messageID", msgID)

// The attestation API expects the hash of the message body
attestationResp, err := s.callAttestationApi(ctx, utils.Keccak256Fixed(messageBody))
if err != nil {
return []byte{}, errors.Wrap(err, "failed calling usdc attestation API ")
}

s.lggr.Infow("Got response from attestation API", "messageBodyHash", msgBody, "messageID", msgID,
"ar.status", attestationResp.Status, "ar.attestation", attestationResp.Attestation, "ar.error", attestationResp.Error)

switch attestationResp.Status {
case attestationStatusSuccess:
// The USDC pool needs a combination of the message body and the attestation
Expand Down

0 comments on commit 4766bf7

Please sign in to comment.