Skip to content

Commit

Permalink
Break getTransactionsByLedgerSequence into smaller sub-functions (#244
Browse files Browse the repository at this point in the history
)

* reduce function length

* nit changes - 1

* nit changes - 2

* Update cmd/soroban-rpc/internal/methods/get_transactions.go

Co-authored-by: George <[email protected]>

* Remove LedgerLoop tag

* Fix failing test

* change function docstring

* Change tx index instead of creating new cursor object

* Return nil instead of cursor object in case of an error

* Fix lint issue

* Return toid.ID instead of a *toid.ID

---------

Co-authored-by: George <[email protected]>
  • Loading branch information
aditya1702 and Shaptic authored Jul 25, 2024
1 parent ee27f46 commit 2d6faac
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 90 deletions.
210 changes: 121 additions & 89 deletions cmd/soroban-rpc/internal/methods/get_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
Expand Down Expand Up @@ -93,38 +94,19 @@ type transactionsRPCHandler struct {
networkPassphrase string
}

// getTransactionsByLedgerSequence fetches transactions between the start and end ledgers, inclusive of both.
// The number of ledgers returned can be tuned using the pagination options - cursor and limit.
func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context, request GetTransactionsRequest) (GetTransactionsResponse, error) {
ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

err = request.isValid(h.maxLimit, ledgerRange)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest,
Message: err.Error(),
}
}

// Move start to pagination cursor
// initializePagination sets the pagination limit and cursor
func (h transactionsRPCHandler) initializePagination(request GetTransactionsRequest) (toid.ID, uint, error) {
start := toid.New(int32(request.StartLedger), 1, 1)
limit := h.defaultLimit
if request.Pagination != nil {
if request.Pagination.Cursor != "" {
cursorInt, err := strconv.ParseInt(request.Pagination.Cursor, 10, 64)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
return toid.ID{}, 0, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: err.Error(),
}
}

*start = toid.Parse(cursorInt)
// increment tx index because, when paginating,
// we start with the item right after the cursor
Expand All @@ -134,92 +116,142 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont
limit = request.Pagination.Limit
}
}
return *start, limit, nil
}

// Iterate through each ledger and its transactions until limit or end range is reached.
// The latest ledger acts as the end ledger range for the request.
var txns []TransactionInfo
cursor := toid.New(0, 0, 0)
LedgerLoop:
for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ {
// Get ledger close meta from db
ledger, found, err := h.ledgerReader.GetLedger(ctx, uint32(ledgerSeq))
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
// fetchLedgerData calls the meta table to fetch the corresponding ledger data.
func (h transactionsRPCHandler) fetchLedgerData(ctx context.Context, ledgerSeq uint32) (xdr.LedgerCloseMeta, error) {
ledger, found, err := h.ledgerReader.GetLedger(ctx, ledgerSeq)
if err != nil {
return ledger, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
} else if !found {
return ledger, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq),
}
}
return ledger, nil
}

