diff --git a/.github/workflows/soroban-rpc.yml b/.github/workflows/soroban-rpc.yml index 7edfe2e3..e2d72a04 100644 --- a/.github/workflows/soroban-rpc.yml +++ b/.github/workflows/soroban-rpc.yml @@ -100,10 +100,10 @@ jobs: SOROBAN_RPC_INTEGRATION_TESTS_ENABLED: true SOROBAN_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }} SOROBAN_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core - PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 21.0.1-1897.dfd3dbff1.focal - PROTOCOL_20_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal - PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.0.1-1897.dfd3dbff1.focal - PROTOCOL_21_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal + PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 21.1.0-1909.rc1.b3aeb14cc.focal + PROTOCOL_20_CORE_DOCKER_IMG: stellar/stellar-core:21.1.0-1909.rc1.b3aeb14cc.focal + PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.1.0-1909.rc1.b3aeb14cc.focal + PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:21.1.0-1909.rc1.b3aeb14cc.focal steps: - uses: actions/checkout@v4 with: diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index ab8ec8a0..c6d287c6 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -296,7 +296,11 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow cfg.NetworkPassphrase, cfg.HistoryRetentionWindow, ) - feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase) + feewindows := feewindow.NewFeeWindows( + cfg.ClassicFeeStatsLedgerRetentionWindow, + cfg.SorobanFeeStatsLedgerRetentionWindow, + cfg.NetworkPassphrase, + ) readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) defer cancelReadTxMeta() @@ -315,7 +319,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow initialSeq = currentSeq d.logger.WithFields(supportlog.F{ "seq": currentSeq, - }).Info("initializing in-memory store") + }).Info("initializing in-memory store and applying DB data migrations") } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { d.logger.WithFields(supportlog.F{ "seq": currentSeq, @@ -346,7 +350,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow if currentSeq != 0 { d.logger.WithFields(supportlog.F{ "seq": currentSeq, - }).Info("finished initializing in-memory store") + }).Info("finished initializing in-memory store and applying DB data migrations") } return feewindows, eventStore diff --git a/cmd/soroban-rpc/internal/db/ledger.go b/cmd/soroban-rpc/internal/db/ledger.go index 97887281..39e43b83 100644 --- a/cmd/soroban-rpc/internal/db/ledger.go +++ b/cmd/soroban-rpc/internal/db/ledger.go @@ -7,6 +7,8 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) const ( @@ -18,6 +20,7 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error type LedgerReader interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error + GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) } type LedgerWriter interface { @@ -65,8 +68,40 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge case 1: return results[0], true, nil default: - return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", len(results), sequence, ledgerCloseMetaTableName) + return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", + len(results), sequence, ledgerCloseMetaTableName) + } +} + +// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table. +func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) { + query := sq.Select("lcm.meta"). + From(ledgerCloseMetaTableName + " as lcm"). + Where(sq.Or{ + sq.Expr("lcm.sequence = (?)", sq.Select("MIN(sequence)").From(ledgerCloseMetaTableName)), + sq.Expr("lcm.sequence = (?)", sq.Select("MAX(sequence)").From(ledgerCloseMetaTableName)), + }).OrderBy("lcm.sequence ASC") + + var lcms []xdr.LedgerCloseMeta + if err := r.db.Select(ctx, &lcms, query); err != nil { + return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err) + } + + // Empty DB + if len(lcms) == 0 { + return ledgerbucketwindow.LedgerRange{}, nil } + + return ledgerbucketwindow.LedgerRange{ + FirstLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: lcms[0].LedgerSequence(), + CloseTime: lcms[0].LedgerCloseTime(), + }, + LastLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: lcms[len(lcms)-1].LedgerSequence(), + CloseTime: lcms[len(lcms)-1].LedgerCloseTime(), + }, + }, nil } type ledgerWriter struct { diff --git a/cmd/soroban-rpc/internal/db/ledger_test.go b/cmd/soroban-rpc/internal/db/ledger_test.go index a40bc255..519a1168 100644 --- a/cmd/soroban-rpc/internal/db/ledger_test.go +++ b/cmd/soroban-rpc/internal/db/ledger_test.go @@ -44,24 +44,24 @@ func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) { allLedgers = append(allLedgers, txmeta) return nil }) - assert.NoError(t, err) + require.NoError(t, err) for i := start - 1; i <= end+1; i++ { ledger, exists, err := reader.GetLedger(context.Background(), i) - assert.NoError(t, err) + require.NoError(t, err) if i < start || i > end { assert.False(t, exists) continue } assert.True(t, exists) ledgerBinary, err := ledger.MarshalBinary() - assert.NoError(t, err) + require.NoError(t, err) expected := createLedger(i) expectedBinary, err := expected.MarshalBinary() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, expectedBinary, ledgerBinary) ledgerBinary, err = allLedgers[0].MarshalBinary() - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, expectedBinary, ledgerBinary) allLedgers = allLedgers[1:] } @@ -74,45 +74,147 @@ func TestLedgers(t *testing.T) { reader := NewLedgerReader(db) _, exists, err := reader.GetLedger(context.Background(), 1) - assert.NoError(t, err) + require.NoError(t, err) assert.False(t, exists) for i := 1; i <= 10; i++ { ledgerSequence := uint32(i) tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background()) - assert.NoError(t, err) - assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) - assert.NoError(t, tx.Commit(ledgerSequence)) + require.NoError(t, err) + require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) + require.NoError(t, tx.Commit(ledgerSequence)) // rolling back after a commit is a no-op - assert.NoError(t, tx.Rollback()) + require.NoError(t, tx.Rollback()) } assertLedgerRange(t, reader, 1, 10) ledgerSequence := uint32(11) tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background()) - assert.NoError(t, err) - assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) - assert.NoError(t, tx.Commit(ledgerSequence)) + require.NoError(t, err) + require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) + require.NoError(t, tx.Commit(ledgerSequence)) assertLedgerRange(t, reader, 1, 11) ledgerSequence = uint32(12) tx, err = NewReadWriter(logger, db, daemon, 150, 5, passphrase).NewTx(context.Background()) - assert.NoError(t, err) - assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) - assert.NoError(t, tx.Commit(ledgerSequence)) + require.NoError(t, err) + require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence))) + require.NoError(t, tx.Commit(ledgerSequence)) assertLedgerRange(t, reader, 8, 12) } +func TestGetLedgerRange_NonEmptyDB(t *testing.T) { + db := NewTestDB(t) + ctx := context.TODO() + + writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + lcms := []xdr.LedgerCloseMeta{ + txMeta(1234, true), + txMeta(1235, true), + txMeta(1236, true), + txMeta(1237, true), + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1) + require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1) + } + require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence())) + + reader := NewLedgerReader(db) + ledgerRange, err := reader.GetLedgerRange(ctx) + require.NoError(t, err) + assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence) + assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime) + assert.Equal(t, uint32(1337), ledgerRange.LastLedger.Sequence) + assert.Equal(t, ledgerCloseTime(1337), ledgerRange.LastLedger.CloseTime) +} + +func TestGetLedgerRange_SingleDBRow(t *testing.T) { + db := NewTestDB(t) + ctx := context.TODO() + + writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + lcms := []xdr.LedgerCloseMeta{ + txMeta(1234, true), + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1) + require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1) + } + require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence())) + + reader := NewLedgerReader(db) + ledgerRange, err := reader.GetLedgerRange(ctx) + require.NoError(t, err) + assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence) + assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime) + assert.Equal(t, uint32(1334), ledgerRange.LastLedger.Sequence) + assert.Equal(t, ledgerCloseTime(1334), ledgerRange.LastLedger.CloseTime) +} + +func TestGetLedgerRange_EmptyDB(t *testing.T) { + db := NewTestDB(t) + ctx := context.TODO() + + reader := NewLedgerReader(db) + ledgerRange, err := reader.GetLedgerRange(ctx) + require.NoError(t, err) + assert.Equal(t, uint32(0), ledgerRange.FirstLedger.Sequence) + assert.Equal(t, int64(0), ledgerRange.FirstLedger.CloseTime) + assert.Equal(t, uint32(0), ledgerRange.LastLedger.Sequence) + assert.Equal(t, int64(0), ledgerRange.LastLedger.CloseTime) +} + +func BenchmarkGetLedgerRange(b *testing.B) { + db := NewTestDB(b) + logger := log.DefaultLogger + writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase) + write, err := writer.NewTx(context.TODO()) + require.NoError(b, err) + + // create 100k tx rows + lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) + for i := range cap(lcms) { + lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(b, ledgerW.InsertLedger(lcm)) + require.NoError(b, txW.InsertTransactions(lcm)) + } + require.NoError(b, write.Commit(lcms[len(lcms)-1].LedgerSequence())) + reader := NewLedgerReader(db) + + b.ResetTimer() + for range b.N { + ledgerRange, err := reader.GetLedgerRange(context.TODO()) + require.NoError(b, err) + assert.Equal(b, lcms[0].LedgerSequence(), ledgerRange.FirstLedger.Sequence) + assert.Equal(b, lcms[len(lcms)-1].LedgerSequence(), ledgerRange.LastLedger.Sequence) + } +} + func NewTestDB(tb testing.TB) *DB { tmp := tb.TempDir() dbPath := path.Join(tmp, "db.sqlite") db, err := OpenSQLiteDB(dbPath) require.NoError(tb, err) tb.Cleanup(func() { - assert.NoError(tb, db.Close()) + require.NoError(tb, db.Close()) }) return db } diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index dbe07ab5..552cc98d 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -115,9 +115,13 @@ type guardedMigration struct { db *DB migration MigrationApplier alreadyMigrated bool + logger *log.Entry + applyLogged bool } -func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, factory migrationApplierFactory, db *DB) (Migration, error) { +func newGuardedDataMigration( + ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, +) (Migration, error) { migrationDB := &DB{ cache: db.cache, SessionInterface: db.SessionInterface.Clone(), @@ -132,7 +136,7 @@ func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, fa return nil, err } latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) - if err != nil && err != ErrEmptyDB { + if err != nil && !errors.Is(err, ErrEmptyDB) { err = errors.Join(err, migrationDB.Rollback()) return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) } @@ -146,6 +150,7 @@ func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, fa db: migrationDB, migration: applier, alreadyMigrated: previouslyMigrated, + logger: logger, } return guardedMigration, nil } @@ -156,6 +161,10 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) // but, just in case. return nil } + if !g.applyLogged { + g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration") + g.applyLogged = true + } return g.migration.Apply(ctx, meta) } @@ -177,19 +186,20 @@ func (g *guardedMigration) Commit(ctx context.Context) error { return g.db.Commit() } -func (g *guardedMigration) Rollback(ctx context.Context) error { +func (g *guardedMigration) Rollback(_ context.Context) error { return g.db.Rollback() } func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { migrationName := "TransactionsTable" + logger = logger.WithField("migration", migrationName) factory := newTransactionTableMigration( ctx, - logger.WithField("migration", migrationName), + logger, cfg.HistoryRetentionWindow, cfg.NetworkPassphrase, ) - m, err := newGuardedDataMigration(ctx, migrationName, factory, db) + m, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db) if err != nil { return nil, fmt.Errorf("creating guarded transaction migration: %w", err) } diff --git a/cmd/soroban-rpc/internal/db/mocks.go b/cmd/soroban-rpc/internal/db/mocks.go index a5e97530..492d64bb 100644 --- a/cmd/soroban-rpc/internal/db/mocks.go +++ b/cmd/soroban-rpc/internal/db/mocks.go @@ -2,6 +2,7 @@ package db import ( "context" + "errors" "io" "github.com/prometheus/client_golang/prometheus" @@ -12,7 +13,7 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) -type mockTransactionHandler struct { +type MockTransactionHandler struct { passphrase string ledgerRange ledgerbucketwindow.LedgerRange @@ -21,8 +22,8 @@ type mockTransactionHandler struct { ledgerSeqToMeta map[uint32]*xdr.LedgerCloseMeta } -func NewMockTransactionStore(passphrase string) *mockTransactionHandler { - return &mockTransactionHandler{ +func NewMockTransactionStore(passphrase string) *MockTransactionHandler { + return &MockTransactionHandler{ passphrase: passphrase, txs: make(map[string]ingest.LedgerTransaction), txHashToMeta: make(map[string]*xdr.LedgerCloseMeta), @@ -30,7 +31,7 @@ func NewMockTransactionStore(passphrase string) *mockTransactionHandler { } } -func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error { +func (txn *MockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error { txn.ledgerSeqToMeta[lcm.LedgerSequence()] = &lcm reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm) @@ -40,7 +41,7 @@ func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) e for { tx, err := reader.Read() - if err == io.EOF { + if errors.Is(err, io.EOF) { break } else if err != nil { return err @@ -65,35 +66,30 @@ func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) e return nil } -// GetLedgerRange pulls the min/max ledger sequence numbers from the database. -func (txn *mockTransactionHandler) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) { - return txn.ledgerRange, nil -} - -func (txn *mockTransactionHandler) GetTransaction(ctx context.Context, hash xdr.Hash) ( - Transaction, ledgerbucketwindow.LedgerRange, error, +func (txn *MockTransactionHandler) GetTransaction(_ context.Context, hash xdr.Hash) ( + Transaction, error, ) { - if tx, ok := txn.txs[hash.HexString()]; !ok { - return Transaction{}, txn.ledgerRange, ErrNoTransaction - } else { - itx, err := ParseTransaction(*txn.txHashToMeta[hash.HexString()], tx) - return itx, txn.ledgerRange, err + tx, ok := txn.txs[hash.HexString()] + if !ok { + return Transaction{}, ErrNoTransaction } + itx, err := ParseTransaction(*txn.txHashToMeta[hash.HexString()], tx) + return itx, err } -func (txn *mockTransactionHandler) RegisterMetrics(_, _ prometheus.Observer) {} +func (txn *MockTransactionHandler) RegisterMetrics(_, _ prometheus.Observer) {} -type mockLedgerReader struct { - txn mockTransactionHandler +type MockLedgerReader struct { + txn *MockTransactionHandler } -func NewMockLedgerReader(txn *mockTransactionHandler) *mockLedgerReader { - return &mockLedgerReader{ - txn: *txn, +func NewMockLedgerReader(txn *MockTransactionHandler) *MockLedgerReader { + return &MockLedgerReader{ + txn: txn, } } -func (m *mockLedgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { +func (m *MockLedgerReader) GetLedger(_ context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { lcm, ok := m.txn.ledgerSeqToMeta[sequence] if !ok { return xdr.LedgerCloseMeta{}, false, nil @@ -101,12 +97,16 @@ func (m *mockLedgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr. return *lcm, true, nil } -func (m *mockLedgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error { +func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) error { return nil } +func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow.LedgerRange, error) { + return m.txn.ledgerRange, nil +} + var ( - _ TransactionReader = &mockTransactionHandler{} - _ TransactionWriter = &mockTransactionHandler{} - _ LedgerReader = &mockLedgerReader{} + _ TransactionReader = &MockTransactionHandler{} + _ TransactionWriter = &MockTransactionHandler{} + _ LedgerReader = &MockLedgerReader{} ) diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index fdb8b5b9..8bccc58e 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -18,7 +18,9 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) -const transactionTableName = "transactions" +const ( + transactionTableName = "transactions" +) var ErrNoTransaction = errors.New("no transaction with this hash exists") @@ -41,8 +43,7 @@ type TransactionWriter interface { // TransactionReader provides all the public ways to read from the DB. type TransactionReader interface { - GetTransaction(ctx context.Context, hash xdr.Hash) (Transaction, ledgerbucketwindow.LedgerRange, error) - GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) + GetTransaction(ctx context.Context, hash xdr.Hash) (Transaction, error) } type transactionHandler struct { @@ -88,7 +89,7 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error } transactions := make(map[xdr.Hash]ingest.LedgerTransaction, txCount) - for i := 0; i < txCount; i++ { + for i := range txCount { tx, err := reader.Read() if err != nil { return fmt.Errorf("failed reading tx %d: %w", i, err) @@ -110,7 +111,7 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error _, err = query.RunWith(txn.stmtCache).Exec() L.WithField("duration", time.Since(start)). - Infof("Ingested %d transaction lookups", len(transactions)) + Debugf("Ingested %d transaction lookups", len(transactions)) return err } @@ -135,61 +136,6 @@ func (txn *transactionHandler) trimTransactions(latestLedgerSeq uint32, retentio return err } -// GetLedgerRange pulls the min/max ledger sequence numbers from the database. -func (txn *transactionHandler) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) { - var ledgerRange ledgerbucketwindow.LedgerRange - - // - // We use subqueries alongside a UNION ALL stitch in order to select the min - // and max from the ledger table in a single query and get around sqlite's - // limitations with parentheses (see https://stackoverflow.com/a/22609948). - // - newestQ := sq. - Select("m1.meta"). - FromSelect( - sq. - Select("meta"). - From(ledgerCloseMetaTableName). - OrderBy("sequence ASC"). - Limit(1), - "m1", - ) - sql, args, err := sq. - Select("m2.meta"). - FromSelect( - sq. - Select("meta"). - From(ledgerCloseMetaTableName). - OrderBy("sequence DESC"). - Limit(1), - "m2", - ).ToSql() - if err != nil { - return ledgerRange, fmt.Errorf("couldn't build ledger range query: %w", err) - } - - var lcms []xdr.LedgerCloseMeta - if err = txn.db.Select(ctx, &lcms, newestQ.Suffix("UNION ALL "+sql, args...)); err != nil { - return ledgerRange, fmt.Errorf("couldn't query ledger range: %w", err) - } else if len(lcms) < 2 { - // There is almost certainly a row, but we want to avoid a race condition - // with ingestion as well as support test cases from an empty DB, so we need - // to sanity check that there is in fact a result. Note that no ledgers in - // the database isn't an error, it's just an empty range. - return ledgerRange, nil - } - - lcm1, lcm2 := lcms[0], lcms[1] - ledgerRange.FirstLedger.Sequence = lcm1.LedgerSequence() - ledgerRange.FirstLedger.CloseTime = lcm1.LedgerCloseTime() - ledgerRange.LastLedger.Sequence = lcm2.LedgerSequence() - ledgerRange.LastLedger.CloseTime = lcm2.LedgerCloseTime() - - txn.log.Debugf("Database ledger range: [%d, %d]", - ledgerRange.FirstLedger.Sequence, ledgerRange.LastLedger.Sequence) - return ledgerRange, nil -} - // GetTransaction conforms to the interface in // methods/get_transaction.go#NewGetTransactionHandler so that it can be used // directly against the RPC handler. @@ -197,23 +143,18 @@ func (txn *transactionHandler) GetLedgerRange(ctx context.Context) (ledgerbucket // Errors occur if there are issues with the DB connection or the XDR is // corrupted somehow. If the transaction is not found, io.EOF is returned. func (txn *transactionHandler) GetTransaction(ctx context.Context, hash xdr.Hash) ( - Transaction, ledgerbucketwindow.LedgerRange, error, + Transaction, error, ) { start := time.Now() tx := Transaction{} - ledgerRange, err := txn.GetLedgerRange(ctx) - if err != nil && err != ErrEmptyDB { - return tx, ledgerRange, err - } - lcm, ingestTx, err := txn.getTransactionByHash(ctx, hash) if err != nil { - return tx, ledgerRange, err + return tx, err } tx, err = ParseTransaction(lcm, ingestTx) if err != nil { - return tx, ledgerRange, err + return tx, err } txn.log. @@ -221,7 +162,7 @@ func (txn *transactionHandler) GetTransaction(ctx context.Context, hash xdr.Hash WithField("duration", time.Since(start)). Debugf("Fetched and encoded transaction from ledger %d", lcm.LedgerSequence()) - return tx, ledgerRange, nil + return tx, nil } // getTransactionByHash actually performs the DB ops to cross-reference a @@ -321,13 +262,15 @@ func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange { } } -func (t *transactionTableMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { +func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) error { return t.writer.InsertTransactions(meta) } -func newTransactionTableMigration(ctx context.Context, logger *log.Entry, retentionWindow uint32, passphrase string) migrationApplierFactory { +func newTransactionTableMigration(ctx context.Context, logger *log.Entry, + retentionWindow uint32, passphrase string, +) migrationApplierFactory { return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := uint32(2) + firstLedgerToMigrate := uint32(2) //nolint:mnd writer := &transactionHandler{ log: logger, db: db, @@ -339,9 +282,6 @@ func newTransactionTableMigration(ctx context.Context, logger *log.Entry, retent } // Truncate the table, since it may contain data, causing insert conflicts later on. // (the migration was shipped after the actual transactions table change) - // FIXME: this can be simply replaced by an upper limit in the ledgers to migrate - // but ... it can't be done until https://github.com/stellar/soroban-rpc/issues/208 - // is addressed _, err := db.Exec(ctx, sq.Delete(transactionTableName)) if err != nil { return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err) diff --git a/cmd/soroban-rpc/internal/db/transaction_test.go b/cmd/soroban-rpc/internal/db/transaction_test.go index a4bf2342..e05671b9 100644 --- a/cmd/soroban-rpc/internal/db/transaction_test.go +++ b/cmd/soroban-rpc/internal/db/transaction_test.go @@ -23,8 +23,8 @@ func TestTransactionNotFound(t *testing.T) { log.SetLevel(logrus.TraceLevel) reader := NewTransactionReader(log, db, passphrase) - _, _, err := reader.GetTransaction(context.TODO(), xdr.Hash{}) - require.Error(t, err, ErrNoTransaction) + _, err := reader.GetTransaction(context.TODO(), xdr.Hash{}) + require.ErrorIs(t, err, ErrNoTransaction) } func TestTransactionFound(t *testing.T) { @@ -53,16 +53,14 @@ func TestTransactionFound(t *testing.T) { // check 404 case reader := NewTransactionReader(log, db, passphrase) - _, _, err = reader.GetTransaction(ctx, xdr.Hash{}) - require.Error(t, err, ErrNoTransaction) + _, err = reader.GetTransaction(ctx, xdr.Hash{}) + require.ErrorIs(t, err, ErrNoTransaction) // check all 200 cases for _, lcm := range lcms { h := lcm.TransactionHash(0) - tx, lRange, err := reader.GetTransaction(ctx, h) + tx, err := reader.GetTransaction(ctx, h) require.NoError(t, err, "failed to find txhash %s in db", hex.EncodeToString(h[:])) - assert.EqualValues(t, 1234+100, lRange.FirstLedger.Sequence) - assert.EqualValues(t, 1237+100, lRange.LastLedger.Sequence) assert.EqualValues(t, 1, tx.ApplicationOrder) expectedEnvelope, err := lcm.TransactionEnvelopes()[0].MarshalBinary() @@ -82,8 +80,8 @@ func BenchmarkTransactionFetch(b *testing.B) { // ingest 100k tx rows lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) - for i := uint32(0); i < uint32(cap(lcms)); i++ { - lcms = append(lcms, txMeta(1234+i, i%2 == 0)) + for i := range cap(lcms) { + lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) } ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() @@ -102,7 +100,7 @@ func BenchmarkTransactionFetch(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { r := randoms[i] - tx, _, err := reader.GetTransaction(ctx, lcms[r].TransactionHash(0)) + tx, err := reader.GetTransaction(ctx, lcms[r].TransactionHash(0)) require.NoError(b, err) assert.Equal(b, r%2 == 0, tx.Successful) } diff --git a/cmd/soroban-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml b/cmd/soroban-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml index 579cf67b..de3bcc49 100644 --- a/cmd/soroban-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml +++ b/cmd/soroban-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml @@ -13,7 +13,7 @@ services: # Note: Please keep the image pinned to an immutable tag matching the Captive Core version. # This avoids implicit updates which break compatibility between # the Core container and captive core. - image: ${CORE_IMAGE:-stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal} + image: ${CORE_IMAGE:-stellar/stellar-core:21.1.0-1909.rc1.b3aeb14cc.focal} depends_on: - core-postgres environment: diff --git a/cmd/soroban-rpc/internal/integrationtest/infrastructure/test.go b/cmd/soroban-rpc/internal/integrationtest/infrastructure/test.go index c839a553..25e0fa4b 100644 --- a/cmd/soroban-rpc/internal/integrationtest/infrastructure/test.go +++ b/cmd/soroban-rpc/internal/integrationtest/infrastructure/test.go @@ -398,32 +398,15 @@ func (i *Test) generateRPCConfigFile(rpcConfig rpcConfig) { func newTestLogWriter(t *testing.T, prefix string) *testLogWriter { tw := &testLogWriter{t: t, prefix: prefix} - t.Cleanup(func() { - tw.testDoneMx.Lock() - tw.testDone = true - tw.testDoneMx.Unlock() - }) return tw } type testLogWriter struct { - t *testing.T - prefix string - testDoneMx sync.RWMutex - testDone bool + t *testing.T + prefix string } func (tw *testLogWriter) Write(p []byte) (n int, err error) { - tw.testDoneMx.RLock() - if tw.testDone { - // Workaround for https://github.com/stellar/go/issues/5342 - // and https://github.com/stellar/go/issues/5350, which causes a race condition - // in test logging - // TODO: remove once the tickets are fixed - tw.testDoneMx.RUnlock() - return len(p), nil - } - tw.testDoneMx.RUnlock() all := strings.TrimSpace(string(p)) lines := strings.Split(all, "\n") for _, l := range lines { diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index 9236a4a4..a6329456 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -3,6 +3,7 @@ package internal import ( "context" "encoding/json" + "errors" "net/http" "strconv" "strings" @@ -26,10 +27,13 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/network" ) -// maxHTTPRequestSize defines the largest request size that the http handler -// would be willing to accept before dropping the request. The implementation -// uses the default MaxBytesHandler to limit the request size. -const maxHTTPRequestSize = 512 * 1024 // half a megabyte +const ( + // maxHTTPRequestSize defines the largest request size that the http handler + // would be willing to accept before dropping the request. The implementation + // uses the default MaxBytesHandler to limit the request size. + maxHTTPRequestSize = 512 * 1024 // half a megabyte + warningThresholdDenominator = 3 +) // Handler is the HTTP handler which serves the Soroban JSON RPC responses type Handler struct { @@ -67,7 +71,7 @@ func decorateHandlers(daemon interfaces.Daemon, logger *log.Entry, m handler.Map }, []string{"endpoint", "status"}) decorated := handler.Map{} for endpoint, h := range m { - // create copy of h so it can be used in closure bleow + // create copy of h, so it can be used in closure below h := h decorated[endpoint] = handler.New(func(ctx context.Context, r *jrpc2.Request) (interface{}, error) { reqID := strconv.FormatUint(middleware.NextRequestID(), 10) @@ -80,7 +84,8 @@ func decorateHandlers(daemon interfaces.Daemon, logger *log.Entry, m handler.Map if ok && simulateTransactionResponse.Error != "" { label["status"] = "error" } else if err != nil { - if jsonRPCErr, ok := err.(*jrpc2.Error); ok { + var jsonRPCErr *jrpc2.Error + if errors.As(err, &jsonRPCErr) { prometheusLabelReplacer := strings.NewReplacer(" ", "_", "-", "_", "(", "", ")", "") status := prometheusLabelReplacer.Replace(jsonRPCErr.Code.String()) label["status"] = status @@ -149,7 +154,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { { methodName: "getHealth", underlyingHandler: methods.NewHealthCheck( - retentionWindow, params.TransactionReader, cfg.MaxHealthyLedgerLatency), + retentionWindow, params.LedgerReader, cfg.MaxHealthyLedgerLatency), longName: "get_health", queueLimit: cfg.RequestBacklogGetHealthQueueLimit, requestDurationLimit: cfg.MaxGetHealthExecutionDuration, @@ -170,8 +175,9 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { requestDurationLimit: cfg.MaxGetNetworkExecutionDuration, }, { - methodName: "getVersionInfo", - underlyingHandler: methods.NewGetVersionInfoHandler(params.Logger, params.LedgerEntryReader, params.LedgerReader, params.Daemon), + methodName: "getVersionInfo", + underlyingHandler: methods.NewGetVersionInfoHandler(params.Logger, params.LedgerEntryReader, + params.LedgerReader, params.Daemon), longName: "get_version_info", queueLimit: cfg.RequestBacklogGetVersionInfoQueueLimit, requestDurationLimit: cfg.MaxGetVersionInfoExecutionDuration, @@ -199,14 +205,15 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { }, { methodName: "getTransaction", - underlyingHandler: methods.NewGetTransactionHandler(params.Logger, params.TransactionReader), + underlyingHandler: methods.NewGetTransactionHandler(params.Logger, params.TransactionReader, params.LedgerReader), longName: "get_transaction", queueLimit: cfg.RequestBacklogGetTransactionQueueLimit, requestDurationLimit: cfg.MaxGetTransactionExecutionDuration, }, { - methodName: "getTransactions", - underlyingHandler: methods.NewGetTransactionsHandler(params.Logger, params.LedgerReader, params.TransactionReader, cfg.MaxTransactionsLimit, cfg.DefaultTransactionsLimit, cfg.NetworkPassphrase), + methodName: "getTransactions", + underlyingHandler: methods.NewGetTransactionsHandler(params.Logger, params.LedgerReader, + cfg.MaxTransactionsLimit, cfg.DefaultTransactionsLimit, cfg.NetworkPassphrase), longName: "get_transactions", queueLimit: cfg.RequestBacklogGetTransactionsQueueLimit, requestDurationLimit: cfg.MaxGetTransactionsExecutionDuration, @@ -214,7 +221,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { { methodName: "sendTransaction", underlyingHandler: methods.NewSendTransactionHandler( - params.Daemon, params.Logger, params.TransactionReader, cfg.NetworkPassphrase), + params.Daemon, params.Logger, params.LedgerReader, cfg.NetworkPassphrase), longName: "send_transaction", queueLimit: cfg.RequestBacklogSendTransactionQueueLimit, requestDurationLimit: cfg.MaxSendTransactionExecutionDuration, @@ -230,7 +237,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { }, { methodName: "getFeeStats", - underlyingHandler: methods.NewGetFeeStatsHandler(params.FeeStatWindows, params.TransactionReader, params.Logger), + underlyingHandler: methods.NewGetFeeStatsHandler(params.FeeStatWindows, params.LedgerReader, params.Logger), longName: "get_fee_stats", queueLimit: cfg.RequestBacklogGetFeeStatsTransactionQueueLimit, requestDurationLimit: cfg.MaxGetFeeStatsExecutionDuration, @@ -254,8 +261,10 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { durationWarnCounterName := handler.longName + "_execution_threshold_warning" durationLimitCounterName := handler.longName + "_execution_threshold_limit" - durationWarnCounterHelp := "The metric measures the count of " + handler.methodName + " requests that surpassed the warning threshold for execution time" - durationLimitCounterHelp := "The metric measures the count of " + handler.methodName + " requests that surpassed the limit threshold for execution time" + durationWarnCounterHelp := "The metric measures the count of " + handler.methodName + + " requests that surpassed the warning threshold for execution time" + durationLimitCounterHelp := "The metric measures the count of " + handler.methodName + + " requests that surpassed the limit threshold for execution time" requestDurationWarnCounter := prometheus.NewCounter(prometheus.CounterOpts{ Namespace: params.Daemon.MetricsNamespace(), Subsystem: "network", @@ -268,7 +277,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { Help: durationLimitCounterHelp, }) // set the warning threshold to be one third of the limit. - requestDurationWarn := handler.requestDurationLimit / 3 + requestDurationWarn := handler.requestDurationLimit / warningThresholdDenominator durationLimiter := network.MakeJrpcRequestDurationLimiter( queueLimiter.Handle, requestDurationWarn, @@ -297,12 +306,16 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { params.Logger) globalQueueRequestExecutionDurationWarningCounter := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: params.Daemon.MetricsNamespace(), Subsystem: "network", Name: "global_request_execution_duration_threshold_warning", - Help: "The metric measures the count of requests that surpassed the warning threshold for execution time", + Namespace: params.Daemon.MetricsNamespace(), + Subsystem: "network", + Name: "global_request_execution_duration_threshold_warning", + Help: "The metric measures the count of requests that surpassed the warning threshold for execution time", }) globalQueueRequestExecutionDurationLimitCounter := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: params.Daemon.MetricsNamespace(), Subsystem: "network", Name: "global_request_execution_duration_threshold_limit", - Help: "The metric measures the count of requests that surpassed the limit threshold for execution time", + Namespace: params.Daemon.MetricsNamespace(), + Subsystem: "network", + Name: "global_request_execution_duration_threshold_limit", + Help: "The metric measures the count of requests that surpassed the limit threshold for execution time", }) handler := network.MakeHTTPRequestDurationLimiter( queueLimitedBridge, diff --git a/cmd/soroban-rpc/internal/methods/get_fee_stats.go b/cmd/soroban-rpc/internal/methods/get_fee_stats.go index 73ebfd3a..9fd85d76 100644 --- a/cmd/soroban-rpc/internal/methods/get_fee_stats.go +++ b/cmd/soroban-rpc/internal/methods/get_fee_stats.go @@ -58,9 +58,11 @@ type GetFeeStatsResult struct { } // NewGetFeeStatsHandler returns a handler obtaining fee statistics -func NewGetFeeStatsHandler(windows *feewindow.FeeWindows, reader db.TransactionReader, logger *log.Entry) jrpc2.Handler { +func NewGetFeeStatsHandler(windows *feewindow.FeeWindows, ledgerReader db.LedgerReader, + logger *log.Entry, +) jrpc2.Handler { return NewHandler(func(ctx context.Context) (GetFeeStatsResult, error) { - ledgerInfo, err := reader.GetLedgerRange(ctx) + ledgerRange, err := ledgerReader.GetLedgerRange(ctx) if err != nil { // still not fatal logger.WithError(err). Error("could not fetch ledger range") @@ -69,7 +71,7 @@ func NewGetFeeStatsHandler(windows *feewindow.FeeWindows, reader db.TransactionR result := GetFeeStatsResult{ SorobanInclusionFee: convertFeeDistribution(windows.SorobanInclusionFeeWindow.GetFeeDistribution()), InclusionFee: convertFeeDistribution(windows.ClassicFeeWindow.GetFeeDistribution()), - LatestLedger: ledgerInfo.LastLedger.Sequence, + LatestLedger: ledgerRange.LastLedger.Sequence, } return result, nil }) diff --git a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go index 86a8e48a..affa0536 100644 --- a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go @@ -6,10 +6,12 @@ import ( "github.com/creachadair/jrpc2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stellar/go/xdr" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) const ( @@ -24,15 +26,19 @@ type ConstantLedgerEntryReaderTx struct{} type ConstantLedgerReader struct{} -func (entryReader *ConstantLedgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { +func (ledgerReader *ConstantLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow.LedgerRange, error) { + return ledgerbucketwindow.LedgerRange{}, nil +} + +func (entryReader *ConstantLedgerEntryReader) GetLatestLedgerSequence(_ context.Context) (uint32, error) { return expectedLatestLedgerSequence, nil } -func (entryReader *ConstantLedgerEntryReader) NewTx(ctx context.Context) (db.LedgerEntryReadTx, error) { +func (entryReader *ConstantLedgerEntryReader) NewTx(_ context.Context) (db.LedgerEntryReadTx, error) { return ConstantLedgerEntryReaderTx{}, nil } -func (entryReader *ConstantLedgerEntryReader) NewCachedTx(ctx context.Context) (db.LedgerEntryReadTx, error) { +func (entryReader *ConstantLedgerEntryReader) NewCachedTx(_ context.Context) (db.LedgerEntryReadTx, error) { return ConstantLedgerEntryReaderTx{}, nil } @@ -40,7 +46,7 @@ func (entryReaderTx ConstantLedgerEntryReaderTx) GetLatestLedgerSequence() (uint return expectedLatestLedgerSequence, nil } -func (entryReaderTx ConstantLedgerEntryReaderTx) GetLedgerEntries(keys ...xdr.LedgerKey) ([]db.LedgerKeyAndEntry, error) { +func (entryReaderTx ConstantLedgerEntryReaderTx) GetLedgerEntries(_ ...xdr.LedgerKey) ([]db.LedgerKeyAndEntry, error) { return nil, nil } @@ -48,11 +54,13 @@ func (entryReaderTx ConstantLedgerEntryReaderTx) Done() error { return nil } -func (ledgerReader *ConstantLedgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { +func (ledgerReader *ConstantLedgerReader) GetLedger(_ context.Context, + sequence uint32, +) (xdr.LedgerCloseMeta, bool, error) { return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil } -func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(ctx context.Context, f db.StreamLedgerFn) error { +func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(_ context.Context, _ db.StreamLedgerFn) error { return nil } @@ -75,7 +83,7 @@ func TestGetLatestLedger(t *testing.T) { getLatestLedgerHandler := NewGetLatestLedgerHandler(&ConstantLedgerEntryReader{}, &ConstantLedgerReader{}) latestLedgerRespI, err := getLatestLedgerHandler(context.Background(), &jrpc2.Request{}) latestLedgerResp := latestLedgerRespI.(GetLatestLedgerResponse) - assert.NoError(t, err) + require.NoError(t, err) expectedLatestLedgerHashStr := xdr.Hash{expectedLatestLedgerHashBytes}.HexString() assert.Equal(t, expectedLatestLedgerHashStr, latestLedgerResp.Hash) diff --git a/cmd/soroban-rpc/internal/methods/get_transaction.go b/cmd/soroban-rpc/internal/methods/get_transaction.go index 37ef5f3a..ac6ac8b2 100644 --- a/cmd/soroban-rpc/internal/methods/get_transaction.go +++ b/cmd/soroban-rpc/internal/methods/get_transaction.go @@ -72,6 +72,7 @@ func GetTransaction( ctx context.Context, log *log.Entry, reader db.TransactionReader, + ledgerReader db.LedgerReader, request GetTransactionRequest, ) (GetTransactionResponse, error) { // parse hash @@ -91,7 +92,15 @@ func GetTransaction( } } - tx, storeRange, err := reader.GetTransaction(ctx, txHash) + storeRange, err := ledgerReader.GetLedgerRange(ctx) + if err != nil { + return GetTransactionResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: fmt.Sprintf("unable to get ledger range: %v", err), + } + } + + tx, err := reader.GetTransaction(ctx, txHash) response := GetTransactionResponse{ LatestLedger: storeRange.LastLedger.Sequence, @@ -130,8 +139,10 @@ func GetTransaction( } // NewGetTransactionHandler returns a get transaction json rpc handler -func NewGetTransactionHandler(logger *log.Entry, getter db.TransactionReader) jrpc2.Handler { +func NewGetTransactionHandler(logger *log.Entry, getter db.TransactionReader, + ledgerReader db.LedgerReader, +) jrpc2.Handler { return NewHandler(func(ctx context.Context, request GetTransactionRequest) (GetTransactionResponse, error) { - return GetTransaction(ctx, logger, getter, request) + return GetTransaction(ctx, logger, getter, ledgerReader, request) }) } diff --git a/cmd/soroban-rpc/internal/methods/get_transaction_test.go b/cmd/soroban-rpc/internal/methods/get_transaction_test.go index 65ceea29..1bc6f2b9 100644 --- a/cmd/soroban-rpc/internal/methods/get_transaction_test.go +++ b/cmd/soroban-rpc/internal/methods/get_transaction_test.go @@ -17,19 +17,21 @@ import ( func TestGetTransaction(t *testing.T) { var ( - ctx = context.TODO() - log = log.DefaultLogger - store = db.NewMockTransactionStore("passphrase") + ctx = context.TODO() + log = log.DefaultLogger + store = db.NewMockTransactionStore("passphrase") + ledgerReader = db.NewMockLedgerReader(store) ) log.SetLevel(logrus.DebugLevel) - _, err := GetTransaction(ctx, log, store, GetTransactionRequest{"ab"}) + _, err := GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{"ab"}) require.EqualError(t, err, "[-32602] unexpected hash length (2)") - _, err = GetTransaction(ctx, log, store, GetTransactionRequest{"foo "}) + _, err = GetTransaction(ctx, log, store, ledgerReader, + GetTransactionRequest{"foo "}) require.EqualError(t, err, "[-32602] incorrect hash: encoding/hex: invalid byte: U+006F 'o'") hash := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - tx, err := GetTransaction(ctx, log, store, GetTransactionRequest{hash}) + tx, err := GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{hash}) require.NoError(t, err) require.Equal(t, GetTransactionResponse{Status: TransactionStatusNotFound}, tx) @@ -38,7 +40,7 @@ func TestGetTransaction(t *testing.T) { xdrHash := txHash(1) hash = hex.EncodeToString(xdrHash[:]) - tx, err = GetTransaction(ctx, log, store, GetTransactionRequest{hash}) + tx, err = GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{hash}) require.NoError(t, err) expectedTxResult, err := xdr.MarshalBase64(meta.V1.TxProcessing[0].Result.Result) @@ -68,7 +70,7 @@ func TestGetTransaction(t *testing.T) { require.NoError(t, store.InsertTransactions(meta)) // the first transaction should still be there - tx, err = GetTransaction(ctx, log, store, GetTransactionRequest{hash}) + tx, err = GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{hash}) require.NoError(t, err) require.Equal(t, GetTransactionResponse{ Status: TransactionStatusSuccess, @@ -97,7 +99,7 @@ func TestGetTransaction(t *testing.T) { expectedTxMeta, err = xdr.MarshalBase64(meta.V1.TxProcessing[0].TxApplyProcessing) require.NoError(t, err) - tx, err = GetTransaction(ctx, log, store, GetTransactionRequest{hash}) + tx, err = GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{hash}) require.NoError(t, err) require.Equal(t, GetTransactionResponse{ Status: TransactionStatusFailed, @@ -134,7 +136,7 @@ func TestGetTransaction(t *testing.T) { expectedEventsMeta, err := xdr.MarshalBase64(diagnosticEvents[0]) require.NoError(t, err) - tx, err = GetTransaction(ctx, log, store, GetTransactionRequest{hash}) + tx, err = GetTransaction(ctx, log, store, ledgerReader, GetTransactionRequest{hash}) require.NoError(t, err) require.Equal(t, GetTransactionResponse{ Status: TransactionStatusSuccess, diff --git a/cmd/soroban-rpc/internal/methods/get_transactions.go b/cmd/soroban-rpc/internal/methods/get_transactions.go index 6fe48187..1d16f259 100644 --- a/cmd/soroban-rpc/internal/methods/get_transactions.go +++ b/cmd/soroban-rpc/internal/methods/get_transactions.go @@ -87,7 +87,6 @@ type GetTransactionsResponse struct { type transactionsRPCHandler struct { ledgerReader db.LedgerReader - dbReader db.TransactionReader maxLimit uint defaultLimit uint logger *log.Entry @@ -97,7 +96,7 @@ type transactionsRPCHandler struct { // getTransactionsByLedgerSequence fetches transactions between the start and end ledgers, inclusive of both. // The number of ledgers returned can be tuned using the pagination options - cursor and limit. func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context, request GetTransactionsRequest) (GetTransactionsResponse, error) { - ledgerRange, err := h.dbReader.GetLedgerRange(ctx) + ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx) if err != nil { return GetTransactionsResponse{}, &jrpc2.Error{ Code: jrpc2.InternalError, @@ -139,7 +138,7 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont // Iterate through each ledger and its transactions until limit or end range is reached. // The latest ledger acts as the end ledger range for the request. var txns []TransactionInfo - var cursor *toid.ID + cursor := toid.New(0, 0, 0) LedgerLoop: for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ { // Get ledger close meta from db @@ -172,7 +171,7 @@ LedgerLoop: if ierr := reader.Seek(startTxIdx - 1); ierr != nil && ierr != io.EOF { return GetTransactionsResponse{}, &jrpc2.Error{ Code: jrpc2.InternalError, - Message: err.Error(), + Message: ierr.Error(), } } } @@ -184,7 +183,7 @@ LedgerLoop: ingestTx, err := reader.Read() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { // No more transactions to read. Start from next ledger break } @@ -234,17 +233,16 @@ LedgerLoop: }, nil } -func NewGetTransactionsHandler(logger *log.Entry, ledgerReader db.LedgerReader, dbReader db.TransactionReader, maxLimit, defaultLimit uint, networkPassphrase string) jrpc2.Handler { +func NewGetTransactionsHandler(logger *log.Entry, ledgerReader db.LedgerReader, maxLimit, + defaultLimit uint, networkPassphrase string, +) jrpc2.Handler { transactionsHandler := transactionsRPCHandler{ ledgerReader: ledgerReader, - dbReader: dbReader, maxLimit: maxLimit, defaultLimit: defaultLimit, logger: logger, networkPassphrase: networkPassphrase, } - return handler.New(func(context context.Context, request GetTransactionsRequest) (GetTransactionsResponse, error) { - return transactionsHandler.getTransactionsByLedgerSequence(context, request) - }) + return handler.New(transactionsHandler.getTransactionsByLedgerSequence) } diff --git a/cmd/soroban-rpc/internal/methods/get_transactions_test.go b/cmd/soroban-rpc/internal/methods/get_transactions_test.go index b02cc750..76f833c4 100644 --- a/cmd/soroban-rpc/internal/methods/get_transactions_test.go +++ b/cmd/soroban-rpc/internal/methods/get_transactions_test.go @@ -7,6 +7,7 @@ import ( "github.com/creachadair/jrpc2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" @@ -20,7 +21,7 @@ const ( // createTestLedger Creates a test ledger with 2 transactions func createTestLedger(sequence uint32) xdr.LedgerCloseMeta { - sequence = sequence - 100 + sequence -= 100 meta := txMeta(sequence, true) meta.V1.TxProcessing = append(meta.V1.TxProcessing, xdr.TransactionResultMeta{ TxApplyProcessing: xdr.TransactionMeta{ @@ -37,17 +38,16 @@ func createTestLedger(sequence uint32) xdr.LedgerCloseMeta { } func TestGetTransactions_DefaultLimit(t *testing.T) { - mockDbReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDbReader) + mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) + mockLedgerReader := db.NewMockLedgerReader(mockDBReader) for i := 1; i <= 10; i++ { meta := createTestLedger(uint32(i)) - err := mockDbReader.InsertTransactions(meta) - assert.NoError(t, err) + err := mockDBReader.InsertTransactions(meta) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDbReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -58,7 +58,7 @@ func TestGetTransactions_DefaultLimit(t *testing.T) { } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) - assert.NoError(t, err) + require.NoError(t, err) // assert latest ledger details assert.Equal(t, uint32(10), response.LatestLedger) @@ -77,12 +77,11 @@ func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { for i := 1; i <= 3; i++ { meta := createTestLedger(uint32(i)) err := mockDBReader.InsertTransactions(meta) - assert.NoError(t, err) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDBReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -93,7 +92,7 @@ func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) - assert.NoError(t, err) + require.NoError(t, err) // assert latest ledger details assert.Equal(t, uint32(3), response.LatestLedger) @@ -112,12 +111,11 @@ func TestGetTransactions_CustomLimit(t *testing.T) { for i := 1; i <= 10; i++ { meta := createTestLedger(uint32(i)) err := mockDBReader.InsertTransactions(meta) - assert.NoError(t, err) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDBReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -131,7 +129,7 @@ func TestGetTransactions_CustomLimit(t *testing.T) { } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) - assert.NoError(t, err) + require.NoError(t, err) // assert latest ledger details assert.Equal(t, uint32(10), response.LatestLedger) @@ -152,12 +150,11 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { for i := 1; i <= 10; i++ { meta := createTestLedger(uint32(i)) err := mockDBReader.InsertTransactions(meta) - assert.NoError(t, err) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDBReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -171,7 +168,7 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) - assert.NoError(t, err) + require.NoError(t, err) // assert latest ledger details assert.Equal(t, uint32(10), response.LatestLedger) @@ -188,17 +185,16 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { } func TestGetTransactions_InvalidStartLedger(t *testing.T) { - mockDbReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDbReader) + mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) + mockLedgerReader := db.NewMockLedgerReader(mockDBReader) for i := 1; i <= 3; i++ { meta := createTestLedger(uint32(i)) - err := mockDbReader.InsertTransactions(meta) - assert.NoError(t, err) + err := mockDBReader.InsertTransactions(meta) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDbReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -218,21 +214,20 @@ func TestGetTransactions_InvalidStartLedger(t *testing.T) { } func TestGetTransactions_LedgerNotFound(t *testing.T) { - mockDbReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDbReader) + mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) + mockLedgerReader := db.NewMockLedgerReader(mockDBReader) for i := 1; i <= 3; i++ { // Skip creation of ledger 2 if i == 2 { continue } meta := createTestLedger(uint32(i)) - err := mockDbReader.InsertTransactions(meta) - assert.NoError(t, err) + err := mockDBReader.InsertTransactions(meta) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDbReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -249,17 +244,16 @@ func TestGetTransactions_LedgerNotFound(t *testing.T) { } func TestGetTransactions_LimitGreaterThanMaxLimit(t *testing.T) { - mockDbReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDbReader) + mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) + mockLedgerReader := db.NewMockLedgerReader(mockDBReader) for i := 1; i <= 3; i++ { meta := createTestLedger(uint32(i)) - err := mockDbReader.InsertTransactions(meta) - assert.NoError(t, err) + err := mockDBReader.InsertTransactions(meta) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDbReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -278,17 +272,16 @@ func TestGetTransactions_LimitGreaterThanMaxLimit(t *testing.T) { } func TestGetTransactions_InvalidCursorString(t *testing.T) { - mockDbReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDbReader) + mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) + mockLedgerReader := db.NewMockLedgerReader(mockDBReader) for i := 1; i <= 3; i++ { meta := createTestLedger(uint32(i)) - err := mockDbReader.InsertTransactions(meta) - assert.NoError(t, err) + err := mockDBReader.InsertTransactions(meta) + require.NoError(t, err) } handler := transactionsRPCHandler{ ledgerReader: mockLedgerReader, - dbReader: mockDbReader, maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, diff --git a/cmd/soroban-rpc/internal/methods/health.go b/cmd/soroban-rpc/internal/methods/health.go index b1db3f75..8de0767c 100644 --- a/cmd/soroban-rpc/internal/methods/health.go +++ b/cmd/soroban-rpc/internal/methods/health.go @@ -20,11 +20,11 @@ type HealthCheckResult struct { // NewHealthCheck returns a health check json rpc handler func NewHealthCheck( retentionWindow uint32, - reader db.TransactionReader, + ledgerReader db.LedgerReader, maxHealthyLedgerLatency time.Duration, ) jrpc2.Handler { return NewHandler(func(ctx context.Context) (HealthCheckResult, error) { - ledgerRange, err := reader.GetLedgerRange(ctx) + ledgerRange, err := ledgerReader.GetLedgerRange(ctx) if err != nil || ledgerRange.LastLedger.Sequence < 1 { extra := "" if err != nil { diff --git a/cmd/soroban-rpc/internal/methods/send_transaction.go b/cmd/soroban-rpc/internal/methods/send_transaction.go index 8bd92b07..9cdd6a20 100644 --- a/cmd/soroban-rpc/internal/methods/send_transaction.go +++ b/cmd/soroban-rpc/internal/methods/send_transaction.go @@ -49,7 +49,7 @@ type SendTransactionRequest struct { func NewSendTransactionHandler( daemon interfaces.Daemon, logger *log.Entry, - reader db.TransactionReader, + ledgerReader db.LedgerReader, passphrase string, ) jrpc2.Handler { submitter := daemon.CoreClient() @@ -73,7 +73,7 @@ func NewSendTransactionHandler( } txHash := hex.EncodeToString(hash[:]) - ledgerInfo, err := reader.GetLedgerRange(ctx) + ledgerInfo, err := ledgerReader.GetLedgerRange(ctx) if err != nil { // still not fatal logger.WithError(err). WithField("tx", request.Transaction). diff --git a/go.mod b/go.mod index 5317d150..c604919d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 - github.com/stellar/go v0.0.0-20240617183518-100dc4fa6043 + github.com/stellar/go v0.0.0-20240628063057-b589529f102f github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 2a6e1022..ac3ba4e5 100644 --- a/go.sum +++ b/go.sum @@ -342,6 +342,8 @@ github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= github.com/stellar/go v0.0.0-20240617183518-100dc4fa6043 h1:5UQzsvt9VtD3ijpzPtdW0/lXWCNgDs6GzmLUE8ZuWfk= github.com/stellar/go v0.0.0-20240617183518-100dc4fa6043/go.mod h1:TuXKLL7WViqwrvpWno2I4UYGn2Ny9KZld1jUIN6fnK8= +github.com/stellar/go v0.0.0-20240628063057-b589529f102f h1:3W9JZJ0r87wy2M3wsACuJtKW/cNWXpfw5Jwyt89Am30= +github.com/stellar/go v0.0.0-20240628063057-b589529f102f/go.mod h1:4cVjIVyU8V1iSBEMGd41j22DAyBoz2SVL5TcrJPqePU= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=