From 19036f6e615bbc4f128e6c6cd73a309b772ea695 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 11 Jan 2023 18:59:21 +0100 Subject: [PATCH] WIP: use a sqliteDB to store ledger entries --- cmd/soroban-rpc/internal/daemon/daemon.go | 3 +- .../internal/ledgerentry_storage.go | 210 ------------- .../internal/ledgerentry_storage/db.go | 284 ++++++++++++++++++ .../ledgerentry_storage.go | 232 ++++++++++++++ .../migrations/01_init.sql | 20 ++ cmd/soroban-rpc/main.go | 10 +- 6 files changed, 547 insertions(+), 212 deletions(-) delete mode 100644 cmd/soroban-rpc/internal/ledgerentry_storage.go create mode 100644 cmd/soroban-rpc/internal/ledgerentry_storage/db.go create mode 100644 cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go create mode 100644 cmd/soroban-rpc/internal/ledgerentry_storage/migrations/01_init.sql diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index d90d5b3840..bb34e20cf1 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -14,6 +14,7 @@ import ( "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config" + "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerentry_storage" "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/methods" ) @@ -59,7 +60,7 @@ func Start(cfg config.LocalConfig) (exitCode int) { logger.Fatalf("could not connect to history archive: %v", err) } - storage, err := internal.NewLedgerEntryStorage(cfg.NetworkPassphrase, historyArchive, core) + storage, err := ledgerentry_storage.NewLedgerEntryStorage(cfg.NetworkPassphrase, historyArchive, core) if err != nil { logger.Fatalf("could not initialize ledger entry storage: %v", err) } diff --git a/cmd/soroban-rpc/internal/ledgerentry_storage.go b/cmd/soroban-rpc/internal/ledgerentry_storage.go deleted file mode 100644 index 6b21177a1c..0000000000 --- a/cmd/soroban-rpc/internal/ledgerentry_storage.go +++ /dev/null @@ -1,210 +0,0 @@ -package internal - -import ( - "context" - "errors" - "fmt" - "io" - "sync" - "time" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" - backends "github.com/stellar/go/ingest/ledgerbackend" - "github.com/stellar/go/xdr" -) - -type LedgerEntryStorage interface { - GetLedgerEntry(key xdr.LedgerKey) (xdr.LedgerEntry, bool, uint32, error) - io.Closer -} - -func NewLedgerEntryStorage( - networkPassPhrase string, - archive historyarchive.ArchiveInterface, - ledgerBackend backends.LedgerBackend) (LedgerEntryStorage, error) { - root, err := archive.GetRootHAS() - if err != nil { - return nil, err - } - checkpointLedger := root.CurrentLedger - ctx, done := context.WithCancel(context.Background()) - ls := ledgerEntryStorage{ - networkPassPhrase: networkPassPhrase, - storage: map[string]xdr.LedgerEntry{}, - done: done, - } - ls.wg.Add(1) - go ls.run(ctx, checkpointLedger, archive, ledgerBackend) - return &ls, nil -} - -type ledgerEntryStorage struct { - encodingBuffer *xdr.EncodingBuffer - networkPassPhrase string - // from serialized ledger key to ledger entry - storage map[string]xdr.LedgerEntry - // What's the latest processed ledger - latestLedger uint32 - sync.RWMutex - done context.CancelFunc - wg sync.WaitGroup -} - -func (ls *ledgerEntryStorage) GetLedgerEntry(key xdr.LedgerKey) (xdr.LedgerEntry, bool, uint32, error) { - stringKey := getRelevantLedgerKey(ls.encodingBuffer, key) - if stringKey == "" { - return xdr.LedgerEntry{}, false, 0, nil - } - ls.RLock() - defer ls.RUnlock() - if ls.latestLedger == 0 { - // we haven't yet processed the first checkpoint - return xdr.LedgerEntry{}, false, 0, errors.New("Ledger storage not initialized yet") - } - - entry, present := ls.storage[stringKey] - if !present { - return xdr.LedgerEntry{}, false, 0, nil - } - return entry, true, ls.latestLedger, nil -} - -func (ls *ledgerEntryStorage) Close() error { - ls.done() - ls.wg.Wait() - return nil -} - -func (ls *ledgerEntryStorage) run(ctx context.Context, startCheckpointLedger uint32, archive historyarchive.ArchiveInterface, ledgerBackend backends.LedgerBackend) { - defer ls.wg.Done() - - // First, process the checkpoint - // TODO: use a logger - fmt.Println("Starting processing of checkpoint", startCheckpointLedger) - checkpointCtx, cancelCheckpointCtx := context.WithTimeout(ctx, 30*time.Minute) - reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, startCheckpointLedger) - if err != nil { - // TODO: implement retries instead - panic(err) - } - // We intentionally use this local encoding buffer to avoid race conditions with the main one - buffer := xdr.NewEncodingBuffer() - - for { - select { - case <-ctx.Done(): - cancelCheckpointCtx() - return - default: - } - change, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - // TODO: we probably shouldn't panic, at least in case of timeout - panic(err) - } - - entry := change.Post - key := getRelevantLedgerKeyFromData(buffer, entry.Data) - if key == "" { - // not relevant - continue - } - - // no need to Write-lock until we process the full checkpoint, since the reader checks latestLedger to be non-zero - ls.storage[key] = *entry - - if len(ls.storage)%2000 == 0 { - fmt.Printf(" processed %d checkpoint ledger entries\n", len(ls.storage)) - } - } - - cancelCheckpointCtx() - - fmt.Println("Finished checkpoint processing") - ls.Lock() - ls.latestLedger = startCheckpointLedger - ls.Unlock() - - // Now, continuously process txmeta deltas - - // TODO: we can probably do the preparation in parallel with the checkpoint processing - prepareRangeCtx, cancelPrepareRange := context.WithTimeout(ctx, 30*time.Minute) - if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startCheckpointLedger)); err != nil { - // TODO: we probably shouldn't panic, at least in case of timeout - panic(err) - } - cancelPrepareRange() - - nextLedger := startCheckpointLedger + 1 - for { - fmt.Println("Processing txmeta of ledger", nextLedger) - reader, err := ingest.NewLedgerChangeReader(ctx, ledgerBackend, ls.networkPassPhrase, nextLedger) - if err != nil { - // TODO: we probably shouldn't panic, at least in case of timeout/cancellation - panic(err) - } - - // TODO: completely blocking reads between ledgers being processed may not be acceptable - // however, we don't want to return ledger entries inbetween ledger updates - ls.Lock() - for { - change, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - // TODO: we probably shouldn't panic, at least in case of timeout/cancellation - panic(err) - } - if change.Post == nil { - key := getRelevantLedgerKeyFromData(buffer, change.Pre.Data) - if key == "" { - continue - } - delete(ls.storage, key) - } else { - key := getRelevantLedgerKeyFromData(buffer, change.Post.Data) - if key == "" { - continue - } - ls.storage[key] = *change.Post - } - } - ls.latestLedger = nextLedger - nextLedger++ - fmt.Println("Ledger entry count", len(ls.storage)) - ls.Unlock() - reader.Close() - } - -} - -func getRelevantLedgerKey(buffer *xdr.EncodingBuffer, key xdr.LedgerKey) string { - // this is safe since we are converting to string right away, which causes a copy - binKey, err := buffer.LedgerKeyUnsafeMarshalBinaryCompress(key) - if err != nil { - // TODO: we probably don't want to panic - panic(err) - } - return string(binKey) -} - -func getRelevantLedgerKeyFromData(buffer *xdr.EncodingBuffer, data xdr.LedgerEntryData) string { - var key xdr.LedgerKey - switch data.Type { - case xdr.LedgerEntryTypeAccount: - key.SetAccount(data.Account.AccountId) - case xdr.LedgerEntryTypeTrustline: - key.SetTrustline(data.TrustLine.AccountId, data.TrustLine.Asset) - case xdr.LedgerEntryTypeContractData: - key.SetContractData(data.ContractData.ContractId, data.ContractData.Val) - default: - // we don't care about any other entry types for now - return "" - } - return getRelevantLedgerKey(buffer, key) -} diff --git a/cmd/soroban-rpc/internal/ledgerentry_storage/db.go b/cmd/soroban-rpc/internal/ledgerentry_storage/db.go new file mode 100644 index 0000000000..82096c67bf --- /dev/null +++ b/cmd/soroban-rpc/internal/ledgerentry_storage/db.go @@ -0,0 +1,284 @@ +package ledgerentry_storage + +import ( + "context" + "database/sql" + "embed" + "fmt" + "strconv" + + sq "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + migrate "github.com/rubenv/sql-migrate" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +//go:embed migrations/*.sql +var migrations embed.FS + +const ( + LedgerEntriesTableName = "ledger_entries" + LedgerEntriesMetaTableName = "ledger_entries_meta" + LatestLedgerSequenceMetaKey = "LatestLedgerSequence" +) + +type DB interface { + LedgerEntryStorage + GetLatestLedgerSequence() (uint32, error) + NewLedgerEntryUpdaterTx(nextSequence uint32) (LedgerEntryUpdaterTx, error) +} + +type LedgerEntryUpdaterTx interface { + UpsertLedgerEntry(key xdr.LedgerKey, entry xdr.LedgerEntry) error + DeleteLedgerEntry(key xdr.LedgerKey) error + Done() error +} + +type sqlDB struct { + db *sqlx.DB +} + +func OpenSQLiteDB(dbFilePath string) (DB, error) { + db, err := sqlx.Open("sqlite3", dbFilePath) + if err != nil { + return nil, errors.Wrap(err, "open failed") + } + + ret := &sqlDB{ + db: db, + } + + runMigrations(ret.db.DB, "sqlite3") + + return ret, nil +} + +func getLedgerEntry(tx *sqlx.Tx, buffer *xdr.EncodingBuffer, key xdr.LedgerKey) (xdr.LedgerEntry, error) { + encodedKey, err := encodeLedgerKey(buffer, key) + if err != nil { + return xdr.LedgerEntry{}, err + } + + sqlStr, _, err := sq.Select("entry").From(LedgerEntriesTableName).Where(sq.Eq{"key": encodedKey}).ToSql() + if err != nil { + return xdr.LedgerEntry{}, err + } + var results []string + if err := tx.Select(&results, sqlStr); err != nil { + return xdr.LedgerEntry{}, err + } + if len(results) != 1 { + return xdr.LedgerEntry{}, sql.ErrNoRows + } + ledgerEntryBase64 := results[0] + var result xdr.LedgerEntry + err = xdr.SafeUnmarshalBase64(ledgerEntryBase64, &result) + return result, err +} + +func flushLedgerEntryBatch(tx *sqlx.Tx, encodedKeyEntries map[string]*string) error { + upsertCount := 0 + upsertSQL := sq.Replace(LedgerEntriesTableName) + var deleteKeys []interface{} + for key, entry := range encodedKeyEntries { + if entry != nil { + upsertSQL.Values(key, entry) + upsertCount += 1 + } else { + deleteKeys = append(deleteKeys, interface{}(key)) + } + } + + if upsertCount > 0 { + sqlStr, args, err := upsertSQL.ToSql() + if err != nil { + return err + } + if _, err = tx.Exec(sqlStr, args...); err != nil { + return err + } + } + + if len(deleteKeys) > 0 { + sqlStr, args, err := sq.Delete(LedgerEntriesTableName).Where(sq.Eq{"key": deleteKeys}).ToSql() + _, err = tx.Exec(sqlStr, args...) + if _, err = tx.Exec(sqlStr, args...); err != nil { + return err + } + } + return nil +} + +func getLatestLedgerSequence(tx *sqlx.Tx) (uint32, error) { + sql, _, err := sq.Select("value").From(LedgerEntriesMetaTableName).Where(sq.Eq{"key": LatestLedgerSequenceMetaKey}).ToSql() + if err != nil { + return 0, err + } + var results []string + if err := tx.Select(&results, sql); err != nil { + return 0, err + } + if len(results) != 1 { + return 0, nil + } + latestLedgerStr := results[0] + latestLedger, err := strconv.ParseUint(latestLedgerStr, 10, 32) + if err != nil { + return 0, err + } + return uint32(latestLedger), nil +} + +func upsertLatestLedgerSequence(tx *sqlx.Tx, sequence uint32) error { + sql, args, err := sq.Replace(LedgerEntriesMetaTableName).Values(LatestLedgerSequenceMetaKey, fmt.Sprintf("%d", sequence)).ToSql() + if err != nil { + return err + } + _, err = tx.Exec(sql, args...) + return err +} + +func (s *sqlDB) GetLatestLedgerSequence() (uint32, error) { + opts := sql.TxOptions{ + ReadOnly: true, + } + tx, err := s.db.BeginTxx(context.Background(), &opts) + if err != nil { + return 0, err + } + defer tx.Commit() + return getLatestLedgerSequence(tx) +} + +func (s *sqlDB) GetLedgerEntry(key xdr.LedgerKey) (xdr.LedgerEntry, bool, uint32, error) { + opts := sql.TxOptions{ + ReadOnly: true, + } + tx, err := s.db.BeginTxx(context.Background(), &opts) + if err != nil { + return xdr.LedgerEntry{}, false, 0, err + } + seq, err := getLatestLedgerSequence(tx) + if err != nil { + tx.Rollback() + return xdr.LedgerEntry{}, false, 0, err + } + buffer := xdr.NewEncodingBuffer() + entry, err := getLedgerEntry(tx, buffer, key) + if err != nil { + if err == sql.ErrNoRows { + return xdr.LedgerEntry{}, false, seq, nil + } + tx.Rollback() + return xdr.LedgerEntry{}, false, seq, err + } + tx.Commit() + return entry, true, seq, err +} + +func (s *sqlDB) Close() error { + // TODO: What if there is a running transaction? + return s.db.Close() +} + +type ledgerUpdaterTx struct { + tx *sqlx.Tx + // Value to set "latestSequence" to once we are done + forLedgerSequence uint32 + maxBatchSize int + buffer *xdr.EncodingBuffer + // nil implies deleted + keyToEntryBatch map[string]*string +} + +func (s *sqlDB) NewLedgerEntryUpdaterTx(forLedgerSequence uint32) (LedgerEntryUpdaterTx, error) { + tx, err := s.db.BeginTxx(context.Background(), nil) + if err != nil { + return nil, err + } + // TODO: Make this configurable? + const maxBatchSize = 150 + return &ledgerUpdaterTx{ + maxBatchSize: maxBatchSize, + tx: tx, + forLedgerSequence: forLedgerSequence, + buffer: xdr.NewEncodingBuffer(), + keyToEntryBatch: make(map[string]*string, maxBatchSize), + }, nil +} + +func (l *ledgerUpdaterTx) UpsertLedgerEntry(key xdr.LedgerKey, entry xdr.LedgerEntry) error { + encodedKey, err := encodeLedgerKey(l.buffer, key) + if err != nil { + return err + } + encodedEntry, err := l.buffer.MarshalBase64(&entry) + if err != nil { + return err + } + l.keyToEntryBatch[encodedKey] = &encodedEntry + if len(l.keyToEntryBatch) > l.maxBatchSize { + if err := flushLedgerEntryBatch(l.tx, l.keyToEntryBatch); err != nil { + l.tx.Rollback() + return err + } + } + return nil +} + +func (l *ledgerUpdaterTx) DeleteLedgerEntry(key xdr.LedgerKey) error { + encodedKey, err := encodeLedgerKey(l.buffer, key) + if err != nil { + return err + } + l.keyToEntryBatch[encodedKey] = nil + if len(l.keyToEntryBatch) > l.maxBatchSize { + if err := flushLedgerEntryBatch(l.tx, l.keyToEntryBatch); err != nil { + l.tx.Rollback() + return err + } + } + return nil +} + +func (l *ledgerUpdaterTx) Done() error { + if err := flushLedgerEntryBatch(l.tx, l.keyToEntryBatch); err != nil { + l.tx.Rollback() + return err + } + return l.tx.Commit() +} + +func encodeLedgerKey(buffer *xdr.EncodingBuffer, key xdr.LedgerKey) (string, error) { + // this is safe since we are converting to string right away, which causes a copy + binKey, err := buffer.LedgerKeyUnsafeMarshalBinaryCompress(key) + if err != nil { + return "", err + } + return string(binKey), nil +} + +func runMigrations(db *sql.DB, dialect string) error { + m := &migrate.AssetMigrationSource{ + Asset: migrations.ReadFile, + AssetDir: func() func(string) ([]string, error) { + return func(path string) ([]string, error) { + dirEntry, err := migrations.ReadDir(path) + if err != nil { + return nil, err + } + entries := make([]string, 0) + for _, e := range dirEntry { + entries = append(entries, e.Name()) + } + + return entries, nil + } + }(), + Dir: "migrations", + } + _, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0) + return err +} diff --git a/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go b/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go new file mode 100644 index 0000000000..0d68ace9a6 --- /dev/null +++ b/cmd/soroban-rpc/internal/ledgerentry_storage/ledgerentry_storage.go @@ -0,0 +1,232 @@ +package ledgerentry_storage + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + backends "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/xdr" +) + +type LedgerEntryStorage interface { + GetLedgerEntry(key xdr.LedgerKey) (xdr.LedgerEntry, bool, uint32, error) + io.Closer +} + +func NewLedgerEntryStorage( + db DB, + networkPassPhrase string, + archive historyarchive.ArchiveInterface, + ledgerBackend backends.LedgerBackend) (LedgerEntryStorage, error) { + ctx, done := context.WithCancel(context.Background()) + ls := ledgerEntryStorage{ + db: db, + networkPassPhrase: networkPassPhrase, + done: done, + } + ls.wg.Add(1) + go ls.run(ctx, archive, ledgerBackend) + return &ls, nil +} + +type ledgerEntryStorage struct { + db DB + networkPassPhrase string + done context.CancelFunc + wg sync.WaitGroup +} + +func (ls *ledgerEntryStorage) GetLedgerEntry(key xdr.LedgerKey) (xdr.LedgerEntry, bool, uint32, error) { + return ls.db.GetLedgerEntry(key) +} + +func (ls *ledgerEntryStorage) Close() error { + ls.done() + ls.wg.Wait() + ls.db.Close() + return nil +} + +func (ls *ledgerEntryStorage) fillEntriesFromLatestCheckpoint(ctx context.Context, archive historyarchive.ArchiveInterface) (uint32, error) { + root, err := archive.GetRootHAS() + if err != nil { + return 0, err + } + startCheckpointLedger := root.CurrentLedger + + // TODO: use a logger + fmt.Println("Starting processing of checkpoint", startCheckpointLedger) + // TODO: should we make this configurable? + checkpointCtx, cancelCheckpointCtx := context.WithTimeout(ctx, 30*time.Minute) + defer cancelCheckpointCtx() + reader, err := ingest.NewCheckpointChangeReader(checkpointCtx, archive, startCheckpointLedger) + if err != nil { + return 0, err + } + tx, err := ls.db.NewLedgerEntryUpdaterTx(startCheckpointLedger) + if err != nil { + return 0, err + } + // Make sure we finish the updating transaction + defer tx.Done() + entryCount := 0 + + for { + select { + case <-ctx.Done(): + cancelCheckpointCtx() + return 0, context.Canceled + default: + } + change, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout + panic(err) + } + + entry := change.Post + key, relevant, err := getRelevantLedgerKeyFromData(entry.Data) + if err != nil { + return 0, err + } + if !relevant { + continue + } + tx.UpsertLedgerEntry(key, *entry) + + if entryCount%2000 == 0 { + // TODO: use a logger + fmt.Printf(" processed %d checkpoint ledger entries\n", entryCount) + } + } + + // TODO: use a logger + fmt.Println("Finished checkpoint processing") + + return startCheckpointLedger, nil +} + +func (ls *ledgerEntryStorage) run(ctx context.Context, archive historyarchive.ArchiveInterface, ledgerBackend backends.LedgerBackend) { + defer ls.wg.Done() + + // First, make sure the DB has a complete ledger entry baseline + + startCheckpointLedger, err := ls.db.GetLatestLedgerSequence() + if err != nil { + // TODO: implement retries? + panic(err) + } + if startCheckpointLedger == 0 { + // DB is empty, let's fill it in from a checkpoint + startCheckpointLedger, err = ls.fillEntriesFromLatestCheckpoint(ctx, archive) + // TODO: implement retries? + panic(err) + } + + // Secondly, continuously process txmeta deltas + + // TODO: we can probably do the preparation in parallel with the checkpoint processing + prepareRangeCtx, cancelPrepareRange := context.WithTimeout(ctx, 30*time.Minute) + if err := ledgerBackend.PrepareRange(prepareRangeCtx, backends.UnboundedRange(startCheckpointLedger)); err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout + panic(err) + } + cancelPrepareRange() + + nextLedger := startCheckpointLedger + 1 + for { + fmt.Println("Processing txmeta of ledger", nextLedger) + reader, err := ingest.NewLedgerChangeReader(ctx, ledgerBackend, ls.networkPassPhrase, nextLedger) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + tx, err := ls.db.NewLedgerEntryUpdaterTx(nextLedger) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + + for { + change, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + if change.Post == nil { + key, relevant, err := getRelevantLedgerKeyFromData(change.Pre.Data) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + if !relevant { + continue + } + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + err = tx.DeleteLedgerEntry(key) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + } else { + key, relevant, err := getRelevantLedgerKeyFromData(change.Pre.Data) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + if !relevant { + continue + } + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + err = tx.UpsertLedgerEntry(key, *change.Post) + if err != nil { + // TODO: we probably shouldn't panic, at least in case of timeout/cancellation + panic(err) + } + } + } + tx.Done() + nextLedger++ + reader.Close() + } + +} + +func getRelevantLedgerKeyFromData(data xdr.LedgerEntryData) (xdr.LedgerKey, bool, error) { + var key xdr.LedgerKey + switch data.Type { + case xdr.LedgerEntryTypeAccount: + if err := key.SetAccount(data.Account.AccountId); err != nil { + return xdr.LedgerKey{}, false, err + } + case xdr.LedgerEntryTypeTrustline: + if err := key.SetTrustline(data.TrustLine.AccountId, data.TrustLine.Asset); err != nil { + return xdr.LedgerKey{}, false, err + } + case xdr.LedgerEntryTypeContractData: + if err := key.SetContractData(data.ContractData.ContractId, data.ContractData.Val); err != nil { + return xdr.LedgerKey{}, false, err + } + default: + // we don't care about any other entry types for now + return xdr.LedgerKey{}, false, nil + } + return key, true, nil +} diff --git a/cmd/soroban-rpc/internal/ledgerentry_storage/migrations/01_init.sql b/cmd/soroban-rpc/internal/ledgerentry_storage/migrations/01_init.sql new file mode 100644 index 0000000000..dfda9677ff --- /dev/null +++ b/cmd/soroban-rpc/internal/ledgerentry_storage/migrations/01_init.sql @@ -0,0 +1,20 @@ +-- +migrate Up +CREATE TABLE ledger_entries ( + key bigint NOT NULL PRIMARY KEY, + entry TEXT +); + +-- metadata about the content in the ledger_entries table +CREATE TABLE ledger_entries_meta ( + key TEXT PRIMARY KEY, + value TEXT, +); + + + +CREATE INDEX ledger_entries_key ON ledger_entries (key); + + +-- +migrate Down +drop table ledger_entries cascade; +drop table ledger_entries_meta cascade; diff --git a/cmd/soroban-rpc/main.go b/cmd/soroban-rpc/main.go index 36c35b8dc6..b09d77adb9 100644 --- a/cmd/soroban-rpc/main.go +++ b/cmd/soroban-rpc/main.go @@ -19,7 +19,7 @@ import ( ) func main() { - var endpoint, horizonURL, binaryPath, configPath, networkPassphrase string + var endpoint, horizonURL, binaryPath, configPath, networkPassphrase, dbPath string var captiveCoreHTTPPort uint16 var historyArchiveURLs []string var txConcurrency, txQueueSize int @@ -121,6 +121,14 @@ func main() { FlagDefault: 10, Required: false, }, + { + Name: "db-path", + Usage: "SQLite DB path", + OptType: types.String, + ConfigKey: &dbPath, + FlagDefault: "soroban-rpc.db", + Required: false, + }, } cmd := &cobra.Command{ Use: "soroban-rpc",