Skip to content

Commit

Permalink
Track db query durations
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Feb 7, 2025
1 parent 052d249 commit 1a82420
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
12 changes: 11 additions & 1 deletion internal/data/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package data
import (
"context"
"fmt"
"time"

"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/metrics"
)

type AccountModel struct {
DB db.ConnectionPool
DB db.ConnectionPool
MetricsService *metrics.MetricsService
}

func (m *AccountModel) Insert(ctx context.Context, address string) error {
const query = `INSERT INTO accounts (stellar_address) VALUES ($1) ON CONFLICT DO NOTHING`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "accounts", duration)
if err != nil {
return fmt.Errorf("inserting address %s: %w", address, err)
}
Expand All @@ -26,7 +30,10 @@ func (m *AccountModel) Insert(ctx context.Context, address string) error {

func (m *AccountModel) Delete(ctx context.Context, address string) error {
const query = `DELETE FROM accounts WHERE stellar_address = $1`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("DELETE", "accounts", duration)
if err != nil {
return fmt.Errorf("deleting address %s: %w", address, err)
}
Expand All @@ -46,7 +53,10 @@ func (m *AccountModel) IsAccountFeeBumpEligible(ctx context.Context, address str
)
`
var exists bool
start := time.Now()
err := m.DB.GetContext(ctx, &exists, query, address)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "accounts", duration)
if err != nil {
return false, fmt.Errorf("checking if account %s is fee bump eligible: %w", address, err)
}
Expand Down
15 changes: 15 additions & 0 deletions internal/data/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ type Payment struct {

func (m *PaymentModel) GetLatestLedgerSynced(ctx context.Context, cursorName string) (uint32, error) {
var lastSyncedLedger uint32
start := time.Now()
err := m.DB.GetContext(ctx, &lastSyncedLedger, `SELECT value FROM ingest_store WHERE key = $1`, cursorName)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_store", duration)
// First run, key does not exist yet
if err == sql.ErrNoRows {
return 0, nil
Expand All @@ -55,7 +58,10 @@ func (m *PaymentModel) UpdateLatestLedgerSynced(ctx context.Context, cursorName
INSERT INTO ingest_store (key, value) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = excluded.value
`
start := time.Now()
_, err := m.DB.ExecContext(ctx, query, cursorName, ledger)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "ingest_store", duration)
if err != nil {
return fmt.Errorf("updating last synced ledger to %d: %w", ledger, err)
}
Expand Down Expand Up @@ -93,8 +99,11 @@ func (m *PaymentModel) AddPayment(ctx context.Context, tx db.Transaction, paymen
memo_type = EXCLUDED.memo_type
;
`
start := time.Now()
_, err := tx.ExecContext(ctx, query, payment.OperationID, payment.OperationType, payment.TransactionID, payment.TransactionHash, payment.FromAddress, payment.ToAddress, payment.SrcAssetCode, payment.SrcAssetIssuer, payment.SrcAssetType, payment.SrcAmount,
payment.DestAssetCode, payment.DestAssetIssuer, payment.DestAssetType, payment.DestAmount, payment.CreatedAt, payment.Memo, payment.MemoType)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("INSERT", "ingest_payments", duration)
if err != nil {
return fmt.Errorf("inserting payment: %w", err)
}
Expand Down Expand Up @@ -145,7 +154,10 @@ func (m *PaymentModel) GetPaymentsPaginated(ctx context.Context, address string,
if err != nil {
return nil, false, false, fmt.Errorf("preparing named query: %w", err)
}
start := time.Now()
err = m.DB.SelectContext(ctx, &payments, query, args...)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_payments", duration)
if err != nil {
return nil, false, false, fmt.Errorf("fetching payments: %w", err)
}
Expand Down Expand Up @@ -190,7 +202,10 @@ func (m *PaymentModel) existsPrevNext(ctx context.Context, filteredSetCTE string
}

var prevExists, nextExists bool
start := time.Now()
err = m.DB.QueryRowxContext(ctx, query, args...).Scan(&prevExists, &nextExists)
duration := time.Since(start).Seconds()
m.MetricsService.ObserveDBQueryDuration("SELECT", "ingest_payments", duration)
if err != nil {
return false, false, fmt.Errorf("fetching prev and next exists: %w", err)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/tss/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash
current_status = EXCLUDED.current_status,
updated_at = NOW();
`
start := time.Now()
_, err := s.DB.ExecContext(ctx, q, txHash, txXDR, webhookURL, status.Status())
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("INSERT", "tss_transactions", duration)
if err != nil {
return fmt.Errorf("inserting/updatig tss transaction: %w", err)
}
Expand All @@ -98,7 +101,10 @@ func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash stri
result_xdr = EXCLUDED.result_xdr,
updated_at = NOW();
`
start := time.Now()
_, err := s.DB.ExecContext(ctx, q, txHash, feeBumpTxHash, feeBumpTxXDR, status.Status(), code.Code(), resultXDR)
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("INSERT", "tss_transaction_submission_tries", duration)
if err != nil {
return fmt.Errorf("inserting/updating tss try: %w", err)
}
Expand All @@ -109,7 +115,10 @@ func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash stri
func (s *store) GetTransaction(ctx context.Context, hash string) (Transaction, error) {
q := `SELECT * FROM tss_transactions WHERE transaction_hash = $1`
var transaction Transaction
start := time.Now()
err := s.DB.GetContext(ctx, &transaction, q, hash)
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("SELECT", "tss_transactions", duration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Transaction{}, nil
Expand All @@ -123,7 +132,10 @@ func (s *store) GetTransaction(ctx context.Context, hash string) (Transaction, e
func (s *store) GetTry(ctx context.Context, hash string) (Try, error) {
q := `SELECT * FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`
var try Try
start := time.Now()
err := s.DB.GetContext(ctx, &try, q, hash)
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("SELECT", "tss_transaction_submission_tries", duration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Try{}, nil
Expand All @@ -137,7 +149,10 @@ func (s *store) GetTry(ctx context.Context, hash string) (Try, error) {
func (s *store) GetTryByXDR(ctx context.Context, xdr string) (Try, error) {
q := `SELECT * FROM tss_transaction_submission_tries WHERE try_transaction_xdr = $1`
var try Try
start := time.Now()
err := s.DB.GetContext(ctx, &try, q, xdr)
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("SELECT", "tss_transaction_submission_tries", duration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Try{}, nil
Expand All @@ -151,7 +166,10 @@ func (s *store) GetTryByXDR(ctx context.Context, xdr string) (Try, error) {
func (s *store) GetTransactionsWithStatus(ctx context.Context, status tss.RPCTXStatus) ([]Transaction, error) {
q := `SELECT * FROM tss_transactions WHERE current_status = $1`
var transactions []Transaction
start := time.Now()
err := s.DB.SelectContext(ctx, &transactions, q, status.Status())
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("SELECT", "tss_transactions", duration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []Transaction{}, nil
Expand All @@ -165,7 +183,10 @@ func (s *store) GetTransactionsWithStatus(ctx context.Context, status tss.RPCTXS
func (s *store) GetLatestTry(ctx context.Context, txHash string) (Try, error) {
q := `SELECT * FROM tss_transaction_submission_tries WHERE original_transaction_hash = $1 ORDER BY updated_at DESC LIMIT 1`
var try Try
start := time.Now()
err := s.DB.GetContext(ctx, &try, q, txHash)
duration := time.Since(start).Seconds()
s.MetricsService.ObserveDBQueryDuration("SELECT", "tss_transaction_submission_tries", duration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Try{}, nil
Expand Down

0 comments on commit 1a82420

Please sign in to comment.