Skip to content

Commit

Permalink
callers set lastValidBlockheight + get blockhash on expiration + inte…
Browse files Browse the repository at this point in the history
…gration tests
  • Loading branch information
Farber98 committed Dec 17, 2024
1 parent e7d7680 commit 409fd1c
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 192 deletions.
2 changes: 1 addition & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
}

chainTxm := c.TxManager()
err = chainTxm.Enqueue(ctx, "", tx, nil,
err = chainTxm.Enqueue(ctx, "", tx, nil, blockhash.Value.LastValidBlockHeight,
txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
// no fee bumping and no additional fee - makes validating balance accurate
txm.SetComputeUnitPriceMax(0),
Expand Down
13 changes: 8 additions & 5 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(0), receiverBal)

createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *solana.Transaction {
createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) (*solana.Transaction, uint64) {
selectedClient, err = testChain.getClient()
assert.NoError(t, err)
hash, hashErr := selectedClient.LatestBlockhash(tests.Context(t))
Expand All @@ -553,11 +553,12 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
solana.TransactionPayer(signer),
)
require.NoError(t, txErr)
return tx
return tx, hash.Value.LastValidBlockHeight
}

// Send funds twice, along with an invalid transaction
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
tx, lastValidBlockHeight := createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", tx, nil, lastValidBlockHeight))

// Wait for new block hash
currentBh, err := selectedClient.LatestBlockhash(tests.Context(t))
Expand All @@ -578,8 +579,10 @@ NewBlockHash:
}
}

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing
tx2, lastValidBlockHeight2 := createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", tx2, nil, lastValidBlockHeight2))
tx3, lastValidBlockHeight3 := createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", tx3, nil, lastValidBlockHeight3)) // cannot sign tx before enqueuing

// wait for all txes to finish
ctx, cancel := context.WithCancel(tests.Context(t))
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var _ TxManager = (*txm.Txm)(nil)

type TxManager interface {
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...txm.SetTxConfig) error
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txm.SetTxConfig) error
}

var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Transmitter) Transmit(

// pass transmit payload to tx manager queue
c.lggr.Debugf("Queuing transmit tx: state (%s) + transmissions (%s)", c.stateID.String(), c.transmissionsID.String())
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil); err != nil {
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil, blockhash.Value.LastValidBlockHeight); err != nil {
return fmt.Errorf("error on Transmit.txManager.Enqueue: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type verifyTxSize struct {
s *solana.PrivateKey
}

func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ ...txm.SetTxConfig) error {
func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txm.SetTxConfig) error {
// additional components that transaction manager adds to the transaction
require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0))
require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0))
Expand Down
59 changes: 32 additions & 27 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,26 +180,9 @@ func (txm *Txm) run() {
}

// sendWithRetry attempts to send a transaction with exponential backoff retry logic.
// It builds, signs and sends the initial tx with a new valid blockhash, and starts a retry routine with fee bumping if needed.
// It builds, signs, sends the initial tx, and starts a retry routine with fee bumping if needed.
// The function returns the signed transaction, its ID, and the initial signature for use in simulation.
func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Transaction, string, solanaGo.Signature, error) {
// Assign new blockhash and lastValidBlockHeight (last valid block number) to the transaction
// This is essential for tracking transaction rebroadcast
// Only the initial transaction should be sent with the updated blockhash
client, err := txm.client.Get()
if err != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to get client: %w", err)
}
blockhash, err := client.LatestBlockhash(ctx)
if err != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to get latest blockhash: %w", err)
}
if blockhash == nil || blockhash.Value == nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, errors.New("nil pointer returned from LatestBlockhash")
}
msg.tx.Message.RecentBlockhash = blockhash.Value.Blockhash
msg.lastValidBlockHeight = blockhash.Value.LastValidBlockHeight

