Skip to content

Commit

Permalink
Merge branch 'bump-latest-core' of github.com:stellar/soroban-rpc int…
Browse files Browse the repository at this point in the history
…o bump-latest-core
  • Loading branch information
Shaptic committed Aug 27, 2024
2 parents c3417d0 + 37ebc32 commit e91ab25
Show file tree
Hide file tree
Showing 24 changed files with 1,376 additions and 995 deletions.
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,54 @@

## Unreleased

### Added

- Add `EndLedger` in `GetEventsRequest`. This provides finer control and clarity on the range of ledgers being queried.
- Disk-Based Event Storage: Events are now stored on disk instead of in memory. For context, storing approximately 3 million events will require around 1.5 GB of disk space.
This change enhances the scalability and can now support a larger retention window (~7 days) for events.
- Ledger Scanning Limitation: The getEvents RPC will now scan a maximum of `10,000` ledgers per request. This limits the resource usage and ensures more predictable performance, especially for queries spanning large ledger ranges.
- A migration process has been introduced to transition event storage from in-memory to disk-based storage.

* Add support for unpacked JSON responses of base64-encoded XDR fields via a new, optional parameter. When omitted, the behavior does not change and we encode fields as base64.
```typescript
xdrFormat?: "" | "base64" | "json"
```
- `getTransaction`
- `getTransactions`
- `getLedgerEntry`
- `getLedgerEntries`
- `getEvents`
- `sendTransaction`
- `simulateTransaction`

There are new field names for the JSONified versions of XDR structures. Any field with an `Xdr` suffix (e.g., `resultXdr` in `getTransaction()`) will be replaced with one that has a `Json` suffix (e.g., `resultJson`) that is a JSON object verbosely and completely describing the XDR structure.

Certain XDR-encoded fields do not have an `Xdr` suffix, but those also have a `*Json` equivalent and are listed below:
* _getEvents_: `topic` -> `topicJson`, `value` -> `valueJson`
* _getLedgerEntries_: `key` -> `keyJson`, `xdr` -> `dataJson`
* _getLedgerEntry_: `xdr` -> `entryJson`
* _simulateTransaction_: `transactionData`, `events`, `results.auth`,
`restorePreamble.transactionData`, `stateChanges.key|before|after` all have a
`Json` suffix, and `results.xdr` is now `results.returnValueJson`

### Fixed
* Improve performance of `getVersionInfo` and `getNetwork` ([#198](https://github.com/stellar/soroban-rpc/pull/198)).


## [v21.4.1](https://github.com/stellar/soroban-rpc/compare/v21.4.0...v21.4.1)

### Fixed
* Fix parsing of the `--log-format` parameter ([#252](https://github.com/stellar/soroban-rpc/pull/252))


## [v21.4.0](https://github.com/stellar/soroban-rpc/compare/v21.2.0...v21.4.0)

### Added
* Transactions will now be stored in a database rather than in memory ([#174](https://github.com/stellar/soroban-rpc/pull/174)).

You can opt-in to longer transaction retention by setting `--transaction-retention-window` / `TRANSACTION_RETENTION_WINDOW` to a higher number of ledgers. This will also retain corresponding number of ledgers in the database. Keep in mind, of course, that this will cause an increase in disk usage for the growing database.

* Unify transaction and event retention windows ([#234](https://github.com/stellar/soroban-rpc/pull/234)).
* There is a new `getTransactions` endpoint with the following API ([#136](https://github.com/stellar/soroban-rpc/pull/136)):

```typescript
Expand Down Expand Up @@ -40,6 +83,10 @@ interface Transaction {
}
```

### Fixed
* Logging and typo fixes in ([#238](https://github.com/stellar/soroban-rpc/pull/238)).
* Fix calculation of ledger ranges across endpoints ([#217](https://github.com/stellar/soroban-rpc/pull/217)).


## [v21.2.0](https://github.com/stellar/soroban-rpc/compare/v21.1.0...v21.2.0)

Expand Down
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
// OneDayOfLedgers is (roughly) a 24 hour window of ledgers.
OneDayOfLedgers = 17280
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
)
Expand Down
90 changes: 48 additions & 42 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight"
Expand Down Expand Up @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand All @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,56 +288,64 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
ledgerSeqRange.Last,
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
}
return nil
})

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand All @@ -353,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down Expand Up @@ -27,6 +27,13 @@ type Cursor struct {
Event uint32
}

type CursorRange struct {
// Start defines the (inclusive) start of the range.
Start Cursor
// End defines the (exclusive) end of the range.
End Cursor
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down
16 changes: 16 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ReadWriter interface {

type WriteTx interface {
TransactionWriter() TransactionWriter
EventWriter() EventWriter
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Expand Down Expand Up @@ -250,6 +251,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
eventWriter: eventHandler{
log: rw.log,
db: txSession,
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
Expand All @@ -266,6 +273,7 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
txWriter transactionHandler
eventWriter eventHandler
ledgerRetentionWindow uint32
}

Expand All @@ -281,6 +289,10 @@ func (w writeTx) TransactionWriter() TransactionWriter {
return &w.txWriter
}

func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand All @@ -293,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down
Loading

0 comments on commit e91ab25

Please sign in to comment.