diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 71619ae3..8b5a3e77 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -29,7 +29,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/util" ) @@ -190,11 +189,6 @@ func MustNew(cfg *config.Config) *Daemon { cfg.NetworkPassphrase, cfg.EventLedgerRetentionWindow, ) - transactionStore := transactions.NewMemoryStore( - daemon, - cfg.NetworkPassphrase, - cfg.TransactionLedgerRetentionWindow, - ) // initialize the stores using what was on the DB readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) @@ -219,9 +213,6 @@ func MustNew(cfg *config.Config) *Daemon { if err := eventStore.IngestEvents(txmeta); err != nil { logger.WithError(err).Fatal("could not initialize event memory store") } - if err := transactionStore.IngestTransactions(txmeta); err != nil { - logger.WithError(err).Fatal("could not initialize transaction memory store") - } return nil }) if currentSeq != 0 { @@ -246,7 +237,6 @@ func MustNew(cfg *config.Config) *Daemon { Logger: logger, DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, maxRetentionWindow), EventStore: eventStore, - TransactionStore: transactionStore, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: historyArchive, LedgerBackend: core, @@ -269,7 +259,6 @@ func MustNew(cfg *config.Config) *Daemon { jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{ Daemon: daemon, EventStore: eventStore, - TransactionStore: transactionStore, Logger: logger, LedgerReader: db.NewLedgerReader(dbConn), LedgerEntryReader: db.NewLedgerEntryReader(dbConn), diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index ba9c1c97..7b8c9604 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -16,6 +16,17 @@ const ( transactionTableName = "transactions" ) +type Transaction struct { + Result []byte // XDR encoded xdr.TransactionResult + Meta []byte // XDR encoded xdr.TransactionMeta + Envelope []byte // XDR encoded xdr.TransactionEnvelope + Events [][]byte // XDR encoded xdr.DiagnosticEvent + FeeBump bool + ApplicationOrder int32 + Successful bool + Ledger ledgerbucketwindow.LedgerInfo +} + type TransactionHandler struct { stmtCache *sq.StmtCache db db.SessionInterface @@ -58,10 +69,7 @@ func (txn *TransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error return err } -func (txn *TransactionHandler) GetLedgerRange() ( - ledgerbucketwindow.LedgerRange, - error, -) { +func (txn *TransactionHandler) GetLedgerRange() ledgerbucketwindow.LedgerRange { var ledgerRange ledgerbucketwindow.LedgerRange newestQ := sq. Select("meta"). @@ -80,18 +88,18 @@ func (txn *TransactionHandler) GetLedgerRange() ( var row1, row2 []byte if err := newestQ.Scan(&row1); err != nil { - return ledgerRange, err + return ledgerRange } if err := oldestQ.Scan(&row2); err != nil { - return ledgerRange, err + return ledgerRange } var lcm1, lcm2 xdr.LedgerCloseMeta if err := lcm1.UnmarshalBinary(row1); err != nil { - return ledgerRange, err + return ledgerRange } if err := lcm2.UnmarshalBinary(row2); err != nil { - return ledgerRange, err + return ledgerRange } return ledgerbucketwindow.LedgerRange{ @@ -103,7 +111,7 @@ func (txn *TransactionHandler) GetLedgerRange() ( Sequence: lcm2.LedgerSequence(), CloseTime: int64(lcm2.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), }, - }, nil + } } func (txn *TransactionHandler) GetTransactionByHash(hash string) ( @@ -148,11 +156,7 @@ func (txn *TransactionHandler) GetTransaction(hash xdr.Hash) ( ) { tx := transactions.Transaction{} - ledgerRange, err := txn.GetLedgerRange() - if err != nil { - return tx, false, ledgerRange - } - + ledgerRange := txn.GetLedgerRange() lcm, ingestTx, err := txn.GetTransactionByHash(hex.EncodeToString(hash[:])) if err != nil { return tx, false, ledgerRange diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 63977c94..35134eda 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -21,7 +21,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/util" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions" ) const ( @@ -34,7 +33,6 @@ type Config struct { Logger *log.Entry DB db.ReadWriter EventStore *events.MemoryStore - TransactionStore *transactions.MemoryStore NetworkPassPhrase string Archive historyarchive.ArchiveInterface LedgerBackend backends.LedgerBackend @@ -82,7 +80,6 @@ func newService(cfg Config) *Service { logger: cfg.Logger, db: cfg.DB, eventStore: cfg.EventStore, - transactionStore: cfg.TransactionStore, ledgerBackend: cfg.LedgerBackend, networkPassPhrase: cfg.NetworkPassPhrase, timeout: cfg.Timeout, @@ -134,7 +131,6 @@ type Service struct { logger *log.Entry db db.ReadWriter eventStore *events.MemoryStore - transactionStore *transactions.MemoryStore ledgerBackend backends.LedgerBackend timeout time.Duration networkPassPhrase string @@ -317,10 +313,6 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds()) - if err := s.transactionStore.IngestTransactions(ledgerCloseMeta); err != nil { - return err - } - if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { return err } diff --git a/cmd/soroban-rpc/internal/ingest/service_test.go b/cmd/soroban-rpc/internal/ingest/service_test.go index 1fb233c0..427a476a 100644 --- a/cmd/soroban-rpc/internal/ingest/service_test.go +++ b/cmd/soroban-rpc/internal/ingest/service_test.go @@ -17,7 +17,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions" ) type ErrorReadWriter struct { @@ -46,7 +45,6 @@ func TestRetryRunningIngestion(t *testing.T) { Logger: supportlog.New(), DB: &ErrorReadWriter{}, EventStore: nil, - TransactionStore: nil, NetworkPassPhrase: "", Archive: nil, LedgerBackend: nil, @@ -71,7 +69,6 @@ func TestIngestion(t *testing.T) { Logger: supportlog.New(), DB: mockDB, EventStore: events.NewMemoryStore(daemon, network.TestNetworkPassphrase, 1), - TransactionStore: transactions.NewMemoryStore(daemon, network.TestNetworkPassphrase, 1), LedgerBackend: mockLedgerBackend, Daemon: daemon, NetworkPassPhrase: network.TestNetworkPassphrase, diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index 8d80de1a..6369f7b4 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -22,7 +22,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/methods" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/network" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/transactions" ) // maxHTTPRequestSize defines the largest request size that the http handler @@ -47,7 +46,6 @@ func (h Handler) Close() { type HandlerParams struct { EventStore *events.MemoryStore - TransactionStore *transactions.MemoryStore TransactionGetter *db.TransactionHandler LedgerEntryReader db.LedgerEntryReader LedgerReader db.LedgerReader @@ -141,7 +139,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { var retentionWindow = cfg.EventLedgerRetentionWindow if cfg.TransactionLedgerRetentionWindow > cfg.EventLedgerRetentionWindow { retentionWindow = cfg.TransactionLedgerRetentionWindow - ledgerRangeGetter = params.TransactionStore + ledgerRangeGetter = params.TransactionGetter } handlers := []struct { @@ -201,15 +199,19 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { requestDurationLimit: cfg.MaxGetTransactionExecutionDuration, }, { - methodName: "sendTransaction", - underlyingHandler: methods.NewSendTransactionHandler(params.Daemon, params.Logger, params.TransactionStore, cfg.NetworkPassphrase), + methodName: "sendTransaction", + underlyingHandler: methods.NewSendTransactionHandler( + params.Daemon, params.Logger, params.TransactionGetter, cfg.NetworkPassphrase, + ), longName: "send_transaction", queueLimit: cfg.RequestBacklogSendTransactionQueueLimit, requestDurationLimit: cfg.MaxSendTransactionExecutionDuration, }, { - methodName: "simulateTransaction", - underlyingHandler: methods.NewSimulateTransactionHandler(params.Logger, params.LedgerEntryReader, params.LedgerReader, params.PreflightGetter), + methodName: "simulateTransaction", + underlyingHandler: methods.NewSimulateTransactionHandler( + params.Logger, params.LedgerEntryReader, params.LedgerReader, params.PreflightGetter, + ), longName: "simulate_transaction", queueLimit: cfg.RequestBacklogSimulateTransactionQueueLimit, requestDurationLimit: cfg.MaxSimulateTransactionExecutionDuration, diff --git a/cmd/soroban-rpc/internal/transactions/transactions.go b/cmd/soroban-rpc/internal/transactions/transactions.go index e5abe55c..0e187983 100644 --- a/cmd/soroban-rpc/internal/transactions/transactions.go +++ b/cmd/soroban-rpc/internal/transactions/transactions.go @@ -1,145 +1,9 @@ package transactions import ( - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" - - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) -type transaction struct { - bucket *ledgerbucketwindow.LedgerBucket[[]xdr.Hash] - result []byte // encoded XDR of xdr.TransactionResult - meta []byte // encoded XDR of xdr.TransactionMeta - envelope []byte // encoded XDR of xdr.TransactionEnvelope - feeBump bool - successful bool - applicationOrder int32 -} - -// MemoryStore is an in-memory store of Stellar transactions. -type MemoryStore struct { - // networkPassphrase is an immutable string containing the - // Stellar network passphrase. - // Accessing networkPassphrase does not need to be protected - // by the lock - networkPassphrase string - lock sync.RWMutex - transactions map[xdr.Hash]transaction - transactionsByLedger *ledgerbucketwindow.LedgerBucketWindow[[]xdr.Hash] - transactionDurationMetric *prometheus.SummaryVec - transactionCountMetric prometheus.Summary -} - -// NewMemoryStore creates a new MemoryStore. -// The retention window is in units of ledgers. -// All events occurring in the following ledger range -// [ latestLedger - retentionWindow, latestLedger ] -// will be included in the MemoryStore. If the MemoryStore -// is full, any transactions from new ledgers will evict -// older entries outside the retention window. -func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) *MemoryStore { - window := ledgerbucketwindow.NewLedgerBucketWindow[[]xdr.Hash](retentionWindow) - - // transactionDurationMetric is a metric for measuring latency of transaction store operations - transactionDurationMetric := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: daemon.MetricsNamespace(), Subsystem: "transactions", Name: "operation_duration_seconds", - Help: "transaction store operation durations, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"operation"}, - ) - transactionCountMetric := prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: daemon.MetricsNamespace(), Subsystem: "transactions", Name: "count", - Help: "count of transactions ingested, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - daemon.MetricsRegistry().MustRegister(transactionDurationMetric, transactionCountMetric) - - return &MemoryStore{ - networkPassphrase: networkPassphrase, - transactions: make(map[xdr.Hash]transaction), - transactionsByLedger: window, - transactionDurationMetric: transactionDurationMetric, - transactionCountMetric: transactionCountMetric, - } -} - -// IngestTransactions adds new transactions from the given ledger into the store. -// As a side effect, transactions which fall outside the retention window are -// removed from the store. -func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error { - startTime := time.Now() - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(m.networkPassphrase, ledgerCloseMeta) - if err != nil { - return err - } - - txCount := ledgerCloseMeta.CountTransactions() - transactions := make([]transaction, txCount) - hashes := make([]xdr.Hash, 0, txCount) - hashMap := map[xdr.Hash]transaction{} - var bucket ledgerbucketwindow.LedgerBucket[[]xdr.Hash] - - for i := 0; i < txCount; i++ { - tx, err := reader.Read() - if err != nil { - return err - } - transactions[i] = transaction{ - bucket: &bucket, - feeBump: tx.Envelope.IsFeeBump(), - applicationOrder: int32(tx.Index), - successful: tx.Result.Result.Successful(), - } - if transactions[i].result, err = tx.Result.Result.MarshalBinary(); err != nil { - return err - } - if transactions[i].meta, err = tx.UnsafeMeta.MarshalBinary(); err != nil { - return err - } - if transactions[i].envelope, err = tx.Envelope.MarshalBinary(); err != nil { - return err - } - if transactions[i].feeBump { - innerHash := tx.Result.InnerHash() - hashMap[innerHash] = transactions[i] - hashes = append(hashes, innerHash) - } - hashMap[tx.Result.TransactionHash] = transactions[i] - hashes = append(hashes, tx.Result.TransactionHash) - } - bucket = ledgerbucketwindow.LedgerBucket[[]xdr.Hash]{ - LedgerSeq: ledgerCloseMeta.LedgerSequence(), - LedgerCloseTimestamp: int64(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), - BucketContent: hashes, - } - - m.lock.Lock() - defer m.lock.Unlock() - evicted, err := m.transactionsByLedger.Append(bucket) - if err != nil { - return err - } - if evicted != nil { - // garbage-collect evicted entries - for _, evictedTxHash := range evicted.BucketContent { - delete(m.transactions, evictedTxHash) - } - } - for hash, tx := range hashMap { - m.transactions[hash] = tx - } - m.transactionDurationMetric.With(prometheus.Labels{"operation": "ingest"}).Observe(time.Since(startTime).Seconds()) - m.transactionCountMetric.Observe(float64(txCount)) - return nil -} - type Transaction struct { Result []byte // XDR encoded xdr.TransactionResult Meta []byte // XDR encoded xdr.TransactionMeta @@ -150,60 +14,3 @@ type Transaction struct { Successful bool Ledger ledgerbucketwindow.LedgerInfo } - -// GetLedgerRange returns the first and latest ledger available in the store. -func (m *MemoryStore) GetLedgerRange() ledgerbucketwindow.LedgerRange { - m.lock.RLock() - defer m.lock.RUnlock() - return m.transactionsByLedger.GetLedgerRange() -} - -// GetTransaction obtains a transaction from the store and whether it's present and the current store range -func (m *MemoryStore) GetTransaction(hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange) { - startTime := time.Now() - m.lock.RLock() - defer m.lock.RUnlock() - storeRange := m.transactionsByLedger.GetLedgerRange() - internalTx, ok := m.transactions[hash] - if !ok { - return Transaction{}, false, storeRange - } - - var txMeta xdr.TransactionMeta - err := txMeta.UnmarshalBinary(internalTx.meta) - if err != nil { - return Transaction{}, false, storeRange - } - - txEvents, err := txMeta.GetDiagnosticEvents() - if err != nil { - return Transaction{}, false, storeRange - } - - events := make([][]byte, 0, len(txEvents)) - - for _, e := range txEvents { - diagnosticEventXDR, err := e.MarshalBinary() - if err != nil { - return Transaction{}, false, storeRange - } - events = append(events, diagnosticEventXDR) - } - - tx := Transaction{ - Result: internalTx.result, - Meta: internalTx.meta, - Envelope: internalTx.envelope, - Events: events, - FeeBump: internalTx.feeBump, - Successful: internalTx.successful, - ApplicationOrder: internalTx.applicationOrder, - Ledger: ledgerbucketwindow.LedgerInfo{ - Sequence: internalTx.bucket.LedgerSeq, - CloseTime: internalTx.bucket.LedgerCloseTimestamp, - }, - } - - m.transactionDurationMetric.With(prometheus.Labels{"operation": "get"}).Observe(time.Since(startTime).Seconds()) - return tx, true, storeRange -} diff --git a/cmd/soroban-rpc/internal/transactions/transactions_test.go b/cmd/soroban-rpc/internal/transactions/transactions_test.go deleted file mode 100644 index 4b801835..00000000 --- a/cmd/soroban-rpc/internal/transactions/transactions_test.go +++ /dev/null @@ -1,436 +0,0 @@ -package transactions - -import ( - "encoding/hex" - "fmt" - "math" - "runtime" - "testing" - "time" - - "github.com/stellar/go/network" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/require" - - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" -) - -func expectedTransaction(t *testing.T, ledger uint32, feeBump bool) Transaction { - tx := Transaction{ - FeeBump: feeBump, - ApplicationOrder: 1, - Ledger: expectedLedgerInfo(ledger), - Events: [][]byte{}, - } - var err error - tx.Result, err = transactionResult(ledger, feeBump).MarshalBinary() - require.NoError(t, err) - tx.Meta, err = xdr.TransactionMeta{ - V: 3, - Operations: &[]xdr.OperationMeta{}, - V3: &xdr.TransactionMetaV3{}, - }.MarshalBinary() - require.NoError(t, err) - tx.Envelope, err = txEnvelope(ledger, feeBump).MarshalBinary() - require.NoError(t, err) - return tx -} - -func expectedLedgerInfo(ledgerSequence uint32) ledgerbucketwindow.LedgerInfo { - return ledgerbucketwindow.LedgerInfo{ - Sequence: ledgerSequence, - CloseTime: ledgerCloseTime(ledgerSequence), - } - -} - -func expectedStoreRange(startLedger uint32, endLedger uint32) ledgerbucketwindow.LedgerRange { - return ledgerbucketwindow.LedgerRange{ - FirstLedger: expectedLedgerInfo(startLedger), - LastLedger: expectedLedgerInfo(endLedger), - } -} - -func txHash(ledgerSequence uint32, feebump bool) xdr.Hash { - envelope := txEnvelope(ledgerSequence, feebump) - hash, err := network.HashTransactionInEnvelope(envelope, "passphrase") - if err != nil { - panic(err) - } - - return hash -} - -func ledgerCloseTime(ledgerSequence uint32) int64 { - return int64(ledgerSequence)*25 + 100 -} - -func transactionResult(ledgerSequence uint32, feeBump bool) xdr.TransactionResult { - if feeBump { - return xdr.TransactionResult{ - FeeCharged: 100, - Result: xdr.TransactionResultResult{ - Code: xdr.TransactionResultCodeTxFeeBumpInnerFailed, - InnerResultPair: &xdr.InnerTransactionResultPair{ - TransactionHash: txHash(ledgerSequence, false), - Result: xdr.InnerTransactionResult{ - Result: xdr.InnerTransactionResultResult{ - Code: xdr.TransactionResultCodeTxBadSeq, - }, - }, - }, - }, - } - } - return xdr.TransactionResult{ - FeeCharged: 100, - Result: xdr.TransactionResultResult{ - Code: xdr.TransactionResultCodeTxBadSeq, - }, - } -} - -func txMeta(ledgerSequence uint32, feeBump bool) xdr.LedgerCloseMeta { - envelope := txEnvelope(ledgerSequence, feeBump) - persistentKey := xdr.ScSymbol("TEMPVAL") - contractIDBytes, _ := hex.DecodeString("df06d62447fd25da07c0135eed7557e5a5497ee7d15b7fe345bd47e191d8f577") - var contractID xdr.Hash - copy(contractID[:], contractIDBytes) - contractAddress := xdr.ScAddress{ - Type: xdr.ScAddressTypeScAddressTypeContract, - ContractId: &contractID, - } - xdrTrue := true - operationChanges := xdr.LedgerEntryChanges{ - { - Type: xdr.LedgerEntryChangeTypeLedgerEntryState, - State: &xdr.LedgerEntry{ - LastModifiedLedgerSeq: xdr.Uint32(ledgerSequence - 1), - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeContractData, - ContractData: &xdr.ContractDataEntry{ - Contract: contractAddress, - Key: xdr.ScVal{ - Type: xdr.ScValTypeScvSymbol, - Sym: &persistentKey, - }, - Durability: xdr.ContractDataDurabilityPersistent, - Val: xdr.ScVal{ - Type: xdr.ScValTypeScvBool, - B: &xdrTrue, - }, - }, - }, - }, - }, - { - Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, - Updated: &xdr.LedgerEntry{ - LastModifiedLedgerSeq: xdr.Uint32(ledgerSequence - 1), - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeContractData, - ContractData: &xdr.ContractDataEntry{ - Contract: xdr.ScAddress{ - Type: xdr.ScAddressTypeScAddressTypeContract, - ContractId: &contractID, - }, - Key: xdr.ScVal{ - Type: xdr.ScValTypeScvSymbol, - Sym: &persistentKey, - }, - Durability: xdr.ContractDataDurabilityPersistent, - Val: xdr.ScVal{ - Type: xdr.ScValTypeScvBool, - B: &xdrTrue, - }, - }, - }, - }, - }, - } - txProcessing := []xdr.TransactionResultMeta{ - { - TxApplyProcessing: xdr.TransactionMeta{ - V: 3, - Operations: &[]xdr.OperationMeta{ - { - Changes: operationChanges, - }, - }, - V3: &xdr.TransactionMetaV3{}, - }, - Result: xdr.TransactionResultPair{ - TransactionHash: txHash(ledgerSequence, feeBump), - Result: transactionResult(ledgerSequence, feeBump), - }, - }, - } - - components := []xdr.TxSetComponent{ - { - Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - BaseFee: nil, - Txs: []xdr.TransactionEnvelope{ - envelope, - }, - }, - }, - } - return xdr.LedgerCloseMeta{ - V: 1, - V1: &xdr.LedgerCloseMetaV1{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: xdr.TimePoint(ledgerCloseTime(ledgerSequence)), - }, - LedgerSeq: xdr.Uint32(ledgerSequence), - }, - }, - TxProcessing: txProcessing, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{ - PreviousLedgerHash: xdr.Hash{1}, - Phases: []xdr.TransactionPhase{ - { - V: 0, - V0Components: &components, - }, - }, - }, - }, - }, - } -} - -func txMetaWithEvents(ledgerSequence uint32, feeBump bool) xdr.LedgerCloseMeta { - tx := txMeta(ledgerSequence, feeBump) - contractIDBytes, _ := hex.DecodeString("df06d62447fd25da07c0135eed7557e5a5497ee7d15b7fe345bd47e191d8f577") - var contractID xdr.Hash - copy(contractID[:], contractIDBytes) - counter := xdr.ScSymbol("COUNTER") - - tx.V1.TxProcessing[0].TxApplyProcessing.V3 = &xdr.TransactionMetaV3{ - SorobanMeta: &xdr.SorobanTransactionMeta{ - Events: []xdr.ContractEvent{{ - ContractId: &contractID, - Type: xdr.ContractEventTypeContract, - Body: xdr.ContractEventBody{ - V: 0, - V0: &xdr.ContractEventV0{ - Topics: []xdr.ScVal{{ - Type: xdr.ScValTypeScvSymbol, - Sym: &counter, - }}, - Data: xdr.ScVal{ - Type: xdr.ScValTypeScvSymbol, - Sym: &counter, - }, - }, - }, - }}, - ReturnValue: xdr.ScVal{ - Type: xdr.ScValTypeScvSymbol, - Sym: &counter, - }, - }, - } - - return tx -} - -func txEnvelope(ledgerSequence uint32, feeBump bool) xdr.TransactionEnvelope { - var envelope xdr.TransactionEnvelope - var err error - if feeBump { - envelope, err = xdr.NewTransactionEnvelope(xdr.EnvelopeTypeEnvelopeTypeTxFeeBump, xdr.FeeBumpTransactionEnvelope{ - Tx: xdr.FeeBumpTransaction{ - Fee: 10, - FeeSource: xdr.MustMuxedAddress("MA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJVAAAAAAAAAAAAAJLK"), - InnerTx: xdr.FeeBumpTransactionInnerTx{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Fee: 1, - SeqNum: xdr.SequenceNumber(ledgerSequence + 90), - SourceAccount: xdr.MustMuxedAddress("MA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJVAAAAAAAAAAAAAJLK"), - }, - }, - }, - }, - }) - } else { - envelope, err = xdr.NewTransactionEnvelope(xdr.EnvelopeTypeEnvelopeTypeTx, xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Fee: 1, - SeqNum: xdr.SequenceNumber(ledgerSequence + 90), - SourceAccount: xdr.MustMuxedAddress("MA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJVAAAAAAAAAAAAAJLK"), - }, - }) - } - if err != nil { - panic(err) - } - return envelope -} - -func requirePresent(t *testing.T, store *MemoryStore, feeBump bool, ledgerSequence, firstSequence, lastSequence uint32) { - tx, ok, storeRange := store.GetTransaction(txHash(ledgerSequence, false)) - require.True(t, ok) - require.Equal(t, expectedTransaction(t, ledgerSequence, feeBump), tx) - require.Equal(t, expectedStoreRange(firstSequence, lastSequence), storeRange) - if feeBump { - tx, ok, storeRange = store.GetTransaction(txHash(ledgerSequence, true)) - require.True(t, ok) - require.Equal(t, expectedTransaction(t, ledgerSequence, feeBump), tx) - require.Equal(t, expectedStoreRange(firstSequence, lastSequence), storeRange) - } -} - -func TestIngestTransactions(t *testing.T) { - // Use a small retention window to test eviction - store := NewMemoryStore(interfaces.MakeNoOpDeamon(), "passphrase", 3) - - _, ok, storeRange := store.GetTransaction(txHash(1, false)) - require.False(t, ok) - require.Equal(t, ledgerbucketwindow.LedgerRange{}, storeRange) - - // Insert ledger 1 - require.NoError(t, store.IngestTransactions(txMeta(1, false))) - requirePresent(t, store, false, 1, 1, 1) - require.Len(t, store.transactions, 1) - - // Insert ledger 2 - require.NoError(t, store.IngestTransactions(txMeta(2, true))) - requirePresent(t, store, false, 1, 1, 2) - requirePresent(t, store, true, 2, 1, 2) - require.Len(t, store.transactions, 3) - - // Insert ledger 3 - require.NoError(t, store.IngestTransactions(txMeta(3, false))) - requirePresent(t, store, false, 1, 1, 3) - requirePresent(t, store, true, 2, 1, 3) - requirePresent(t, store, false, 3, 1, 3) - require.Len(t, store.transactions, 4) - - // Now we have filled the memory store - - // Insert ledger 4, which will cause the window to move and evict ledger 1 - require.NoError(t, store.IngestTransactions(txMeta(4, false))) - requirePresent(t, store, true, 2, 2, 4) - requirePresent(t, store, false, 3, 2, 4) - requirePresent(t, store, false, 4, 2, 4) - - _, ok, storeRange = store.GetTransaction(txHash(1, false)) - require.False(t, ok) - require.Equal(t, expectedStoreRange(2, 4), storeRange) - require.Equal(t, uint32(3), store.transactionsByLedger.Len()) - require.Len(t, store.transactions, 4) - - // Insert ledger 5, which will cause the window to move and evict ledger 2 - require.NoError(t, store.IngestTransactions(txMeta(5, false))) - requirePresent(t, store, false, 3, 3, 5) - requirePresent(t, store, false, 4, 3, 5) - requirePresent(t, store, false, 5, 3, 5) - - _, ok, storeRange = store.GetTransaction(txHash(2, false)) - require.False(t, ok) - require.Equal(t, expectedStoreRange(3, 5), storeRange) - require.Equal(t, uint32(3), store.transactionsByLedger.Len()) - require.Len(t, store.transactions, 3) - - _, ok, storeRange = store.GetTransaction(txHash(2, true)) - require.False(t, ok) - require.Equal(t, expectedStoreRange(3, 5), storeRange) - require.Equal(t, uint32(3), store.transactionsByLedger.Len()) - require.Len(t, store.transactions, 3) -} - -func TestGetTransactionsWithEventData(t *testing.T) { - store := NewMemoryStore(interfaces.MakeNoOpDeamon(), "passphrase", 100) - - // Insert ledger 1 - metaWithEvents := txMetaWithEvents(1, false) - require.NoError(t, store.IngestTransactions(metaWithEvents)) - require.Len(t, store.transactions, 1) - - // check events data - tx, ok, _ := store.GetTransaction(txHash(1, false)) - require.True(t, ok) - require.NotNil(t, tx.Events) - require.Len(t, tx.Events, 1) - - events, err := metaWithEvents.V1.TxProcessing[0].TxApplyProcessing.GetDiagnosticEvents() - require.NoError(t, err) - eventBytes, err := events[0].MarshalBinary() - require.NoError(t, err) - require.Equal(t, eventBytes, tx.Events[0]) -} - -func stableHeapInUse() int64 { - var ( - m = runtime.MemStats{} - prevInUse uint64 - prevNumGC uint32 - ) - - for { - runtime.GC() - - // Sleeping to allow GC to run a few times and collect all temporary data. - time.Sleep(100 * time.Millisecond) - - runtime.ReadMemStats(&m) - - // Considering heap stable if recent cycle collected less than 10KB. - if prevNumGC != 0 && m.NumGC > prevNumGC && math.Abs(float64(m.HeapInuse-prevInUse)) < 10*1024 { - break - } - - prevInUse = m.HeapInuse - prevNumGC = m.NumGC - } - - return int64(m.HeapInuse) -} - -func byteCountBinary(b int64) string { - const unit = 1024 - if b < unit { - return fmt.Sprintf("%d B", b) - } - div, exp := int64(unit), 0 - for n := b / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) -} - -func BenchmarkIngestTransactionsMemory(b *testing.B) { - roundsNumber := uint32(b.N * 100000) - // Use a small retention window to test eviction - store := NewMemoryStore(interfaces.MakeNoOpDeamon(), "passphrase", roundsNumber) - - heapSizeBefore := stableHeapInUse() - - for i := uint32(0); i < roundsNumber; i++ { - // Insert ledger i - require.NoError(b, store.IngestTransactions(txMeta(i, false))) - } - heapSizeAfter := stableHeapInUse() - b.ReportMetric(float64(heapSizeAfter), "bytes/100k_transactions") - b.Logf("Memory consumption for %d transactions %v", roundsNumber, byteCountBinary(heapSizeAfter-heapSizeBefore)) - - // we want to generate 500*20000 transactions total, to cover the expected daily amount of transactions. - projectedTransactionCount := int64(500 * 20000) - projectedMemoryUtiliztion := (heapSizeAfter - heapSizeBefore) * projectedTransactionCount / int64(roundsNumber) - b.Logf("Projected memory consumption for %d transactions %v", projectedTransactionCount, byteCountBinary(projectedMemoryUtiliztion)) - b.ReportMetric(float64(projectedMemoryUtiliztion), "bytes/10M_transactions") - - // add another call to store to prevent the GC from collecting. - store.GetTransaction(xdr.Hash{}) -}