Skip to content

Commit

Permalink
Add initial implementation of DB-backed tx store
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Apr 16, 2024
1 parent 9a98d4f commit dcbe0c0
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 5 deletions.
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func MustNew(cfg *config.Config) *Daemon {
}

if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
logger.Fatal("no history archives URLs were provided")
}

historyArchive, err := historyarchive.NewArchivePool(
Expand Down Expand Up @@ -273,6 +273,7 @@ func MustNew(cfg *config.Config) *Daemon {
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionGetter: db.NewTransactionHandler(dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down
9 changes: 9 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ type ReadWriter interface {
}

type WriteTx interface {
TransactionHandler() *TransactionHandler
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Commit(ledgerSeq uint32) error
Rollback() error
}
Expand Down Expand Up @@ -176,6 +178,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
maxBatchSize: rw.maxBatchSize,
},
ledgerRetentionWindow: rw.ledgerRetentionWindow,
txWriter: TransactionHandler{stmtCache: stmtCache},
}, nil
}

Expand All @@ -187,6 +190,8 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
ledgerRetentionWindow uint32

txWriter TransactionHandler
}

func (w writeTx) LedgerEntryWriter() LedgerEntryWriter {
Expand All @@ -197,6 +202,10 @@ func (w writeTx) LedgerWriter() LedgerWriter {
return w.ledgerWriter
}

func (w writeTx) TransactionHandler() *TransactionHandler {
return &w.txWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand Down
13 changes: 13 additions & 0 deletions cmd/soroban-rpc/internal/db/migrations/02_transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +migrate Up

-- index to find transactions in ledgers by hash
CREATE TABLE transactions (
hash BLOB PRIMARY KEY,
ledger_sequence INTEGER NOT NULL,
application_order INTEGER NOT NULL,
FOREIGN KEY (ledger_sequence)
REFERENCES ledger_close_meta (sequence)
);

-- +migrate Down
drop table transactions cascade;
193 changes: 193 additions & 0 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package db

import (
"encoding/hex"
"fmt"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions"
)

const (
transactionTableName = "transactions"
)

type TransactionHandler struct {
stmtCache *sq.StmtCache
db db.SessionInterface
passphrase string
}

func NewTransactionHandler(db db.SessionInterface, passphrase string) *TransactionHandler {
return &TransactionHandler{db: db, passphrase: passphrase}
}

func (txn *TransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm)
if err != nil {
return err
}
txCount := lcm.CountTransactions()
transactions := make(map[xdr.Hash]ingest.LedgerTransaction, txCount)

for i := 0; i < txCount; i++ {
tx, err := reader.Read()
if err != nil {
return err
}

if tx.Envelope.IsFeeBump() {
transactions[tx.Result.InnerHash()] = tx
}
transactions[tx.Result.TransactionHash] = tx
}

query := sq.
Insert(transactionTableName).
RunWith(txn.stmtCache).
Columns("hash", "ledger_sequence", "application_order")
for hash, tx := range transactions {
query = query.Values(hash, lcm.LedgerSequence(), tx.Index)
}

_, err = query.Exec()
return err
}

func (txn *TransactionHandler) GetLedgerRange() (
ledgerbucketwindow.LedgerRange,
error,
) {
var ledgerRange ledgerbucketwindow.LedgerRange
newestQ := sq.
Select("meta").
From(ledgerCloseMetaTableName).
OrderBy("sequence DESC").
Limit(1).
RunWith(txn.stmtCache).
QueryRow()
oldestQ := sq.
Select("meta").
From(ledgerCloseMetaTableName).
OrderBy("sequence ASC").
Limit(1).
RunWith(txn.stmtCache).
QueryRow()

var row1, row2 []byte
if err := newestQ.Scan(&row1); err != nil {
return ledgerRange, err
}
if err := oldestQ.Scan(&row2); err != nil {
return ledgerRange, err
}

var lcm1, lcm2 xdr.LedgerCloseMeta
if err := lcm1.UnmarshalBinary(row1); err != nil {
return ledgerRange, err
}
if err := lcm2.UnmarshalBinary(row2); err != nil {
return ledgerRange, err
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm1.LedgerSequence(),
CloseTime: int64(lcm1.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm2.LedgerSequence(),
CloseTime: int64(lcm2.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime),
},
}, nil
}

func (txn *TransactionHandler) GetTransactionByHash(hash string) (
xdr.LedgerCloseMeta, ingest.LedgerTransaction, error,
) {
rows := sq.
Select("t.application_order", "lcm.meta").
From(fmt.Sprintf("%s t", transactionTableName)).
Join(fmt.Sprintf("%s lcm ON (t.ledger_sequence = lcm.sequence)", ledgerCloseMetaTableName)).
Limit(1).
RunWith(txn.stmtCache).
QueryRow()

var row struct {
txIndex int
meta []byte
}
if err := rows.Scan(&row); err != nil {
return xdr.LedgerCloseMeta{}, ingest.LedgerTransaction{}, err
}

var lcm xdr.LedgerCloseMeta
if err := lcm.UnmarshalBinary(row.meta); err != nil {
return lcm, ingest.LedgerTransaction{}, err
}

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm)
reader.Seek(row.txIndex - 1)
if err != nil {
return lcm, ingest.LedgerTransaction{}, err
}

ledgerTx, err := reader.Read()
return lcm, ledgerTx, err
}

