diff --git a/cmd/stellar-rpc/internal/db/ledger.go b/cmd/stellar-rpc/internal/db/ledger.go index 762309ca..7dd03611 100644 --- a/cmd/stellar-rpc/internal/db/ledger.go +++ b/cmd/stellar-rpc/internal/db/ledger.go @@ -28,6 +28,7 @@ type LedgerReader interface { } type LedgerReaderTx interface { + GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error) Done() error @@ -77,6 +78,11 @@ func (l ledgerReaderTx) BatchGetLedgers(ctx context.Context, sequence uint32, return results, nil } +// GetLedger fetches a single ledger from the db using a transaction. +func (l ledgerReaderTx) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { + return getLedgerFromDB(ctx, l.tx, sequence) +} + func (l ledgerReaderTx) Done() error { return l.tx.Rollback() } @@ -151,20 +157,7 @@ func (r ledgerReader) StreamLedgerRange( // GetLedger fetches a single ledger from the db. func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { - sql := sq.Select("meta").From(ledgerCloseMetaTableName).Where(sq.Eq{"sequence": sequence}) - var results []xdr.LedgerCloseMeta - if err := r.db.Select(ctx, &results, sql); err != nil { - return xdr.LedgerCloseMeta{}, false, err - } - switch len(results) { - case 0: - return xdr.LedgerCloseMeta{}, false, nil - case 1: - return results[0], true, nil - default: - return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", - len(results), sequence, ledgerCloseMetaTableName) - } + return getLedgerFromDB(ctx, r.db, sequence) } // GetLedgerRange pulls the min/max ledger sequence numbers from the meta table. @@ -260,6 +253,25 @@ func (l ledgerWriter) trimLedgers(latestLedgerSeq uint32, retentionWindow uint32 return err } +// getLedgerFromDB is a helper function that encapsulates the common logic +// for fetching a single ledger from the database +func getLedgerFromDB(ctx context.Context, db readDB, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { + sql := sq.Select("meta").From(ledgerCloseMetaTableName).Where(sq.Eq{"sequence": sequence}) + var results []xdr.LedgerCloseMeta + if err := db.Select(ctx, &results, sql); err != nil { + return xdr.LedgerCloseMeta{}, false, err + } + switch len(results) { + case 0: + return xdr.LedgerCloseMeta{}, false, nil + case 1: + return results[0], true, nil + default: + return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", + len(results), sequence, ledgerCloseMetaTableName) + } +} + // InsertLedger inserts a ledger in the db. func (l ledgerWriter) InsertLedger(ledger xdr.LedgerCloseMeta) error { _, err := sq.StatementBuilder.RunWith(l.stmtCache). diff --git a/cmd/stellar-rpc/internal/methods/get_transactions.go b/cmd/stellar-rpc/internal/methods/get_transactions.go index 1f3d4b33..20d4c6a2 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions.go @@ -136,8 +136,10 @@ func (h transactionsRPCHandler) initializePagination(request GetTransactionsRequ } // fetchLedgerData calls the meta table to fetch the corresponding ledger data. -func (h transactionsRPCHandler) fetchLedgerData(ctx context.Context, ledgerSeq uint32) (xdr.LedgerCloseMeta, error) { - ledger, found, err := h.ledgerReader.GetLedger(ctx, ledgerSeq) +func (h transactionsRPCHandler) fetchLedgerData(ctx context.Context, ledgerSeq uint32, + readTx db.LedgerReaderTx, +) (xdr.LedgerCloseMeta, error) { + ledger, found, err := readTx.GetLedger(ctx, ledgerSeq) if err != nil { return ledger, &jrpc2.Error{ Code: jrpc2.InternalError, @@ -262,7 +264,18 @@ func (h transactionsRPCHandler) processTransactionsInLedger( func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context, request GetTransactionsRequest, ) (GetTransactionsResponse, error) { - ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx) + readTx, err := h.ledgerReader.NewTx(ctx) + if err != nil { + return GetTransactionsResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: err.Error(), + } + } + defer func() { + _ = readTx.Done() + }() + + ledgerRange, err := readTx.GetLedgerRange(ctx) if err != nil { return GetTransactionsResponse{}, &jrpc2.Error{ Code: jrpc2.InternalError, @@ -289,7 +302,7 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont var done bool cursor := toid.New(0, 0, 0) for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ { - ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq)) + ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq), readTx) if err != nil { return GetTransactionsResponse{}, err } diff --git a/cmd/stellar-rpc/internal/methods/get_transactions_test.go b/cmd/stellar-rpc/internal/methods/get_transactions_test.go index 3ad026ab..f8a91bc5 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions_test.go @@ -10,9 +10,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stellar/go/support/log" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" ) @@ -53,17 +55,26 @@ func createTestLedger(sequence uint32) xdr.LedgerCloseMeta { return meta } -func TestGetTransactions_DefaultLimit(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 10; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) +func setupDB(t *testing.T, numLedgers int, skipLedger int) *db.DB { + testDB := NewTestDB(t) + daemon := interfaces.MakeNoOpDeamon() + for sequence := 1; sequence <= numLedgers; sequence++ { + if sequence == skipLedger { + continue + } + ledgerCloseMeta := createTestLedger(uint32(sequence)) + tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 100, passphrase).NewTx(context.Background()) require.NoError(t, err) + require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta)) + require.NoError(t, tx.Commit(ledgerCloseMeta)) } + return testDB +} +func TestGetTransactions_DefaultLimit(t *testing.T) { //nolint:dupl + testDB := setupDB(t, 10, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -90,17 +101,10 @@ func TestGetTransactions_DefaultLimit(t *testing.T) { assert.Equal(t, expectedTransactionInfo, response.Transactions[0]) } -func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - +func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { //nolint:dupl + testDB := setupDB(t, 3, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -112,32 +116,17 @@ func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) require.NoError(t, err) - - // assert latest ledger details assert.Equal(t, uint32(3), response.LatestLedger) assert.Equal(t, int64(175), response.LatestLedgerCloseTime) - - // assert pagination assert.Equal(t, toid.New(3, 2, 1).String(), response.Cursor) - - // assert transactions result assert.Len(t, response.Transactions, 6) - - // assert the transaction structure. We will match only 1 tx for sanity purposes. assert.Equal(t, expectedTransactionInfo, response.Transactions[0]) } func TestGetTransactions_CustomLimit(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 10; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 10, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -152,34 +141,19 @@ func TestGetTransactions_CustomLimit(t *testing.T) { response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) require.NoError(t, err) - - // assert latest ledger details assert.Equal(t, uint32(10), response.LatestLedger) assert.Equal(t, int64(350), response.LatestLedgerCloseTime) - - // assert pagination assert.Equal(t, toid.New(1, 2, 1).String(), response.Cursor) - - // assert transactions result assert.Len(t, response.Transactions, 2) assert.Equal(t, uint32(1), response.Transactions[0].Ledger) assert.Equal(t, uint32(1), response.Transactions[1].Ledger) - - // assert the transaction structure. We will match only 1 tx for sanity purposes. assert.Equal(t, expectedTransactionInfo, response.Transactions[0]) } func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 10; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 10, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -194,15 +168,9 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) require.NoError(t, err) - - // assert latest ledger details assert.Equal(t, uint32(10), response.LatestLedger) assert.Equal(t, int64(350), response.LatestLedgerCloseTime) - - // assert pagination assert.Equal(t, toid.New(3, 1, 1).String(), response.Cursor) - - // assert transactions result assert.Len(t, response.Transactions, 3) assert.Equal(t, uint32(2), response.Transactions[0].Ledger) assert.Equal(t, uint32(2), response.Transactions[1].Ledger) @@ -210,16 +178,9 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { } func TestGetTransactions_InvalidStartLedger(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 3, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -239,20 +200,9 @@ func TestGetTransactions_InvalidStartLedger(t *testing.T) { } func TestGetTransactions_LedgerNotFound(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - // Skip creation of ledger 2 - if i == 2 { - continue - } - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 3, 2) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -269,16 +219,9 @@ func TestGetTransactions_LedgerNotFound(t *testing.T) { } func TestGetTransactions_LimitGreaterThanMaxLimit(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 3, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -297,16 +240,9 @@ func TestGetTransactions_LimitGreaterThanMaxLimit(t *testing.T) { } func TestGetTransactions_InvalidCursorString(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 3, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase, @@ -324,16 +260,9 @@ func TestGetTransactions_InvalidCursorString(t *testing.T) { } func TestGetTransactions_JSONFormat(t *testing.T) { - mockDBReader := db.NewMockTransactionStore(NetworkPassphrase) - mockLedgerReader := db.NewMockLedgerReader(mockDBReader) - for i := 1; i <= 3; i++ { - meta := createTestLedger(uint32(i)) - err := mockDBReader.InsertTransactions(meta) - require.NoError(t, err) - } - + testDB := setupDB(t, 3, 0) handler := transactionsRPCHandler{ - ledgerReader: mockLedgerReader, + ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, defaultLimit: 10, networkPassphrase: NetworkPassphrase,