Skip to content

Commit

Permalink
#8: added user agent and archive pool metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Feb 8, 2024
1 parent 97ab094 commit 2ed8616
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 53 deletions.
22 changes: 19 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package daemon
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/pprof" //nolint:gosec
"os"
"os/signal"
"path"
runtimePprof "runtime/pprof"
"sync"
"syscall"
Expand All @@ -18,7 +20,9 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
"github.com/stellar/go/support/log"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
Expand Down Expand Up @@ -144,12 +148,24 @@ func MustNew(cfg *config.Config) *Daemon {
if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
}
historyArchive, err := historyarchive.Connect(
cfg.HistoryArchiveURLs[0],
historyarchive.ConnectOptions{

historyArchive, err := historyarchive.NewArchivePool(
cfg.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: cfg.NetworkPassphrase,
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: fmt.Sprintf("soroban-rpc/%s", config.Version)},
CacheConfig: historyarchive.CacheOptions{
Cache: true,
Path: path.Join(cfg.CaptiveCoreStoragePath, "bucket-cache"),
Log: log.WithField("subservice", "ha-cache"),
MaxFiles: 150,
},
},
)

if err != nil {
logger.WithError(err).Fatal("could not connect to history archive")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down
8 changes: 4 additions & 4 deletions cmd/soroban-rpc/internal/ingest/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (s *Service) ingestLedgerEntryChanges(ctx context.Context, reader ingest.Ch
results := changeStatsProcessor.GetResults()
for stat, value := range results.Map() {
stat = strings.Replace(stat, "stats_", "change_", 1)
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": stat}).Add(float64(value.(int64)))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand All @@ -66,10 +66,10 @@ func (s *Service) ingestTempLedgerEntryEvictions(
}

for evictionType, count := range counts {
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": evictionType}).Add(float64(count))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "evicted_temp_ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand Down
102 changes: 78 additions & 24 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,41 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(ingestionDurationMetric, latestLedgerMetric, ledgerStatsMetric)
haStatsMetric := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: cfg.Daemon.MetricsNamespace(), Subsystem: "ingest", Name: "history_archive_stats_total",
Help: "Counters of different ingestion archive requests. " +
"'source' label will provide name/address of the physical history archive server from the pool for which a request may be sent. " +
"'type' label will further categorize the potential request into specific requests, " +
"'file_downloads' - the count of files downloaded from an archive server, " +
"'file_uploads' - the count of files uploaded to an archive server, " +
"'requests' - the count of all http requests(includes both queries and file downloads) sent to an archive server, " +
"'cache_hits' - the count of requests for an archive file that were found on local cache instead, no download request sent to archive server.",
},
[]string{"source", "type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric,
haStatsMetric)

service := &Service{
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
metrics: Metrics{
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
haStatsMetric: haStatsMetric,
},
archive: cfg.Archive,
}

return service
Expand Down Expand Up @@ -119,19 +141,25 @@ func startService(service *Service, cfg Config) {
})
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
haStatsMetric *prometheus.CounterVec
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
archive historyarchive.ArchiveInterface
}

func (s *Service) Close() error {
Expand Down Expand Up @@ -286,18 +314,44 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.latestLedgerMetric.Set(float64(sequence))
s.metrics.latestLedgerMetric.Set(float64(sequence))
s.addHistoryArchiveStatsMetrics(s.archive.GetStats())
return nil
}

func (s *Service) addHistoryArchiveStatsMetrics(stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_downloads"}).
Add(float64(historyServerStat.GetDownloads()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "file_uploads"}).
Add(float64(historyServerStat.GetUploads()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "requests"}).
Add(float64(historyServerStat.GetRequests()))
s.metrics.haStatsMetric.
With(prometheus.Labels{
"source": historyServerStat.GetBackendName(),
"type": "cache_hits"}).
Add(float64(historyServerStat.GetCacheHits()))
}
}

func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error {
startTime := time.Now()
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/simulate_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func TestSimulateSystemEvent(t *testing.T) {
err = xdr.SafeUnmarshalBase64(response.TransactionData, &transactionData)
require.NoError(t, err)
assert.InDelta(t, 6856, uint32(transactionData.Resources.ReadBytes), 200)

// the resulting fee is derived from compute factors and a default padding is applied to instructions by preflight
// for test purposes, the most deterministic way to assert the resulting fee is expected value in test scope, is to capture
// the resulting fee from current preflight output and re-plug it in here, rather than try to re-implement the cost-model algo
Expand Down
36 changes: 30 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stellar/go v0.0.0-20240109175136-3ca501f09055
github.com/stellar/go v0.0.0-20240201172825-2df28100a1c5
github.com/stretchr/testify v1.8.4
golang.org/x/mod v0.13.0
gotest.tools/v3 v3.5.0
)

require (
cloud.google.com/go v0.111.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/cloudflare/circl v1.3.5 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
Expand All @@ -37,8 +50,19 @@ require (
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/api v0.149.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231211222908-989df2bf70f3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.60.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

Expand Down Expand Up @@ -85,13 +109,13 @@ require (
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/tylerb/graceful.v1 v1.2.15 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 2ed8616

Please sign in to comment.