// GetTransaction conforms to the interface in
// methods/get_transaction.go#NewGetTransactionHandler so that it can be used
// directly against the RPC handler.
func (txn *TransactionHandler) GetTransaction(hash xdr.Hash) (
transactions.Transaction, bool, ledgerbucketwindow.LedgerRange,
) {
tx := transactions.Transaction{}

ledgerRange, err := txn.GetLedgerRange()
if err != nil {
return tx, false, ledgerRange
}

lcm, ingestTx, err := txn.GetTransactionByHash(hex.EncodeToString(hash[:]))
if err != nil {
return tx, false, ledgerRange
}

//
// On-the-fly ingestion: extract all of the fields
//

if tx.Result, err = ingestTx.Result.MarshalBinary(); err != nil {
return tx, false, ledgerRange
}
if tx.Meta, err = ingestTx.UnsafeMeta.MarshalBinary(); err != nil {
return tx, false, ledgerRange
}
if tx.Envelope, err = ingestTx.Envelope.MarshalBinary(); err != nil {
return tx, false, ledgerRange
}
if events, errr := ingestTx.GetDiagnosticEvents(); errr != nil {
tx.Events = make([][]byte, 0, len(events))
for _, event := range events {
bytes, ierr := event.MarshalBinary()
if ierr != nil {
return tx, false, ledgerRange
}
tx.Events = append(tx.Events, bytes)
}
}
tx.FeeBump = ingestTx.Envelope.IsFeeBump()
tx.ApplicationOrder = int32(ingestTx.Index)
tx.Successful = ingestTx.Result.Successful()
tx.Ledger = ledgerbucketwindow.LedgerInfo{
Sequence: lcm.LedgerSequence(),
CloseTime: int64(lcm.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime),
}

return tx, true, ledgerRange
}
5 changes: 5 additions & 0 deletions cmd/soroban-rpc/internal/ingest/mock_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (m MockTx) LedgerWriter() db.LedgerWriter {
return args.Get(0).(db.LedgerWriter)
}

func (m MockTx) TransactionHandler() *db.TransactionHandler {
args := m.Called()
return args.Get(0).(*db.TransactionHandler)
}

func (m MockTx) Commit(ledgerSeq uint32) error {
args := m.Called(ledgerSeq)
return args.Error(0)
Expand Down
14 changes: 11 additions & 3 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
if err := s.ingestLedgerEntryChanges(ctx, reader, tx, 0); err != nil {
return err
}

if err := reader.Close(); err != nil {
return err
}
Expand All @@ -291,31 +292,38 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
return err
}

if err := tx.TransactionHandler().InsertTransactions(ledgerCloseMeta); err != nil {
return err
}

if err := tx.Commit(sequence); err != nil {
return err
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
With(prometheus.Labels{"type": "total"}).
Observe(time.Since(startTime).Seconds())
s.metrics.latestLedgerMetric.Set(float64(sequence))
return nil
}

func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error {
startTime := time.Now()

if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
if err := s.transactionStore.IngestTransactions(ledgerCloseMeta); err != nil {
return err
}

if err := s.transactionStore.IngestTransactions(ledgerCloseMeta); err != nil {
if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
return err
}

return nil
}
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (h Handler) Close() {
type HandlerParams struct {
EventStore *events.MemoryStore
TransactionStore *transactions.MemoryStore
TransactionGetter *db.TransactionHandler
LedgerEntryReader db.LedgerEntryReader
LedgerReader db.LedgerReader
Logger *log.Entry
Expand Down Expand Up @@ -194,7 +195,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {
},
{
methodName: "getTransaction",
underlyingHandler: methods.NewGetTransactionHandler(params.TransactionStore),
underlyingHandler: methods.NewGetTransactionHandler(params.TransactionGetter),
longName: "get_transaction",
queueLimit: cfg.RequestBacklogGetTransactionQueueLimit,
requestDurationLimit: cfg.MaxGetTransactionExecutionDuration,
Expand Down

0 comments on commit dcbe0c0

Please sign in to comment.