Skip to content

Commit

Permalink
feat(store)!: metrics for Store (#129)
Browse files Browse the repository at this point in the history
Includes:
* headHeight gauge
* flushTime histogram
* readTime histogram
* writeQueueBlocked counter

Breaking because removes existing `head` metrics which are now moved into the store itself. 

Also fixes two minor bugs
  • Loading branch information
Wondertan authored Nov 6, 2023
1 parent a8ce731 commit 28ff21c
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 58 deletions.
39 changes: 0 additions & 39 deletions metrics.go

This file was deleted.

119 changes: 119 additions & 0 deletions store/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package store

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/store")

type metrics struct {
headHeight atomic.Int64
headHeightInst metric.Int64ObservableGauge
headHeightReg metric.Registration

flushTimeInst metric.Float64Histogram
readTimeInst metric.Float64Histogram

writesQueueBlockedInst metric.Int64Counter
}

func newMetrics() (m *metrics, err error) {
m = new(metrics)
m.headHeightInst, err = meter.Int64ObservableGauge(
"hdr_store_head_height_gauge",
metric.WithDescription("current header store head height(subjective height)"),
)
if err != nil {
return nil, err
}
m.headHeightReg, err = meter.RegisterCallback(m.observeHeight, m.headHeightInst)
if err != nil {
return nil, err
}
m.flushTimeInst, err = meter.Float64Histogram(
"hdr_store_flush_time_hist",
metric.WithDescription("header store flush time in seconds"),
)
if err != nil {
return nil, err
}
m.readTimeInst, err = meter.Float64Histogram(
"hdr_store_read_time_hist",
metric.WithDescription("header store single header read time from datastore in seconds and ignoring cache"),
)
if err != nil {
return nil, err
}
m.writesQueueBlockedInst, err = meter.Int64Counter(
"hdr_store_writes_blocked_counter",
metric.WithDescription("header store writes blocked counter"),
)
if err != nil {
return nil, err
}
return m, nil
}

func (m *metrics) newHead(height uint64) {
m.observe(context.Background(), func(ctx context.Context) {
m.headHeight.Store(int64(height))
})
}

func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.headHeightInst, m.headHeight.Load())
return nil
}

func (m *metrics) flush(ctx context.Context, duration time.Duration, amount int, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.flushTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(
attribute.Int("amount", amount/100), // divide by 100 to reduce cardinality
attribute.Bool("failed", failed),
),
)
})
}

func (m *metrics) readSingle(ctx context.Context, duration time.Duration, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.readTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Bool("failed", failed)),
)
})
}

func (m *metrics) writesQueueBlocked(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.writesQueueBlockedInst.Add(ctx, 1)
})
}

func (m *metrics) observe(ctx context.Context, f func(context.Context)) {
if m == nil {
return
}

if ctx.Err() != nil {
ctx = context.Background()
}

f(ctx)
}

func (m *metrics) Close() error {
if m == nil {
return nil
}

return m.headHeightReg.Unregister()
}
12 changes: 11 additions & 1 deletion store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ type Parameters struct {
// IndexCacheSize defines the maximum amount of entries in the Height to Hash index cache.
IndexCacheSize int

// WriteBatchSize defines the size of the batched header write.
// WriteBatchSize defines the size of the batched header flush.
// Headers are written in batches not to thrash the underlying Datastore with writes.
WriteBatchSize int

// storePrefix defines the prefix used to wrap the store
// OPTIONAL
storePrefix datastore.Key

// metrics is a flag that enables metrics collection
metrics bool
}

// DefaultParameters returns the default params to configure the store.
Expand All @@ -51,6 +54,13 @@ func (p *Parameters) Validate() error {
return nil
}

// WithMetrics enables metrics on the Store.
func WithMetrics() Option {
return func(p *Parameters) {
p.metrics = true
}
}

