Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jan 20, 2023
1 parent f92e0ef commit b8b86f0
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions cmd/soroban-rpc/internal/ledgerentry_storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,28 @@ type LedgerEntryUpdaterTx interface {
}

type sqlDB struct {
db *sqlx.DB
db *sqlx.DB
postWriteCommitHook func() error
}

func OpenSQLiteDB(dbFilePath string) (DB, error) {
db, err := sqlx.Open("sqlite3", fmt.Sprintf("file:%s?_journal_mode=WAL", dbFilePath))
// 1. Use Write-Ahead Logging (WAL).
// 2. Disable WAL auto-checkpointing (we will do the checkpointing ourselves with wal_checkpoint pragmas
// after every write transaction).
// 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode.
db, err := sqlx.Open("sqlite3", fmt.Sprintf("file:%s?_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL", dbFilePath))
if err != nil {
return nil, errors.Wrap(err, "open failed")
}

postWriteCommitHook := func() error {
_, err := db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
return err
}

ret := &sqlDB{
db: db,
db: db,
postWriteCommitHook: postWriteCommitHook,
}

if err = runMigrations(ret.db.DB, "sqlite3"); err != nil {
Expand Down Expand Up @@ -180,8 +191,9 @@ func (s *sqlDB) Close() error {
}

type ledgerUpdaterTx struct {
tx *sqlx.Tx
stmtCache *sq.StmtCache
tx *sqlx.Tx
stmtCache *sq.StmtCache
postWriteCommitHook func() error
// Value to set "latestSequence" to once we are done
forLedgerSequence uint32
maxBatchSize int
Expand All @@ -196,11 +208,12 @@ func (s *sqlDB) NewLedgerEntryUpdaterTx(forLedgerSequence uint32, maxBatchSize i
return nil, err
}
ret := &ledgerUpdaterTx{
maxBatchSize: maxBatchSize,
tx: tx,
forLedgerSequence: forLedgerSequence,
buffer: xdr.NewEncodingBuffer(),
keyToEntryBatch: make(map[string]*string, maxBatchSize),
tx: tx,
postWriteCommitHook: s.postWriteCommitHook,
forLedgerSequence: forLedgerSequence,
maxBatchSize: maxBatchSize,
buffer: xdr.NewEncodingBuffer(),
keyToEntryBatch: make(map[string]*string, maxBatchSize),
}
ret.stmtCache = sq.NewStmtCache(tx)
return ret, nil
Expand Down Expand Up @@ -284,9 +297,18 @@ func (l *ledgerUpdaterTx) Done() error {
return err
}
if err := upsertLatestLedgerSequence(l.tx, l.forLedgerSequence); err != nil {
_ = l.tx.Rollback()
return err
}
if err := l.tx.Commit(); err != nil {
return err
}
return l.tx.Commit()
if l.postWriteCommitHook() != nil {
if err := l.postWriteCommitHook(); err != nil {
return err
}
}
return nil
}

func encodeLedgerKey(buffer *xdr.EncodingBuffer, key xdr.LedgerKey) (string, error) {
Expand Down

0 comments on commit b8b86f0

Please sign in to comment.