From 65ae13752669cfe8786512e180c690815d6d9ec6 Mon Sep 17 00:00:00 2001 From: amit-momin <108959691+amit-momin@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:39:49 -0600 Subject: [PATCH] Updated Solana TXM in-memory storage layer to track transactions states (#909) * Updated the in-memory storage to use state maps to better track transactions across their lifecycle * Removed all tx map from in-memory storage * Moved retention timeout logic into OnFinalized and OnError methods * Updated internal tests and fixed linting * Added check for same state transition calls on transactions * Updated logs and fixed chain test * Added new internal TXM tests and moved tx ID generation to Enqueue * Updated broadcast log and fixed confirm timeout logic * Fixed linting * Updated internal tests to validate reap mechanism * Updated comment * Updated error messages * Fixed tests * Fixed internal tests and linting * Reverted predefined error and updated error logs * Fixed chain test * Updated keystore Accounts mock * Added errors to state change methods and updated logs * Encapsulated in-memory storage locking in separate methods * Fixed tests and linting * Added tests for add signature and get tx state * Fixed linting --- pkg/solana/chain.go | 2 +- pkg/solana/chain_test.go | 11 +- pkg/solana/config/config.go | 8 +- pkg/solana/config/mocks/config.go | 18 + pkg/solana/config/toml.go | 6 + pkg/solana/relay.go | 2 +- pkg/solana/transmitter.go | 2 +- pkg/solana/transmitter_test.go | 2 +- pkg/solana/txm/pendingtx.go | 581 +++++++++++++---- pkg/solana/txm/pendingtx_test.go | 949 +++++++++++++++++++++++++++- pkg/solana/txm/prom.go | 4 + pkg/solana/txm/txm.go | 242 ++++--- pkg/solana/txm/txm_internal_test.go | 656 ++++++++++++++----- pkg/solana/txm/txm_load_test.go | 12 +- pkg/solana/txm/txm_race_test.go | 60 +- pkg/solana/txm/utils.go | 54 +- pkg/solana/txm/utils_test.go | 10 +- 17 files changed, 2170 insertions(+), 449 deletions(-) diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index c47e1cf1b..55b199912 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -574,7 +574,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba } chainTxm := c.TxManager() - err = chainTxm.Enqueue(ctx, "", tx, + err = chainTxm.Enqueue(ctx, "", tx, nil, txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units // no fee bumping and no additional fee - makes validating balance accurate txm.SetComputeUnitPriceMax(0), diff --git a/pkg/solana/chain_test.go b/pkg/solana/chain_test.go index b705860c9..b5e9adaf8 100644 --- a/pkg/solana/chain_test.go +++ b/pkg/solana/chain_test.go @@ -287,11 +287,11 @@ func TestChain_Transact(t *testing.T) { require.NoError(t, c.txm.Start(ctx)) require.NoError(t, c.Transact(ctx, sender.PublicKey().String(), receiver.PublicKey().String(), amount, true)) - tests.AssertLogEventually(t, logs, "tx state: confirmed") + tests.AssertLogEventually(t, logs, "marking transaction as confirmed") tests.AssertLogEventually(t, logs, "stopped tx retry") require.NoError(t, c.txm.Close()) - filteredLogs := logs.FilterMessage("tx state: confirmed").All() + filteredLogs := logs.FilterMessage("marking transaction as confirmed").All() require.Len(t, filteredLogs, 1) sig, ok := filteredLogs[0].ContextMap()["signature"] require.True(t, ok) @@ -515,6 +515,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) { return sig[:] }, nil) mkey.On("Sign", mock.Anything, pubKeyReceiver.String(), mock.Anything).Return([]byte{}, config.KeyNotFoundError{ID: pubKeyReceiver.String(), KeyType: "Solana"}) + mkey.On("Accounts", mock.Anything).Return([]string{pubKey.String()}, nil).Maybe() testChain, err := newChain("localnet", cfg, mkey, logger.Test(t)) require.NoError(t, err) @@ -556,7 +557,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) { } // Send funds twice, along with an invalid transaction - require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) + require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // Wait for new block hash currentBh, err := selectedClient.LatestBlockhash(tests.Context(t)) @@ -577,8 +578,8 @@ NewBlockHash: } } - require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) - require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) // cannot sign tx before enqueuing + require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) + require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing // wait for all txes to finish ctx, cancel := context.WithCancel(tests.Context(t)) diff --git a/pkg/solana/config/config.go b/pkg/solana/config/config.go index 08d86d631..28d7ac5fb 100644 --- a/pkg/solana/config/config.go +++ b/pkg/solana/config/config.go @@ -17,7 +17,8 @@ var defaultConfigSet = Chain{ OCR2CacheTTL: config.MustNewDuration(time.Minute), // stale cache deadline TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node - TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed + TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx. + TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions. SkipPreflight: ptr(true), // to enable or disable preflight checks Commitment: ptr(string(rpc.CommitmentConfirmed)), MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries @@ -43,6 +44,7 @@ type Config interface { TxTimeout() time.Duration TxRetryTimeout() time.Duration TxConfirmTimeout() time.Duration + TxRetentionTimeout() time.Duration SkipPreflight() bool Commitment() rpc.CommitmentType MaxRetries() *uint @@ -67,6 +69,7 @@ type Chain struct { TxTimeout *config.Duration TxRetryTimeout *config.Duration TxConfirmTimeout *config.Duration + TxRetentionTimeout *config.Duration SkipPreflight *bool Commitment *string MaxRetries *int64 @@ -103,6 +106,9 @@ func (c *Chain) SetDefaults() { if c.TxConfirmTimeout == nil { c.TxConfirmTimeout = defaultConfigSet.TxConfirmTimeout } + if c.TxRetentionTimeout == nil { + c.TxRetentionTimeout = defaultConfigSet.TxRetentionTimeout + } if c.SkipPreflight == nil { c.SkipPreflight = defaultConfigSet.SkipPreflight } diff --git a/pkg/solana/config/mocks/config.go b/pkg/solana/config/mocks/config.go index 4d5685b33..feef5c3c6 100644 --- a/pkg/solana/config/mocks/config.go +++ b/pkg/solana/config/mocks/config.go @@ -322,6 +322,24 @@ func (_m *Config) TxConfirmTimeout() time.Duration { return r0 } +// TxRetentionTimeout provides a mock function with given fields: +func (_m *Config) TxRetentionTimeout() time.Duration { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for TxRetentionTimeout") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + // TxRetryTimeout provides a mock function with given fields: func (_m *Config) TxRetryTimeout() time.Duration { ret := _m.Called() diff --git a/pkg/solana/config/toml.go b/pkg/solana/config/toml.go index 5f8f770eb..6e9eadc5d 100644 --- a/pkg/solana/config/toml.go +++ b/pkg/solana/config/toml.go @@ -155,6 +155,9 @@ func setFromChain(c, f *Chain) { if f.TxConfirmTimeout != nil { c.TxConfirmTimeout = f.TxConfirmTimeout } + if f.TxRetentionTimeout != nil { + c.TxRetentionTimeout = f.TxRetentionTimeout + } if f.SkipPreflight != nil { c.SkipPreflight = f.SkipPreflight } @@ -238,6 +241,9 @@ func (c *TOMLConfig) TxConfirmTimeout() time.Duration { return c.Chain.TxConfirmTimeout.Duration() } +func (c *TOMLConfig) TxRetentionTimeout() time.Duration { + return c.Chain.TxRetentionTimeout.Duration() +} func (c *TOMLConfig) SkipPreflight() bool { return *c.Chain.SkipPreflight } diff --git a/pkg/solana/relay.go b/pkg/solana/relay.go index 6edd11b4f..8266293ef 100644 --- a/pkg/solana/relay.go +++ b/pkg/solana/relay.go @@ -24,7 +24,7 @@ import ( var _ TxManager = (*txm.Txm)(nil) type TxManager interface { - Enqueue(ctx context.Context, accountID string, msg *solana.Transaction, txCfgs ...txm.SetTxConfig) error + Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...txm.SetTxConfig) error } var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck diff --git a/pkg/solana/transmitter.go b/pkg/solana/transmitter.go index 4a3731921..a488730d0 100644 --- a/pkg/solana/transmitter.go +++ b/pkg/solana/transmitter.go @@ -84,7 +84,7 @@ func (c *Transmitter) Transmit( // pass transmit payload to tx manager queue c.lggr.Debugf("Queuing transmit tx: state (%s) + transmissions (%s)", c.stateID.String(), c.transmissionsID.String()) - if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx); err != nil { + if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil); err != nil { return fmt.Errorf("error on Transmit.txManager.Enqueue: %w", err) } return nil diff --git a/pkg/solana/transmitter_test.go b/pkg/solana/transmitter_test.go index 66dd8658c..6aef6c921 100644 --- a/pkg/solana/transmitter_test.go +++ b/pkg/solana/transmitter_test.go @@ -26,7 +26,7 @@ type verifyTxSize struct { s *solana.PrivateKey } -func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, _ ...txm.SetTxConfig) error { +func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ ...txm.SetTxConfig) error { // additional components that transaction manager adds to the transaction require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0)) require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0)) diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index 4bf06c653..b2c3c98ed 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -3,132 +3,204 @@ package txm import ( "context" "errors" + "fmt" "sync" "time" "github.com/gagliardetto/solana-go" - "github.com/google/uuid" "golang.org/x/exp/maps" ) +var ( + ErrAlreadyInExpectedState = errors.New("transaction already in expected state") + ErrSigAlreadyExists = errors.New("signature already exists") + ErrIDAlreadyExists = errors.New("id already exists") + ErrSigDoesNotExist = errors.New("signature does not exist") + ErrTransactionNotFound = errors.New("transaction not found for id") +) + type PendingTxContext interface { - New(sig solana.Signature, cancel context.CancelFunc) (uuid.UUID, error) - Add(id uuid.UUID, sig solana.Signature) error - Remove(sig solana.Signature) uuid.UUID + // New adds a new tranasction in Broadcasted state to the storage + New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error + // AddSignature adds a new signature for an existing transaction ID + AddSignature(id string, sig solana.Signature) error + // Remove removes transaction and related signatures from storage if not in finalized or errored state + Remove(sig solana.Signature) (string, error) + // ListAll returns all of the signatures being tracked for all transactions not yet finalized or errored ListAll() []solana.Signature - Expired(sig solana.Signature, lifespan time.Duration) bool - // state change hooks - OnSuccess(sig solana.Signature) uuid.UUID - OnError(sig solana.Signature, errType int) uuid.UUID // match err type using enum + // Expired returns whether or not confirmation timeout amount of time has passed since creation + Expired(sig solana.Signature, confirmationTimeout time.Duration) bool + // OnProcessed marks transactions as Processed + OnProcessed(sig solana.Signature) (string, error) + // OnConfirmed marks transaction as Confirmed and moves it from broadcast map to confirmed map + OnConfirmed(sig solana.Signature) (string, error) + // OnFinalized marks transaction as Finalized, moves it from the broadcasted or confirmed map to finalized map, removes signatures from signature map to stop confirmation checks + OnFinalized(sig solana.Signature, retentionTimeout time.Duration) (string, error) + // OnError marks transaction as errored, matches err type using enum, moves it from the broadcasted or confirmed map to finalized/errored map, removes signatures from signature map to stop confirmation checks + OnError(sig solana.Signature, retentionTimeout time.Duration, errType int) (string, error) + // GetTxState returns the transaction state for the provided ID if it exists + GetTxState(id string) (TxState, error) + // TrimFinalizedErroredTxs removes transactions that have reached their retention time + TrimFinalizedErroredTxs() +} + +type pendingTx struct { + tx solana.Transaction + cfg TxConfig + signatures []solana.Signature + id string + createTs time.Time + retentionTs time.Time + state TxState } var _ PendingTxContext = &pendingTxContext{} type pendingTxContext struct { - cancelBy map[uuid.UUID]context.CancelFunc - timestamp map[uuid.UUID]time.Time - sigToID map[solana.Signature]uuid.UUID - idToSigs map[uuid.UUID][]solana.Signature - lock sync.RWMutex + cancelBy map[string]context.CancelFunc + sigToID map[solana.Signature]string + + broadcastedTxs map[string]pendingTx // transactions that require retry and bumping i.e broadcasted, processed + confirmedTxs map[string]pendingTx // transactions that require monitoring for re-org + finalizedErroredTxs map[string]pendingTx // finalized and errored transactions held onto for status + + lock sync.RWMutex } func newPendingTxContext() *pendingTxContext { return &pendingTxContext{ - cancelBy: map[uuid.UUID]context.CancelFunc{}, - timestamp: map[uuid.UUID]time.Time{}, - sigToID: map[solana.Signature]uuid.UUID{}, - idToSigs: map[uuid.UUID][]solana.Signature{}, + cancelBy: map[string]context.CancelFunc{}, + sigToID: map[solana.Signature]string{}, + + broadcastedTxs: map[string]pendingTx{}, + confirmedTxs: map[string]pendingTx{}, + finalizedErroredTxs: map[string]pendingTx{}, } } -func (c *pendingTxContext) New(sig solana.Signature, cancel context.CancelFunc) (uuid.UUID, error) { - // validate signature does not exist - c.lock.RLock() - if _, exists := c.sigToID[sig]; exists { - c.lock.RUnlock() - return uuid.UUID{}, errors.New("signature already exists") +func (c *pendingTxContext) New(tx pendingTx, sig solana.Signature, cancel context.CancelFunc) error { + err := c.withReadLock(func() error { + // validate signature does not exist + if _, exists := c.sigToID[sig]; exists { + return ErrSigAlreadyExists + } + // validate id does not exist + if _, exists := c.broadcastedTxs[tx.id]; exists { + return ErrIDAlreadyExists + } + return nil + }) + if err != nil { + return err } - c.lock.RUnlock() - // upgrade to write lock if sig does not exist - c.lock.Lock() - defer c.lock.Unlock() - if _, exists := c.sigToID[sig]; exists { - return uuid.UUID{}, errors.New("signature already exists") - } - // save cancel func - id := uuid.New() - c.cancelBy[id] = cancel - c.timestamp[id] = time.Now() - c.sigToID[sig] = id - c.idToSigs[id] = []solana.Signature{sig} - return id, nil + // upgrade to write lock if sig or id do not exist + _, err = c.withWriteLock(func() (string, error) { + if _, exists := c.sigToID[sig]; exists { + return "", ErrSigAlreadyExists + } + if _, exists := c.broadcastedTxs[tx.id]; exists { + return "", ErrIDAlreadyExists + } + // save cancel func + c.cancelBy[tx.id] = cancel + c.sigToID[sig] = tx.id + // add signature to tx + tx.signatures = append(tx.signatures, sig) + tx.createTs = time.Now() + tx.state = Broadcasted + // save to the broadcasted map since transaction was just broadcasted + c.broadcastedTxs[tx.id] = tx + return "", nil + }) + return err } -func (c *pendingTxContext) Add(id uuid.UUID, sig solana.Signature) error { - // already exists - c.lock.RLock() - if _, exists := c.sigToID[sig]; exists { - c.lock.RUnlock() - return errors.New("signature already exists") - } - if _, exists := c.idToSigs[id]; !exists { - c.lock.RUnlock() - return errors.New("id does not exist") +func (c *pendingTxContext) AddSignature(id string, sig solana.Signature) error { + err := c.withReadLock(func() error { + // signature already exists + if _, exists := c.sigToID[sig]; exists { + return ErrSigAlreadyExists + } + // new signatures should only be added for broadcasted transactions + // otherwise, the transaction has transitioned states and no longer needs new signatures to track + if _, exists := c.broadcastedTxs[id]; !exists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + return err } - c.lock.RUnlock() // upgrade to write lock if sig does not exist - c.lock.Lock() - defer c.lock.Unlock() - if _, exists := c.sigToID[sig]; exists { - return errors.New("signature already exists") - } - if _, exists := c.idToSigs[id]; !exists { - return errors.New("id does not exist - tx likely confirmed by other signature") - } - // save signature - c.sigToID[sig] = id - c.idToSigs[id] = append(c.idToSigs[id], sig) - return nil + _, err = c.withWriteLock(func() (string, error) { + if _, exists := c.sigToID[sig]; exists { + return "", ErrSigAlreadyExists + } + if _, exists := c.broadcastedTxs[id]; !exists { + return "", ErrTransactionNotFound + } + c.sigToID[sig] = id + tx := c.broadcastedTxs[id] + // save new signature + tx.signatures = append(tx.signatures, sig) + // save updated tx to broadcasted map + c.broadcastedTxs[id] = tx + return "", nil + }) + return err } -// returns the id if removed (otherwise returns 0-id) -func (c *pendingTxContext) Remove(sig solana.Signature) (id uuid.UUID) { - // check if already cancelled - c.lock.RLock() - id, sigExists := c.sigToID[sig] - if !sigExists { - c.lock.RUnlock() - return id - } - if _, idExists := c.idToSigs[id]; !idExists { - c.lock.RUnlock() - return id +// returns the id if removed (otherwise returns empty string) +// removes transactions from any state except finalized and errored +func (c *pendingTxContext) Remove(sig solana.Signature) (id string, err error) { + err = c.withReadLock(func() error { + // check if already removed + id, sigExists := c.sigToID[sig] + if !sigExists { + return ErrSigDoesNotExist + } + _, broadcastedIDExists := c.broadcastedTxs[id] + _, confirmedIDExists := c.confirmedTxs[id] + // transcation does not exist in tx maps + if !broadcastedIDExists && !confirmedIDExists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + return "", err } - c.lock.RUnlock() // upgrade to write lock if sig does not exist - c.lock.Lock() - defer c.lock.Unlock() - id, sigExists = c.sigToID[sig] - if !sigExists { - return id - } - sigs, idExists := c.idToSigs[id] - if !idExists { - return id - } + return c.withWriteLock(func() (string, error) { + id, sigExists := c.sigToID[sig] + if !sigExists { + return id, ErrSigDoesNotExist + } + var tx pendingTx + if tempTx, exists := c.broadcastedTxs[id]; exists { + tx = tempTx + delete(c.broadcastedTxs, id) + } + if tempTx, exists := c.confirmedTxs[id]; exists { + tx = tempTx + delete(c.confirmedTxs, id) + } - // call cancel func + remove from map - c.cancelBy[id]() // cancel context - delete(c.cancelBy, id) - delete(c.timestamp, id) - delete(c.idToSigs, id) - for _, s := range sigs { - delete(c.sigToID, s) - } - return id + // call cancel func + remove from map + if cancel, exists := c.cancelBy[id]; exists { + cancel() // cancel context + delete(c.cancelBy, id) + } + + // remove all signatures associated with transaction from sig map + for _, s := range tx.signatures { + delete(c.sigToID, s) + } + return id, nil + }) } func (c *pendingTxContext) ListAll() []solana.Signature { @@ -138,28 +210,283 @@ func (c *pendingTxContext) ListAll() []solana.Signature { } // Expired returns if the timeout for trying to confirm a signature has been reached -func (c *pendingTxContext) Expired(sig solana.Signature, lifespan time.Duration) bool { +func (c *pendingTxContext) Expired(sig solana.Signature, confirmationTimeout time.Duration) bool { c.lock.RLock() defer c.lock.RUnlock() + // confirmationTimeout set to 0 disables the expiration check + if confirmationTimeout == 0 { + return false + } id, exists := c.sigToID[sig] if !exists { return false // return expired = false if timestamp does not exist (likely cleaned up by something else previously) } + if tx, exists := c.broadcastedTxs[id]; exists { + return time.Since(tx.createTs) > confirmationTimeout + } + if tx, exists := c.confirmedTxs[id]; exists { + return time.Since(tx.createTs) > confirmationTimeout + } + return false // return expired = false if tx does not exist (likely cleaned up by something else previously) +} - timestamp, exists := c.timestamp[id] - if !exists { - return false // return expired = false if timestamp does not exist (likely cleaned up by something else previously) +func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) { + err := c.withReadLock(func() error { + // validate if sig exists + id, sigExists := c.sigToID[sig] + if !sigExists { + return ErrSigDoesNotExist + } + // Transactions should only move to processed from broadcasted + tx, exists := c.broadcastedTxs[id] + if !exists { + return ErrTransactionNotFound + } + // Check if tranasction already in processed state + if tx.state == Processed { + return ErrAlreadyInExpectedState + } + return nil + }) + if err != nil { + return "", err } - return time.Since(timestamp) > lifespan + // upgrade to write lock if sig and id exist + return c.withWriteLock(func() (string, error) { + id, sigExists := c.sigToID[sig] + if !sigExists { + return id, ErrSigDoesNotExist + } + tx, exists := c.broadcastedTxs[id] + if !exists { + return id, ErrTransactionNotFound + } + tx = c.broadcastedTxs[id] + // update tx state to Processed + tx.state = Processed + // save updated tx back to the broadcasted map + c.broadcastedTxs[id] = tx + return id, nil + }) } -func (c *pendingTxContext) OnSuccess(sig solana.Signature) uuid.UUID { - return c.Remove(sig) +func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) { + err := c.withReadLock(func() error { + // validate if sig exists + id, sigExists := c.sigToID[sig] + if !sigExists { + return ErrSigDoesNotExist + } + // Check if transaction already in confirmed state + if tx, exists := c.confirmedTxs[id]; exists && tx.state == Confirmed { + return ErrAlreadyInExpectedState + } + // Transactions should only move to confirmed from broadcasted/processed + if _, exists := c.broadcastedTxs[id]; !exists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + return "", err + } + + // upgrade to write lock if id exists + return c.withWriteLock(func() (string, error) { + id, sigExists := c.sigToID[sig] + if !sigExists { + return id, ErrSigDoesNotExist + } + if _, exists := c.broadcastedTxs[id]; !exists { + return id, ErrTransactionNotFound + } + // call cancel func + remove from map to stop the retry/bumping cycle for this transaction + if cancel, exists := c.cancelBy[id]; exists { + cancel() // cancel context + delete(c.cancelBy, id) + } + tx := c.broadcastedTxs[id] + // update tx state to Confirmed + tx.state = Confirmed + // move tx to confirmed map + c.confirmedTxs[id] = tx + // remove tx from broadcasted map + delete(c.broadcastedTxs, id) + return id, nil + }) +} + +func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout time.Duration) (string, error) { + err := c.withReadLock(func() error { + id, sigExists := c.sigToID[sig] + if !sigExists { + return ErrSigDoesNotExist + } + // Allow transactions to transition from broadcasted, processed, or confirmed state in case there are delays between status checks + _, broadcastedExists := c.broadcastedTxs[id] + _, confirmedExists := c.confirmedTxs[id] + if !broadcastedExists && !confirmedExists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + return "", err + } + + // upgrade to write lock if id exists + return c.withWriteLock(func() (string, error) { + id, exists := c.sigToID[sig] + if !exists { + return id, ErrSigDoesNotExist + } + var tx, tempTx pendingTx + var broadcastedExists, confirmedExists bool + if tempTx, broadcastedExists = c.broadcastedTxs[id]; broadcastedExists { + tx = tempTx + } + if tempTx, confirmedExists = c.confirmedTxs[id]; confirmedExists { + tx = tempTx + } + if !broadcastedExists && !confirmedExists { + return id, ErrTransactionNotFound + } + // call cancel func + remove from map to stop the retry/bumping cycle for this transaction + // cancel is expected to be called and removed when tx is confirmed but checked here too in case state is skipped + if cancel, exists := c.cancelBy[id]; exists { + cancel() // cancel context + delete(c.cancelBy, id) + } + // delete from broadcasted map, if exists + delete(c.broadcastedTxs, id) + // delete from confirmed map, if exists + delete(c.confirmedTxs, id) + // remove all related signatures from the sigToID map to skip picking up this tx in the confirmation logic + for _, s := range tx.signatures { + delete(c.sigToID, s) + } + // if retention duration is set to 0, delete transaction from storage + // otherwise, move to finalized map + if retentionTimeout == 0 { + return id, nil + } + // set the timestamp till which the tx should be retained in storage + tx.retentionTs = time.Now().Add(retentionTimeout) + // update tx state to Finalized + tx.state = Finalized + // move transaction from confirmed to finalized map + c.finalizedErroredTxs[id] = tx + return id, nil + }) +} + +func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.Duration, _ int) (string, error) { + err := c.withReadLock(func() error { + id, sigExists := c.sigToID[sig] + if !sigExists { + return ErrSigDoesNotExist + } + // transaction can transition from any non-finalized state + var broadcastedExists, confirmedExists bool + _, broadcastedExists = c.broadcastedTxs[id] + _, confirmedExists = c.confirmedTxs[id] + // transcation does not exist in any tx maps + if !broadcastedExists && !confirmedExists { + return ErrTransactionNotFound + } + return nil + }) + if err != nil { + return "", err + } + + // upgrade to write lock if sig exists + return c.withWriteLock(func() (string, error) { + id, exists := c.sigToID[sig] + if !exists { + return "", ErrSigDoesNotExist + } + var tx, tempTx pendingTx + var broadcastedExists, confirmedExists bool + if tempTx, broadcastedExists = c.broadcastedTxs[id]; broadcastedExists { + tx = tempTx + } + if tempTx, confirmedExists = c.confirmedTxs[id]; confirmedExists { + tx = tempTx + } + // transcation does not exist in any non-finalized maps + if !broadcastedExists && !confirmedExists { + return "", ErrTransactionNotFound + } + // call cancel func + remove from map + if cancel, exists := c.cancelBy[id]; exists { + cancel() // cancel context + delete(c.cancelBy, id) + } + // delete from broadcasted map, if exists + delete(c.broadcastedTxs, id) + // delete from confirmed map, if exists + delete(c.confirmedTxs, id) + // remove all related signatures from the sigToID map to skip picking up this tx in the confirmation logic + for _, s := range tx.signatures { + delete(c.sigToID, s) + } + // if retention duration is set to 0, delete transaction from storage + // otherwise, move to finalized map + if retentionTimeout == 0 { + return id, nil + } + // set the timestamp till which the tx should be retained in storage + tx.retentionTs = time.Now().Add(retentionTimeout) + // update tx state to Errored + tx.state = Errored + // move transaction from broadcasted to error map + c.finalizedErroredTxs[id] = tx + return id, nil + }) +} + +func (c *pendingTxContext) GetTxState(id string) (TxState, error) { + c.lock.RLock() + defer c.lock.RUnlock() + if tx, exists := c.broadcastedTxs[id]; exists { + return tx.state, nil + } + if tx, exists := c.confirmedTxs[id]; exists { + return tx.state, nil + } + if tx, exists := c.finalizedErroredTxs[id]; exists { + return tx.state, nil + } + return NotFound, fmt.Errorf("failed to find transaction for id: %s", id) } -func (c *pendingTxContext) OnError(sig solana.Signature, _ int) uuid.UUID { - return c.Remove(sig) +// TrimFinalizedErroredTxs deletes transactions from the finalized/errored map and the allTxs map after the retention period has passed +func (c *pendingTxContext) TrimFinalizedErroredTxs() { + c.lock.Lock() + defer c.lock.Unlock() + expiredIDs := make([]string, 0, len(c.finalizedErroredTxs)) + for id, tx := range c.finalizedErroredTxs { + if time.Now().After(tx.retentionTs) { + expiredIDs = append(expiredIDs, id) + } + } + for _, id := range expiredIDs { + delete(c.finalizedErroredTxs, id) + } +} + +func (c *pendingTxContext) withReadLock(fn func() error) error { + c.lock.RLock() + defer c.lock.RUnlock() + return fn() +} + +func (c *pendingTxContext) withWriteLock(fn func() (string, error)) (string, error) { + c.lock.Lock() + defer c.lock.Unlock() + return fn() } var _ PendingTxContext = &pendingTxContextWithProm{} @@ -184,15 +511,27 @@ func newPendingTxContextWithProm(id string) *pendingTxContextWithProm { } } -func (c *pendingTxContextWithProm) New(sig solana.Signature, cancel context.CancelFunc) (uuid.UUID, error) { - return c.pendingTx.New(sig, cancel) +func (c *pendingTxContextWithProm) New(msg pendingTx, sig solana.Signature, cancel context.CancelFunc) error { + return c.pendingTx.New(msg, sig, cancel) +} + +func (c *pendingTxContextWithProm) AddSignature(id string, sig solana.Signature) error { + return c.pendingTx.AddSignature(id, sig) +} + +func (c *pendingTxContextWithProm) OnProcessed(sig solana.Signature) (string, error) { + return c.pendingTx.OnProcessed(sig) } -func (c *pendingTxContextWithProm) Add(id uuid.UUID, sig solana.Signature) error { - return c.pendingTx.Add(id, sig) +func (c *pendingTxContextWithProm) OnConfirmed(sig solana.Signature) (string, error) { + id, err := c.pendingTx.OnConfirmed(sig) // empty ID indicates already previously removed + if id != "" && err == nil { // increment if tx was not removed + promSolTxmSuccessTxs.WithLabelValues(c.chainID).Add(1) + } + return id, err } -func (c *pendingTxContextWithProm) Remove(sig solana.Signature) uuid.UUID { +func (c *pendingTxContextWithProm) Remove(sig solana.Signature) (string, error) { return c.pendingTx.Remove(sig) } @@ -206,25 +545,25 @@ func (c *pendingTxContextWithProm) Expired(sig solana.Signature, lifespan time.D return c.pendingTx.Expired(sig, lifespan) } -// Success - tx included in block and confirmed -func (c *pendingTxContextWithProm) OnSuccess(sig solana.Signature) uuid.UUID { - id := c.pendingTx.OnSuccess(sig) // empty ID indicates already previously removed - if id != uuid.Nil { // increment if tx was not removed - promSolTxmSuccessTxs.WithLabelValues(c.chainID).Add(1) +// Success - tx finalized +func (c *pendingTxContextWithProm) OnFinalized(sig solana.Signature, retentionTimeout time.Duration) (string, error) { + id, err := c.pendingTx.OnFinalized(sig, retentionTimeout) // empty ID indicates already previously removed + if id != "" && err == nil { // increment if tx was not removed + promSolTxmFinalizedTxs.WithLabelValues(c.chainID).Add(1) } - return id + return id, err } -func (c *pendingTxContextWithProm) OnError(sig solana.Signature, errType int) uuid.UUID { +func (c *pendingTxContextWithProm) OnError(sig solana.Signature, retentionTimeout time.Duration, errType int) (string, error) { // special RPC rejects transaction (signature will not be valid) if errType == TxFailReject { promSolTxmRejectTxs.WithLabelValues(c.chainID).Add(1) promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1) - return uuid.Nil + return "", nil } - id := c.pendingTx.OnError(sig, errType) // empty ID indicates already removed - if id != uuid.Nil { + id, err := c.pendingTx.OnError(sig, retentionTimeout, errType) // err indicates transaction not found so may already be removed + if err == nil { switch errType { case TxFailRevert: promSolTxmRevertTxs.WithLabelValues(c.chainID).Add(1) @@ -246,5 +585,13 @@ func (c *pendingTxContextWithProm) OnError(sig solana.Signature, errType int) uu promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1) } - return id + return id, err +} + +func (c *pendingTxContextWithProm) GetTxState(id string) (TxState, error) { + return c.pendingTx.GetTxState(id) +} + +func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() { + c.pendingTx.TrimFinalizedErroredTxs() } diff --git a/pkg/solana/txm/pendingtx_test.go b/pkg/solana/txm/pendingtx_test.go index 5639bff59..b1212ca21 100644 --- a/pkg/solana/txm/pendingtx_test.go +++ b/pkg/solana/txm/pendingtx_test.go @@ -15,15 +15,13 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) -func TestPendingTxContext(t *testing.T) { +func TestPendingTxContext_add_remove_multiple(t *testing.T) { var wg sync.WaitGroup ctx := tests.Context(t) - newProcess := func(i int) (solana.Signature, context.CancelFunc) { + newProcess := func() (solana.Signature, context.CancelFunc) { // make random signature - sig := make([]byte, 64) - _, err := rand.Read(sig) - require.NoError(t, err) + sig := randomSignature(t) // start subprocess to wait for context processCtx, cancel := context.WithCancel(ctx) @@ -32,22 +30,23 @@ func TestPendingTxContext(t *testing.T) { <-processCtx.Done() wg.Done() }() - return solana.SignatureFromBytes(sig), cancel + return sig, cancel } // init inflight txs map + store some signatures and cancelFunc txs := newPendingTxContext() - ids := map[solana.Signature]uuid.UUID{} + ids := map[solana.Signature]string{} n := 5 for i := 0; i < n; i++ { - sig, cancel := newProcess(i) - id, err := txs.New(sig, cancel) + sig, cancel := newProcess() + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) assert.NoError(t, err) - ids[sig] = id + ids[sig] = msg.id } // cannot add signature for non existent ID - require.Error(t, txs.Add(uuid.New(), solana.Signature{})) + require.Error(t, txs.AddSignature(uuid.New().String(), solana.Signature{})) // return list of signatures list := txs.ListAll() @@ -55,28 +54,845 @@ func TestPendingTxContext(t *testing.T) { // stop all sub processes for i := 0; i < len(list); i++ { - id := txs.Remove(list[i]) + id, err := txs.Remove(list[i]) + assert.NoError(t, err) assert.Equal(t, n-i-1, len(txs.ListAll())) assert.Equal(t, ids[list[i]], id) // second remove should not return valid id - already removed - assert.Equal(t, uuid.Nil, txs.Remove(list[i])) + id, err = txs.Remove(list[i]) + require.Error(t, err) + assert.Equal(t, "", id) } wg.Wait() } +func TestPendingTxContext_new(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + sig := randomSignature(t) + txs := newPendingTxContext() + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Check it exists in signature map + id, exists := txs.sigToID[sig] + require.True(t, exists) + require.Equal(t, msg.id, id) + + // Check it exists in broadcasted map + tx, exists := txs.broadcastedTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 1) + require.Equal(t, sig, tx.signatures[0]) + + // Check status is Broadcasted + require.Equal(t, Broadcasted, tx.state) + + // Check it does not exist in confirmed map + tx, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in finalized map + tx, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) +} + +func TestPendingTxContext_add_signature(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + + t.Run("successfully add signature to transaction", func(t *testing.T) { + sig1 := randomSignature(t) + sig2 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + err = txs.AddSignature(msg.id, sig2) + require.NoError(t, err) + + // Check signature map + id, exists := txs.sigToID[sig1] + require.True(t, exists) + require.Equal(t, msg.id, id) + id, exists = txs.sigToID[sig2] + require.True(t, exists) + require.Equal(t, msg.id, id) + + // Check broadcasted map + tx, exists := txs.broadcastedTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 2) + require.Equal(t, sig1, tx.signatures[0]) + require.Equal(t, sig2, tx.signatures[1]) + + // Check confirmed map + tx, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check finalized map + tx, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) + }) + + t.Run("fails to add duplicate signature", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + err = txs.AddSignature(msg.id, sig) + require.ErrorIs(t, err, ErrSigAlreadyExists) + }) + + t.Run("fails to add signature for missing transaction", func(t *testing.T) { + sig1 := randomSignature(t) + sig2 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + err = txs.AddSignature("bad id", sig2) + require.ErrorIs(t, err, ErrTransactionNotFound) + }) + + t.Run("fails to add signature for confirmed transaction", func(t *testing.T) { + sig1 := randomSignature(t) + sig2 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + err = txs.AddSignature(msg.id, sig2) + require.ErrorIs(t, err, ErrTransactionNotFound) + }) +} + +func TestPendingTxContext_on_broadcasted_processed(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + t.Run("successfully transition transaction from broadcasted to processed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it exists in signature map + id, exists := txs.sigToID[sig] + require.True(t, exists) + require.Equal(t, msg.id, id) + + // Check it exists in broadcasted map + tx, exists := txs.broadcastedTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 1) + require.Equal(t, sig, tx.signatures[0]) + + // Check status is Processed + require.Equal(t, Processed, tx.state) + + // Check it does not exist in confirmed map + tx, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in finalized map + tx, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) + }) + + t.Run("fails to transition transaction from confirmed to processed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to processed state + _, err = txs.OnProcessed(sig) + require.Error(t, err) + }) + + t.Run("fails to transition transaction from finalized to processed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to finalized state + id, err = txs.OnFinalized(sig, retentionTimeout) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to processed state + _, err = txs.OnProcessed(sig) + require.Error(t, err) + }) + + t.Run("fails to transition transaction from errored to processed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnError(sig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to processed state + _, err = txs.OnProcessed(sig) + require.Error(t, err) + }) + + t.Run("predefined error if transaction already in processed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // No error if OnProcessed called again + _, err = txs.OnProcessed(sig) + require.ErrorIs(t, err, ErrAlreadyInExpectedState) + }) +} + +func TestPendingTxContext_on_confirmed(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + t.Run("successfully transition transaction from broadcasted/processed to confirmed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it exists in signature map + id, exists := txs.sigToID[sig] + require.True(t, exists) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists = txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it exists in confirmed map + tx, exists := txs.confirmedTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 1) + require.Equal(t, sig, tx.signatures[0]) + + // Check status is Confirmed + require.Equal(t, Confirmed, tx.state) + + // Check it does not exist in finalized map + tx, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) + }) + + t.Run("fails to transition transaction from finalized to confirmed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to finalized state + id, err = txs.OnFinalized(sig, retentionTimeout) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to processed state + _, err = txs.OnConfirmed(sig) + require.Error(t, err) + }) + + t.Run("fails to transition transaction from errored to confirmed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnError(sig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to confirmed state + _, err = txs.OnConfirmed(sig) + require.Error(t, err) + }) + + t.Run("predefined error if transaction already in confirmed state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // No error if OnConfirmed called again + _, err = txs.OnConfirmed(sig) + require.ErrorIs(t, err, ErrAlreadyInExpectedState) + }) +} + +func TestPendingTxContext_on_finalized(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + t.Run("successfully transition transaction from broadcasted/processed to finalized state", func(t *testing.T) { + sig1 := randomSignature(t) + sig2 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + // Add second signature + err = txs.AddSignature(msg.id, sig2) + require.NoError(t, err) + + // Transition to finalized state + id, err := txs.OnFinalized(sig1, retentionTimeout) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it exists in finalized map + tx, exists := txs.finalizedErroredTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 2) + require.Equal(t, sig1, tx.signatures[0]) + require.Equal(t, sig2, tx.signatures[1]) + + // Check status is Finalized + require.Equal(t, Finalized, tx.state) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig1] + require.False(t, exists) + _, exists = txs.sigToID[sig2] + require.False(t, exists) + }) + + t.Run("successfully transition transaction from confirmed to finalized state", func(t *testing.T) { + sig1 := randomSignature(t) + sig2 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + // Add second signature + err = txs.AddSignature(msg.id, sig2) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to finalized state + id, err = txs.OnFinalized(sig1, retentionTimeout) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it exists in finalized map + tx, exists := txs.finalizedErroredTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 2) + require.Equal(t, sig1, tx.signatures[0]) + require.Equal(t, sig2, tx.signatures[1]) + + // Check status is Finalized + require.Equal(t, Finalized, tx.state) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig1] + require.False(t, exists) + _, exists = txs.sigToID[sig2] + require.False(t, exists) + }) + + t.Run("successfully delete transaction when finalized with 0 retention timeout", func(t *testing.T) { + sig1 := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig1, cancel) + require.NoError(t, err) + + // Transition to processed state + id, err := txs.OnProcessed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to confirmed state + id, err = txs.OnConfirmed(sig1) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to finalized state + id, err = txs.OnFinalized(sig1, 0*time.Second) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in finalized map + _, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig1] + require.False(t, exists) + }) + + t.Run("fails to transition transaction from errored to finalized state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnError(sig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to confirmed state + _, err = txs.OnFinalized(sig, retentionTimeout) + require.Error(t, err) + }) +} + +func TestPendingTxContext_on_error(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + t.Run("successfully transition transaction from broadcasted/processed to errored state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnError(sig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it exists in errored map + tx, exists := txs.finalizedErroredTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 1) + require.Equal(t, sig, tx.signatures[0]) + + // Check status is Finalized + require.Equal(t, Errored, tx.state) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig] + require.False(t, exists) + }) + + t.Run("successfully transitions transaction from confirmed to errored state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to errored state + id, err = txs.OnError(sig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it exists in errored map + tx, exists := txs.finalizedErroredTxs[msg.id] + require.True(t, exists) + require.Len(t, tx.signatures, 1) + require.Equal(t, sig, tx.signatures[0]) + + // Check status is Finalized + require.Equal(t, Errored, tx.state) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig] + require.False(t, exists) + }) + + t.Run("successfully delete transaction when errored with 0 retention timeout", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to errored state + id, err := txs.OnConfirmed(sig) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition to errored state + id, err = txs.OnError(sig, 0*time.Second, 0) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Check it does not exist in broadcasted map + _, exists := txs.broadcastedTxs[msg.id] + require.False(t, exists) + + // Check it does not exist in confirmed map + _, exists = txs.confirmedTxs[msg.id] + require.False(t, exists) + + // Check it exists in errored map + _, exists = txs.finalizedErroredTxs[msg.id] + require.False(t, exists) + + // Check sigs do no exist in signature map + _, exists = txs.sigToID[sig] + require.False(t, exists) + }) + + t.Run("fails to transition transaction from finalized to errored state", func(t *testing.T) { + sig := randomSignature(t) + + // Create new transaction + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) + require.NoError(t, err) + + // Transition to confirmed state + id, err := txs.OnFinalized(sig, retentionTimeout) + require.NoError(t, err) + require.Equal(t, msg.id, id) + + // Transition back to confirmed state + id, err = txs.OnError(sig, retentionTimeout, 0) + require.Error(t, err) + require.Equal(t, "", id) + }) +} + +func TestPendingTxContext_remove(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + broadcastedSig1 := randomSignature(t) + broadcastedSig2 := randomSignature(t) + processedSig := randomSignature(t) + confirmedSig := randomSignature(t) + finalizedSig := randomSignature(t) + erroredSig := randomSignature(t) + + // Create new broadcasted transaction with extra sig + broadcastedMsg := pendingTx{id: uuid.NewString()} + err := txs.New(broadcastedMsg, broadcastedSig1, cancel) + require.NoError(t, err) + err = txs.AddSignature(broadcastedMsg.id, broadcastedSig2) + require.NoError(t, err) + + // Create new processed transaction + processedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(processedMsg, processedSig, cancel) + require.NoError(t, err) + id, err := txs.OnProcessed(processedSig) + require.NoError(t, err) + require.Equal(t, processedMsg.id, id) + + // Create new confirmed transaction + confirmedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(confirmedMsg, confirmedSig, cancel) + require.NoError(t, err) + id, err = txs.OnConfirmed(confirmedSig) + require.NoError(t, err) + require.Equal(t, confirmedMsg.id, id) + + // Create new finalized transaction + finalizedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(finalizedMsg, finalizedSig, cancel) + require.NoError(t, err) + id, err = txs.OnFinalized(finalizedSig, retentionTimeout) + require.NoError(t, err) + require.Equal(t, finalizedMsg.id, id) + + // Create new errored transaction + erroredMsg := pendingTx{id: uuid.NewString()} + err = txs.New(erroredMsg, erroredSig, cancel) + require.NoError(t, err) + id, err = txs.OnError(erroredSig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, erroredMsg.id, id) + + // Remove broadcasted transaction + id, err = txs.Remove(broadcastedSig1) + require.NoError(t, err) + require.Equal(t, broadcastedMsg.id, id) + // Check removed from broadcasted map + _, exists := txs.broadcastedTxs[broadcastedMsg.id] + require.False(t, exists) + // Check all signatures removed from sig map + _, exists = txs.sigToID[broadcastedSig1] + require.False(t, exists) + _, exists = txs.sigToID[broadcastedSig2] + require.False(t, exists) + + // Remove processed transaction + id, err = txs.Remove(processedSig) + require.NoError(t, err) + require.Equal(t, processedMsg.id, id) + // Check removed from broadcasted map + _, exists = txs.broadcastedTxs[processedMsg.id] + require.False(t, exists) + // Check all signatures removed from sig map + _, exists = txs.sigToID[processedSig] + require.False(t, exists) + + // Remove confirmed transaction + id, err = txs.Remove(confirmedSig) + require.NoError(t, err) + require.Equal(t, confirmedMsg.id, id) + // Check removed from confirmed map + _, exists = txs.confirmedTxs[confirmedMsg.id] + require.False(t, exists) + // Check all signatures removed from sig map + _, exists = txs.sigToID[confirmedSig] + require.False(t, exists) + + // Check remove cannot be called on finalized transaction + id, err = txs.Remove(finalizedSig) + require.Error(t, err) + require.Equal(t, "", id) + + // Check remove cannot be called on errored transaction + id, err = txs.Remove(erroredSig) + require.Error(t, err) + require.Equal(t, "", id) + + // Check sig list is empty after all removals + require.Empty(t, txs.ListAll()) +} +func TestPendingTxContext_trim_finalized_errored_txs(t *testing.T) { + t.Parallel() + txs := newPendingTxContext() + + // Create new finalized transaction with retention ts in the past and add to map + finalizedMsg1 := pendingTx{id: uuid.NewString(), retentionTs: time.Now().Add(-2 * time.Second)} + txs.finalizedErroredTxs[finalizedMsg1.id] = finalizedMsg1 + + // Create new finalized transaction with retention ts in the future and add to map + finalizedMsg2 := pendingTx{id: uuid.NewString(), retentionTs: time.Now().Add(1 * time.Second)} + txs.finalizedErroredTxs[finalizedMsg2.id] = finalizedMsg2 + + // Create new finalized transaction with retention ts in the past and add to map + erroredMsg := pendingTx{id: uuid.NewString(), retentionTs: time.Now().Add(-2 * time.Second)} + txs.finalizedErroredTxs[erroredMsg.id] = erroredMsg + + // Delete finalized/errored transactions that have passed the retention period + txs.TrimFinalizedErroredTxs() + + // Check finalized message past retention is deleted + _, exists := txs.finalizedErroredTxs[finalizedMsg1.id] + require.False(t, exists) + + // Check errored message past retention is deleted + _, exists = txs.finalizedErroredTxs[erroredMsg.id] + require.False(t, exists) + + // Check finalized message within retention period still exists + msg, exists := txs.finalizedErroredTxs[finalizedMsg2.id] + require.True(t, exists) + require.Equal(t, finalizedMsg2.id, msg.id) +} + func TestPendingTxContext_expired(t *testing.T) { + t.Parallel() _, cancel := context.WithCancel(tests.Context(t)) sig := solana.Signature{} txs := newPendingTxContext() - id, err := txs.New(sig, cancel) + msg := pendingTx{id: uuid.NewString()} + err := txs.New(msg, sig, cancel) assert.NoError(t, err) - assert.True(t, txs.Expired(sig, 0*time.Second)) // expired for 0s lifetime + msg, exists := txs.broadcastedTxs[msg.id] + require.True(t, exists) + + // Set createTs to 10 seconds ago + msg.createTs = time.Now().Add(-10 * time.Second) + txs.broadcastedTxs[msg.id] = msg + + assert.False(t, txs.Expired(sig, 0*time.Second)) // false if timeout 0 + assert.True(t, txs.Expired(sig, 5*time.Second)) // expired for 5s lifetime assert.False(t, txs.Expired(sig, 60*time.Second)) // not expired for 60s lifetime - assert.Equal(t, id, txs.Remove(sig)) + id, err := txs.Remove(sig) + assert.NoError(t, err) + assert.Equal(t, msg.id, id) assert.False(t, txs.Expired(sig, 60*time.Second)) // no longer exists, should return false } @@ -88,11 +904,11 @@ func TestPendingTxContext_race(t *testing.T) { var err [2]error go func() { - _, err[0] = txCtx.New(solana.Signature{}, func() {}) + err[0] = txCtx.New(pendingTx{id: uuid.NewString()}, solana.Signature{}, func() {}) wg.Done() }() go func() { - _, err[1] = txCtx.New(solana.Signature{}, func() {}) + err[1] = txCtx.New(pendingTx{id: uuid.NewString()}, solana.Signature{}, func() {}) wg.Done() }() @@ -100,20 +916,21 @@ func TestPendingTxContext_race(t *testing.T) { assert.True(t, (err[0] != nil && err[1] == nil) || (err[0] == nil && err[1] != nil), "one and only one 'add' should have errored") }) - t.Run("add", func(t *testing.T) { + t.Run("add signature", func(t *testing.T) { txCtx := newPendingTxContext() - id, createErr := txCtx.New(solana.Signature{}, func() {}) + msg := pendingTx{id: uuid.NewString()} + createErr := txCtx.New(msg, solana.Signature{}, func() {}) require.NoError(t, createErr) var wg sync.WaitGroup wg.Add(2) var err [2]error go func() { - err[0] = txCtx.Add(id, solana.Signature{1}) + err[0] = txCtx.AddSignature(msg.id, solana.Signature{1}) wg.Done() }() go func() { - err[1] = txCtx.Add(id, solana.Signature{1}) + err[1] = txCtx.AddSignature(msg.id, solana.Signature{1}) wg.Done() }() @@ -123,20 +940,102 @@ func TestPendingTxContext_race(t *testing.T) { t.Run("remove", func(t *testing.T) { txCtx := newPendingTxContext() - _, err := txCtx.New(solana.Signature{}, func() {}) + msg := pendingTx{id: uuid.NewString()} + err := txCtx.New(msg, solana.Signature{}, func() {}) require.NoError(t, err) var wg sync.WaitGroup wg.Add(2) go func() { - assert.NotPanics(t, func() { txCtx.Remove(solana.Signature{}) }) + assert.NotPanics(t, func() { txCtx.Remove(solana.Signature{}) }) //nolint // no need to check error wg.Done() }() go func() { - assert.NotPanics(t, func() { txCtx.Remove(solana.Signature{}) }) + assert.NotPanics(t, func() { txCtx.Remove(solana.Signature{}) }) //nolint // no need to check error wg.Done() }() wg.Wait() }) } + +func TestGetTxState(t *testing.T) { + t.Parallel() + _, cancel := context.WithCancel(tests.Context(t)) + txs := newPendingTxContext() + retentionTimeout := 5 * time.Second + + broadcastedSig := randomSignature(t) + processedSig := randomSignature(t) + confirmedSig := randomSignature(t) + finalizedSig := randomSignature(t) + erroredSig := randomSignature(t) + + // Create new broadcasted transaction with extra sig + broadcastedMsg := pendingTx{id: uuid.NewString()} + err := txs.New(broadcastedMsg, broadcastedSig, cancel) + require.NoError(t, err) + + var state TxState + // Create new processed transaction + processedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(processedMsg, processedSig, cancel) + require.NoError(t, err) + id, err := txs.OnProcessed(processedSig) + require.NoError(t, err) + require.Equal(t, processedMsg.id, id) + // Check Processed state is returned + state, err = txs.GetTxState(processedMsg.id) + require.NoError(t, err) + require.Equal(t, Processed, state) + + // Create new confirmed transaction + confirmedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(confirmedMsg, confirmedSig, cancel) + require.NoError(t, err) + id, err = txs.OnConfirmed(confirmedSig) + require.NoError(t, err) + require.Equal(t, confirmedMsg.id, id) + // Check Confirmed state is returned + state, err = txs.GetTxState(confirmedMsg.id) + require.NoError(t, err) + require.Equal(t, Confirmed, state) + + // Create new finalized transaction + finalizedMsg := pendingTx{id: uuid.NewString()} + err = txs.New(finalizedMsg, finalizedSig, cancel) + require.NoError(t, err) + id, err = txs.OnFinalized(finalizedSig, retentionTimeout) + require.NoError(t, err) + require.Equal(t, finalizedMsg.id, id) + // Check Finalized state is returned + state, err = txs.GetTxState(finalizedMsg.id) + require.NoError(t, err) + require.Equal(t, Finalized, state) + + // Create new errored transaction + erroredMsg := pendingTx{id: uuid.NewString()} + err = txs.New(erroredMsg, erroredSig, cancel) + require.NoError(t, err) + id, err = txs.OnError(erroredSig, retentionTimeout, 0) + require.NoError(t, err) + require.Equal(t, erroredMsg.id, id) + // Check Errored state is returned + state, err = txs.GetTxState(erroredMsg.id) + require.NoError(t, err) + require.Equal(t, Errored, state) + + // Check NotFound state is returned if unknown id provided + state, err = txs.GetTxState("unknown id") + require.Error(t, err) + require.Equal(t, NotFound, state) +} + +func randomSignature(t *testing.T) solana.Signature { + // make random signature + sig := make([]byte, 64) + _, err := rand.Read(sig) + require.NoError(t, err) + + return solana.SignatureFromBytes(sig) +} diff --git a/pkg/solana/txm/prom.go b/pkg/solana/txm/prom.go index 59c74b687..dcc686be8 100644 --- a/pkg/solana/txm/prom.go +++ b/pkg/solana/txm/prom.go @@ -11,6 +11,10 @@ var ( Name: "solana_txm_tx_success", Help: "Number of transactions that are included and successfully executed on chain", }, []string{"chainID"}) + promSolTxmFinalizedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_txm_tx_finalized", + Help: "Number of transactions that are finalized on chain", + }, []string{"chainID"}) // inflight transactions promSolTxmPendingTxs = promauto.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 7cd09cf5e..2a99a6c44 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" @@ -28,9 +29,10 @@ import ( const ( MaxQueueLen = 1000 - MaxRetryTimeMs = 250 // max tx retry time (exponential retry will taper to retry every 0.25s) - MaxSigsToConfirm = 256 // max number of signatures in GetSignatureStatus call - EstimateComputeUnitLimitBuffer = 10 // percent buffer added on top of estimated compute unit limits to account for any variance + MaxRetryTimeMs = 250 // max tx retry time (exponential retry will taper to retry every 0.25s) + MaxSigsToConfirm = 256 // max number of signatures in GetSignatureStatus call + EstimateComputeUnitLimitBuffer = 10 // percent buffer added on top of estimated compute unit limits to account for any variance + TxReapInterval = 10 * time.Second // interval of time between reaping transactions that have met the retention threshold ) var _ services.Service = (*Txm)(nil) @@ -75,13 +77,6 @@ type TxConfig struct { ComputeUnitLimit uint32 // compute unit limit } -type pendingTx struct { - tx *solanaGo.Transaction - cfg TxConfig - signature solanaGo.Signature - id uuid.UUID -} - // NewTxm creates a txm. Uses simulation so should only be used to send txes to trusted contracts i.e. OCR. func NewTxm(chainID string, client internal.Loader[client.ReaderWriter], sendTx func(ctx context.Context, tx *solanaGo.Transaction) (solanaGo.Signature, error), @@ -136,6 +131,12 @@ func (txm *Txm) Start(ctx context.Context) error { go txm.run() go txm.confirm() go txm.simulate() + // Start reaping loop only if TxRetentionTimeout > 0 + // Otherwise, transactions are dropped immediately after finalization so the loop is not required + if txm.cfg.TxRetentionTimeout() > 0 { + txm.done.Add(1) // waitgroup: reaper + go txm.reap() + } return nil }) @@ -150,7 +151,7 @@ func (txm *Txm) run() { select { case msg := <-txm.chSend: // process tx (pass tx copy) - tx, id, sig, err := txm.sendWithRetry(ctx, *msg.tx, msg.cfg) + tx, id, sig, err := txm.sendWithRetry(ctx, msg) if err != nil { txm.lggr.Errorw("failed to send transaction", "error", err) txm.client.Reset() // clear client if tx fails immediately (potentially bad RPC) @@ -158,13 +159,13 @@ func (txm *Txm) run() { } // send tx + signature to simulation queue - msg.tx = &tx - msg.signature = sig + msg.tx = tx + msg.signatures = append(msg.signatures, sig) msg.id = id select { case txm.chSim <- msg: default: - txm.lggr.Warnw("failed to enqeue tx for simulation", "queueFull", len(txm.chSend) == MaxQueueLen, "tx", msg) + txm.lggr.Warnw("failed to enqueue tx for simulation", "queueFull", len(txm.chSend) == MaxQueueLen, "tx", msg) } txm.lggr.Debugw("transaction sent", "signature", sig.String(), "id", id) @@ -174,29 +175,31 @@ func (txm *Txm) run() { } } -func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, txcfg TxConfig) (solanaGo.Transaction, uuid.UUID, solanaGo.Signature, error) { +func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Transaction, string, solanaGo.Signature, error) { // get key // fee payer account is index 0 account // https://github.com/gagliardetto/solana-go/blob/main/transaction.go#L252 - key := baseTx.Message.AccountKeys[0].String() + key := msg.tx.Message.AccountKeys[0].String() // base compute unit price should only be calculated once // prevent underlying base changing when bumping (could occur with RPC based estimation) getFee := func(count int) fees.ComputeUnitPrice { fee := fees.CalculateFee( - txcfg.BaseComputeUnitPrice, - txcfg.ComputeUnitPriceMax, - txcfg.ComputeUnitPriceMin, + msg.cfg.BaseComputeUnitPrice, + msg.cfg.ComputeUnitPriceMax, + msg.cfg.ComputeUnitPriceMin, uint(count), //nolint:gosec // reasonable number of bumps should never cause overflow ) return fees.ComputeUnitPrice(fee) } + baseTx := msg.tx + // add compute unit limit instruction - static for the transaction // skip if compute unit limit = 0 (otherwise would always fail) - if txcfg.ComputeUnitLimit != 0 { - if computeUnitLimitErr := fees.SetComputeUnitLimit(&baseTx, fees.ComputeUnitLimit(txcfg.ComputeUnitLimit)); computeUnitLimitErr != nil { - return solanaGo.Transaction{}, uuid.Nil, solanaGo.Signature{}, fmt.Errorf("failed to add compute unit limit instruction: %w", computeUnitLimitErr) + if msg.cfg.ComputeUnitLimit != 0 { + if computeUnitLimitErr := fees.SetComputeUnitLimit(&baseTx, fees.ComputeUnitLimit(msg.cfg.ComputeUnitLimit)); computeUnitLimitErr != nil { + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to add compute unit limit instruction: %w", computeUnitLimitErr) } } @@ -227,35 +230,35 @@ func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, initTx, initBuildErr := buildTx(ctx, baseTx, 0) if initBuildErr != nil { - return solanaGo.Transaction{}, uuid.Nil, solanaGo.Signature{}, initBuildErr + return solanaGo.Transaction{}, "", solanaGo.Signature{}, initBuildErr } // create timeout context - ctx, cancel := context.WithTimeout(ctx, txcfg.Timeout) + ctx, cancel := context.WithTimeout(ctx, msg.cfg.Timeout) // send initial tx (do not retry and exit early if fails) sig, initSendErr := txm.sendTx(ctx, &initTx) if initSendErr != nil { - cancel() // cancel context when exiting early - txm.txs.OnError(sig, TxFailReject) // increment failed metric - return solanaGo.Transaction{}, uuid.Nil, solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", initSendErr) + cancel() // cancel context when exiting early + txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailReject) //nolint // no need to check error since only incrementing metric here + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", initSendErr) } // store tx signature + cancel function - id, initStoreErr := txm.txs.New(sig, cancel) + initStoreErr := txm.txs.New(msg, sig, cancel) if initStoreErr != nil { cancel() // cancel context when exiting early - return solanaGo.Transaction{}, uuid.Nil, solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, initStoreErr) + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, initStoreErr) } // used for tracking rebroadcasting only in SendWithRetry var sigs signatureList sigs.Allocate() if initSetErr := sigs.Set(0, sig); initSetErr != nil { - return solanaGo.Transaction{}, uuid.Nil, solanaGo.Signature{}, fmt.Errorf("failed to save initial signature in signature list: %w", initSetErr) + return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save initial signature in signature list: %w", initSetErr) } - txm.lggr.Debugw("tx initial broadcast", "id", id, "signature", sig) + txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", getFee(0), "signature", sig) txm.done.Add(1) // retry with exponential backoff @@ -274,12 +277,12 @@ func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, case <-ctx.Done(): // stop sending tx after retry tx ctx times out (does not stop confirmation polling for tx) wg.Wait() - txm.lggr.Debugw("stopped tx retry", "id", id, "signatures", sigs.List(), "err", context.Cause(ctx)) + txm.lggr.Debugw("stopped tx retry", "id", msg.id, "signatures", sigs.List(), "err", context.Cause(ctx)) return case <-tick: var shouldBump bool // bump if period > 0 and past time - if txcfg.FeeBumpPeriod != 0 && time.Since(bumpTime) > txcfg.FeeBumpPeriod { + if msg.cfg.FeeBumpPeriod != 0 && time.Since(bumpTime) > msg.cfg.FeeBumpPeriod { bumpCount++ bumpTime = time.Now() shouldBump = true @@ -290,7 +293,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, var retryBuildErr error currentTx, retryBuildErr = buildTx(ctx, baseTx, bumpCount) if retryBuildErr != nil { - txm.lggr.Errorw("failed to build bumped retry tx", "error", retryBuildErr, "id", id) + txm.lggr.Errorw("failed to build bumped retry tx", "error", retryBuildErr, "id", msg.id) return // exit func if cannot build tx for retrying } ind := sigs.Allocate() @@ -309,24 +312,24 @@ func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, // this could occur if endpoint goes down or if ctx cancelled if retrySendErr != nil { if strings.Contains(retrySendErr.Error(), "context canceled") || strings.Contains(retrySendErr.Error(), "context deadline exceeded") { - txm.lggr.Debugw("ctx error on send retry transaction", "error", retrySendErr, "signatures", sigs.List(), "id", id) + txm.lggr.Debugw("ctx error on send retry transaction", "error", retrySendErr, "signatures", sigs.List(), "id", msg.id) } else { - txm.lggr.Warnw("failed to send retry transaction", "error", retrySendErr, "signatures", sigs.List(), "id", id) + txm.lggr.Warnw("failed to send retry transaction", "error", retrySendErr, "signatures", sigs.List(), "id", msg.id) } return } // save new signature if fee bumped if bump { - if retryStoreErr := txm.txs.Add(id, retrySig); retryStoreErr != nil { - txm.lggr.Warnw("error in adding retry transaction", "error", retryStoreErr, "id", id) + if retryStoreErr := txm.txs.AddSignature(msg.id, retrySig); retryStoreErr != nil { + txm.lggr.Warnw("error in adding retry transaction", "error", retryStoreErr, "id", msg.id) return } if setErr := sigs.Set(count, retrySig); setErr != nil { // this should never happen txm.lggr.Errorw("INVARIANT VIOLATION", "error", setErr) } - txm.lggr.Debugw("tx rebroadcast with bumped fee", "id", id, "fee", getFee(count), "signatures", sigs.List()) + txm.lggr.Debugw("tx rebroadcast with bumped fee", "id", msg.id, "fee", getFee(count), "signatures", sigs.List()) } // prevent locking on waitgroup when ctx is closed @@ -358,7 +361,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, baseTx solanaGo.Transaction, }(ctx, baseTx, initTx) // return signed tx, id, signature for use in simulation - return initTx, id, sig, nil + return initTx, msg.id, sig, nil } // goroutine that polls to confirm implementation @@ -415,45 +418,67 @@ func (txm *Txm) confirm() { // check confirm timeout exceeded if txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) { - id := txm.txs.OnError(s[i], TxFailDrop) - txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop) + if err != nil { + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) + } else { + txm.lggr.Infow("failed to find transaction within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + } } continue } // if signature has an error, end polling if res[i].Err != nil { - id := txm.txs.OnError(s[i], TxFailRevert) - txm.lggr.Debugw("tx state: failed", - "id", id, - "signature", s[i], - "error", res[i].Err, - "status", res[i].ConfirmationStatus, - ) + id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailRevert) + if err != nil { + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "error", err) + } else { + txm.lggr.Debugw("tx state: failed", "id", id, "signature", s[i], "error", res[i].Err, "status", res[i].ConfirmationStatus) + } continue } - // if signature is processed, keep polling + // if signature is processed, keep polling for confirmed or finalized status if res[i].ConfirmationStatus == rpc.ConfirmationStatusProcessed { - txm.lggr.Debugw("tx state: processed", - "signature", s[i], - ) + // update transaction state in local memory + id, err := txm.txs.OnProcessed(s[i]) + if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { + txm.lggr.Errorw("failed to mark transaction as processed", "signature", s[i], "error", err) + } else if err == nil { + txm.lggr.Debugw("marking transaction as processed", "id", id, "signature", s[i]) + } + // check confirm timeout exceeded if TxConfirmTimeout set + if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) { + id, err := txm.txs.OnError(s[i], txm.cfg.TxRetentionTimeout(), TxFailDrop) + if err != nil { + txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) + } else { + txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + } + } + continue + } - // check confirm timeout exceeded - if txm.txs.Expired(s[i], txm.cfg.TxConfirmTimeout()) { - id := txm.txs.OnError(s[i], TxFailDrop) - txm.lggr.Debugw("tx failed to move beyond 'processed' within confirm timeout", "id", id, "signature", s[i], "timeoutSeconds", txm.cfg.TxConfirmTimeout()) + // if signature is confirmed, keep polling for finalized status + if res[i].ConfirmationStatus == rpc.ConfirmationStatusConfirmed { + id, err := txm.txs.OnConfirmed(s[i]) + if err != nil && !errors.Is(err, ErrAlreadyInExpectedState) { + txm.lggr.Errorw("failed to mark transaction as confirmed", "id", id, "signature", s[i], "error", err) + } else if err == nil { + txm.lggr.Debugw("marking transaction as confirmed", "id", id, "signature", s[i]) } continue } - // if signature is confirmed/finalized, end polling - if res[i].ConfirmationStatus == rpc.ConfirmationStatusConfirmed || res[i].ConfirmationStatus == rpc.ConfirmationStatusFinalized { - id := txm.txs.OnSuccess(s[i]) - txm.lggr.Debugw(fmt.Sprintf("tx state: %s", res[i].ConfirmationStatus), - "id", id, - "signature", s[i], - ) + // if signature is finalized, end polling + if res[i].ConfirmationStatus == rpc.ConfirmationStatusFinalized { + id, err := txm.txs.OnFinalized(s[i], txm.cfg.TxRetentionTimeout()) + if err != nil { + txm.lggr.Errorw("failed to mark transaction as finalized", "id", id, "signature", s[i], "error", err) + } else { + txm.lggr.Debugw("marking transaction as finalized", "id", id, "signature", s[i]) + } continue } } @@ -497,8 +522,11 @@ func (txm *Txm) simulate() { case <-ctx.Done(): return case msg := <-txm.chSim: - res, err := txm.simulateTx(ctx, msg.tx) + res, err := txm.simulateTx(ctx, &msg.tx) if err != nil { + // this error can occur if endpoint goes down or if invalid signature (invalid signature should occur further upstream in sendWithRetry) + // allow retry to continue in case temporary endpoint failure (if still invalid, confirmation or timeout will cleanup) + txm.lggr.Debugw("failed to simulate tx", "id", msg.id, "signatures", msg.signatures, "error", err) continue } @@ -507,13 +535,35 @@ func (txm *Txm) simulate() { continue } - txm.processSimulationError(msg.id, msg.signature, res) + // Transaction has to have a signature if simulation succeeded but added check for belt and braces approach + if len(msg.signatures) > 0 { + txm.processSimulationError(msg.id, msg.signatures[0], res) + } + } + } +} + +// reap is a goroutine that periodically checks whether finalized and errored transactions have reached +// their retention threshold and purges them from the in-memory storage if they have +func (txm *Txm) reap() { + defer txm.done.Done() + ctx, cancel := txm.chStop.NewCtx() + defer cancel() + + tick := time.After(0) + for { + select { + case <-ctx.Done(): + return + case <-tick: + txm.txs.TrimFinalizedErroredTxs() } + tick = time.After(utils.WithJitter(TxReapInterval)) } } -// Enqueue enqueue a msg destined for the solana chain. -func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txCfgs ...SetTxConfig) error { +// Enqueue enqueues a msg destined for the solana chain. +func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txCfgs ...SetTxConfig) error { if err := txm.Ready(); err != nil { return fmt.Errorf("error in soltxm.Enqueue: %w", err) } @@ -552,9 +602,15 @@ func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Tran } } + // Use transaction ID provided by caller if set + id := uuid.New().String() + if txID != nil && *txID != "" { + id = *txID + } msg := pendingTx{ - tx: tx, + tx: *tx, cfg: cfg, + id: id, } select { @@ -566,6 +622,27 @@ func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Tran return nil } +// GetTransactionStatus translates internal TXM transaction statuses to chainlink common statuses +func (txm *Txm) GetTransactionStatus(ctx context.Context, transactionID string) (commontypes.TransactionStatus, error) { + state, err := txm.txs.GetTxState(transactionID) + if err != nil { + return commontypes.Unknown, fmt.Errorf("failed to find transaction with id %s: %w", transactionID, err) + } + + switch state { + case Broadcasted: + return commontypes.Pending, nil + case Processed, Confirmed: + return commontypes.Unconfirmed, nil + case Finalized: + return commontypes.Finalized, nil + case Errored: + return commontypes.Failed, nil + default: + return commontypes.Unknown, fmt.Errorf("found unknown transaction state: %s", state.String()) + } +} + // EstimateComputeUnitLimit estimates the compute unit limit needed for a transaction. // It simulates the provided transaction to determine the used compute and applies a buffer to it. func (txm *Txm) EstimateComputeUnitLimit(ctx context.Context, tx *solanaGo.Transaction) (uint32, error) { @@ -580,7 +657,7 @@ func (txm *Txm) EstimateComputeUnitLimit(ctx context.Context, tx *solanaGo.Trans if len(tx.Signatures) > 0 { sig = tx.Signatures[0] } - txm.processSimulationError(uuid.Nil, sig, res) + txm.processSimulationError("", sig, res) return 0, fmt.Errorf("simulated tx returned error: %v", res.Err) } @@ -623,27 +700,38 @@ func (txm *Txm) simulateTx(ctx context.Context, tx *solanaGo.Transaction) (res * } // processSimulationError parses and handles relevant errors found in simulation results -func (txm *Txm) processSimulationError(id uuid.UUID, sig solanaGo.Signature, res *rpc.SimulateTransactionResult) { +func (txm *Txm) processSimulationError(id string, sig solanaGo.Signature, res *rpc.SimulateTransactionResult) { if res.Err != nil { // handle various errors // https://github.com/solana-labs/solana/blob/master/sdk/src/transaction/error.rs errStr := fmt.Sprintf("%v", res.Err) // convert to string to handle various interfaces + logValues := []interface{}{ + "id", id, + "signature", sig, + "result", res, + } switch { // blockhash not found when simulating, occurs when network bank has not seen the given blockhash or tx is too old // let confirmation process clean up case strings.Contains(errStr, "BlockhashNotFound"): - txm.lggr.Debugw("simulate: BlockhashNotFound", "id", id, "signature", sig, "result", res) + txm.lggr.Debugw("simulate: BlockhashNotFound", logValues...) // transaction will encounter execution error/revert, mark as reverted to remove from confirmation + retry case strings.Contains(errStr, "InstructionError"): - txm.txs.OnError(sig, TxFailSimRevert) // cancel retry - txm.lggr.Debugw("simulate: InstructionError", "id", id, "signature", sig, "result", res) + _, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailSimRevert) // cancel retry + if err != nil { + logValues = append(logValues, "stateTransitionErr", err) + } + txm.lggr.Debugw("simulate: InstructionError", logValues...) // transaction is already processed in the chain, letting txm confirmation handle case strings.Contains(errStr, "AlreadyProcessed"): - txm.lggr.Debugw("simulate: AlreadyProcessed", "id", id, "signature", sig, "result", res) + txm.lggr.Debugw("simulate: AlreadyProcessed", logValues...) // unrecognized errors (indicates more concerning failures) default: - txm.txs.OnError(sig, TxFailSimOther) // cancel retry - txm.lggr.Errorw("simulate: unrecognized error", "id", id, "signature", sig, "result", res) + _, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), TxFailSimOther) // cancel retry + if err != nil { + logValues = append(logValues, "stateTransitionErr", err) + } + txm.lggr.Errorw("simulate: unrecognized error", logValues...) } } } diff --git a/pkg/solana/txm/txm_internal_test.go b/pkg/solana/txm/txm_internal_test.go index 802dc93b2..d246220a7 100644 --- a/pkg/solana/txm/txm_internal_test.go +++ b/pkg/solana/txm/txm_internal_test.go @@ -5,7 +5,7 @@ package txm import ( "context" "errors" - "math/rand" + "math/big" "sync" "testing" "time" @@ -27,23 +27,26 @@ import ( relayconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/utils" + bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" ) type soltxmProm struct { - id string - success, error, revert, reject, drop, simRevert, simOther float64 + id string + confirmed, error, revert, reject, drop, simRevert, simOther, finalized float64 } func (p soltxmProm) assertEqual(t *testing.T) { - assert.Equal(t, p.success, testutil.ToFloat64(promSolTxmSuccessTxs.WithLabelValues(p.id)), "mismatch: success") + assert.Equal(t, p.confirmed, testutil.ToFloat64(promSolTxmSuccessTxs.WithLabelValues(p.id)), "mismatch: confirmed") assert.Equal(t, p.error, testutil.ToFloat64(promSolTxmErrorTxs.WithLabelValues(p.id)), "mismatch: error") assert.Equal(t, p.revert, testutil.ToFloat64(promSolTxmRevertTxs.WithLabelValues(p.id)), "mismatch: revert") assert.Equal(t, p.reject, testutil.ToFloat64(promSolTxmRejectTxs.WithLabelValues(p.id)), "mismatch: reject") assert.Equal(t, p.drop, testutil.ToFloat64(promSolTxmDropTxs.WithLabelValues(p.id)), "mismatch: drop") assert.Equal(t, p.simRevert, testutil.ToFloat64(promSolTxmSimRevertTxs.WithLabelValues(p.id)), "mismatch: simRevert") assert.Equal(t, p.simOther, testutil.ToFloat64(promSolTxmSimOtherTxs.WithLabelValues(p.id)), "mismatch: simOther") + assert.Equal(t, p.finalized, testutil.ToFloat64(promSolTxmFinalizedTxs.WithLabelValues(p.id)), "mismatch: finalized") } func (p soltxmProm) getInflight() float64 { @@ -51,7 +54,7 @@ func (p soltxmProm) getInflight() float64 { } // create placeholder transaction and returns func for signed tx with fee -func getTx(t *testing.T, val uint64, keystore SimpleKeystore, price fees.ComputeUnitPrice) (*solana.Transaction, func(fees.ComputeUnitPrice, bool) *solana.Transaction) { +func getTx(t *testing.T, val uint64, keystore SimpleKeystore) (*solana.Transaction, func(fees.ComputeUnitPrice, bool, fees.ComputeUnitLimit) *solana.Transaction) { pubkey := solana.PublicKey{} // create transfer tx @@ -70,12 +73,12 @@ func getTx(t *testing.T, val uint64, keystore SimpleKeystore, price fees.Compute base := *tx // tx to send to txm, txm will add fee & sign - return &base, func(price fees.ComputeUnitPrice, addLimit bool) *solana.Transaction { + return &base, func(price fees.ComputeUnitPrice, addLimit bool, limit fees.ComputeUnitLimit) *solana.Transaction { tx := base // add fee parameters require.NoError(t, fees.SetComputeUnitPrice(&tx, price)) if addLimit { - require.NoError(t, fees.SetComputeUnitLimit(&tx, 200_000)) // default + require.NoError(t, fees.SetComputeUnitLimit(&tx, limit)) // default } // sign tx @@ -90,6 +93,24 @@ func getTx(t *testing.T, val uint64, keystore SimpleKeystore, price fees.Compute } } +// check if cached transaction is cleared +func empty(t *testing.T, txm *Txm, prom soltxmProm) bool { + count := txm.InflightTxs() + assert.Equal(t, float64(count), prom.getInflight()) // validate prom metric and txs length + return count == 0 +} + +// waits for the provided function to evaluate to true within the provided duration amount of time +func waitFor(t *testing.T, waitDuration time.Duration, txm *Txm, prom soltxmProm, f func(*testing.T, *Txm, soltxmProm) bool) { + for i := 0; i < int(waitDuration.Seconds()*1.5); i++ { + if f(t, txm, prom) { + return + } + time.Sleep(time.Second) + } + assert.NoError(t, errors.New("unable to confirm inflight txs is empty")) +} + func TestTxm(t *testing.T) { for _, eName := range []string{"fixed", "blockhistory"} { estimator := eName @@ -115,35 +136,14 @@ func TestTxm(t *testing.T) { loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil }) txm := NewTxm(id, loader, nil, cfg, mkey, lggr) require.NoError(t, txm.Start(ctx)) + t.Cleanup(func () { require.NoError(t, txm.Close())}) // tracking prom metrics prom := soltxmProm{id: id} - // create random signature - getSig := func() solana.Signature { - sig := make([]byte, 64) - rand.Read(sig) - return solana.SignatureFromBytes(sig) - } - - // check if cached transaction is cleared - empty := func() bool { - count := txm.InflightTxs() - assert.Equal(t, float64(count), prom.getInflight()) // validate prom metric and txs length - return count == 0 - } - // adjust wait time based on config waitDuration := cfg.TxConfirmTimeout() - waitFor := func(f func() bool) { - for i := 0; i < int(waitDuration.Seconds()*1.5); i++ { - if f() { - return - } - time.Sleep(time.Second) - } - assert.NoError(t, errors.New("unable to confirm inflight txs is empty")) - } + computeUnitLimitDefault := fees.ComputeUnitLimit(cfg.ComputeUnitLimitDefault()) // handle signature statuses calls statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{} @@ -161,27 +161,26 @@ func TestTxm(t *testing.T) { }, nil, ) - // happy path (send => simulate success => tx: nil => tx: processed => tx: confirmed => done) + // happy path (send => simulate success => tx: nil => tx: processed => tx: confirmed => finalized => done) t.Run("happyPath", func(t *testing.T) { - sig := getSig() - tx, signed := getTx(t, 0, mkey, 0) + sig := randomSignature(t) + tx, signed := getTx(t, 0, mkey) var wg sync.WaitGroup - wg.Add(3) + wg.Add(1) sendCount := 0 var countRW sync.RWMutex - mc.On("SendTx", mock.Anything, signed(0, true)).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Run(func(mock.Arguments) { countRW.Lock() sendCount++ countRW.Unlock() }).After(500*time.Millisecond).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() // handle signature status calls count := 0 statuses[sig] = func() (out *rpc.SignatureStatusesResult) { defer func() { count++ }() - defer wg.Done() out = &rpc.SignatureStatusesResult{} if count == 1 { @@ -193,15 +192,22 @@ func TestTxm(t *testing.T) { out.ConfirmationStatus = rpc.ConfirmationStatusConfirmed return } + + if count == 3 { + out.ConfirmationStatus = rpc.ConfirmationStatusFinalized + wg.Done() + return + } return nil } // send tx - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) wg.Wait() // no transactions stored inflight txs list - waitFor(empty) + waitFor(t, waitDuration, txm, prom, empty) // transaction should be sent more than twice countRW.RLock() t.Logf("sendTx received %d calls", sendCount) @@ -212,43 +218,51 @@ func TestTxm(t *testing.T) { mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() // check prom metric - prom.success++ + prom.confirmed++ + prom.finalized++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) // fail on initial transmit (RPC immediate rejects) t.Run("fail_initialTx", func(t *testing.T) { - tx, signed := getTx(t, 1, mkey, 0) + tx, signed := getTx(t, 1, mkey) var wg sync.WaitGroup wg.Add(1) // should only be called once (tx does not start retry, confirming, or simulation) - mc.On("SendTx", mock.Anything, signed(0, true)).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Run(func(mock.Arguments) { wg.Done() }).Return(solana.Signature{}, errors.New("FAIL")).Once() // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) wg.Wait() // wait to be picked up and processed // no transactions stored inflight txs list - waitFor(empty) + waitFor(t, waitDuration, txm, prom, empty) // check prom metric prom.error++ prom.reject++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) // tx fails simulation (simulation error) t.Run("fail_simulation", func(t *testing.T) { - tx, signed := getTx(t, 2, mkey, 0) - sig := getSig() + tx, signed := getTx(t, 2, mkey) + sig := randomSignature(t) var wg sync.WaitGroup wg.Add(1) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{ Err: "FAIL", @@ -256,47 +270,55 @@ func TestTxm(t *testing.T) { // signature status is nil (handled automatically) // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // txs cleared quickly + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // txs cleared quickly // check prom metric prom.error++ prom.simOther++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) // tx fails simulation (rpc error, timeout should clean up b/c sig status will be nil) t.Run("fail_simulation_confirmNil", func(t *testing.T) { - tx, signed := getTx(t, 3, mkey, 0) - sig := getSig() - retry0 := getSig() - retry1 := getSig() - retry2 := getSig() - retry3 := getSig() + tx, signed := getTx(t, 3, mkey) + sig := randomSignature(t) + retry0 := randomSignature(t) + retry1 := randomSignature(t) + retry2 := randomSignature(t) + retry3 := randomSignature(t) var wg sync.WaitGroup wg.Add(1) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SendTx", mock.Anything, signed(1, true)).Return(retry0, nil) - mc.On("SendTx", mock.Anything, signed(2, true)).Return(retry1, nil) - mc.On("SendTx", mock.Anything, signed(3, true)).Return(retry2, nil).Maybe() - mc.On("SendTx", mock.Anything, signed(4, true)).Return(retry3, nil).Maybe() - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SendTx", mock.Anything, signed(1, true, computeUnitLimitDefault)).Return(retry0, nil) + mc.On("SendTx", mock.Anything, signed(2, true, computeUnitLimitDefault)).Return(retry1, nil) + mc.On("SendTx", mock.Anything, signed(3, true, computeUnitLimitDefault)).Return(retry2, nil).Maybe() + mc.On("SendTx", mock.Anything, signed(4, true, computeUnitLimitDefault)).Return(retry3, nil).Maybe() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{}, errors.New("FAIL")).Once() // all signature statuses are nil, handled automatically // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout // check prom metric prom.error++ prom.drop++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) @@ -304,8 +326,8 @@ func TestTxm(t *testing.T) { // tx fails simulation with an InstructionError (indicates reverted execution) // manager should cancel sending retry immediately + increment reverted prom metric t.Run("fail_simulation_instructionError", func(t *testing.T) { - tx, signed := getTx(t, 4, mkey, 0) - sig := getSig() + tx, signed := getTx(t, 4, mkey) + sig := randomSignature(t) var wg sync.WaitGroup wg.Add(1) @@ -315,8 +337,8 @@ func TestTxm(t *testing.T) { 0, map[string]int{"Custom": 6003}, }, } - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{ Err: tempErr, @@ -324,29 +346,33 @@ func TestTxm(t *testing.T) { // all signature statuses are nil, handled automatically // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout // check prom metric prom.error++ prom.simRevert++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) // tx fails simulation with BlockHashNotFound error - // txm should continue to confirm tx (in this case it will succeed) + // txm should continue to finalize tx (in this case it will succeed) t.Run("fail_simulation_blockhashNotFound", func(t *testing.T) { - tx, signed := getTx(t, 5, mkey, 0) - sig := getSig() + tx, signed := getTx(t, 5, mkey) + sig := randomSignature(t) var wg sync.WaitGroup - wg.Add(3) + wg.Add(2) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{ Err: "BlockhashNotFound", @@ -356,25 +382,34 @@ func TestTxm(t *testing.T) { count := 0 statuses[sig] = func() (out *rpc.SignatureStatusesResult) { defer func() { count++ }() - defer wg.Done() out = &rpc.SignatureStatusesResult{} - if count == 1 { + if count == 0 { out.ConfirmationStatus = rpc.ConfirmationStatusConfirmed return } + if count == 1 { + out.ConfirmationStatus = rpc.ConfirmationStatusFinalized + wg.Done() + return + } return nil } // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout // check prom metric - prom.success++ + prom.confirmed++ + prom.finalized++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) @@ -382,13 +417,13 @@ func TestTxm(t *testing.T) { // tx fails simulation with AlreadyProcessed error // txm should continue to confirm tx (in this case it will revert) t.Run("fail_simulation_alreadyProcessed", func(t *testing.T) { - tx, signed := getTx(t, 6, mkey, 0) - sig := getSig() + tx, signed := getTx(t, 6, mkey) + sig := randomSignature(t) var wg sync.WaitGroup wg.Add(2) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{ Err: "AlreadyProcessed", @@ -404,36 +439,40 @@ func TestTxm(t *testing.T) { } // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // txs cleared after timeout // check prom metric prom.revert++ prom.error++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) // tx passes sim, never passes processed (timeout should cleanup) t.Run("fail_confirm_processed", func(t *testing.T) { - tx, signed := getTx(t, 7, mkey, 0) - sig := getSig() - retry0 := getSig() - retry1 := getSig() - retry2 := getSig() - retry3 := getSig() + tx, signed := getTx(t, 7, mkey) + sig := randomSignature(t) + retry0 := randomSignature(t) + retry1 := randomSignature(t) + retry2 := randomSignature(t) + retry3 := randomSignature(t) var wg sync.WaitGroup wg.Add(1) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SendTx", mock.Anything, signed(1, true)).Return(retry0, nil) - mc.On("SendTx", mock.Anything, signed(2, true)).Return(retry1, nil) - mc.On("SendTx", mock.Anything, signed(3, true)).Return(retry2, nil).Maybe() - mc.On("SendTx", mock.Anything, signed(4, true)).Return(retry3, nil).Maybe() - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SendTx", mock.Anything, signed(1, true, computeUnitLimitDefault)).Return(retry0, nil) + mc.On("SendTx", mock.Anything, signed(2, true, computeUnitLimitDefault)).Return(retry1, nil) + mc.On("SendTx", mock.Anything, signed(3, true, computeUnitLimitDefault)).Return(retry2, nil).Maybe() + mc.On("SendTx", mock.Anything, signed(4, true, computeUnitLimitDefault)).Return(retry3, nil).Maybe() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{}, nil).Once() @@ -445,36 +484,40 @@ func TestTxm(t *testing.T) { } // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // inflight txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout // check prom metric prom.error++ prom.drop++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) // tx passes sim, shows processed, moves to nil (timeout should cleanup) t.Run("fail_confirm_processedToNil", func(t *testing.T) { - tx, signed := getTx(t, 8, mkey, 0) - sig := getSig() - retry0 := getSig() - retry1 := getSig() - retry2 := getSig() - retry3 := getSig() + tx, signed := getTx(t, 8, mkey) + sig := randomSignature(t) + retry0 := randomSignature(t) + retry1 := randomSignature(t) + retry2 := randomSignature(t) + retry3 := randomSignature(t) var wg sync.WaitGroup wg.Add(1) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SendTx", mock.Anything, signed(1, true)).Return(retry0, nil) - mc.On("SendTx", mock.Anything, signed(2, true)).Return(retry1, nil) - mc.On("SendTx", mock.Anything, signed(3, true)).Return(retry2, nil).Maybe() - mc.On("SendTx", mock.Anything, signed(4, true)).Return(retry3, nil).Maybe() - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SendTx", mock.Anything, signed(1, true, computeUnitLimitDefault)).Return(retry0, nil) + mc.On("SendTx", mock.Anything, signed(2, true, computeUnitLimitDefault)).Return(retry1, nil) + mc.On("SendTx", mock.Anything, signed(3, true, computeUnitLimitDefault)).Return(retry2, nil).Maybe() + mc.On("SendTx", mock.Anything, signed(4, true, computeUnitLimitDefault)).Return(retry3, nil).Maybe() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{}, nil).Once() @@ -493,28 +536,32 @@ func TestTxm(t *testing.T) { } // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // inflight txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout // check prom metric prom.error++ prom.drop++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) // tx passes sim, errors on confirm t.Run("fail_confirm_revert", func(t *testing.T) { - tx, signed := getTx(t, 9, mkey, 0) - sig := getSig() + tx, signed := getTx(t, 9, mkey) + sig := randomSignature(t) var wg sync.WaitGroup wg.Add(1) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{}, nil).Once() @@ -527,36 +574,40 @@ func TestTxm(t *testing.T) { } // tx should be able to queue - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) - wg.Wait() // wait to be picked up and processed - waitFor(empty) // inflight txs cleared after timeout + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, waitDuration, txm, prom, empty) // inflight txs cleared after timeout // check prom metric prom.error++ prom.revert++ prom.assertEqual(t) + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status + // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() }) // tx passes sim, first retried TXs get dropped t.Run("success_retryTx", func(t *testing.T) { - tx, signed := getTx(t, 10, mkey, 0) - sig := getSig() - retry0 := getSig() - retry1 := getSig() - retry2 := getSig() - retry3 := getSig() + tx, signed := getTx(t, 10, mkey) + sig := randomSignature(t) + retry0 := randomSignature(t) + retry1 := randomSignature(t) + retry2 := randomSignature(t) + retry3 := randomSignature(t) var wg sync.WaitGroup wg.Add(2) - mc.On("SendTx", mock.Anything, signed(0, true)).Return(sig, nil) - mc.On("SendTx", mock.Anything, signed(1, true)).Return(retry0, nil) - mc.On("SendTx", mock.Anything, signed(2, true)).Return(retry1, nil) - mc.On("SendTx", mock.Anything, signed(3, true)).Return(retry2, nil).Maybe() - mc.On("SendTx", mock.Anything, signed(4, true)).Return(retry3, nil).Maybe() - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SendTx", mock.Anything, signed(1, true, computeUnitLimitDefault)).Return(retry0, nil) + mc.On("SendTx", mock.Anything, signed(2, true, computeUnitLimitDefault)).Return(retry1, nil) + mc.On("SendTx", mock.Anything, signed(3, true, computeUnitLimitDefault)).Return(retry2, nil).Maybe() + mc.On("SendTx", mock.Anything, signed(4, true, computeUnitLimitDefault)).Return(retry3, nil).Maybe() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { wg.Done() }).Return(&rpc.SimulateTransactionResult{}, nil).Once() @@ -564,52 +615,57 @@ func TestTxm(t *testing.T) { statuses[retry1] = func() (out *rpc.SignatureStatusesResult) { defer wg.Done() return &rpc.SignatureStatusesResult{ - ConfirmationStatus: rpc.ConfirmationStatusConfirmed, + ConfirmationStatus: rpc.ConfirmationStatusFinalized, } } // send tx - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx)) + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) wg.Wait() // no transactions stored inflight txs list - waitFor(empty) + waitFor(t, waitDuration, txm, prom, empty) // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() // check prom metric - prom.success++ + prom.finalized++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) // fee bumping disabled t.Run("feeBumpingDisabled", func(t *testing.T) { - sig := getSig() - tx, signed := getTx(t, 11, mkey, 0) - - defaultFeeBumpPeriod := cfg.FeeBumpPeriod() + sig := randomSignature(t) + tx, signed := getTx(t, 11, mkey) sendCount := 0 var countRW sync.RWMutex - mc.On("SendTx", mock.Anything, signed(0, true)).Run(func(mock.Arguments) { + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Run(func(mock.Arguments) { countRW.Lock() sendCount++ countRW.Unlock() }).Return(sig, nil) // only sends one transaction type (no bumping) - mc.On("SimulateTx", mock.Anything, signed(0, true), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() // handle signature status calls var wg sync.WaitGroup wg.Add(1) count := 0 - start := time.Now() statuses[sig] = func() (out *rpc.SignatureStatusesResult) { defer func() { count++ }() out = &rpc.SignatureStatusesResult{} - if time.Since(start) > 2*defaultFeeBumpPeriod { + if count == 1 { out.ConfirmationStatus = rpc.ConfirmationStatusConfirmed + return + } + if count == 2 { + out.ConfirmationStatus = rpc.ConfirmationStatusFinalized wg.Done() return } @@ -618,11 +674,12 @@ func TestTxm(t *testing.T) { } // send tx - with disabled fee bumping - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, SetFeeBumpPeriod(0))) + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, SetFeeBumpPeriod(0))) wg.Wait() // no transactions stored inflight txs list - waitFor(empty) + waitFor(t, waitDuration, txm, prom, empty) // transaction should be sent more than twice countRW.RLock() t.Logf("sendTx received %d calls", sendCount) @@ -633,48 +690,309 @@ func TestTxm(t *testing.T) { mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() // check prom metric - prom.success++ + prom.confirmed++ + prom.finalized++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) // compute unit limit disabled t.Run("computeUnitLimitDisabled", func(t *testing.T) { - sig := getSig() - tx, signed := getTx(t, 12, mkey, 0) + sig := randomSignature(t) + tx, signed := getTx(t, 12, mkey) // should only match transaction without compute unit limit - assert.Len(t, signed(0, false).Message.Instructions, 2) - mc.On("SendTx", mock.Anything, signed(0, false)).Return(sig, nil) // only sends one transaction type (no bumping) - mc.On("SimulateTx", mock.Anything, signed(0, false), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() + assert.Len(t, signed(0, false, computeUnitLimitDefault).Message.Instructions, 2) + mc.On("SendTx", mock.Anything, signed(0, false, computeUnitLimitDefault)).Return(sig, nil) // only sends one transaction type (no bumping) + mc.On("SimulateTx", mock.Anything, signed(0, false, computeUnitLimitDefault), mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Once() // handle signature status calls var wg sync.WaitGroup wg.Add(1) + count := 0 statuses[sig] = func() *rpc.SignatureStatusesResult { - defer wg.Done() + defer func() { count++ }() + if count == 0 { + return &rpc.SignatureStatusesResult{ + ConfirmationStatus: rpc.ConfirmationStatusConfirmed, + } + } + wg.Done() return &rpc.SignatureStatusesResult{ - ConfirmationStatus: rpc.ConfirmationStatusConfirmed, + ConfirmationStatus: rpc.ConfirmationStatusFinalized, } } // send tx - with disabled fee bumping and disabled compute unit limit - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, SetFeeBumpPeriod(0), SetComputeUnitLimit(0))) + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, SetFeeBumpPeriod(0), SetComputeUnitLimit(0))) wg.Wait() // no transactions stored inflight txs list - waitFor(empty) + waitFor(t, waitDuration, txm, prom, empty) // panic if sendTx called after context cancelled mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() // check prom metric - prom.success++ + prom.confirmed++ + prom.finalized++ prom.assertEqual(t) + + _, err := txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) // transaction cleared from storage after finalized should not return status }) }) } } +func TestTxm_disabled_confirm_timeout_with_retention(t *testing.T) { + t.Parallel() // run estimator tests in parallel + + // set up configs needed in txm + estimator := "fixed" + id := "mocknet-" + estimator + "-" + uuid.NewString() + t.Logf("Starting new iteration: %s", id) + + ctx := tests.Context(t) + lggr := logger.Test(t) + cfg := config.NewDefault() + cfg.Chain.FeeEstimatorMode = &estimator + // Disable confirm timeout + cfg.Chain.TxConfirmTimeout = relayconfig.MustNewDuration(0 * time.Second) + // Enable retention timeout to keep transactions after finality + cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second) + mc := mocks.NewReaderWriter(t) + mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe() + + computeUnitLimitDefault := fees.ComputeUnitLimit(cfg.ComputeUnitLimitDefault()) + + // mock solana keystore + mkey := keyMocks.NewSimpleKeystore(t) + mkey.On("Sign", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, nil) + + loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil }) + txm := NewTxm(id, loader, nil, cfg, mkey, lggr) + require.NoError(t, txm.Start(ctx)) + t.Cleanup(func () { require.NoError(t, txm.Close())}) + + // tracking prom metrics + prom := soltxmProm{id: id} + + // handle signature statuses calls + statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{} + mc.On("SignatureStatuses", mock.Anything, mock.AnythingOfType("[]solana.Signature")).Return( + func(_ context.Context, sigs []solana.Signature) (out []*rpc.SignatureStatusesResult) { + for i := range sigs { + get, exists := statuses[sigs[i]] + if !exists { + out = append(out, nil) + continue + } + out = append(out, get()) + } + return out + }, nil, + ) + + // Test tx is not discarded due to confirm timeout and tracked to finalization + tx, signed := getTx(t, 7, mkey) + sig := randomSignature(t) + retry0 := randomSignature(t) + retry1 := randomSignature(t) + var wg sync.WaitGroup + wg.Add(2) + + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimitDefault)).Return(sig, nil) + mc.On("SendTx", mock.Anything, signed(1, true, computeUnitLimitDefault)).Return(retry0, nil).Maybe() + mc.On("SendTx", mock.Anything, signed(2, true, computeUnitLimitDefault)).Return(retry1, nil).Maybe() + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimitDefault), mock.Anything).Run(func(mock.Arguments) { + wg.Done() + }).Return(&rpc.SimulateTransactionResult{}, nil).Once() + + // handle signature status calls (initial stays processed, others don't exist) + start := time.Now() + statuses[sig] = func() (out *rpc.SignatureStatusesResult) { + out = &rpc.SignatureStatusesResult{} + // return confirmed status after default confirmation timeout + if time.Since(start) > 1*time.Second && time.Since(start) < 2*time.Second { + out.ConfirmationStatus = rpc.ConfirmationStatusConfirmed + return + } + // return finalized status only after the confirmation timeout + if time.Since(start) >= 2*time.Second { + out.ConfirmationStatus = rpc.ConfirmationStatusFinalized + wg.Done() + return + } + out.ConfirmationStatus = rpc.ConfirmationStatusProcessed + return + } + + // tx should be able to queue + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() // wait to be picked up and processed + waitFor(t, 5*time.Second, txm, prom, empty) // inflight txs cleared after timeout + + // panic if sendTx called after context cancelled + mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() + + // check prom metric + prom.confirmed++ + prom.finalized++ + prom.assertEqual(t) + + // check transaction status which should still be stored + status, err := txm.GetTransactionStatus(ctx, testTxID) + require.NoError(t, err) + require.Equal(t, types.Finalized, status) + + // Sleep until retention period has passed for transaction and for another reap cycle to run + time.Sleep(10 *time.Second) + + // check if transaction has been purged from memory + status, err = txm.GetTransactionStatus(ctx, testTxID) + require.Error(t, err) + require.Equal(t, types.Unknown, status) +} + +func TestTxm_compute_unit_limit_estimation(t *testing.T) { + t.Parallel() // run estimator tests in parallel + + // set up configs needed in txm + estimator := "fixed" + id := "mocknet-" + estimator + "-" + uuid.NewString() + t.Logf("Starting new iteration: %s", id) + + ctx := tests.Context(t) + lggr := logger.Test(t) + cfg := config.NewDefault() + cfg.Chain.FeeEstimatorMode = &estimator + // Enable compute unit limit estimation feature + estimateComputeUnitLimit := true + cfg.Chain.EstimateComputeUnitLimit = &estimateComputeUnitLimit + // Enable retention timeout to keep transactions after finality or error + cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(5 * time.Second) + mc := mocks.NewReaderWriter(t) + mc.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil).Maybe() + + // mock solana keystore + mkey := keyMocks.NewSimpleKeystore(t) + mkey.On("Sign", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, nil) + + loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil }) + txm := NewTxm(id, loader, nil, cfg, mkey, lggr) + require.NoError(t, txm.Start(ctx)) + t.Cleanup(func () { require.NoError(t, txm.Close())}) + + // tracking prom metrics + prom := soltxmProm{id: id} + + // handle signature statuses calls + statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{} + mc.On("SignatureStatuses", mock.Anything, mock.AnythingOfType("[]solana.Signature")).Return( + func(_ context.Context, sigs []solana.Signature) (out []*rpc.SignatureStatusesResult) { + for i := range sigs { + get, exists := statuses[sigs[i]] + if !exists { + out = append(out, nil) + continue + } + out = append(out, get()) + } + return out + }, nil, + ) + + t.Run("simulation_succeeds", func(t *testing.T) { + // Test tx is not discarded due to confirm timeout and tracked to finalization + tx, signed := getTx(t, 1, mkey) + sig := randomSignature(t) + var wg sync.WaitGroup + wg.Add(3) + + computeUnitConsumed := uint64(1_000_000) + computeUnitLimit := fees.ComputeUnitLimit(uint32(bigmath.AddPercentage(new(big.Int).SetUint64(computeUnitConsumed), EstimateComputeUnitLimitBuffer).Uint64())) + mc.On("SendTx", mock.Anything, signed(0, true, computeUnitLimit)).Return(sig, nil) + // First simulated before broadcast without signature or compute unit limit set + mc.On("SimulateTx", mock.Anything, tx, mock.Anything).Run(func(mock.Arguments) { + wg.Done() + }).Return(&rpc.SimulateTransactionResult{UnitsConsumed: &computeUnitConsumed}, nil).Once() + // Second simulated after broadcast with signature and compute unit limit set + mc.On("SimulateTx", mock.Anything, signed(0, true, computeUnitLimit), mock.Anything).Run(func(mock.Arguments) { + wg.Done() + }).Return(&rpc.SimulateTransactionResult{UnitsConsumed: &computeUnitConsumed}, nil).Once() + + // handle signature status calls + count := 0 + statuses[sig] = func() (out *rpc.SignatureStatusesResult) { + defer func() { count++ }() + out = &rpc.SignatureStatusesResult{} + if count == 1 { + out.ConfirmationStatus = rpc.ConfirmationStatusProcessed + return + } + if count == 2 { + out.ConfirmationStatus = rpc.ConfirmationStatusConfirmed + return + } + if count == 3 { + out.ConfirmationStatus = rpc.ConfirmationStatusFinalized + wg.Done() + return + } + return nil + } + + // send tx + testTxID := uuid.New().String() + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID)) + wg.Wait() + + // no transactions stored inflight txs list + waitFor(t, txm.cfg.TxConfirmTimeout(), txm, prom, empty) + + // panic if sendTx called after context cancelled + mc.On("SendTx", mock.Anything, tx).Panic("SendTx should not be called anymore").Maybe() + + // check prom metric + prom.confirmed++ + prom.finalized++ + prom.assertEqual(t) + + status, err := txm.GetTransactionStatus(ctx, testTxID) + require.NoError(t, err) + require.Equal(t, types.Finalized, status) + }) + + t.Run("simulation_fails", func(t *testing.T) { + // Test tx is not discarded due to confirm timeout and tracked to finalization + tx, signed := getTx(t, 1, mkey) + sig := randomSignature(t) + + mc.On("SendTx", mock.Anything, signed(0, true, fees.ComputeUnitLimit(0))).Return(sig, nil).Panic("SendTx should never be called").Maybe() + mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("simulation failed")).Once() + + // tx should NOT be able to queue + assert.Error(t, txm.Enqueue(ctx, t.Name(), tx, nil)) + }) + + t.Run("simulation_returns_error", func(t *testing.T) { + // Test tx is not discarded due to confirm timeout and tracked to finalization + tx, signed := getTx(t, 1, mkey) + sig := randomSignature(t) + + mc.On("SendTx", mock.Anything, signed(0, true, fees.ComputeUnitLimit(0))).Return(sig, nil).Panic("SendTx should never be called").Maybe() + mc.On("SimulateTx", mock.Anything, tx, mock.Anything).Return(&rpc.SimulateTransactionResult{Err: errors.New("tx err")}, nil).Once() + + // tx should NOT be able to queue + assert.Error(t, txm.Enqueue(ctx, t.Name(), tx, nil)) + }) +} + func TestTxm_Enqueue(t *testing.T) { // set up configs needed in txm lggr := logger.Test(t) @@ -729,7 +1047,7 @@ func TestTxm_Enqueue(t *testing.T) { loader := utils.NewLazyLoad(func() (client.ReaderWriter, error) { return mc, nil }) txm := NewTxm("enqueue_test", loader, nil, cfg, mkey, lggr) - require.ErrorContains(t, txm.Enqueue(ctx, "txmUnstarted", &solana.Transaction{}), "not started") + require.ErrorContains(t, txm.Enqueue(ctx, "txmUnstarted", &solana.Transaction{}, nil), "not started") require.NoError(t, txm.Start(ctx)) t.Cleanup(func() { require.NoError(t, txm.Close()) }) @@ -747,10 +1065,10 @@ func TestTxm_Enqueue(t *testing.T) { for _, run := range txs { t.Run(run.name, func(t *testing.T) { if !run.fail { - assert.NoError(t, txm.Enqueue(ctx, run.name, run.tx)) + assert.NoError(t, txm.Enqueue(ctx, run.name, run.tx, nil)) return } - assert.Error(t, txm.Enqueue(ctx, run.name, run.tx)) + assert.Error(t, txm.Enqueue(ctx, run.name, run.tx, nil)) }) } } diff --git a/pkg/solana/txm/txm_load_test.go b/pkg/solana/txm/txm_load_test.go index 744610e1f..5d5a8061b 100644 --- a/pkg/solana/txm/txm_load_test.go +++ b/pkg/solana/txm/txm_load_test.go @@ -104,16 +104,16 @@ func TestTxm_Integration(t *testing.T) { } // enqueue txs (must pass to move on to load test) - require.NoError(t, txm.Enqueue(ctx, "test_success_0", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) - require.Error(t, txm.Enqueue(ctx, "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) // cannot sign tx before enqueuing - require.NoError(t, txm.Enqueue(ctx, "test_invalidReceiver", createTx(pubKey, pubKey, solana.PublicKey{}, solana.LAMPORTS_PER_SOL))) + require.NoError(t, txm.Enqueue(ctx, "test_success_0", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) + require.Error(t, txm.Enqueue(ctx, "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing + require.NoError(t, txm.Enqueue(ctx, "test_invalidReceiver", createTx(pubKey, pubKey, solana.PublicKey{}, solana.LAMPORTS_PER_SOL), nil)) time.Sleep(500 * time.Millisecond) // pause 0.5s for new blockhash - require.NoError(t, txm.Enqueue(ctx, "test_success_1", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) - require.NoError(t, txm.Enqueue(ctx, "test_txFail", createTx(pubKey, pubKey, pubKeyReceiver, 1000*solana.LAMPORTS_PER_SOL))) + require.NoError(t, txm.Enqueue(ctx, "test_success_1", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) + require.NoError(t, txm.Enqueue(ctx, "test_txFail", createTx(pubKey, pubKey, pubKeyReceiver, 1000*solana.LAMPORTS_PER_SOL), nil)) // load test: try to overload txs, confirm, or simulation for i := 0; i < 1000; i++ { - assert.NoError(t, txm.Enqueue(ctx, fmt.Sprintf("load_%d", i), createTx(loadTestKey.PublicKey(), loadTestKey.PublicKey(), loadTestKey.PublicKey(), uint64(i)))) + assert.NoError(t, txm.Enqueue(ctx, fmt.Sprintf("load_%d", i), createTx(loadTestKey.PublicKey(), loadTestKey.PublicKey(), loadTestKey.PublicKey(), uint64(i)), nil)) time.Sleep(10 * time.Millisecond) // ~100 txs per second (note: have run 5ms delays for ~200tx/s succesfully) } diff --git a/pkg/solana/txm/txm_race_test.go b/pkg/solana/txm/txm_race_test.go index 81f2c15f6..42062718f 100644 --- a/pkg/solana/txm/txm_race_test.go +++ b/pkg/solana/txm/txm_race_test.go @@ -27,9 +27,11 @@ import ( "github.com/stretchr/testify/require" ) -func NewTestTx() (tx solanaGo.Transaction) { +func NewTestMsg() (msg pendingTx) { + tx := solanaGo.Transaction{} tx.Message.AccountKeys = append(tx.Message.AccountKeys, solanaGo.PublicKey{}) - return tx + msg.tx = tx + return msg } // Test race condition for saving + reading signatures when bumping fees @@ -59,7 +61,7 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { ks.On("Sign", mock.Anything, mock.Anything, mock.Anything).Return([]byte{}, nil) // assemble minimal tx for testing retry - tx := NewTestTx() + msg := NewTestMsg() testRunner := func(t *testing.T, client solanaClient.ReaderWriter) { // build minimal txm @@ -69,11 +71,9 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { txm := NewTxm("retry_race", loader, nil, cfg, ks, lggr) txm.fee = fee - _, _, _, err := txm.sendWithRetry( - tests.Context(t), - tx, - txm.defaultTxConfig(), - ) + msg.cfg = txm.defaultTxConfig() + + _, _, _, err := txm.sendWithRetry(tests.Context(t), msg) require.NoError(t, err) time.Sleep(txRetryDuration / 4 * 5) // wait 1.25x longer of tx life to capture all logs @@ -206,34 +206,34 @@ func TestTxm_SendWithRetry_Race(t *testing.T) { t.Run("bumping tx errors and ctx cleans up waitgroup blocks", func(t *testing.T) { client := clientmocks.NewReaderWriter(t) // client mock - first tx is always successful - tx0 := NewTestTx() - require.NoError(t, fees.SetComputeUnitPrice(&tx0, 0)) - require.NoError(t, fees.SetComputeUnitLimit(&tx0, 200_000)) - tx0.Signatures = make([]solanaGo.Signature, 1) - client.On("SendTx", mock.Anything, &tx0).Return(solanaGo.Signature{1}, nil) + msg0 := NewTestMsg() + require.NoError(t, fees.SetComputeUnitPrice(&msg0.tx, 0)) + require.NoError(t, fees.SetComputeUnitLimit(&msg0.tx, 200_000)) + msg0.tx.Signatures = make([]solanaGo.Signature, 1) + client.On("SendTx", mock.Anything, &msg0.tx).Return(solanaGo.Signature{1}, nil) // init bump tx fails, rebroadcast is successful - tx1 := NewTestTx() - require.NoError(t, fees.SetComputeUnitPrice(&tx1, 1)) - require.NoError(t, fees.SetComputeUnitLimit(&tx1, 200_000)) - tx1.Signatures = make([]solanaGo.Signature, 1) - client.On("SendTx", mock.Anything, &tx1).Return(solanaGo.Signature{}, fmt.Errorf("BUMP FAILED")).Once() - client.On("SendTx", mock.Anything, &tx1).Return(solanaGo.Signature{2}, nil) + msg1 := NewTestMsg() + require.NoError(t, fees.SetComputeUnitPrice(&msg1.tx, 1)) + require.NoError(t, fees.SetComputeUnitLimit(&msg1.tx, 200_000)) + msg1.tx.Signatures = make([]solanaGo.Signature, 1) + client.On("SendTx", mock.Anything, &msg1.tx).Return(solanaGo.Signature{}, fmt.Errorf("BUMP FAILED")).Once() + client.On("SendTx", mock.Anything, &msg1.tx).Return(solanaGo.Signature{2}, nil) // init bump tx success, rebroadcast fails - tx2 := NewTestTx() - require.NoError(t, fees.SetComputeUnitPrice(&tx2, 2)) - require.NoError(t, fees.SetComputeUnitLimit(&tx2, 200_000)) - tx2.Signatures = make([]solanaGo.Signature, 1) - client.On("SendTx", mock.Anything, &tx2).Return(solanaGo.Signature{3}, nil).Once() - client.On("SendTx", mock.Anything, &tx2).Return(solanaGo.Signature{}, fmt.Errorf("REBROADCAST FAILED")) + msg2 := NewTestMsg() + require.NoError(t, fees.SetComputeUnitPrice(&msg2.tx, 2)) + require.NoError(t, fees.SetComputeUnitLimit(&msg2.tx, 200_000)) + msg2.tx.Signatures = make([]solanaGo.Signature, 1) + client.On("SendTx", mock.Anything, &msg2.tx).Return(solanaGo.Signature{3}, nil).Once() + client.On("SendTx", mock.Anything, &msg2.tx).Return(solanaGo.Signature{}, fmt.Errorf("REBROADCAST FAILED")) // always successful - tx3 := NewTestTx() - require.NoError(t, fees.SetComputeUnitPrice(&tx3, 4)) - require.NoError(t, fees.SetComputeUnitLimit(&tx3, 200_000)) - tx3.Signatures = make([]solanaGo.Signature, 1) - client.On("SendTx", mock.Anything, &tx3).Return(solanaGo.Signature{4}, nil) + msg3 := NewTestMsg() + require.NoError(t, fees.SetComputeUnitPrice(&msg3.tx, 4)) + require.NoError(t, fees.SetComputeUnitLimit(&msg3.tx, 200_000)) + msg3.tx.Signatures = make([]solanaGo.Signature, 1) + client.On("SendTx", mock.Anything, &msg3.tx).Return(solanaGo.Signature{4}, nil) testRunner(t, client) }) diff --git a/pkg/solana/txm/utils.go b/pkg/solana/txm/utils.go index 360a2330e..6b2253818 100644 --- a/pkg/solana/txm/utils.go +++ b/pkg/solana/txm/utils.go @@ -11,17 +11,42 @@ import ( "github.com/gagliardetto/solana-go/rpc" ) +type TxState int + // tx not found +// < tx errored +// < tx broadcasted // < tx processed -// < tx confirmed/finalized + revert -// < tx confirmed/finalized + success +// < tx confirmed +// < tx finalized const ( - NotFound = iota + NotFound TxState = iota + Errored + Broadcasted Processed - ConfirmedRevert - ConfirmedSuccess + Confirmed + Finalized ) +func (s TxState) String() string { + switch s { + case NotFound: + return "NotFound" + case Errored: + return "Errored" + case Broadcasted: + return "Broadcasted" + case Processed: + return "Processed" + case Confirmed: + return "Confirmed" + case Finalized: + return "Finalized" + default: + return fmt.Sprintf("TxState(%d)", s) + } +} + type statuses struct { sigs []solana.Signature res []*rpc.SignatureStatusesResult @@ -53,7 +78,7 @@ func SortSignaturesAndResults(sigs []solana.Signature, res []*rpc.SignatureStatu return s.sigs, s.res, nil } -func convertStatus(res *rpc.SignatureStatusesResult) uint { +func convertStatus(res *rpc.SignatureStatusesResult) TxState { if res == nil { return NotFound } @@ -62,12 +87,21 @@ func convertStatus(res *rpc.SignatureStatusesResult) uint { return Processed } - if res.ConfirmationStatus == rpc.ConfirmationStatusConfirmed || - res.ConfirmationStatus == rpc.ConfirmationStatusFinalized { + if res.ConfirmationStatus == rpc.ConfirmationStatusConfirmed { + // If result contains error, consider the transaction errored to avoid wasted resources on re-org and expiration protection + if res.Err != nil { + return Errored + } + return Confirmed + } + + if res.ConfirmationStatus == rpc.ConfirmationStatusFinalized { + // If result contains error, consider the transaction errored + // Should be caught earlier but checked here in case confirmed is skipped due to delays or slow polling if res.Err != nil { - return ConfirmedRevert + return Errored } - return ConfirmedSuccess + return Finalized } return NotFound diff --git a/pkg/solana/txm/utils_test.go b/pkg/solana/txm/utils_test.go index 0530495d7..f4ac868ff 100644 --- a/pkg/solana/txm/utils_test.go +++ b/pkg/solana/txm/utils_test.go @@ -29,15 +29,15 @@ func TestSortSignaturesAndResults(t *testing.T) { sig, statuses, err = SortSignaturesAndResults(sig, statuses) require.NoError(t, err) - // new expected order [1, 3, 0, 2] + // new expected order [1, 0, 3, 2] assert.Equal(t, rpc.SignatureStatusesResult{ConfirmationStatus: rpc.ConfirmationStatusConfirmed}, *statuses[0]) - assert.Equal(t, rpc.SignatureStatusesResult{ConfirmationStatus: rpc.ConfirmationStatusConfirmed, Err: "ERROR"}, *statuses[1]) - assert.Equal(t, rpc.SignatureStatusesResult{ConfirmationStatus: rpc.ConfirmationStatusProcessed}, *statuses[2]) + assert.Equal(t, rpc.SignatureStatusesResult{ConfirmationStatus: rpc.ConfirmationStatusProcessed}, *statuses[1]) + assert.Equal(t, rpc.SignatureStatusesResult{ConfirmationStatus: rpc.ConfirmationStatusConfirmed, Err: "ERROR"}, *statuses[2]) assert.True(t, nil == statuses[3]) assert.Equal(t, solana.Signature{1}, sig[0]) - assert.Equal(t, solana.Signature{3}, sig[1]) - assert.Equal(t, solana.Signature{0}, sig[2]) + assert.Equal(t, solana.Signature{0}, sig[1]) + assert.Equal(t, solana.Signature{3}, sig[2]) assert.Equal(t, solana.Signature{2}, sig[3]) }