Skip to content

Commit

Permalink
Add prometheus metrics back
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Apr 18, 2024
1 parent 9641780 commit 03c58b1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 11 deletions.
25 changes: 24 additions & 1 deletion cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,24 @@ func MustNew(cfg *config.Config) *Daemon {
logger.WithError(err).Error("could not run ingestion. Retrying")
}

// a metric for measuring latency of transaction store operations
txDurationMetric := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: daemon.MetricsNamespace(), Subsystem: "transactions",
Name: "operation_duration_seconds",
Help: "transaction store operation durations, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"operation"},
)
txCountMetric := prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: daemon.MetricsNamespace(), Subsystem: "transactions",
Name: "count",
Help: "count of transactions ingested, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})

daemon.metricsRegistry.MustRegister(txDurationMetric, txCountMetric)

// Take the larger of (event retention, tx retention) and then the smaller
// of (tx retention, default event retention) if event retention wasn't
// specified, for some reason...?
Expand All @@ -241,11 +259,16 @@ func MustNew(cfg *config.Config) *Daemon {
}
ingestService := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(
DB: db.NewReadWriterWithMetrics(
dbConn,
maxLedgerEntryWriteBatchSize,
maxRetentionWindow,
cfg.NetworkPassphrase,
&db.ReadWriterMetrics{
TxIngestDuration: txDurationMetric.With(prometheus.Labels{"operation": "ingest"}),
TxSqlDuration: txDurationMetric.With(prometheus.Labels{"operation": "insert"}),
TxCount: txCountMetric,
},
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Expand Down
32 changes: 26 additions & 6 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,42 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *
return result, nil
}

type ReadWriterMetrics struct {
TxIngestDuration, TxSqlDuration, TxCount prometheus.Observer
}

type readWriter struct {
db *DB
maxBatchSize int
ledgerRetentionWindow uint32
passphrase string

metrics *ReadWriterMetrics
}

// NewReadWriter constructs a new readWriter instance and configures the size of
// ledger entry batches when writing ledger entries and the retention window for
// how many historical ledgers are recorded in the database.
func NewReadWriter(db *DB,
// NewReadWriterWithMetrics constructs a new readWriter instance and configures
// the size of ledger entry batches when writing ledger entries and the
// retention window for how many historical ledgers are recorded in the
// database, optionally storing metrics for various DB ops.
func NewReadWriterWithMetrics(db *DB,
maxBatchSize int,
ledgerRetentionWindow uint32,
networkPassphrase string,
metrics *ReadWriterMetrics,
) ReadWriter {
return &readWriter{
db: db,
maxBatchSize: maxBatchSize,
ledgerRetentionWindow: ledgerRetentionWindow,
passphrase: networkPassphrase,
metrics: metrics,
}
}

func NewReadWriter(db *DB, maxBatchSize int, ledgerRetentionWindow uint32, networkPassphrase string) ReadWriter {
return NewReadWriterWithMetrics(db, maxBatchSize, ledgerRetentionWindow, networkPassphrase, nil)
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache)
}
Expand All @@ -170,7 +183,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache := sq.NewStmtCache(txSession.GetTx())

db := rw.db
return writeTx{
writer := writeTx{
globalCache: &db.cache,
postCommit: func() error {
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
Expand All @@ -192,7 +205,14 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}, nil
}
if rw.metrics != nil {
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
rw.metrics.TxSqlDuration,
rw.metrics.TxCount)
}
return writer, nil
}

type writeTx struct {
Expand Down
26 changes: 22 additions & 4 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"time"

sq "github.com/Masterminds/squirrel"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

Expand All @@ -35,6 +38,7 @@ type Transaction struct {
// TransactionWriter is used during ingestion
type TransactionWriter interface {
InsertTransactions(lcm xdr.LedgerCloseMeta) error
RegisterMetrics(ingest, insert, count prometheus.Observer)
}

// TransactionReader is used to serve requests and just returns a cloned,
Expand All @@ -56,6 +60,8 @@ type transactionHandler struct {
db db.SessionInterface
stmtCache *sq.StmtCache
passphrase string

ingestMetric, insertMetric, countMetric prometheus.Observer
}

type transactionReaderTx struct {
Expand Down Expand Up @@ -124,12 +130,25 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error
_, err = query.RunWith(txn.stmtCache).Exec()

L.WithError(err).
WithField("total_duration", time.Since(start)).
WithField("duration", time.Since(start)).
WithField("sql_duration", time.Since(mid)).
Infof("Ingested %d transaction lookups", len(transactions))

if txn.ingestMetric != nil {
txn.ingestMetric.Observe(time.Since(start).Seconds())
txn.insertMetric.Observe(time.Since(mid).Seconds())
txn.countMetric.Observe(float64(txCount))
}

return err
}

func (txn *transactionHandler) RegisterMetrics(ingest, insert, count prometheus.Observer) {
txn.ingestMetric = ingest
txn.insertMetric = insert
txn.countMetric = count
}

// trimTransactions removes all transactions which fall outside the ledger retention window.
func (txn *transactionHandler) trimTransactions(latestLedgerSeq uint32, retentionWindow uint32) error {
if latestLedgerSeq+1 <= retentionWindow {
Expand Down Expand Up @@ -183,10 +202,9 @@ func (txn *transactionReaderTx) GetLedgerRange() ledgerbucketwindow.LedgerRange
OrderBy("sequence DESC").
Limit(1).
RunWith(sqlTx)

rows, err := newestQ.Query()
if err != nil {
log.Errorf("Error when querying database for ledger range: %v", err)
log.Errorf("Error when querying for latest ledger: %v", err)
return ledgerRange
}

Expand All @@ -195,7 +213,7 @@ func (txn *transactionReaderTx) GetLedgerRange() ledgerbucketwindow.LedgerRange
// to sanity check that there is in fact a result.
if !rows.Next() {
if ierr := rows.Err(); ierr != nil {
log.Errorf("Error when querying database for ledger range: %v", ierr)
log.Errorf("Error when querying for latest ledger: %v", ierr)
}
return ledgerRange
}
Expand Down

0 comments on commit 03c58b1

Please sign in to comment.