Skip to content

Commit

Permalink
services/horizon: Remove Horizon Submission Queue (#5039)
Browse files Browse the repository at this point in the history
* Remove horizon submission queue - 1

* Update system.go

* Update system.go

* remove WaitUntilAccountSequence

* Update system.go

* Remove tests related to WaitUntilAccountSequence - 1

* Update system_test.go

* Remove history_transactions_filtered_tmp - 1

* Fix failing tests in system_test.go

* Revert "Remove history_transactions_filtered_tmp - 1"

This reverts commit d1c6959.

* Revert "Fix failing tests in system_test.go"

This reverts commit 767a9f9.

* Remove sequenceNumber from checkTxAlreadyExists

* Undo removing waitUntilAccountSequence

* Small change - 1

* Small changes - 2

* fix failing unit tests

* Add some comments - 1

* Small changes - 3

* Small changes - 4

* Fix failing tests - 1

* Use defer for sys.finish

* Revert "Use defer for sys.finish"

This reverts commit b2321be.

* Small changes - 5
  • Loading branch information
aditya1702 authored and 2opremio committed Oct 16, 2023
1 parent 4ecb643 commit a49dffa
Show file tree
Hide file tree
Showing 16 changed files with 76 additions and 655 deletions.
2 changes: 0 additions & 2 deletions services/horizon/internal/httpx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stellar/go/services/horizon/internal/ledger"
hProblem "github.com/stellar/go/services/horizon/internal/render/problem"
"github.com/stellar/go/services/horizon/internal/render/sse"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/render/problem"
Expand Down Expand Up @@ -49,7 +48,6 @@ func init() {
// register problems
problem.SetLogFilter(problem.LogUnknownErrors)
problem.RegisterError(sql.ErrNoRows, problem.NotFound)
problem.RegisterError(sequence.ErrNoMoreRoom, hProblem.ServerOverCapacity)
problem.RegisterError(db2.ErrInvalidCursor, problem.BadRequest)
problem.RegisterError(db2.ErrInvalidLimit, problem.BadRequest)
problem.RegisterError(db2.ErrInvalidOrder, problem.BadRequest)
Expand Down
6 changes: 2 additions & 4 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/simplepath"
"github.com/stellar/go/services/horizon/internal/txsub"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -239,9 +238,8 @@ func initWebMetrics(app *App) {

func initSubmissionSystem(app *App) {
app.submitter = &txsub.System{
Pending: txsub.NewDefaultSubmissionList(),
Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL),
SubmissionQueue: sequence.NewManager(),
Pending: txsub.NewDefaultSubmissionList(),
Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL),
DB: func(ctx context.Context) txsub.HorizonDB {
return &history.Q{SessionInterface: app.HorizonSession()}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestTransactionPreconditionsMinSequenceNumberLedgerGap(t *testing.T) {
txParams := buildTXParams(master, masterAccount, currentAccountSeq+1)

// this txsub will error because the tx preconditions require a min sequence gap
// which has been set 10000 sequnce numbers greater than the current difference between
// which has been set 10000 sequence numbers greater than the current difference between
// network ledger sequence and account sequnece numbers
txParams.Preconditions.MinSequenceNumberLedgerGap = uint32(int64(networkLedger) - currentAccountSeq + 10000)
_, err = itest.SubmitMultiSigTransaction([]*keypair.Full{master}, txParams)
Expand Down
3 changes: 0 additions & 3 deletions services/horizon/internal/txsub/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ var (
// ErrBadSequence is a canned error response for transactions whose sequence
// number is wrong.
ErrBadSequence = &FailedTransactionError{"AAAAAAAAAAD////7AAAAAA=="}
// ErrNoAccount is returned when the source account for the transaction
// cannot be found in the database
ErrNoAccount = &FailedTransactionError{"AAAAAAAAAAD////4AAAAAA=="}
)

// FailedTransactionError represent an error that occurred because
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/txsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ type Listener chan<- Result
type OpenSubmissionList interface {
// Add registers the provided listener as interested in being notified when a
// result is available for the provided transaction hash.
Add(context.Context, string, Listener) error
Add(string, Listener)

// Finish forwards the provided result on to any listeners and cleans up any
// resources associated with the transaction that this result is for
Finish(context.Context, string, Result) error
Finish(string, Result)

// Clean removes any open submissions over the provided age.
Clean(context.Context, time.Duration) (int, error)
Clean(time.Duration) int

// Pending return a list of transaction hashes that have at least one
// listener registered to them in this list.
Pending(context.Context) []string
Pending() []string
}

// Submitter represents the low-level "submit a transaction to stellar-core"
Expand Down
21 changes: 6 additions & 15 deletions services/horizon/internal/txsub/open_submission_list.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package txsub

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-errors/errors"
"github.com/stellar/go/support/log"
)

Expand All @@ -33,18 +31,14 @@ type submissionList struct {
log *log.Entry
}

func (s *submissionList) Add(ctx context.Context, hash string, l Listener) error {
func (s *submissionList) Add(hash string, l Listener) {
s.Lock()
defer s.Unlock()

if cap(l) == 0 {
panic("Unbuffered listener cannot be added to OpenSubmissionList")
}

if len(hash) != 64 {
return errors.New("Unexpected transaction hash length: must be 64 hex characters")
}

os, ok := s.submissions[hash]

if !ok {
Expand All @@ -60,17 +54,15 @@ func (s *submissionList) Add(ctx context.Context, hash string, l Listener) error
}

os.Listeners = append(os.Listeners, l)

return nil
}

func (s *submissionList) Finish(ctx context.Context, hash string, r Result) error {
func (s *submissionList) Finish(hash string, r Result) {
s.Lock()
defer s.Unlock()

os, ok := s.submissions[hash]
if !ok {
return nil
return
}

s.log.WithFields(log.F{
Expand All @@ -85,10 +77,9 @@ func (s *submissionList) Finish(ctx context.Context, hash string, r Result) erro
}

delete(s.submissions, hash)
return nil
}

func (s *submissionList) Clean(ctx context.Context, maxAge time.Duration) (int, error) {
func (s *submissionList) Clean(maxAge time.Duration) int {
s.Lock()
defer s.Unlock()

Expand All @@ -107,10 +98,10 @@ func (s *submissionList) Clean(ctx context.Context, maxAge time.Duration) (int,
}
}

return len(s.submissions), nil
return len(s.submissions)
}

func (s *submissionList) Pending(ctx context.Context) []string {
func (s *submissionList) Pending() []string {
s.Lock()
defer s.Unlock()
results := make([]string, 0, len(s.submissions))
Expand Down
41 changes: 16 additions & 25 deletions services/horizon/internal/txsub/open_submission_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (suite *SubmissionListTestSuite) SetupTest() {

func (suite *SubmissionListTestSuite) TestSubmissionList_Add() {
// adds an entry to the submission list when a new hash is used
suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0])
suite.list.Add(suite.hashes[0], suite.listeners[0])
sub := suite.realList.submissions[suite.hashes[0]]
assert.Equal(suite.T(), suite.hashes[0], sub.Hash)
assert.WithinDuration(suite.T(), sub.SubmittedAt, time.Now(), 1*time.Second)
Expand All @@ -50,12 +50,12 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Add() {
}

func (suite *SubmissionListTestSuite) TestSubmissionList_AddListener() {
// adds an listener to an existing entry when a hash is used with a new listener
suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0])
// adds a listener to an existing entry when a hash is used with a new listener
suite.list.Add(suite.hashes[0], suite.listeners[0])
sub := suite.realList.submissions[suite.hashes[0]]
st := sub.SubmittedAt
<-time.After(20 * time.Millisecond)
suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[1])
suite.list.Add(suite.hashes[0], suite.listeners[1])

// increases the size of the listener
assert.Equal(suite.T(), 2, len(sub.Listeners))
Expand All @@ -65,20 +65,16 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_AddListener() {
// Panics when the listener is not buffered
// panics when the listener is not buffered
assert.Panics(suite.T(), func() {
suite.list.Add(suite.ctx, suite.hashes[0], make(Listener))
suite.list.Add(suite.hashes[0], make(Listener))
})

// errors when the provided hash is not 64-bytes
err := suite.list.Add(suite.ctx, "123", suite.listeners[0])
assert.NotNil(suite.T(), err)
}

func (suite *SubmissionListTestSuite) TestSubmissionList_Finish() {

suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0])
suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[1])
suite.list.Add(suite.hashes[0], suite.listeners[0])
suite.list.Add(suite.hashes[0], suite.listeners[1])
r := Result{Err: errors.New("test error")}
suite.list.Finish(suite.ctx, suite.hashes[0], r)
suite.list.Finish(suite.hashes[0], r)

// Wries to every listener
r1, ok1 := <-suite.listeners[0]
Expand All @@ -102,20 +98,15 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Finish() {
_, _ = <-suite.listeners[1]
_, more = <-suite.listeners[1]
assert.False(suite.T(), more)

// works when no one is waiting for the result
err := suite.list.Finish(suite.ctx, suite.hashes[0], r)
assert.Nil(suite.T(), err)
}

func (suite *SubmissionListTestSuite) TestSubmissionList_Clean() {

suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0])
suite.list.Add(suite.hashes[0], suite.listeners[0])
<-time.After(200 * time.Millisecond)
suite.list.Add(suite.ctx, suite.hashes[1], suite.listeners[1])
left, err := suite.list.Clean(suite.ctx, 200*time.Millisecond)
suite.list.Add(suite.hashes[1], suite.listeners[1])
left := suite.list.Clean(200 * time.Millisecond)

assert.Nil(suite.T(), err)
assert.Equal(suite.T(), 1, left)

// removes submissions older than the maxAge provided
Expand All @@ -139,11 +130,11 @@ func (suite *SubmissionListTestSuite) TestSubmissionList_Clean() {

// Tests that Pending works as expected
func (suite *SubmissionListTestSuite) TestSubmissionList_Pending() {
assert.Equal(suite.T(), 0, len(suite.list.Pending(suite.ctx)))
suite.list.Add(suite.ctx, suite.hashes[0], suite.listeners[0])
assert.Equal(suite.T(), 1, len(suite.list.Pending(suite.ctx)))
suite.list.Add(suite.ctx, suite.hashes[1], suite.listeners[1])
assert.Equal(suite.T(), 2, len(suite.list.Pending(suite.ctx)))
assert.Equal(suite.T(), 0, len(suite.list.Pending()))
suite.list.Add(suite.hashes[0], suite.listeners[0])
assert.Equal(suite.T(), 1, len(suite.list.Pending()))
suite.list.Add(suite.hashes[1], suite.listeners[1])
assert.Equal(suite.T(), 2, len(suite.list.Pending()))
}

func TestSubmissionListTestSuite(t *testing.T) {
Expand Down
34 changes: 0 additions & 34 deletions services/horizon/internal/txsub/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package txsub

import (
"context"
"database/sql"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -49,35 +47,3 @@ func txResultFromHistory(tx history.Transaction) (history.Transaction, error) {

return tx, err
}

// checkTxAlreadyExists uses a repeatable read transaction to look up both transaction results
// and sequence numbers. Without the repeatable read transaction it is possible that the two database
// queries execute on different ledgers. In this case, txsub can mistakenly respond with a bad_seq error
// because the first query occurs when the tx is not yet ingested and the second query occurs when the tx
// is ingested.
func checkTxAlreadyExists(ctx context.Context, db HorizonDB, hash, sourceAddress string) (history.Transaction, uint64, error) {
err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return history.Transaction{}, 0, errors.Wrap(err, "cannot start repeatable read tx")
}
defer db.Rollback()

tx, err := txResultByHash(ctx, db, hash)
if err == ErrNoResults {
var sequenceNumbers map[string]uint64
sequenceNumbers, err = db.GetSequenceNumbers(ctx, []string{sourceAddress})
if err != nil {
return tx, 0, errors.Wrapf(err, "cannot fetch sequence number for %v", sourceAddress)
}

num, ok := sequenceNumbers[sourceAddress]
if !ok {
return tx, 0, ErrNoAccount
}
return tx, num, ErrNoResults
}
return tx, 0, err
}
Loading

0 comments on commit a49dffa

Please sign in to comment.