// WithStoreCacheSize is a functional option that configures the
// `StoreCacheSize` parameter.
func WithStoreCacheSize(size int) Option {
Expand Down
80 changes: 62 additions & 18 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-datastore"
Expand All @@ -31,6 +32,8 @@ type Store[H header.Header[H]] struct {
ds datastore.Batching
// adaptive replacement cache of headers
cache *lru.ARCCache
// metrics collection instance
metrics *metrics

// header heights management
//
Expand Down Expand Up @@ -102,15 +105,24 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
return nil, fmt.Errorf("failed to create height indexer: %w", err)
}

var metrics *metrics
if params.metrics {
metrics, err = newMetrics()
if err != nil {
return nil, err
}
}

return &Store[H]{
Params: params,
ds: wrappedStore,
cache: cache,
metrics: metrics,
heightIndex: index,
heightSub: newHeightSub[H](),
writes: make(chan []H, 16),
writesDn: make(chan struct{}),
cache: cache,
heightIndex: index,
pending: newBatch[H](params.WriteBatchSize),
Params: params,
}, nil
}

Expand Down Expand Up @@ -141,17 +153,22 @@ func (s *Store[H]) Stop(ctx context.Context) error {
default:
}
// signal to prevent further writes to Store
s.writes <- nil
select {
case <-s.writesDn: // wait till it is done writing
case s.writes <- nil:
case <-ctx.Done():
return ctx.Err()
}
// wait till it is done writing
select {
case <-s.writesDn:
case <-ctx.Done():
return ctx.Err()
}

// cleanup caches
s.cache.Purge()
s.heightIndex.cache.Purge()
return nil
return s.metrics.Close()
}

func (s *Store[H]) Height() uint64 {
Expand All @@ -172,7 +189,7 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
return zero, header.ErrNoHead
case err == nil:
s.heightSub.SetHeight(uint64(head.Height()))
s.heightSub.SetHeight(head.Height())
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
return head, nil
}
Expand All @@ -188,12 +205,8 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
return h, nil
}

b, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
b, err := s.get(ctx, hash)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return zero, header.ErrNotFound
}

return zero, err
}

Expand Down Expand Up @@ -356,15 +369,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
verified, head = append(verified, h), h
}

onWrite := func() {
newHead := verified[len(verified)-1]
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
}

// queue headers to be written on disk
select {
case s.writes <- verified:
ln := len(verified)
s.writeHead.Store(&verified[ln-1])
wh := *s.writeHead.Load()
log.Infow("new head", "height", wh.Height(), "hash", wh.Hash())
// we return an error here after writing,
// as there might be an invalid header in between of a given range
onWrite()
return err
default:
s.metrics.writesQueueBlocked(ctx)
}
// if the writes queue is full, we block until it is not
select {
case s.writes <- verified:
onWrite()
return err
case <-s.writesDn:
return errStoppedStore
Expand Down Expand Up @@ -393,13 +418,17 @@ func (s *Store[H]) flushLoop() {
continue
}

err := s.flush(ctx, s.pending.GetAll()...)
startTime := time.Now()
toFlush := s.pending.GetAll()
err := s.flush(ctx, toFlush...)
if err != nil {
from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height()
// TODO(@Wondertan): Should this be a fatal error case with os.Exit?
from, to := uint64(headers[0].Height()), uint64(headers[len(headers)-1].Height())
log.Errorw("writing header batch", "from", from, "to", to)
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)
continue
}
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
s.pending.Reset()

Expand Down Expand Up @@ -472,3 +501,18 @@ func (s *Store[H]) readHead(ctx context.Context) (H, error) {

return s.Get(ctx, head)
}

func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
startTime := time.Now()
data, err := s.ds.Get(ctx, datastore.NewKey(hash.String()))
if err != nil {
s.metrics.readSingle(ctx, time.Since(startTime), true)
if errors.Is(err, datastore.ErrNotFound) {
return nil, header.ErrNotFound
}
return nil, err
}

s.metrics.readSingle(ctx, time.Since(startTime), false)
return data, nil
}

0 comments on commit 28ff21c

Please sign in to comment.