Skip to content

Commit

Permalink
Incorporate errors into ledger range retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Apr 22, 2024
1 parent 3779ab8 commit 82ccd3a
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 65 deletions.
58 changes: 21 additions & 37 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Transaction struct {
Ledger ledgerbucketwindow.LedgerInfo
}

// TransactionWriter is used during ingestion
// TransactionWriter is used during ingestion to write LCM.
type TransactionWriter interface {
InsertTransactions(lcm xdr.LedgerCloseMeta) error
RegisterMetrics(ingest, insert, count prometheus.Observer)
Expand All @@ -45,14 +45,13 @@ type TransactionWriter interface {
// read-only DB session to perform actual DB reads on.
type TransactionReader interface {
NewTx(ctx context.Context) (TransactionReaderTx, error)
GetLedgerRange(ctx context.Context) ledgerbucketwindow.LedgerRange
}

// TransactionReaderTx provides all of the public ways to read from the DB.
// Note that `Done()` *MUST* be called to clean things up.
type TransactionReaderTx interface {
GetTransaction(log *log.Entry, hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange)
GetLedgerRange() ledgerbucketwindow.LedgerRange
GetLedgerRange() (ledgerbucketwindow.LedgerRange, error)
Done() error
}

Expand Down Expand Up @@ -168,24 +167,12 @@ func (txn *transactionHandler) NewTx(ctx context.Context) (TransactionReaderTx,
return &transactionReaderTx{ctx: ctx, db: sesh, passphrase: txn.passphrase}, nil
}

// GetLedgerRange will fetch from the database in an isolated tx.
func (txn *transactionHandler) GetLedgerRange(ctx context.Context) ledgerbucketwindow.LedgerRange {
reader, err := txn.NewTx(ctx)
defer reader.Done()
if err != nil {
return ledgerbucketwindow.LedgerRange{}
}

lRange := reader.GetLedgerRange()
return lRange
}

func (txn *transactionReaderTx) Done() error {
return txn.db.Rollback()
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the database.
func (txn *transactionReaderTx) GetLedgerRange() ledgerbucketwindow.LedgerRange {
func (txn *transactionReaderTx) GetLedgerRange() (ledgerbucketwindow.LedgerRange, error) {
var ledgerRange ledgerbucketwindow.LedgerRange
log.Debugf("Retrieving ledger range from database: %+v", txn)

Expand All @@ -197,29 +184,23 @@ func (txn *transactionReaderTx) GetLedgerRange() ledgerbucketwindow.LedgerRange
RunWith(sqlTx)
rows, err := newestQ.Query()
if err != nil {
log.Errorf("Error when querying for latest ledger: %v", err)
return ledgerRange
return ledgerRange, errors.Wrap(err, "couldn't query latest ledger")
}

// There is almost certainly a row, but we want to avoid a race condition
// with ingestion as well as support test cases from an empty DB, so we need
// to sanity check that there is in fact a result.
if !rows.Next() {
retErr := errors.New("no ledgers in database")
if ierr := rows.Err(); ierr != nil {
log.Errorf("Error when querying for latest ledger: %v", ierr)
retErr = errors.Wrap(ierr, "couldn't query latest ledger")
}
return ledgerRange
return ledgerRange, retErr
}

var row1, row2 []byte
var lcm1, lcm2 xdr.LedgerCloseMeta
if err := rows.Scan(&row1); err != nil {
log.Errorf("Error when scanning newest ledger: %v", err)
return ledgerRange
}
if err := lcm1.UnmarshalBinary(row1); err != nil {
log.Errorf("Error when unmarshaling newest ledger: %v", err)
return ledgerRange
if err := rows.Scan(&lcm1); err != nil {
return ledgerRange, errors.Wrap(err, "couldn't read newest ledger")
}
// Best effort: if the second fails later, we at least set the first.
ledgerRange.FirstLedger.Sequence = lcm1.LedgerSequence()
Expand All @@ -234,20 +215,15 @@ func (txn *transactionReaderTx) GetLedgerRange() ledgerbucketwindow.LedgerRange
RunWith(sqlTx).
QueryRow()

if err := oldestQ.Scan(&row2); err != nil {
log.Errorf("Error when scanning oldest ledger: %v", err)
return ledgerRange
}
if err := lcm2.UnmarshalBinary(row2); err != nil {
log.Errorf("Error when unmarshaling oldest ledger: %v", err)
return ledgerRange
if err := oldestQ.Scan(&lcm2); err != nil {
return ledgerRange, errors.Wrap(err, "couldn't read oldest ledger")
}
ledgerRange.FirstLedger.Sequence = lcm2.LedgerSequence()
ledgerRange.FirstLedger.CloseTime = lcm2.LedgerCloseTime()

log.Debugf("Database ledger range: [%d, %d]",
ledgerRange.FirstLedger.Sequence, ledgerRange.LastLedger.Sequence)
return ledgerRange
return ledgerRange, nil
}

// GetTransaction conforms to the interface in
Expand All @@ -261,8 +237,16 @@ func (txn *transactionReaderTx) GetTransaction(log *log.Entry, hash xdr.Hash) (
) {
start := time.Now()
tx := Transaction{}
ledgerRange := txn.GetLedgerRange()
hexHash := hex.EncodeToString(hash[:])

ledgerRange, err := txn.GetLedgerRange()
if err != nil {
log.WithField("error", err).
WithField("txhash", hexHash).
Errorf("Failed to fetch ledger range from database")
return tx, false, ledgerRange
}

lcm, ingestTx, err := txn.getTransactionByHash(hexHash)
if err != nil {
if err != io.EOF { // mark for "not in db"
Expand Down
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
}

// GetLedgerRange returns the first and latest ledger available in the store.
func (m *MemoryStore) GetLedgerRange(_ context.Context) ledgerbucketwindow.LedgerRange {
func (m *MemoryStore) GetLedgerRange(_ context.Context) (ledgerbucketwindow.LedgerRange, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.eventsByLedger.GetLedgerRange()
return m.eventsByLedger.GetLedgerRange(), nil
}
4 changes: 3 additions & 1 deletion cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
if err := tx.Commit(sequence); err != nil {
return err
}
s.logger.Debugf("Ingested ledger %d", sequence)
s.logger.
WithField("duration", time.Since(startTime).Seconds()).
Debugf("Ingested ledger %d", sequence)

s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).
Expand Down
24 changes: 11 additions & 13 deletions cmd/soroban-rpc/internal/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,10 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {
},
}

// Get the largest history window
var ledgerRangeGetter methods.LedgerRangeGetter = params.EventStore
var retentionWindow = cfg.EventLedgerRetentionWindow
if cfg.TransactionLedgerRetentionWindow > cfg.EventLedgerRetentionWindow {
retentionWindow = cfg.TransactionLedgerRetentionWindow
ledgerRangeGetter = params.TransactionReader
}
// While we transition from in-memory to database-oriented history storage,
// the on-disk (transaction) retention window will always be larger than the
// in-memory (events) one.
var retentionWindow = cfg.TransactionLedgerRetentionWindow

handlers := []struct {
methodName string
Expand All @@ -150,15 +147,17 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {
requestDurationLimit time.Duration
}{
{
methodName: "getHealth",
underlyingHandler: methods.NewHealthCheck(retentionWindow, ledgerRangeGetter, cfg.MaxHealthyLedgerLatency),
methodName: "getHealth",
underlyingHandler: methods.NewHealthCheck(
retentionWindow, params.TransactionReader, cfg.MaxHealthyLedgerLatency),
longName: "get_health",
queueLimit: cfg.RequestBacklogGetHealthQueueLimit,
requestDurationLimit: cfg.MaxGetHealthExecutionDuration,
},
{
methodName: "getEvents",
underlyingHandler: methods.NewGetEventsHandler(params.EventStore, cfg.MaxEventsLimit, cfg.DefaultEventsLimit),
methodName: "getEvents",
underlyingHandler: methods.NewGetEventsHandler(
params.EventStore, cfg.MaxEventsLimit, cfg.DefaultEventsLimit),
longName: "get_events",
queueLimit: cfg.RequestBacklogGetEventsQueueLimit,
requestDurationLimit: cfg.MaxGetEventsExecutionDuration,
Expand Down Expand Up @@ -201,8 +200,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {
{
methodName: "sendTransaction",
underlyingHandler: methods.NewSendTransactionHandler(
params.Daemon, params.Logger, params.TransactionReader, cfg.NetworkPassphrase,
),
params.Daemon, params.Logger, params.TransactionReader, cfg.NetworkPassphrase),
longName: "send_transaction",
queueLimit: cfg.RequestBacklogSendTransactionQueueLimit,
requestDurationLimit: cfg.MaxSendTransactionExecutionDuration,
Expand Down
5 changes: 0 additions & 5 deletions cmd/soroban-rpc/internal/methods/get_ledger_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

var ErrLedgerTtlEntriesCannotBeQueriedDirectly = "ledger ttl entries cannot be queried directly"
Expand All @@ -36,10 +35,6 @@ type GetLedgerEntriesResponse struct {
LatestLedger uint32 `json:"latestLedger"`
}

type LedgerRangeGetter interface {
GetLedgerRange(ctx context.Context) ledgerbucketwindow.LedgerRange
}

const getLedgerEntriesMaxKeys = 200

// NewGetLedgerEntriesHandler returns a JSON RPC handler to retrieve the specified ledger entries from Stellar Core.
Expand Down
21 changes: 17 additions & 4 deletions cmd/soroban-rpc/internal/methods/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/creachadair/jrpc2"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
)

type HealthCheckResult struct {
Expand All @@ -16,13 +17,25 @@ type HealthCheckResult struct {
}

// NewHealthCheck returns a health check json rpc handler
func NewHealthCheck(retentionWindow uint32, ledgerRangeGetter LedgerRangeGetter, maxHealthyLedgerLatency time.Duration) jrpc2.Handler {
func NewHealthCheck(
retentionWindow uint32,
txReader db.TransactionReader,
maxHealthyLedgerLatency time.Duration,
) jrpc2.Handler {
return NewHandler(func(ctx context.Context) (HealthCheckResult, error) {
ledgerRange := ledgerRangeGetter.GetLedgerRange(ctx)
if ledgerRange.LastLedger.Sequence < 1 {
tx, err := txReader.NewTx(ctx)
if err != nil {
return HealthCheckResult{}, jrpc2.Error{
Code: jrpc2.InternalError,
Message: "data stores are not initialized",
Message: "database read failed: " + err.Error(),
}
}

ledgerRange, err := tx.GetLedgerRange()
if err != nil || ledgerRange.LastLedger.Sequence < 1 {
return HealthCheckResult{}, jrpc2.Error{
Code: jrpc2.InternalError,
Message: "data stores are not initialized " + err.Error(),
}
}

Expand Down
28 changes: 25 additions & 3 deletions cmd/soroban-rpc/internal/methods/send_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
)

// SendTransactionResponse represents the transaction submission response returned Soroban-RPC
Expand Down Expand Up @@ -44,7 +45,12 @@ type SendTransactionRequest struct {
}

// NewSendTransactionHandler returns a submit transaction json rpc handler
func NewSendTransactionHandler(daemon interfaces.Daemon, logger *log.Entry, ledgerRangeGetter LedgerRangeGetter, passphrase string) jrpc2.Handler {
func NewSendTransactionHandler(
daemon interfaces.Daemon,
logger *log.Entry,
txReader db.TransactionReader,
passphrase string,
) jrpc2.Handler {
submitter := daemon.CoreClient()
return NewHandler(func(ctx context.Context, request SendTransactionRequest) (SendTransactionResponse, error) {
var envelope xdr.TransactionEnvelope
Expand All @@ -66,11 +72,27 @@ func NewSendTransactionHandler(daemon interfaces.Daemon, logger *log.Entry, ledg
}
txHash := hex.EncodeToString(hash[:])

latestLedgerInfo := ledgerRangeGetter.GetLedgerRange(ctx).LastLedger
tx, err := txReader.NewTx(ctx)
if err != nil {
// This isn't fatal as we can still do txsub, but it's worth logging.
logger.WithError(err).
WithField("tx", request.Transaction).
Error("could not begin database transaction")
}

ledgerInfo, err := tx.GetLedgerRange()
if err != nil { // still not fatal
logger.WithError(err).
WithField("tx", request.Transaction).
Error("could not fetch ledger range")
}
latestLedgerInfo := ledgerInfo.LastLedger

resp, err := submitter.SubmitTransaction(ctx, request.Transaction)
if err != nil {
logger.WithError(err).
WithField("tx", request.Transaction).Error("could not submit transaction")
WithField("tx", request.Transaction).
Error("could not submit transaction")
return SendTransactionResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: "could not submit transaction to stellar-core",
Expand Down

0 comments on commit 82ccd3a

Please sign in to comment.