Skip to content

Commit

Permalink
Merge branch 'main' into fix-bad-simulate
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic authored Aug 28, 2024
2 parents 1e583e3 + ac29c08 commit 9846694
Show file tree
Hide file tree
Showing 25 changed files with 1,347 additions and 1,010 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ target/
captive-core/
.soroban/
!test.toml
*.sqlite
*.sqlite*
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
## Unreleased

### Added

- Add `EndLedger` in `GetEventsRequest`. This provides finer control and clarity on the range of ledgers being queried.
- Disk-Based Event Storage: Events are now stored on disk instead of in memory. For context, storing approximately 3 million events will require around 1.5 GB of disk space.
This change enhances the scalability and can now support a larger retention window (~7 days) for events.
- Ledger Scanning Limitation: The getEvents RPC will now scan a maximum of `10,000` ledgers per request. This limits the resource usage and ensures more predictable performance, especially for queries spanning large ledger ranges.
- A migration process has been introduced to transition event storage from in-memory to disk-based storage.

* Add support for unpacked JSON responses of base64-encoded XDR fields via a new, optional parameter. When omitted, the behavior does not change and we encode fields as base64.
```typescript
xdrFormat?: "" | "base64" | "json"
Expand Down
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
// OneDayOfLedgers is (roughly) a 24 hour window of ledgers.
OneDayOfLedgers = 17280
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
)
Expand Down
114 changes: 58 additions & 56 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight"
Expand Down Expand Up @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand All @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,56 +288,62 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)

var initialSeq, currentSeq uint32
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
ledgerSeqRange.Last,
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithField("seq", currentSeq).
Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithField("seq", currentSeq).
Debug("still initializing in-memory store")
}
}
return nil
})

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand All @@ -348,18 +352,15 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
}

if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store and applying DB data migrations")
d.logger.WithField("seq", currentSeq).
Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
return feeWindows
}

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.listener.Addr().String(),
}).Info("starting HTTP server")
d.logger.WithField("addr", d.listener.Addr().String()).Info("starting HTTP server")

panicGroup := util.UnrecoverablePanicGroup.Log(d.logger)
panicGroup.Go(func() {
Expand All @@ -369,19 +370,20 @@ func (d *Daemon) Run() {
})

if d.adminServer != nil {
d.logger.WithFields(supportlog.F{
"addr": d.adminListener.Addr().String(),
}).Info("starting Admin HTTP server")
d.logger.
WithField("addr", d.adminListener.Addr().String()).
Info("starting Admin HTTP server")
panicGroup.Go(func() {
if err := d.adminServer.Serve(d.adminListener); !errors.Is(err, http.ErrServerClosed) {
d.logger.WithError(err).Error("soroban admin server encountered fatal error")
}
})
}

// Shutdown gracefully when we receive an interrupt signal.
// First server.Shutdown closes all open listeners, then closes all idle connections.
// Finally, it waits a grace period (10s here) for connections to return to idle and then shut down.
// Shutdown gracefully when we receive an interrupt signal. First
// server.Shutdown closes all open listeners, then closes all idle
// connections. Finally, it waits a grace period (10s here) for connections
// to return to idle and then shut down.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down Expand Up @@ -27,6 +27,13 @@ type Cursor struct {
Event uint32
}

type CursorRange struct {
// Start defines the (inclusive) start of the range.
Start Cursor
// End defines the (exclusive) end of the range.
End Cursor
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down
16 changes: 16 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ReadWriter interface {

type WriteTx interface {
TransactionWriter() TransactionWriter
EventWriter() EventWriter
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Expand Down Expand Up @@ -250,6 +251,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
eventWriter: eventHandler{
log: rw.log,
db: txSession,
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
Expand All @@ -266,6 +273,7 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
txWriter transactionHandler
eventWriter eventHandler
ledgerRetentionWindow uint32
}

Expand All @@ -281,6 +289,10 @@ func (w writeTx) TransactionWriter() TransactionWriter {
return &w.txWriter
}

func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand All @@ -293,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down
Loading

0 comments on commit 9846694

Please sign in to comment.