// processTransactionsInLedger cycles through all the transactions in a ledger, extracts the transaction info
// and builds the list of transactions.
func (h transactionsRPCHandler) processTransactionsInLedger(ledger xdr.LedgerCloseMeta, start toid.ID,
txns *[]TransactionInfo, limit uint,
) (*toid.ID, bool, error) {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(h.networkPassphrase, ledger)
if err != nil {
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

startTxIdx := 1
ledgerSeq := ledger.LedgerSequence()
if int32(ledgerSeq) == start.LedgerSequence {
startTxIdx = int(start.TransactionOrder)
if ierr := reader.Seek(startTxIdx - 1); ierr != nil && !errors.Is(ierr, io.EOF) {
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
Message: ierr.Error(),
}
}
}

txCount := ledger.CountTransactions()
cursor := toid.New(int32(ledgerSeq), 0, 1)
for i := startTxIdx; i <= txCount; i++ {
cursor.TransactionOrder = int32(i)

ingestTx, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
} else if !found {
return GetTransactionsResponse{}, &jrpc2.Error{
return nil, false, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("ledger close meta not found: %d", ledgerSeq),
Message: err.Error(),
}
}

// Initialize tx reader.
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(h.networkPassphrase, ledger)
tx, err := db.ParseTransaction(ledger, ingestTx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

// Move the reader to specific tx idx
startTxIdx := 1
if ledgerSeq == start.LedgerSequence {
startTxIdx = int(start.TransactionOrder)
if ierr := reader.Seek(startTxIdx - 1); ierr != nil && ierr != io.EOF {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: ierr.Error(),
}
}
txInfo := TransactionInfo{
ApplicationOrder: tx.ApplicationOrder,
FeeBump: tx.FeeBump,
ResultXdr: base64.StdEncoding.EncodeToString(tx.Result),
ResultMetaXdr: base64.StdEncoding.EncodeToString(tx.Meta),
EnvelopeXdr: base64.StdEncoding.EncodeToString(tx.Envelope),
DiagnosticEventsXDR: base64EncodeSlice(tx.Events),
Ledger: tx.Ledger.Sequence,
LedgerCloseTime: tx.Ledger.CloseTime,
}
txInfo.Status = TransactionStatusFailed
if tx.Successful {
txInfo.Status = TransactionStatusSuccess
}

// Decode transaction info from ledger meta
txCount := ledger.CountTransactions()
for i := startTxIdx; i <= txCount; i++ {
cursor = toid.New(int32(ledger.LedgerSequence()), int32(i), 1)
*txns = append(*txns, txInfo)
if len(*txns) >= int(limit) {
return cursor, true, nil
}
}

ingestTx, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
// No more transactions to read. Start from next ledger
break
}
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: err.Error(),
}
}
return cursor, false, nil
}

tx, err := db.ParseTransaction(ledger, ingestTx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}
// getTransactionsByLedgerSequence fetches transactions between the start and end ledgers, inclusive of both.
// The number of ledgers returned can be tuned using the pagination options - cursor and limit.
func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context,
request GetTransactionsRequest,
) (GetTransactionsResponse, error) {
ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

txInfo := TransactionInfo{
ApplicationOrder: tx.ApplicationOrder,
FeeBump: tx.FeeBump,
ResultXdr: base64.StdEncoding.EncodeToString(tx.Result),
ResultMetaXdr: base64.StdEncoding.EncodeToString(tx.Meta),
EnvelopeXdr: base64.StdEncoding.EncodeToString(tx.Envelope),
DiagnosticEventsXDR: base64EncodeSlice(tx.Events),
Ledger: tx.Ledger.Sequence,
LedgerCloseTime: tx.Ledger.CloseTime,
}
txInfo.Status = TransactionStatusFailed
if tx.Successful {
txInfo.Status = TransactionStatusSuccess
}
err = request.isValid(h.maxLimit, ledgerRange)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest,
Message: err.Error(),
}
}

txns = append(txns, txInfo)
if len(txns) >= int(limit) {
break LedgerLoop
}
start, limit, err := h.initializePagination(request)
if err != nil {
return GetTransactionsResponse{}, err
}

// Iterate through each ledger and its transactions until limit or end range is reached.
// The latest ledger acts as the end ledger range for the request.
var txns []TransactionInfo
var done bool
cursor := toid.New(0, 0, 0)
for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ {
ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq))
if err != nil {
return GetTransactionsResponse{}, err
}

cursor, done, err = h.processTransactionsInLedger(ledger, start, &txns, limit)
if err != nil {
return GetTransactionsResponse{}, err
}
if done {
break
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/methods/get_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestGetTransactions_LedgerNotFound(t *testing.T) {
}

response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request)
expectedErr := fmt.Errorf("[%d] ledger close meta not found: 2", jrpc2.InvalidParams)
expectedErr := fmt.Errorf("[%d] database does not contain metadata for ledger: 2", jrpc2.InvalidParams)
assert.Equal(t, expectedErr.Error(), err.Error())
assert.Nil(t, response.Transactions)
}
Expand Down

0 comments on commit 2d6faac

Please sign in to comment.