diff --git a/services/horizon/internal/httpx/server.go b/services/horizon/internal/httpx/server.go index 7d9dc5419f..7fcaf379f2 100644 --- a/services/horizon/internal/httpx/server.go +++ b/services/horizon/internal/httpx/server.go @@ -15,7 +15,6 @@ import ( "github.com/stellar/go/services/horizon/internal/ledger" hProblem "github.com/stellar/go/services/horizon/internal/render/problem" "github.com/stellar/go/services/horizon/internal/render/sse" - "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/db" "github.com/stellar/go/support/log" "github.com/stellar/go/support/render/problem" @@ -49,7 +48,6 @@ func init() { // register problems problem.SetLogFilter(problem.LogUnknownErrors) problem.RegisterError(sql.ErrNoRows, problem.NotFound) - problem.RegisterError(sequence.ErrNoMoreRoom, hProblem.ServerOverCapacity) problem.RegisterError(db2.ErrInvalidCursor, problem.BadRequest) problem.RegisterError(db2.ErrInvalidLimit, problem.BadRequest) problem.RegisterError(db2.ErrInvalidOrder, problem.BadRequest) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 22c77cdafc..e735a2d2c8 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -14,7 +14,6 @@ import ( "github.com/stellar/go/services/horizon/internal/paths" "github.com/stellar/go/services/horizon/internal/simplepath" "github.com/stellar/go/services/horizon/internal/txsub" - "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/db" "github.com/stellar/go/support/log" ) @@ -239,9 +238,8 @@ func initWebMetrics(app *App) { func initSubmissionSystem(app *App) { app.submitter = &txsub.System{ - Pending: txsub.NewDefaultSubmissionList(), - Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL), - SubmissionQueue: sequence.NewManager(), + Pending: txsub.NewDefaultSubmissionList(), + Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL), DB: func(ctx context.Context) txsub.HorizonDB { return &history.Q{SessionInterface: app.HorizonSession()} }, diff --git a/services/horizon/internal/integration/transaction_preconditions_test.go b/services/horizon/internal/integration/transaction_preconditions_test.go index 79e0cb7271..e3aff9e0da 100644 --- a/services/horizon/internal/integration/transaction_preconditions_test.go +++ b/services/horizon/internal/integration/transaction_preconditions_test.go @@ -218,7 +218,7 @@ func TestTransactionPreconditionsMinSequenceNumberLedgerGap(t *testing.T) { txParams := buildTXParams(master, masterAccount, currentAccountSeq+1) // this txsub will error because the tx preconditions require a min sequence gap - // which has been set 10000 sequnce numbers greater than the current difference between + // which has been set 10000 sequence numbers greater than the current difference between // network ledger sequence and account sequnece numbers txParams.Preconditions.MinSequenceNumberLedgerGap = uint32(int64(networkLedger) - currentAccountSeq + 10000) _, err = itest.SubmitMultiSigTransaction([]*keypair.Full{master}, txParams) diff --git a/services/horizon/internal/txsub/errors.go b/services/horizon/internal/txsub/errors.go index 7821537d7f..5652498327 100644 --- a/services/horizon/internal/txsub/errors.go +++ b/services/horizon/internal/txsub/errors.go @@ -17,9 +17,6 @@ var ( // ErrBadSequence is a canned error response for transactions whose sequence // number is wrong. ErrBadSequence = &FailedTransactionError{"AAAAAAAAAAD////7AAAAAA=="} - // ErrNoAccount is returned when the source account for the transaction - // cannot be found in the database - ErrNoAccount = &FailedTransactionError{"AAAAAAAAAAD////4AAAAAA=="} ) // FailedTransactionError represent an error that occurred because diff --git a/services/horizon/internal/txsub/main.go b/services/horizon/internal/txsub/main.go index ede2df8dd8..b466fc0055 100644 --- a/services/horizon/internal/txsub/main.go +++ b/services/horizon/internal/txsub/main.go @@ -24,18 +24,18 @@ type Listener chan<- Result type OpenSubmissionList interface { // Add registers the provided listener as interested in being notified when a // result is available for the provided transaction hash. - Add(context.Context, string, Listener) error + Add(string, Listener) // Finish forwards the provided result on to any listeners and cleans up any // resources associated with the transaction that this result is for - Finish(context.Context, string, Result) error + Finish(string, Result) // Clean removes any open submissions over the provided age. - Clean(context.Context, time.Duration) (int, error) + Clean(time.Duration) int // Pending return a list of transaction hashes that have at least one // listener registered to them in this list. - Pending(context.Context) []string + Pending() []string } // Submitter represents the low-level "submit a transaction to stellar-core" diff --git a/services/horizon/internal/txsub/open_submission_list.go b/services/horizon/internal/txsub/open_submission_list.go index 9b64915732..8d781b7bb7 100644 --- a/services/horizon/internal/txsub/open_submission_list.go +++ b/services/horizon/internal/txsub/open_submission_list.go @@ -1,12 +1,10 @@ package txsub import ( - "context" "fmt" "sync" "time" - "github.com/go-errors/errors" "github.com/stellar/go/support/log" ) @@ -33,7 +31,7 @@ type submissionList struct { log *log.Entry } -func (s *submissionList) Add(ctx context.Context, hash string, l Listener) error { +func (s *submissionList) Add(hash string, l Listener) { s.Lock() defer s.Unlock() @@ -41,10 +39,6 @@ func (s *submissionList) Add(ctx context.Context, hash string, l Listener) error panic("Unbuffered listener cannot be added to OpenSubmissionList") } - if len(hash) != 64 { - return errors.New("Unexpected transaction hash length: must be 64 hex characters") - } - os, ok := s.submissions[hash] if !ok { @@ -60,17 +54,15 @@ func (s *submissionList) Add(ctx context.Context, hash string, l Listener) error } os.Listeners = append(os.Listeners, l) - - return nil } -func (s *submissionList) Finish(ctx context.Context, hash string, r Result) error { +func (s *submissionList) Finish(hash string, r Result) { s.Lock() defer s.Unlock() os, ok := s.submissions[hash] if !ok { - return nil + return } s.log.WithFields(log.F{ @@ -85,10 +77,9 @@ func (s *submissionList) Finish(ctx context.Context, hash string, r Result) erro } delete(s.submissions, hash) - return nil } -func (s *submissionList) Clean(ctx context.Context, maxAge time.Duration) (int, error) { +func (s *submissionList) Clean(maxAge time.Duration) int { s.Lock() defer s.Unlock() @@ -107,10 +98,10 @@ func (s *submissionList) Clean(ctx context.Context, maxAge time.Duration) (int, } } - return len(s.submissions), nil + return len(s.submissions) } -func (s *submissionList) Pending(ctx context.Context) []string { +func (s *submissionList) Pending() []string { s.Lock() defer s.Unlock() results := make([]string, 0, len(s.submissions)) diff --git a/services/horizon/internal/txsub/open_submission_list_test.go b/services/horizon/internal/txsub/open_submission_list_test.go index 772aca96fe..d34033a0ac 100644 --- a/services/horizon/internal/txsub/open_submission_list_test.go +++ b/services/horizon/internal/txsub/open_submission_list_test.go @@ -38,7 +38,7 @@ func (suite *SubmissionListTestSuite) SetupTest() { func (suite *SubmissionListTestSuite) TestSubmissionList_Add() { // adds an entry to the submission list when a new hash is used - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0]) + suite.list.Add(suite.hashes[0], suite.listeners[0]) sub := suite.realList.submissions[suite.hashes[0]] assert.Equal(suite.T(), suite.hashes[0], sub.Hash) assert.WithinDuration(suite.T(), sub.SubmittedAt, time.Now(), 1*time.Second) @@ -50,12 +50,12 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Add() { } func (suite *SubmissionListTestSuite) TestSubmissionList_AddListener() { - // adds an listener to an existing entry when a hash is used with a new listener - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0]) + // adds a listener to an existing entry when a hash is used with a new listener + suite.list.Add(suite.hashes[0], suite.listeners[0]) sub := suite.realList.submissions[suite.hashes[0]] st := sub.SubmittedAt <-time.After(20 * time.Millisecond) - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[1]) + suite.list.Add(suite.hashes[0], suite.listeners[1]) // increases the size of the listener assert.Equal(suite.T(), 2, len(sub.Listeners)) @@ -65,20 +65,16 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_AddListener() { // Panics when the listener is not buffered // panics when the listener is not buffered assert.Panics(suite.T(), func() { - suite.list.Add(suite.ctx, suite.hashes[0], make(Listener)) + suite.list.Add(suite.hashes[0], make(Listener)) }) - - // errors when the provided hash is not 64-bytes - err := suite.list.Add(suite.ctx, "123", suite.listeners[0]) - assert.NotNil(suite.T(), err) } func (suite *SubmissionListTestSuite) TestSubmissionList_Finish() { - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0]) - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[1]) + suite.list.Add(suite.hashes[0], suite.listeners[0]) + suite.list.Add(suite.hashes[0], suite.listeners[1]) r := Result{Err: errors.New("test error")} - suite.list.Finish(suite.ctx, suite.hashes[0], r) + suite.list.Finish(suite.hashes[0], r) // Wries to every listener r1, ok1 := <-suite.listeners[0] @@ -102,20 +98,15 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Finish() { _, _ = <-suite.listeners[1] _, more = <-suite.listeners[1] assert.False(suite.T(), more) - - // works when no one is waiting for the result - err := suite.list.Finish(suite.ctx, suite.hashes[0], r) - assert.Nil(suite.T(), err) } func (suite *SubmissionListTestSuite) TestSubmissionList_Clean() { - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0]) + suite.list.Add(suite.hashes[0], suite.listeners[0]) <-time.After(200 * time.Millisecond) - suite.list.Add(suite.ctx, suite.hashes[1], suite.listeners[1]) - left, err := suite.list.Clean(suite.ctx, 200*time.Millisecond) + suite.list.Add(suite.hashes[1], suite.listeners[1]) + left := suite.list.Clean(200 * time.Millisecond) - assert.Nil(suite.T(), err) assert.Equal(suite.T(), 1, left) // removes submissions older than the maxAge provided @@ -139,11 +130,11 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Clean() { // Tests that Pending works as expected func (suite *SubmissionListTestSuite) TestSubmissionList_Pending() { - assert.Equal(suite.T(), 0, len(suite.list.Pending(suite.ctx))) - suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0]) - assert.Equal(suite.T(), 1, len(suite.list.Pending(suite.ctx))) - suite.list.Add(suite.ctx, suite.hashes[1], suite.listeners[1]) - assert.Equal(suite.T(), 2, len(suite.list.Pending(suite.ctx))) + assert.Equal(suite.T(), 0, len(suite.list.Pending())) + suite.list.Add(suite.hashes[0], suite.listeners[0]) + assert.Equal(suite.T(), 1, len(suite.list.Pending())) + suite.list.Add(suite.hashes[1], suite.listeners[1]) + assert.Equal(suite.T(), 2, len(suite.list.Pending())) } func TestSubmissionListTestSuite(t *testing.T) { diff --git a/services/horizon/internal/txsub/results.go b/services/horizon/internal/txsub/results.go index e9d91bc35e..4adba767bd 100644 --- a/services/horizon/internal/txsub/results.go +++ b/services/horizon/internal/txsub/results.go @@ -2,8 +2,6 @@ package txsub import ( "context" - "database/sql" - "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -49,35 +47,3 @@ func txResultFromHistory(tx history.Transaction) (history.Transaction, error) { return tx, err } - -// checkTxAlreadyExists uses a repeatable read transaction to look up both transaction results -// and sequence numbers. Without the repeatable read transaction it is possible that the two database -// queries execute on different ledgers. In this case, txsub can mistakenly respond with a bad_seq error -// because the first query occurs when the tx is not yet ingested and the second query occurs when the tx -// is ingested. -func checkTxAlreadyExists(ctx context.Context, db HorizonDB, hash, sourceAddress string) (history.Transaction, uint64, error) { - err := db.BeginTx(ctx, &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }) - if err != nil { - return history.Transaction{}, 0, errors.Wrap(err, "cannot start repeatable read tx") - } - defer db.Rollback() - - tx, err := txResultByHash(ctx, db, hash) - if err == ErrNoResults { - var sequenceNumbers map[string]uint64 - sequenceNumbers, err = db.GetSequenceNumbers(ctx, []string{sourceAddress}) - if err != nil { - return tx, 0, errors.Wrapf(err, "cannot fetch sequence number for %v", sourceAddress) - } - - num, ok := sequenceNumbers[sourceAddress] - if !ok { - return tx, 0, ErrNoAccount - } - return tx, num, ErrNoResults - } - return tx, 0, err -} diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go deleted file mode 100644 index 9d92595bac..0000000000 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ /dev/null @@ -1,132 +0,0 @@ -package sequence - -import ( - "sort" - "time" -) - -// AccountTxSubmissionQueue manages the submission queue for a single source account. The -// transaction system uses Push to enqueue submissions for given sequence -// numbers. -// -// AccountTxSubmissionQueue maintains a priority queue of pending submissions, and when updated -// (via the NotifyLastAccountSequence() method) with the current sequence number of the account -// being managed, queued submissions that can be acted upon will be unblocked. -type AccountTxSubmissionQueue struct { - lastActiveAt time.Time - timeout time.Duration - lastSeenAccountSequence uint64 - transactions []txToSubmit -} - -// txToSubmit represents a transaction being tracked by the queue -type txToSubmit struct { - minAccSeqNum uint64 // minimum account sequence required to send the transaction - maxAccSeqNum uint64 // maximum account sequence required to send the transaction - notifyBackChan chan error // submission notification channel -} - -// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue -func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue { - result := &AccountTxSubmissionQueue{ - lastActiveAt: time.Now(), - timeout: 10 * time.Second, - } - return result -} - -// Size returns the count of currently buffered submissions in the queue. -func (q *AccountTxSubmissionQueue) Size() int { - return len(q.transactions) -} - -// Push enqueues the intent to submit a transaction at the provided sequence -// number and returns a channel that will emit when it is safe for the client -// to do so. -// -// Push does not perform any triggering (which -// occurs in NotifyLastAccountSequence(), even if the current sequence number for this queue is -// the same as the provided sequence, to keep internal complexity much lower. -// Given that, the recommended usage pattern is: -// -// 1. Push the submission onto the queue -// 2. Load the current sequence number for the source account from the DB -// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if -// possible -func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { - // From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1. - // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum. - effectiveMinSeqNum := sequence - 1 - if minSeqNum != nil { - effectiveMinSeqNum = *minSeqNum - } - ch := make(chan error, 1) - q.transactions = append(q.transactions, txToSubmit{ - minAccSeqNum: effectiveMinSeqNum, - maxAccSeqNum: sequence - 1, - notifyBackChan: ch, - }) - return ch -} - -// NotifyLastAccountSequence notifies the queue that the provided sequence number is the latest -// seen value for the account that this queue manages submissions for. -// -// This function is monotonic... calling it with a sequence number lower than -// the latest seen sequence number is a noop. -func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { - if q.lastSeenAccountSequence < sequence { - q.lastSeenAccountSequence = sequence - } - - queueWasChanged := false - - txsToSubmit := make([]txToSubmit, 0, len(q.transactions)) - // Extract transactions ready to submit and notify those which are un-submittable. - for i := 0; i < len(q.transactions); { - candidate := q.transactions[i] - removeCandidateFromQueue := false - if q.lastSeenAccountSequence > candidate.maxAccSeqNum { - // this transaction can never be submitted because account sequence numbers only grow - candidate.notifyBackChan <- ErrBadSequence - close(candidate.notifyBackChan) - removeCandidateFromQueue = true - } else if q.lastSeenAccountSequence >= candidate.minAccSeqNum { - txsToSubmit = append(txsToSubmit, candidate) - removeCandidateFromQueue = true - } - if removeCandidateFromQueue { - q.transactions = append(q.transactions[:i], q.transactions[i+1:]...) - queueWasChanged = true - } else { - // only increment the index if there was no removal - i++ - } - } - - // To maximize successful submission opportunity, submit transactions by the account sequence - // which would result from a successful submission (i.e. maxAccSeqNum+1) - sort.Slice(txsToSubmit, func(i, j int) bool { - return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum - }) - for _, tx := range txsToSubmit { - tx.notifyBackChan <- nil - close(tx.notifyBackChan) - } - - // if we modified the queue, bump the timeout for this queue - if queueWasChanged { - q.lastActiveAt = time.Now() - return - } - - // if the queue wasn't changed, see if it is too old, clear - // it and make room for other submissions - if time.Since(q.lastActiveAt) > q.timeout { - for _, tx := range q.transactions { - tx.notifyBackChan <- ErrBadSequence - close(tx.notifyBackChan) - } - q.transactions = nil - } -} diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go deleted file mode 100644 index 0b4497f2fd..0000000000 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ /dev/null @@ -1,68 +0,0 @@ -//lint:file-ignore U1001 Ignore all unused code, staticcheck doesn't understand testify/suite - -package sequence - -import ( - "context" - "testing" - "time" - - "github.com/stellar/go/services/horizon/internal/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" -) - -type QueueTestSuite struct { - suite.Suite - ctx context.Context - queue *AccountTxSubmissionQueue -} - -func (suite *QueueTestSuite) SetupTest() { - suite.ctx = test.Context() - suite.queue = NewAccountTxSubmissionQueue() -} - -// Tests the NotifyLastAccountSequence method -func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { - // NotifyLastAccountSequence removes sequences that are submittable or in the past - lowMin := uint64(1) - results := []<-chan error{ - suite.queue.Push(1, nil), - suite.queue.Push(2, nil), - suite.queue.Push(3, nil), - suite.queue.Push(4, nil), - suite.queue.Push(4, &lowMin), - } - - suite.queue.NotifyLastAccountSequence(2) - - // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain - assert.Equal(suite.T(), 1, suite.queue.Size()) - entry := suite.queue.transactions[0] - assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum) - assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum) - - suite.queue.NotifyLastAccountSequence(4) - assert.Equal(suite.T(), 0, suite.queue.Size()) - - assert.Equal(suite.T(), ErrBadSequence, <-results[0]) - assert.Equal(suite.T(), ErrBadSequence, <-results[1]) - assert.Equal(suite.T(), nil, <-results[2]) - assert.Equal(suite.T(), ErrBadSequence, <-results[3]) - assert.Equal(suite.T(), nil, <-results[4]) - - // NotifyLastAccountSequence clears the queue if the head has not been released within the time limit - suite.queue.timeout = 1 * time.Millisecond - result := suite.queue.Push(2, nil) - <-time.After(10 * time.Millisecond) - suite.queue.NotifyLastAccountSequence(0) - - assert.Equal(suite.T(), 0, suite.queue.Size()) - assert.Equal(suite.T(), ErrBadSequence, <-result) -} - -func TestQueueTestSuite(t *testing.T) { - suite.Run(t, new(QueueTestSuite)) -} diff --git a/services/horizon/internal/txsub/sequence/doc.go b/services/horizon/internal/txsub/sequence/doc.go deleted file mode 100644 index 805e571090..0000000000 --- a/services/horizon/internal/txsub/sequence/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package sequence providers helpers to manage sequence numbers on behalf of horizon clients. -// See Manager for more details on the api. -package sequence diff --git a/services/horizon/internal/txsub/sequence/errors.go b/services/horizon/internal/txsub/sequence/errors.go deleted file mode 100644 index 2a7e1258e5..0000000000 --- a/services/horizon/internal/txsub/sequence/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package sequence - -import ( - "errors" -) - -var ( - ErrNoMoreRoom = errors.New("queue full") - ErrBadSequence = errors.New("bad sequence") -) diff --git a/services/horizon/internal/txsub/sequence/manager.go b/services/horizon/internal/txsub/sequence/manager.go deleted file mode 100644 index 1a776f44d8..0000000000 --- a/services/horizon/internal/txsub/sequence/manager.go +++ /dev/null @@ -1,115 +0,0 @@ -package sequence - -import ( - "fmt" - "strings" - "sync" -) - -// Manager provides a system for tracking the transaction submission queue for -// a set of addresses. Requests to submit at a certain sequence number are -// registered using the Push() method, and as the system is updated with -// account sequence information (through the Update() method) requests are -// notified that they can safely submit to stellar-core. -type Manager struct { - mutex sync.Mutex - MaxSize int - queues map[string]*AccountTxSubmissionQueue -} - -// NewManager returns a new manager -func NewManager() *Manager { - return &Manager{ - MaxSize: 1024, //TODO: make MaxSize configurable - queues: map[string]*AccountTxSubmissionQueue{}, - } -} - -func (m *Manager) String() string { - m.mutex.Lock() - defer m.mutex.Unlock() - var addys []string - - for addy, q := range m.queues { - addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence)) - } - - return "[ " + strings.Join(addys, ",") + " ]" -} - -// Size returns the count of submissions buffered within -// this manager. -func (m *Manager) Size() int { - m.mutex.Lock() - defer m.mutex.Unlock() - return m.size() -} - -func (m *Manager) Addresses() []string { - m.mutex.Lock() - defer m.mutex.Unlock() - addys := make([]string, 0, len(m.queues)) - - for addy := range m.queues { - addys = append(addys, addy) - } - - return addys -} - -// Push registers an intent to submit a transaction for the provided address at -// the provided sequence. A channel is returned that will be written to when -// the requester should attempt the submission. -func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error { - m.mutex.Lock() - defer m.mutex.Unlock() - - if m.size() >= m.MaxSize { - return m.getError(ErrNoMoreRoom) - } - - aq, ok := m.queues[address] - if !ok { - aq = NewAccountTxSubmissionQueue() - m.queues[address] = aq - } - - return aq.Push(sequence, minSeqNum) -} - -// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information -// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic. -func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) { - m.mutex.Lock() - defer m.mutex.Unlock() - - for address, seq := range updates { - queue, ok := m.queues[address] - if !ok { - continue - } - - queue.NotifyLastAccountSequence(seq) - if queue.Size() == 0 { - delete(m.queues, address) - } - } -} - -// size returns the count of submissions buffered within this manager. This -// internal version assumes you have locked the manager previously. -func (m *Manager) size() int { - var result int - for _, q := range m.queues { - result += q.Size() - } - - return result -} - -func (m *Manager) getError(err error) <-chan error { - ch := make(chan error, 1) - ch <- err - close(ch) - return ch -} diff --git a/services/horizon/internal/txsub/sequence/manager_test.go b/services/horizon/internal/txsub/sequence/manager_test.go deleted file mode 100644 index e69b8be6fc..0000000000 --- a/services/horizon/internal/txsub/sequence/manager_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package sequence - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -// Test the Push method -func TestManager_Push(t *testing.T) { - mgr := NewManager() - - minSeq := uint64(1) - mgr.Push("1", 2, nil) - mgr.Push("1", 2, nil) - mgr.Push("1", 3, &minSeq) - mgr.Push("2", 2, nil) - - assert.Equal(t, 4, mgr.Size()) - assert.Equal(t, 3, mgr.queues["1"].Size()) - assert.Equal(t, 1, mgr.queues["2"].Size()) -} - -// Test the NotifyLastAccountSequences method -func TestManager_NotifyLastAccountSequences(t *testing.T) { - mgr := NewManager() - minSeq := uint64(1) - results := []<-chan error{ - mgr.Push("1", 4, &minSeq), - mgr.Push("1", 3, nil), - mgr.Push("2", 2, nil), - } - - mgr.NotifyLastAccountSequences(map[string]uint64{ - "1": 1, - "2": 1, - }) - - assert.Equal(t, 1, mgr.Size()) - _, ok := mgr.queues["2"] - assert.False(t, ok) - - assert.Equal(t, nil, <-results[0]) - assert.Equal(t, nil, <-results[2]) - assert.Equal(t, 0, len(results[1])) -} - -// Push until maximum queue size is reached and check that another push results in ErrNoMoreRoom -func TestManager_PushNoMoreRoom(t *testing.T) { - mgr := NewManager() - for i := 0; i < mgr.MaxSize; i++ { - mgr.Push("1", 2, nil) - } - - assert.Equal(t, 1024, mgr.Size()) - assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil)) -} diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index fb25e366de..4dd492a9f8 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -39,7 +38,6 @@ type System struct { DB func(context.Context) HorizonDB Pending OpenSubmissionList Submitter Submitter - SubmissionQueue *sequence.Manager SubmissionTimeout time.Duration Log *log.Entry @@ -48,10 +46,6 @@ type System struct { // submissions to stellar-core SubmissionDuration prometheus.Summary - // BufferedSubmissionGauge tracks the count of submissions buffered - // behind this system's SubmissionQueue - BufferedSubmissionsGauge prometheus.Gauge - // OpenSubmissionsGauge tracks the count of "open" submissions (i.e. // submissions whose transactions haven't been confirmed successful or failed OpenSubmissionsGauge prometheus.Gauge @@ -81,7 +75,6 @@ type System struct { // RegisterMetrics registers the prometheus metrics func (sys *System) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(sys.Metrics.SubmissionDuration) - registry.MustRegister(sys.Metrics.BufferedSubmissionsGauge) registry.MustRegister(sys.Metrics.OpenSubmissionsGauge) registry.MustRegister(sys.Metrics.FailedSubmissionsCounter) registry.MustRegister(sys.Metrics.SuccessfulSubmissionsCounter) @@ -122,7 +115,7 @@ func (sys *System) Submit( return } - tx, sequenceNumber, err := checkTxAlreadyExists(ctx, db, hash, sourceAddress) + tx, err := txResultByHash(ctx, db, hash) if err == nil { sys.Log.Ctx(ctx).WithField("hash", hash).Info("Found submission result in a DB") sys.finish(ctx, hash, resultCh, Result{Transaction: tx}) @@ -134,75 +127,45 @@ func (sys *System) Submit( return } - // queue the submission and get the channel that will emit when - // submission is valid - var pMinSeqNum *uint64 - if minSeqNum != nil { - uMinSeqNum := uint64(*minSeqNum) - pMinSeqNum = &uMinSeqNum - } - submissionWait := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum) - - // update the submission queue with the source accounts current sequence value - // which will cause the channel returned by Push() to emit if possible. - sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ - sourceAddress: sequenceNumber, - }) - - select { - case err := <-submissionWait: - if err == sequence.ErrBadSequence { - // convert the internal only ErrBadSequence into the FailedTransactionError - err = ErrBadSequence - } + sr := sys.submitOnce(ctx, rawTx) + sys.updateTransactionTypeMetrics(envelope) + if sr.Err != nil { + // any error other than "txBAD_SEQ" is a failure + isBad, err := sr.IsBadSeq() if err != nil { sys.finish(ctx, hash, resultCh, Result{Err: err}) return } + if !isBad { + sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) + return + } - sr := sys.submitOnce(ctx, rawTx) - sys.updateTransactionTypeMetrics(envelope) - - if sr.Err != nil { - // any error other than "txBAD_SEQ" is a failure - isBad, err := sr.IsBadSeq() - if err != nil { - sys.finish(ctx, hash, resultCh, Result{Err: err}) - return - } - if !isBad { - sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) - return - } - - if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil { - sys.finish(ctx, hash, resultCh, Result{Err: err}) - return - } - - // If error is txBAD_SEQ, check for the result again - tx, err = txResultByHash(ctx, db, hash) - if err != nil { - // finally, return the bad_seq error if no result was found on 2nd attempt - sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) - return - } - // If we found the result, use it as the result - sys.finish(ctx, hash, resultCh, Result{Transaction: tx}) + // Even if a transaction is successfully submitted to core, Horizon ingestion might + // be lagging behind leading to txBAD_SEQ. This function will block a txsub request + // until the request times out or account sequence is bumped to txn sequence. + if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil { + sys.finish(ctx, hash, resultCh, Result{Err: err}) return } - // add transactions to open list - sys.Pending.Add(ctx, hash, resultCh) - // update the submission queue, allowing the next submission to proceed - sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ - sourceAddress: uint64(envelope.SeqNum()), - }) - case <-ctx.Done(): - sys.finish(ctx, hash, resultCh, Result{Err: sys.deriveTxSubError(ctx)}) + // If error is txBAD_SEQ, check for the result again + tx, err = txResultByHash(ctx, db, hash) + if err != nil { + // finally, return the bad_seq error if no result was found on 2nd attempt + sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) + return + } + // If we found the result, use it as the result + sys.finish(ctx, hash, resultCh, Result{Transaction: tx}) + return } + // Add transaction to open list of pending txns: the transaction has been successfully submitted to core + // but that does not mean it is included in the ledger. The txn status remains pending + // until we see an ingestion in the db. + sys.Pending.Add(hash, resultCh) return } @@ -311,9 +274,7 @@ func (sys *System) Tick(ctx context.Context) { defer sys.unsetTickInProgress() - logger. - WithField("queued", sys.SubmissionQueue.String()). - Debug("ticking txsub system") + logger.Debug("ticking txsub system") db := sys.DB(ctx) options := &sql.TxOptions{ @@ -326,18 +287,7 @@ func (sys *System) Tick(ctx context.Context) { } defer db.Rollback() - addys := sys.SubmissionQueue.Addresses() - if len(addys) > 0 { - curSeq, err := db.GetSequenceNumbers(ctx, addys) - if err != nil { - logger.WithStack(err).Error(err) - return - } else { - sys.SubmissionQueue.NotifyLastAccountSequences(curSeq) - } - } - - pending := sys.Pending.Pending(ctx) + pending := sys.Pending.Pending() if len(pending) > 0 { latestLedger, err := db.GetLatestHistoryLedger(ctx) @@ -376,13 +326,13 @@ func (sys *System) Tick(ctx context.Context) { if err == nil { logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, Result{Transaction: tx}) + sys.Pending.Finish(hash, Result{Transaction: tx}) continue } if _, ok := err.(*FailedTransactionError); ok { logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err}) + sys.Pending.Finish(hash, Result{Transaction: tx, Err: err}) continue } @@ -392,14 +342,8 @@ func (sys *System) Tick(ctx context.Context) { } } - stillOpen, err := sys.Pending.Clean(ctx, sys.SubmissionTimeout) - if err != nil { - logger.WithStack(err).Error(err) - return - } - + stillOpen := sys.Pending.Clean(sys.SubmissionTimeout) sys.Metrics.OpenSubmissionsGauge.Set(float64(stillOpen)) - sys.Metrics.BufferedSubmissionsGauge.Set(float64(sys.SubmissionQueue.Size())) } // Init initializes `sys` @@ -420,9 +364,6 @@ func (sys *System) Init() { sys.Metrics.OpenSubmissionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "horizon", Subsystem: "txsub", Name: "open", }) - sys.Metrics.BufferedSubmissionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "horizon", Subsystem: "txsub", Name: "buffered", - }) sys.Metrics.V0TransactionsCounter = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "horizon", Subsystem: "txsub", Name: "v0", }) diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 6b152673d3..816cc28e66 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -19,7 +19,6 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/test" - "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/xdr" ) @@ -42,9 +41,8 @@ func (suite *SystemTestSuite) SetupTest() { suite.db = &mockDBQ{} suite.system = &System{ - Pending: NewDefaultSubmissionList(), - Submitter: suite.submitter, - SubmissionQueue: sequence.NewManager(), + Pending: NewDefaultSubmissionList(), + Submitter: suite.submitter, DB: func(ctx context.Context) HorizonDB { return suite.db }, @@ -124,11 +122,6 @@ func (suite *SystemTestSuite) TearDownTest() { // Returns the result provided by the ResultProvider. func (suite *SystemTestSuite) TestSubmit_Basic() { - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Run(func(args mock.Arguments) { ptr := args.Get(1).(*history.Transaction) @@ -148,17 +141,12 @@ func (suite *SystemTestSuite) TestSubmit_Basic() { assert.False(suite.T(), suite.submitter.WasSubmittedTo) } -func (suite *SystemTestSuite) TestTimeoutDuringSequnceLoop() { +func (suite *SystemTestSuite) TestTimeoutDuringSequenceLoop() { var cancel context.CancelFunc suite.ctx, cancel = context.WithTimeout(suite.ctx, time.Duration(0)) defer cancel() suite.submitter.R = suite.badSeq - suite.db.On("BeginTx", mock.AnythingOfType("*context.timerCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). @@ -178,16 +166,11 @@ func (suite *SystemTestSuite) TestTimeoutDuringSequnceLoop() { assert.Equal(suite.T(), ErrTimeout, r.Err) } -func (suite *SystemTestSuite) TestClientDisconnectedDuringSequnceLoop() { +func (suite *SystemTestSuite) TestClientDisconnectedDuringSequenceLoop() { var cancel context.CancelFunc suite.ctx, cancel = context.WithCancel(suite.ctx) suite.submitter.R = suite.badSeq - suite.db.On("BeginTx", mock.AnythingOfType("*context.cancelCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). @@ -196,7 +179,7 @@ func (suite *SystemTestSuite) TestClientDisconnectedDuringSequnceLoop() { suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). Run(func(args mock.Arguments) { - // simulate client disconnecting while looping on sequnce number check + // simulate client disconnecting while looping on sequence number check cancel() suite.ctx.Deadline() }). @@ -226,19 +209,11 @@ func getMetricValue(metric prometheus.Metric) *dto.Metric { // Returns the error from submission if no result is found by hash and the suite.submitter returns an error. func (suite *SystemTestSuite) TestSubmit_NotFoundError() { - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() - suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). - Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). - Once() suite.submitter.R.Err = errors.New("busted for some reason") r := <-suite.system.Submit( @@ -258,11 +233,6 @@ func (suite *SystemTestSuite) TestSubmit_NotFoundError() { // If the error is bad_seq and the result at the transaction's sequence number is for the same hash, return result. func (suite *SystemTestSuite) TestSubmit_BadSeq() { suite.submitter.R = suite.badSeq - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). @@ -298,11 +268,6 @@ func (suite *SystemTestSuite) TestSubmit_BadSeq() { // If error is bad_seq and no result is found, return error. func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { suite.submitter.R = suite.badSeq - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Twice() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() @@ -333,19 +298,11 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { // If no result found and no error submitting, add to open transaction list. func (suite *SystemTestSuite) TestSubmit_OpenTransactionList() { - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). Return(sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() - suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). - Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). - Once() suite.system.Submit( suite.ctx, @@ -353,7 +310,7 @@ func (suite *SystemTestSuite) TestSubmit_OpenTransactionList() { suite.successXDR, suite.successTx.Transaction.TransactionHash, ) - pending := suite.system.Pending.Pending(suite.ctx) + pending := suite.system.Pending.Pending() assert.Equal(suite.T(), 1, len(pending)) assert.Equal(suite.T(), suite.successTx.Transaction.TransactionHash, pending[0]) assert.Equal(suite.T(), float64(1), getMetricValue(suite.system.Metrics.SuccessfulSubmissionsCounter).GetCounter().GetValue()) @@ -372,35 +329,10 @@ func (suite *SystemTestSuite) TestTick_Noop() { suite.system.Tick(suite.ctx) } -// TestTick_Deadlock is a regression test for Tick() deadlock: if for any reason -// call to Tick() takes more time and another Tick() is called. -// This test starts two go routines: both calling Tick() but the call to -// `sys.Sequences.Get(addys)` is delayed by 1 second. It allows to simulate two -// calls to `Tick()` executed at the same time. -func (suite *SystemTestSuite) TestTick_Deadlock() { - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() - - // Start first Tick - suite.system.SubmissionQueue.Push("address", 0, nil) - suite.db.On("GetSequenceNumbers", suite.ctx, []string{"address"}). - Return(map[string]uint64{}, nil). - Run(func(args mock.Arguments) { - // Start second tick - suite.system.Tick(suite.ctx) - }). - Once() - - suite.system.Tick(suite.ctx) -} - // Test that Tick finishes any available transactions, func (suite *SystemTestSuite) TestTick_FinishesTransactions() { l := make(chan Result, 1) - suite.system.Pending.Add(suite.ctx, suite.successTx.Transaction.TransactionHash, l) + suite.system.Pending.Add(suite.successTx.Transaction.TransactionHash, l) suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -414,7 +346,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 0, len(l)) - assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending(suite.ctx))) + assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending())) suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -427,7 +359,7 @@ func (suite *SystemTestSuite) TestTick_FinishesTransactions() { suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 1, len(l)) - assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending(suite.ctx))) + assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending())) } func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { @@ -450,23 +382,15 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { }, } - suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, - }).Return(nil).Once() - suite.db.On("Rollback").Return(nil).Once() suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, innerHash). Return(sql.ErrNoRows).Once() suite.db.On("TransactionByHash", suite.ctx, mock.Anything, innerHash). Return(sql.ErrNoRows).Once() suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() - suite.db.On("GetSequenceNumbers", suite.ctx, []string{"GABQGAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB2MX"}). - Return(map[string]uint64{"GABQGAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB2MX": 96}, nil). - Once() l := suite.system.Submit(suite.ctx, innerTxEnvelope, parsedInnerTx, innerHash) assert.Equal(suite.T(), 0, len(l)) - assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending(suite.ctx))) + assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending())) suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -479,7 +403,7 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 1, len(l)) - assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending(suite.ctx))) + assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending())) r := <-l assert.NoError(suite.T(), r.Err) assert.Equal(suite.T(), feeBumpTx, r) @@ -489,7 +413,7 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() { l := make(chan Result, 1) suite.system.SubmissionTimeout = 100 * time.Millisecond - suite.system.Pending.Add(suite.ctx, suite.successTx.Transaction.TransactionHash, l) + suite.system.Pending.Add(suite.successTx.Transaction.TransactionHash, l) <-time.After(101 * time.Millisecond) suite.db.On("BeginTx", mock.AnythingOfType("*context.valueCtx"), &sql.TxOptions{ @@ -503,7 +427,7 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() { suite.system.Tick(suite.ctx) - assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending(suite.ctx))) + assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending())) assert.Equal(suite.T(), 1, len(l)) <-l select {