// Build and sign initial transaction setting compute unit price and limit
initTx, err := txm.buildTx(ctx, msg, 0)
if err != nil {
Expand Down Expand Up @@ -568,27 +551,48 @@ func (txm *Txm) handleFinalizedSignatureStatus(sig solanaGo.Signature) {

// rebroadcastExpiredTxs attempts to rebroadcast all transactions that are in broadcasted state and have expired.
// An expired tx is one where it's blockhash lastValidBlockHeight (last valid block number) is smaller than the current block height (block number).
// The function loops through all expired txes, rebroadcasts them with a new blockhash, and updates the lastValidBlockHeight.
// If any error occurs during rebroadcast attempt, they are discarded, and the function continues with the next transaction.
func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderWriter) {
currBlock, err := client.GetLatestBlock(ctx)
if err != nil || currBlock == nil || currBlock.BlockHeight == nil {
txm.lggr.Errorw("failed to get current block height", "error", err)
return
}
// Rebroadcast all expired txes using currBlockHeight (current block number)
for _, tx := range txm.txs.ListAllExpiredBroadcastedTxs(*currBlock.BlockHeight) {

// Get all expired broadcasted transactions at current block number. Safe to quit if no txes are found.
expiredBroadcastedTxes := txm.txs.ListAllExpiredBroadcastedTxs(*currBlock.BlockHeight)
if len(expiredBroadcastedTxes) == 0 {
return
}

// Request new blockhash and loop through all expired txes overwriting with new blockhash and rebroadcasting
for _, tx := range expiredBroadcastedTxes {
txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures, "lastValidBlockHeight", tx.lastValidBlockHeight, "currentBlockHeight", *currBlock.BlockHeight)
// Removes all signatures associated to tx and cancels context.
_, err := txm.txs.Remove(tx.id)
if err != nil {
txm.lggr.Errorw("failed to remove expired transaction", "id", tx.id, "error", err)
continue
}

blockhash, err := client.LatestBlockhash(ctx)
if err != nil {
txm.lggr.Errorw("failed to get latest blockhash for rebroadcast", "error", err)
return
}
if blockhash == nil || blockhash.Value == nil {
txm.lggr.Errorw("nil pointer returned from LatestBlockhash for rebroadcast")
return
}

tx.tx.Message.RecentBlockhash = blockhash.Value.Blockhash
tx.cfg.BaseComputeUnitPrice = txm.fee.BaseComputeUnitPrice() // update compute unit price (priority fee) for rebroadcast
rebroadcastTx := pendingTx{
tx: tx.tx,
cfg: tx.cfg,
id: tx.id, // using same id in case it was set by caller and we need to maintain it.
tx: tx.tx,
cfg: tx.cfg,
id: tx.id, // using same id in case it was set by caller and we need to maintain it.
lastValidBlockHeight: blockhash.Value.LastValidBlockHeight,
}
// call sendWithRetry directly to avoid enqueuing
_, _, _, sendErr := txm.sendWithRetry(ctx, rebroadcastTx)
Expand Down Expand Up @@ -669,7 +673,7 @@ func (txm *Txm) reap() {
}

// Enqueue enqueues a msg destined for the solana chain.
func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txCfgs ...SetTxConfig) error {
func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...SetTxConfig) error {
if err := txm.Ready(); err != nil {
return fmt.Errorf("error in soltxm.Enqueue: %w", err)
}
Expand Down Expand Up @@ -717,9 +721,10 @@ func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Tran
}

msg := pendingTx{
id: id,
tx: *tx,
cfg: cfg,
id: id,
tx: *tx,
cfg: cfg,
lastValidBlockHeight: txLastValidBlockHeight,
}

select {
Expand Down
186 changes: 186 additions & 0 deletions pkg/solana/txm/txm_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//go:build integration

package txm_test

import (
"context"
"testing"
"time"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/system"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

relayconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
keyMocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks"
)

func TestTxm_Integration_ExpirationRebroadcast(t *testing.T) {
t.Parallel()
url := solanaClient.SetupLocalSolNode(t) // live validator

type TestCase struct {
name string
txExpirationRebroadcast bool
useValidBlockHash bool
expectRebroadcast bool
expectTransactionStatus types.TransactionStatus
}

testCases := []TestCase{
{
name: "WithRebroadcast",
txExpirationRebroadcast: true,
useValidBlockHash: false,
expectRebroadcast: true,
expectTransactionStatus: types.Finalized,
},
{
name: "WithoutRebroadcast",
txExpirationRebroadcast: false,
useValidBlockHash: false,
expectRebroadcast: false,
expectTransactionStatus: types.Failed,
},
{
name: "ConfirmedBeforeRebroadcast",
txExpirationRebroadcast: true,
useValidBlockHash: true,
expectRebroadcast: false,
expectTransactionStatus: types.Finalized,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, client, txmInstance, senderPubKey, receiverPubKey, observer := setup(t, url, tc.txExpirationRebroadcast)

// Record initial balance
initSenderBalance, err := client.Balance(ctx, senderPubKey)
require.NoError(t, err)
const amount = 1 * solana.LAMPORTS_PER_SOL

// Create and enqueue tx
txID := tc.name
tx, lastValidBlockHeight := createTransaction(ctx, t, client, senderPubKey, receiverPubKey, amount, tc.useValidBlockHash)
require.NoError(t, txmInstance.Enqueue(ctx, "", tx, &txID, lastValidBlockHeight))

// Wait for the transaction to reach the expected status
require.Eventually(t, func() bool {
status, statusErr := txmInstance.GetTransactionStatus(ctx, txID)
if statusErr != nil {
return false
}
return status == tc.expectTransactionStatus
}, 60*time.Second, 1*time.Second, "Transaction should eventually reach expected status")

// Verify balances
finalSenderBalance, err := client.Balance(ctx, senderPubKey)
require.NoError(t, err)
finalReceiverBalance, err := client.Balance(ctx, receiverPubKey)
require.NoError(t, err)

if tc.expectTransactionStatus == types.Finalized {
require.Less(t, finalSenderBalance, initSenderBalance, "Sender balance should decrease")
require.Equal(t, amount, finalReceiverBalance, "Receiver should receive the transferred amount")
} else {
require.Equal(t, initSenderBalance, finalSenderBalance, "Sender balance should remain the same")
require.Equal(t, uint64(0), finalReceiverBalance, "Receiver should not receive any funds")
}

// Verify rebroadcast logs
rebroadcastLogs := observer.FilterMessageSnippet("rebroadcast transaction sent").All()
rebroadcastLogs2 := observer.FilterMessageSnippet("transaction expired, rebroadcasting").All()
if tc.expectRebroadcast {
require.NotEmpty(t, rebroadcastLogs, "Expected rebroadcast log message not found")
require.NotEmpty(t, rebroadcastLogs2, "Expected rebroadcast log message not found")
} else {
require.Empty(t, rebroadcastLogs, "Rebroadcast should not occur")
require.Empty(t, rebroadcastLogs2, "Rebroadcast should not occur")
}
})
}
}

func setup(t *testing.T, url string, txExpirationRebroadcast bool) (context.Context, *solanaClient.Client, *txm.Txm, solana.PublicKey, solana.PublicKey, *observer.ObservedLogs) {
ctx := context.Background()

// Generate sender and receiver keys and fund sender account
senderKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
senderPubKey := senderKey.PublicKey()
receiverKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
receiverPubKey := receiverKey.PublicKey()
solanaClient.FundTestAccounts(t, []solana.PublicKey{senderPubKey}, url)

// Set up mock keystore with sender key
mkey := keyMocks.NewSimpleKeystore(t)
mkey.On("Sign", mock.Anything, senderPubKey.String(), mock.Anything).Return(func(_ context.Context, _ string, data []byte) []byte {
sig, _ := senderKey.Sign(data)
return sig[:]
}, nil)

// Set configs
cfg := config.NewDefault()
cfg.Chain.TxExpirationRebroadcast = &txExpirationRebroadcast
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(10 * time.Second) // to get the finalized tx status

// Initialize the Solana client and TXM
lggr, obs := logger.TestObserved(t, zapcore.DebugLevel)
client, err := solanaClient.NewClient(url, cfg, 2*time.Second, lggr)
require.NoError(t, err)
loader := utils.NewLazyLoad(func() (solanaClient.ReaderWriter, error) { return client, nil })
txmInstance := txm.NewTxm("localnet", loader, nil, cfg, mkey, lggr)
servicetest.Run(t, txmInstance)

return ctx, client, txmInstance, senderPubKey, receiverPubKey, obs
}

// createTransaction is a helper function to create a transaction based on the test case.
func createTransaction(ctx context.Context, t *testing.T, client *solanaClient.Client, senderPubKey, receiverPubKey solana.PublicKey, amount uint64, useValidBlockHash bool) (*solana.Transaction, uint64) {
var blockhash solana.Hash
var lastValidBlockHeight uint64

if useValidBlockHash {
// Get a valid recent blockhash
recentBlockHashResult, err := client.LatestBlockhash(ctx)
require.NoError(t, err)
blockhash = recentBlockHashResult.Value.Blockhash
lastValidBlockHeight = recentBlockHashResult.Value.LastValidBlockHeight
} else {
// Use empty blockhash to simulate expiration
blockhash = solana.Hash{}
lastValidBlockHeight = 0
}

// Create the transaction
tx, err := solana.NewTransaction(
[]solana.Instruction{
system.NewTransferInstruction(
amount,
senderPubKey,
receiverPubKey,
).Build(),
},
blockhash,
solana.TransactionPayer(senderPubKey),
)
require.NoError(t, err)

return tx, lastValidBlockHeight
}
Loading

0 comments on commit 409fd1c

Please sign in to comment.