Skip to content

Commit

Permalink
CCIP-5109 Refactor tokendata
Browse files Browse the repository at this point in the history
  • Loading branch information
bukata-sa committed Mar 7, 2025
1 parent 7912a5e commit 0b19a85
Show file tree
Hide file tree
Showing 24 changed files with 1,205 additions and 925 deletions.
4 changes: 2 additions & 2 deletions execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink-ccip/execute/metrics"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata/observer"
"github.com/smartcontractkit/chainlink-ccip/internal/plugintypes"
"github.com/smartcontractkit/chainlink-ccip/internal/reader"
"github.com/smartcontractkit/chainlink-ccip/pkg/contractreader"
Expand Down Expand Up @@ -149,7 +149,7 @@ func (p PluginFactory) NewReportingPlugin(
p.addrCodec,
)

tokenDataObserver, err := tokendata.NewConfigBasedCompositeObservers(
tokenDataObserver, err := observer.NewConfigBasedCompositeObservers(
ctx,
logutil.WithComponent(lggr, "TokenDataObserver"),
p.ocrConfig.Config.ChainSelector,
Expand Down
4 changes: 2 additions & 2 deletions execute/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/internal/cache"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata/observer"
"github.com/smartcontractkit/chainlink-ccip/internal/mocks"
"github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader"
readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"
Expand Down Expand Up @@ -108,7 +108,7 @@ func Test_getMessagesObservation(t *testing.T) {
// Create mock objects
ccipReader := readerpkg_mock.NewMockCCIPReader(t)
msgHasher := mocks.NewMessageHasher()
tokenDataObserver := tokendata.NoopTokenDataObserver{}
tokenDataObserver := observer.NoopTokenDataObserver{}

//emptyMsgHash, err := msgHasher.Hash(ctx, cciptypes.Message{})
//require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/smartcontractkit/chainlink-ccip/execute/internal/cache"
"github.com/smartcontractkit/chainlink-ccip/execute/metrics"
"github.com/smartcontractkit/chainlink-ccip/execute/report"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata/observer"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery"
Expand Down Expand Up @@ -72,7 +72,7 @@ type Plugin struct {
observer metrics.Reporter

oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID
tokenDataObserver tokendata.TokenDataObserver
tokenDataObserver observer.TokenDataObserver
estimateProvider cciptypes.EstimateProvider
lggr logger.Logger
ocrTypeCodec ocrtypecodec.ExecCodec
Expand All @@ -97,7 +97,7 @@ func NewPlugin(
reportCodec cciptypes.ExecutePluginCodec,
msgHasher cciptypes.MessageHasher,
homeChain reader.HomeChain,
tokenDataObserver tokendata.TokenDataObserver,
tokenDataObserver observer.TokenDataObserver,
estimateProvider cciptypes.EstimateProvider,
lggr logger.Logger,
metricsReporter metrics.Reporter,
Expand Down
5 changes: 3 additions & 2 deletions execute/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-ccip/execute/tokendata/observer"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
dt "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery/discoverytypes"
Expand Down Expand Up @@ -613,7 +614,7 @@ func Test_getPendingReportsForExecution(t *testing.T) {
}

func TestPlugin_Close(t *testing.T) {
p := &Plugin{tokenDataObserver: &tokendata.NoopTokenDataObserver{}}
p := &Plugin{tokenDataObserver: &observer.NoopTokenDataObserver{}}
require.NoError(t, p.Close())
}

Expand Down
52 changes: 33 additions & 19 deletions execute/plugin_usdc_test.go → execute/plugin_tokendata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sel "github.com/smartcontractkit/chain-selectors"

logger2 "github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/internal"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/internal/mocks/inmem"
ocrtypecodec "github.com/smartcontractkit/chainlink-ccip/pkg/ocrtypecodec/v1"
readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
Expand All @@ -29,28 +32,34 @@ func runRoundAndGetOutcome(ctx context.Context, ocrTypeCodec ocrtypecodec.ExecCo
return outcome
}

func Test_USDC_Transfer(t *testing.T) {
func Test_LBTC_USDC_Transfer(t *testing.T) {
ocrTypeCodec := ocrtypecodec.DefaultExecCodec
ctx := tests.Context(t)
randomEthAddress := string(rand.RandomAddress())

sourceChain := cciptypes.ChainSelector(sel.ETHEREUM_TESTNET_SEPOLIA.Selector)
destChain := cciptypes.ChainSelector(sel.ETHEREUM_MAINNET_BASE_1.Selector)

addressBytes, err := cciptypes.NewUnknownAddressFromHex(randomEthAddress)
usdcAddress := "0x3765b189a8fe4a0bc34457835f01c9d178dbea60"
usdcAddressBytes, err := cciptypes.NewUnknownAddressFromHex(usdcAddress)
require.NoError(t, err)

messages := []inmem.MessagesWithMetadata{
makeMsgWithMetadata(102, sourceChain, destChain, false),
makeMsgWithMetadata(103, sourceChain, destChain, false),
makeMsgWithMetadata(104, sourceChain, destChain, false, withTokens(cciptypes.RampTokenAmount{
SourcePoolAddress: addressBytes,
SourcePoolAddress: usdcAddressBytes,
ExtraData: readerpkg.NewSourceTokenDataPayload(1, 0).ToBytes(),
})),
makeMsgWithMetadata(105, sourceChain, destChain, false, withTokens(cciptypes.RampTokenAmount{
SourcePoolAddress: addressBytes,
SourcePoolAddress: usdcAddressBytes,
ExtraData: readerpkg.NewSourceTokenDataPayload(2, 0).ToBytes(),
})),
makeMsgWithMetadata(106, sourceChain, destChain, false,
withTokens(cciptypes.RampTokenAmount{
SourcePoolAddress: usdcAddressBytes,
ExtraData: readerpkg.NewSourceTokenDataPayload(3, 0).ToBytes(),
}),
),
}

events := []*readerpkg.MessageSentEvent{
Expand All @@ -59,16 +68,20 @@ func Test_USDC_Transfer(t *testing.T) {
newMessageSentEvent(0, 6, 3, []byte{3}),
}

attestation104 := map[string]string{
usdcAttestation104_106 := map[string]string{
"0x0f43587da5355551d234a2ba24dde8edfe0e385346465d6d53653b6aa642992e": `{
"status": "complete",
"attestation": "0x720502893578a89a8a87982982ef781c18b193"
"attestation": "0x100001"
}`,
"0x2b235443d276ec7dd517dcf34cca9dcd34f33542ccb6f305828d98e777404b63": `{
"status": "complete",
"attestation": "0x100003"
}`,
}

intTest := SetupSimpleTest(t, logger2.Test(t), sourceChain, destChain)
intTest.WithMessages(messages, 1000, time.Now().Add(-4*time.Hour), 1)
intTest.WithUSDC(randomEthAddress, attestation104, events)
intTest.WithUSDC(usdcAddress, usdcAttestation104_106, events)
runner := intTest.Start()
defer intTest.Close()

Expand All @@ -87,31 +100,32 @@ func Test_USDC_Transfer(t *testing.T) {
require.Len(t, outcome.CommitReports, 1)

// Round 3 - Filter
// Messages 102-104 are executed, 105 doesn't have token data ready
// Messages 102-104,106,108 are executed, 105 and 107 don't have token data ready
outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner)
require.NoError(t, err)
require.Len(t, outcome.Report.ChainReports, 1)
assert.Len(t, outcome.Report.ChainReports, 1)
sequenceNumbers := extractSequenceNumbers(outcome.Report.ChainReports[0].Messages)
require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{102, 103, 104})
assert.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{102, 103, 104, 106})
//Attestation data added to the USDC
require.NotEmpty(t, outcome.Report.ChainReports[0].OffchainTokenData[2])
assert.Equal(t, internal.MustDecodeRaw("0x100001"), outcome.Report.ChainReports[0].OffchainTokenData[2][0])
assert.Equal(t, internal.MustDecodeRaw("0x100003"), outcome.Report.ChainReports[0].OffchainTokenData[3][0])

intTest.server.AddResponse(
intTest.usdcServer.AddResponse(
"0x70ef528624085241badbff913575c0ab50241e7cb6db183a5614922ab0bcba5d",
`{
"status": "complete",
"attestation": "0x720502893578a89a8a87982982ef781c18b194"
"attestation": "0x100002"
}`)

// Run 3 more rounds to get all attestations
for i := 0; i < 3; i++ {
outcome = runRoundAndGetOutcome(ctx, ocrTypeCodec, t, runner)
}

require.Len(t, outcome.Report.ChainReports, 1)
assert.Len(t, outcome.Report.ChainReports, 1)
sequenceNumbers = extractSequenceNumbers(outcome.Report.ChainReports[0].Messages)
// 102, 103 and 104 are in the inflight message cache.
require.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{105})
assert.ElementsMatch(t, sequenceNumbers, []cciptypes.SeqNum{105})
//Attestation data added to the remaining USDC messages
require.NotEmpty(t, outcome.Report.ChainReports[0].OffchainTokenData[0])
assert.Equal(t, internal.MustDecodeRaw("0x100002"), outcome.Report.ChainReports[0].OffchainTokenData[0][0])
}
107 changes: 79 additions & 28 deletions execute/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package execute
import (
"context"
"encoding/binary"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"slices"
"strings"
"testing"
"time"
Expand All @@ -27,7 +30,7 @@ import (
"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/metrics"
"github.com/smartcontractkit/chainlink-ccip/execute/report"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata"
"github.com/smartcontractkit/chainlink-ccip/execute/tokendata/observer"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/slicelib"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers"
"github.com/smartcontractkit/chainlink-ccip/internal/libs/testhelpers/rand"
Expand Down Expand Up @@ -56,7 +59,7 @@ type IntTest struct {

msgHasher cciptypes.MessageHasher
ccipReader *inmem.InMemoryCCIPReader
server *ConfigurableAttestationServer
usdcServer *ConfigurableAttestationServer
tokenObserverConfig []pluginconfig.TokenDataObserverConfig
tokenChainReader map[cciptypes.ChainSelector]contractreader.ContractReaderFacade
}
Expand Down Expand Up @@ -153,24 +156,25 @@ func (it *IntTest) WithUSDC(
attestations map[string]string,
events []*readerpkg.MessageSentEvent,
) {
it.server = newConfigurableAttestationServer(attestations)
it.tokenObserverConfig = []pluginconfig.TokenDataObserverConfig{
{
Type: "usdc-cctp",
Version: "1",
USDCCCTPObserverConfig: &pluginconfig.USDCCCTPObserverConfig{
Tokens: map[cciptypes.ChainSelector]pluginconfig.USDCCCTPTokenConfig{
it.srcSelector: {
SourcePoolAddress: sourcePoolAddress,
SourceMessageTransmitterAddr: sourcePoolAddress,
},
},
AttestationAPI: it.server.server.URL,
it.usdcServer = newConfigurableAttestationServer(attestations)
it.tokenObserverConfig = append(it.tokenObserverConfig, pluginconfig.TokenDataObserverConfig{
Type: "usdc-cctp",
Version: "1",
USDCCCTPObserverConfig: &pluginconfig.USDCCCTPObserverConfig{
AttestationConfig: pluginconfig.AttestationConfig{
AttestationAPI: it.usdcServer.server.URL,
AttestationAPIInterval: commonconfig.MustNewDuration(1 * time.Millisecond),
AttestationAPITimeout: commonconfig.MustNewDuration(1 * time.Second),
AttestationAPICooldown: commonconfig.MustNewDuration(5 * time.Minute),
},
Tokens: map[cciptypes.ChainSelector]pluginconfig.USDCCCTPTokenConfig{
it.srcSelector: {
SourcePoolAddress: sourcePoolAddress,
SourceMessageTransmitterAddr: sourcePoolAddress,
},
},
},
}
})

usdcEvents := make([]types.Sequence, len(events))
for i, e := range events {
Expand Down Expand Up @@ -225,7 +229,7 @@ func (it *IntTest) Start() *testhelpers.OCR3Runner[[]byte] {
err := homeChain.Start(ctx)
require.NoError(it.t, err, "failed to start home chain poller")
mockAddrCodec := internal.NewMockAddressCodecHex(it.t)
tkObs, err := tokendata.NewConfigBasedCompositeObservers(
tkObs, err := observer.NewConfigBasedCompositeObservers(
ctx,
it.lggr,
it.dstSelector,
Expand Down Expand Up @@ -263,16 +267,16 @@ func (it *IntTest) Start() *testhelpers.OCR3Runner[[]byte] {
}

func (it *IntTest) Close() {
if it.server != nil {
it.server.Close()
if it.usdcServer != nil {
it.usdcServer.Close()
}
}

func (it *IntTest) newNode(
cfg pluginconfig.ExecuteOffchainConfig,
homeChain reader.HomeChain,
ep cciptypes.EstimateProvider,
tokenDataObserver tokendata.TokenDataObserver,
tokenDataObserver observer.TokenDataObserver,
oracleIDToP2pID map[commontypes.OracleID]libocrtypes.PeerID,
id int,
N int,
Expand Down Expand Up @@ -335,13 +339,43 @@ func newConfigurableAttestationServer(responses map[string]string) *Configurable
}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for url, response := range c.responses {
if strings.Contains(r.RequestURI, url) {
_, err := w.Write([]byte(response))
if err != nil {
panic(err)
if r.Method == http.MethodGet {
for url, response := range c.responses {
if strings.Contains(r.RequestURI, url) {
_, err := w.Write([]byte(response))
if err != nil {
panic(err)
}
return
}
return
}
}
if r.Method == http.MethodPost {
var request map[string]interface{}
bodyRaw, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
err = json.Unmarshal(bodyRaw, &request)
if err != nil {
panic(err)
}
payloadHashesUntyped := request["messageHash"].([]interface{})
if len(payloadHashesUntyped) == 0 {
w.WriteHeader(http.StatusBadRequest)
}
payloadHashes := make([]string, len(payloadHashesUntyped))
for i, hash := range payloadHashesUntyped {
payloadHashes[i] = hash.(string)
}
attestationResponse := attestationBatchByMessageHashes(payloadHashes, c.responses)
responseBytes, err := json.Marshal(attestationResponse)
if err != nil {
panic(err)
}
_, err = w.Write(responseBytes)
if err != nil {
panic(err)
}
}
w.WriteHeader(http.StatusNotFound)
Expand All @@ -350,8 +384,25 @@ func newConfigurableAttestationServer(responses map[string]string) *Configurable
return c
}

func (c *ConfigurableAttestationServer) AddResponse(url, response string) {
c.responses[url] = response
func attestationBatchByMessageHashes(payloadHashes []string, responses map[string]string) map[string]interface{} {
attestations := make([]interface{}, 0, len(responses))
for payloadHash, attestationRaw := range responses {
if slices.Contains(payloadHashes, payloadHash) {
var attestation map[string]interface{}
err := json.Unmarshal([]byte(attestationRaw), &attestation)
if err != nil {
panic(err)
}
attestations = append(attestations, attestation)
}
}
attestationResponse := make(map[string]interface{})
attestationResponse["attestations"] = attestations
return attestationResponse
}

func (c *ConfigurableAttestationServer) AddResponse(key, response string) {
c.responses[key] = response
}

func (c *ConfigurableAttestationServer) Close() {
Expand Down
Loading

0 comments on commit 0b19a85

Please sign in to comment.