diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index c2c224de..71619ae3 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -149,7 +149,7 @@ func MustNew(cfg *config.Config) *Daemon { } if len(cfg.HistoryArchiveURLs) == 0 { - logger.Fatal("no history archives url were provided") + logger.Fatal("no history archives URLs were provided") } historyArchive, err := historyarchive.NewArchivePool( @@ -273,6 +273,7 @@ func MustNew(cfg *config.Config) *Daemon { Logger: logger, LedgerReader: db.NewLedgerReader(dbConn), LedgerEntryReader: db.NewLedgerEntryReader(dbConn), + TransactionGetter: db.NewTransactionHandler(dbConn, cfg.NetworkPassphrase), PreflightGetter: preflightWorkerPool, }) diff --git a/cmd/soroban-rpc/internal/db/db.go b/cmd/soroban-rpc/internal/db/db.go index 428f29fe..404a6bd0 100644 --- a/cmd/soroban-rpc/internal/db/db.go +++ b/cmd/soroban-rpc/internal/db/db.go @@ -34,8 +34,10 @@ type ReadWriter interface { } type WriteTx interface { + TransactionHandler() *TransactionHandler LedgerEntryWriter() LedgerEntryWriter LedgerWriter() LedgerWriter + Commit(ledgerSeq uint32) error Rollback() error } @@ -176,6 +178,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) { maxBatchSize: rw.maxBatchSize, }, ledgerRetentionWindow: rw.ledgerRetentionWindow, + txWriter: TransactionHandler{stmtCache: stmtCache}, }, nil } @@ -187,6 +190,8 @@ type writeTx struct { ledgerEntryWriter ledgerEntryWriter ledgerWriter ledgerWriter ledgerRetentionWindow uint32 + + txWriter TransactionHandler } func (w writeTx) LedgerEntryWriter() LedgerEntryWriter { @@ -197,6 +202,10 @@ func (w writeTx) LedgerWriter() LedgerWriter { return w.ledgerWriter } +func (w writeTx) TransactionHandler() *TransactionHandler { + return &w.txWriter +} + func (w writeTx) Commit(ledgerSeq uint32) error { if err := w.ledgerEntryWriter.flush(); err != nil { return err diff --git a/cmd/soroban-rpc/internal/db/migrations/02_transactions.sql b/cmd/soroban-rpc/internal/db/migrations/02_transactions.sql new file mode 100644 index 00000000..dfdddd3e --- /dev/null +++ b/cmd/soroban-rpc/internal/db/migrations/02_transactions.sql @@ -0,0 +1,13 @@ +-- +migrate Up + +-- index to find transactions in ledgers by hash +CREATE TABLE transactions ( + hash BLOB PRIMARY KEY, + ledger_sequence INTEGER NOT NULL, + application_order INTEGER NOT NULL, + FOREIGN KEY (ledger_sequence) + REFERENCES ledger_close_meta (sequence) +); + +-- +migrate Down +drop table transactions cascade; diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go new file mode 100644 index 00000000..ba9c1c97 --- /dev/null +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -0,0 +1,193 @@ +package db + +import ( + "encoding/hex" + "fmt" + + sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions" +) + +const ( + transactionTableName = "transactions" +) + +type TransactionHandler struct { + stmtCache *sq.StmtCache + db db.SessionInterface + passphrase string +} + +func NewTransactionHandler(db db.SessionInterface, passphrase string) *TransactionHandler { + return &TransactionHandler{db: db, passphrase: passphrase} +} + +func (txn *TransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error { + reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm) + if err != nil { + return err + } + txCount := lcm.CountTransactions() + transactions := make(map[xdr.Hash]ingest.LedgerTransaction, txCount) + + for i := 0; i < txCount; i++ { + tx, err := reader.Read() + if err != nil { + return err + } + + if tx.Envelope.IsFeeBump() { + transactions[tx.Result.InnerHash()] = tx + } + transactions[tx.Result.TransactionHash] = tx + } + + query := sq. + Insert(transactionTableName). + RunWith(txn.stmtCache). + Columns("hash", "ledger_sequence", "application_order") + for hash, tx := range transactions { + query = query.Values(hash, lcm.LedgerSequence(), tx.Index) + } + + _, err = query.Exec() + return err +} + +func (txn *TransactionHandler) GetLedgerRange() ( + ledgerbucketwindow.LedgerRange, + error, +) { + var ledgerRange ledgerbucketwindow.LedgerRange + newestQ := sq. + Select("meta"). + From(ledgerCloseMetaTableName). + OrderBy("sequence DESC"). + Limit(1). + RunWith(txn.stmtCache). + QueryRow() + oldestQ := sq. + Select("meta"). + From(ledgerCloseMetaTableName). + OrderBy("sequence ASC"). + Limit(1). + RunWith(txn.stmtCache). + QueryRow() + + var row1, row2 []byte + if err := newestQ.Scan(&row1); err != nil { + return ledgerRange, err + } + if err := oldestQ.Scan(&row2); err != nil { + return ledgerRange, err + } + + var lcm1, lcm2 xdr.LedgerCloseMeta + if err := lcm1.UnmarshalBinary(row1); err != nil { + return ledgerRange, err + } + if err := lcm2.UnmarshalBinary(row2); err != nil { + return ledgerRange, err + } + + return ledgerbucketwindow.LedgerRange{ + FirstLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: lcm1.LedgerSequence(), + CloseTime: int64(lcm1.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), + }, + LastLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: lcm2.LedgerSequence(), + CloseTime: int64(lcm2.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), + }, + }, nil +} + +func (txn *TransactionHandler) GetTransactionByHash(hash string) ( + xdr.LedgerCloseMeta, ingest.LedgerTransaction, error, +) { + rows := sq. + Select("t.application_order", "lcm.meta"). + From(fmt.Sprintf("%s t", transactionTableName)). + Join(fmt.Sprintf("%s lcm ON (t.ledger_sequence = lcm.sequence)", ledgerCloseMetaTableName)). + Limit(1). + RunWith(txn.stmtCache). + QueryRow() + + var row struct { + txIndex int + meta []byte + } + if err := rows.Scan(&row); err != nil { + return xdr.LedgerCloseMeta{}, ingest.LedgerTransaction{}, err + } + + var lcm xdr.LedgerCloseMeta + if err := lcm.UnmarshalBinary(row.meta); err != nil { + return lcm, ingest.LedgerTransaction{}, err + } + + reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm) + reader.Seek(row.txIndex - 1) + if err != nil { + return lcm, ingest.LedgerTransaction{}, err + } + + ledgerTx, err := reader.Read() + return lcm, ledgerTx, err +} + +// GetTransaction conforms to the interface in +// methods/get_transaction.go#NewGetTransactionHandler so that it can be used +// directly against the RPC handler. +func (txn *TransactionHandler) GetTransaction(hash xdr.Hash) ( + transactions.Transaction, bool, ledgerbucketwindow.LedgerRange, +) { + tx := transactions.Transaction{} + + ledgerRange, err := txn.GetLedgerRange() + if err != nil { + return tx, false, ledgerRange + } + + lcm, ingestTx, err := txn.GetTransactionByHash(hex.EncodeToString(hash[:])) + if err != nil { + return tx, false, ledgerRange + } + + // + // On-the-fly ingestion: extract all of the fields + // + + if tx.Result, err = ingestTx.Result.MarshalBinary(); err != nil { + return tx, false, ledgerRange + } + if tx.Meta, err = ingestTx.UnsafeMeta.MarshalBinary(); err != nil { + return tx, false, ledgerRange + } + if tx.Envelope, err = ingestTx.Envelope.MarshalBinary(); err != nil { + return tx, false, ledgerRange + } + if events, errr := ingestTx.GetDiagnosticEvents(); errr != nil { + tx.Events = make([][]byte, 0, len(events)) + for _, event := range events { + bytes, ierr := event.MarshalBinary() + if ierr != nil { + return tx, false, ledgerRange + } + tx.Events = append(tx.Events, bytes) + } + } + tx.FeeBump = ingestTx.Envelope.IsFeeBump() + tx.ApplicationOrder = int32(ingestTx.Index) + tx.Successful = ingestTx.Result.Successful() + tx.Ledger = ledgerbucketwindow.LedgerInfo{ + Sequence: lcm.LedgerSequence(), + CloseTime: int64(lcm.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), + } + + return tx, true, ledgerRange +} diff --git a/cmd/soroban-rpc/internal/ingest/mock_db_test.go b/cmd/soroban-rpc/internal/ingest/mock_db_test.go index cb883eac..5a6252c0 100644 --- a/cmd/soroban-rpc/internal/ingest/mock_db_test.go +++ b/cmd/soroban-rpc/internal/ingest/mock_db_test.go @@ -44,6 +44,11 @@ func (m MockTx) LedgerWriter() db.LedgerWriter { return args.Get(0).(db.LedgerWriter) } +func (m MockTx) TransactionHandler() *db.TransactionHandler { + args := m.Called() + return args.Get(0).(*db.TransactionHandler) +} + func (m MockTx) Commit(ledgerSeq uint32) error { args := m.Called(ledgerSeq) return args.Error(0) diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 931abfe8..63977c94 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -273,6 +273,7 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { if err := s.ingestLedgerEntryChanges(ctx, reader, tx, 0); err != nil { return err } + if err := reader.Close(); err != nil { return err } @@ -291,31 +292,38 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { return err } + if err := tx.TransactionHandler().InsertTransactions(ledgerCloseMeta); err != nil { + return err + } + if err := tx.Commit(sequence); err != nil { return err } s.logger.Debugf("Ingested ledger %d", sequence) s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds()) + With(prometheus.Labels{"type": "total"}). + Observe(time.Since(startTime).Seconds()) s.metrics.latestLedgerMetric.Set(float64(sequence)) return nil } func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error { startTime := time.Now() + if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil { return err } s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds()) - if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { + if err := s.transactionStore.IngestTransactions(ledgerCloseMeta); err != nil { return err } - if err := s.transactionStore.IngestTransactions(ledgerCloseMeta); err != nil { + if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { return err } + return nil } diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index b0ceb372..8d80de1a 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -48,6 +48,7 @@ func (h Handler) Close() { type HandlerParams struct { EventStore *events.MemoryStore TransactionStore *transactions.MemoryStore + TransactionGetter *db.TransactionHandler LedgerEntryReader db.LedgerEntryReader LedgerReader db.LedgerReader Logger *log.Entry @@ -194,7 +195,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { }, { methodName: "getTransaction", - underlyingHandler: methods.NewGetTransactionHandler(params.TransactionStore), + underlyingHandler: methods.NewGetTransactionHandler(params.TransactionGetter), longName: "get_transaction", queueLimit: cfg.RequestBacklogGetTransactionQueueLimit, requestDurationLimit: cfg.MaxGetTransactionExecutionDuration,