Skip to content

Commit

Permalink
metric: export quantiles from prometheus-based histogram
Browse files Browse the repository at this point in the history
This change builds on the previous one and adds a function to export
quantiles from the Prometheus-based histogram. This functionality is
used to store histogram data in the internal timeseries database. The
hdr library came with a function to do this, while Prometheus does not
have a public API for exporting quantiles.

The function implemented here is very similar to the one found
internally in Prometheus, using linear interpolation to calculate values
at a given quantile.

This commit also includes some additional testing and general
refactoring of the metrics code.

Release note: None

Release justification: low risk, high benefit changes
  • Loading branch information
aadityasondhi committed Aug 23, 2022
1 parent ba4a84c commit ad8b581
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 41 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ go_library(
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
"@com_github_kr_pretty//:pretty",
"@com_github_prometheus_client_golang//prometheus",
"@io_etcd_go_etcd_raft_v3//:raft",
"@io_etcd_go_etcd_raft_v3//raftpb",
"@io_etcd_go_etcd_raft_v3//tracker",
Expand Down
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/slidingwindow"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/raft/v3/raftpb"
)

Expand Down Expand Up @@ -1746,7 +1745,7 @@ type StoreMetrics struct {
RaftCommandCommitLatency *metric.Histogram
RaftHandleReadyLatency *metric.Histogram
RaftApplyCommittedLatency *metric.Histogram
RaftSchedulerLatency *metric.HistogramV2
RaftSchedulerLatency *metric.Histogram
RaftTimeoutCampaign *metric.Counter

// Raft message metrics.
Expand Down Expand Up @@ -2248,10 +2247,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftCommandCommitLatency: metric.NewLatency(metaRaftCommandCommitLatency, histogramWindow),
RaftHandleReadyLatency: metric.NewLatency(metaRaftHandleReadyLatency, histogramWindow),
RaftApplyCommittedLatency: metric.NewLatency(metaRaftApplyCommittedLatency, histogramWindow),
RaftSchedulerLatency: metric.NewHistogramV2(metaRaftSchedulerLatency, histogramWindow, prometheus.HistogramOpts{
Buckets: metric.IOLatencyBuckets,
}),
RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign),
RaftSchedulerLatency: metric.NewLatency(metaRaftSchedulerLatency, histogramWindow),
RaftTimeoutCampaign: metric.NewCounter(metaRaftTimeoutCampaign),

// Raft message metrics.
RaftRcvdMessages: [...]*metric.Counter{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type raftScheduleState struct {
type raftScheduler struct {
ambientContext log.AmbientContext
processor raftProcessor
latency *metric.HistogramV2
latency *metric.Histogram
numWorkers int

mu struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/server/status/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_codahale_hdrhistogram//:hdrhistogram",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_elastic_gosigar//:gosigar",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_shirou_gopsutil_v3//net",
] + select({
"@io_bazel_rules_go//go/platform:aix": [
Expand Down
46 changes: 17 additions & 29 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/cockroachdb/redact"
"github.com/dustin/go-humanize"
"github.com/elastic/gosigar"
prometheusgo "github.com/prometheus/client_model/go"
)

const (
Expand Down Expand Up @@ -528,23 +527,7 @@ type registryRecorder struct {
}

func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
// TODO(tschottdorf,ajwerner): consider moving this switch to a single
// interface implemented by the individual metric types.
type (
float64Valuer interface{ Value() float64 }
int64Valuer interface{ Value() int64 }
int64Counter interface{ Count() int64 }
prometheusMetricValuer interface{ ToPrometheusMetric() *prometheusgo.Metric }
)
switch mtr := mtr.(type) {
case float64:
fn(name, mtr)
case float64Valuer:
fn(name, mtr.Value())
case int64Valuer:
fn(name, float64(mtr.Value()))
case int64Counter:
fn(name, float64(mtr.Count()))
case *metric.Histogram:
// TODO(mrtracy): Where should this comment go for better
// visibility?
Expand All @@ -566,19 +549,24 @@ func extractValue(name string, mtr interface{}, fn func(string, float64)) error
}
fn(name+"-count", float64(curr.TotalCount()))
case *metric.HistogramV2:
// NB: this branch is intentionally at the bottom since all metrics implement it.
cur := mtr.Windowed()
var m prometheusgo.Metric
_ = cur.Write(&m)
hist := m.Histogram
n := float64(*hist.SampleCount)
n := float64(mtr.TotalCountWindowed())
fn(name+"-count", n)
fn(name+"-avg", *hist.SampleSum/n)
// TODO(obs-inf): add quantiles like for the hdrhistogram.
case prometheusMetricValuer:
// TODO we should be able to handle all non-histogram branches using this, i.e.
// can delete the float, int, etc, cases above.
_ = mtr.ToPrometheusMetric()
avg := mtr.TotalSumWindowed() / n
if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) {
avg = 0
}
fn(name+"-avg", avg)
for _, pt := range recordHistogramQuantiles {
fn(name+pt.suffix, mtr.ValueAtQuantileWindowed(pt.quantile))
}
case metric.PrometheusExportable:
// NB: this branch is intentionally at the bottom since all metrics implement it.
m := mtr.ToPrometheusMetric()
if m.Gauge != nil {
fn(name, *m.Gauge.Value)
} else if m.Counter != nil {
fn(name, *m.Counter.Value)
}

default:
return errors.Errorf("cannot extract value for type %T", mtr)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/metric/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
deps = [
"//pkg/util/log",
"@com_github_kr_pretty//:pretty",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
],
Expand Down
89 changes: 88 additions & 1 deletion pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package metric
import (
"encoding/json"
"math"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -231,6 +232,7 @@ func NewLatency(metadata Metadata, histogramWindow time.Duration) *Histogram {
func (h *Histogram) Windowed() (*hdrhistogram.Histogram, time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
maybeTick(h.mu.tickHelper)
// TODO(obs-inf): not sure we should multiply by histWrapNum here, but it
// has been the behavior for a long time.
return cloneHistogram(h.mu.sliding.Merge()), histWrapNum * h.mu.tickInterval
Expand Down Expand Up @@ -404,6 +406,12 @@ func NewHistogramV2(
var _ periodic = (*HistogramV2)(nil)
var _ PrometheusExportable = (*HistogramV2)(nil)

// HistogramV2 is a prometheus-backed histogram. It collects observed values by
// keeping bucketed counts. For convenience, internally two sets of buckets are
// kept: A cumulative set (i.e. data is never evicted) and a windowed set (which
// keeps only recently collected samples).
//
// New buckets are created using TestHistogramBuckets.
type HistogramV2 struct {
Metadata
cum prometheus.Histogram
Expand Down Expand Up @@ -440,26 +448,29 @@ func (h *HistogramV2) tick() {
h.windowed.tick()
}

// Windowed returns a copy of the current windowed histogram.
func (h *HistogramV2) Windowed() prometheus.Histogram {
h.windowed.RLock()
defer h.windowed.RUnlock()
return h.windowed.cur
}

// RecordValue adds the given value to the histogram.
func (h *HistogramV2) RecordValue(n int64) {
v := float64(n)
h.cum.Observe(v)

h.windowed.RLock()
defer h.windowed.RUnlock()
h.windowed.cur.Observe(v)

}

// GetType returns the prometheus type enum for this metric.
func (h *HistogramV2) GetType() *prometheusgo.MetricType {
return prometheusgo.MetricType_HISTOGRAM.Enum()
}

// ToPrometheusMetric returns a filled-in prometheus metric of the right type.
func (h *HistogramV2) ToPrometheusMetric() *prometheusgo.Metric {
m := &prometheusgo.Metric{}
if err := h.cum.Write(m); err != nil {
Expand All @@ -468,17 +479,93 @@ func (h *HistogramV2) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
func (h *HistogramV2) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
panic(err) // TODD
}
return m
}

// GetMetadata returns the metric's metadata including the Prometheus
// MetricType.
func (h *HistogramV2) GetMetadata() Metadata {
return h.Metadata
}

// Inspect calls the closure.
func (h *HistogramV2) Inspect(f func(interface{})) {
h.windowed.Lock()
maybeTick(&h.windowed)
h.windowed.Unlock()
f(h)
}

// TotalCount returns the (cumulative) number of samples.
func (h *HistogramV2) TotalCount() int64 {
return int64(h.ToPrometheusMetric().Histogram.GetSampleCount())
}

// TotalCountWindowed returns the number of samples in the current window.
func (h *HistogramV2) TotalCountWindowed() int64 {
return int64(h.ToPrometheusMetricWindowed().Histogram.GetSampleCount())
}

// TotalSum returns the (cumulative) number of samples.
func (h *HistogramV2) TotalSum() float64 {
return h.ToPrometheusMetric().Histogram.GetSampleSum()
}

// TotalSumWindowed returns the number of samples in the current window.
func (h *HistogramV2) TotalSumWindowed() float64 {
return h.ToPrometheusMetricWindowed().Histogram.GetSampleSum()
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
//
// https://github.com/prometheus/prometheus/blob/d91621890a2ccb3191a6d74812cc1827dd4093bf/promql/quantile.go#L75
// This function is mostly taken from a prometheus internal function that
// does the same thing. There are a few differences for our use case:
// 1. As a user of the prometheus go client library, we don't have access
// to the implicit +Inf bucket, so we don't need special cases to deal
// with the quantiles that include the +Inf bucket.
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *HistogramV2) ValueAtQuantileWindowed(q float64) float64 {
m := h.ToPrometheusMetricWindowed()

buckets := m.Histogram.Bucket
n := float64(*m.Histogram.SampleCount)
if n == 0 {
return 0
}

rank := uint64((q / 100) * n)
b := sort.Search(len(buckets)-1, func(i int) bool { return *buckets[i].CumulativeCount >= rank })

var (
bucketStart float64
bucketEnd = *buckets[b].UpperBound
count = *buckets[b].CumulativeCount
)

// Calculate the linearly interpolated value within the bucket
if b > 0 {
bucketStart = *buckets[b-1].UpperBound
count -= *buckets[b-1].CumulativeCount
rank -= *buckets[b-1].CumulativeCount
}
val := bucketStart + (bucketEnd-bucketStart)*(float64(rank)/float64(count))
if math.IsNaN(val) || math.IsInf(val, +1) || math.IsInf(val, -1) {
return 0
}
return val
}

// A Counter holds a single mutable atomic value.
type Counter struct {
Metadata
Expand Down
74 changes: 74 additions & 0 deletions pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,80 @@ func TestHistogramPrometheus(t *testing.T) {
}
}

func TestHistogramV2(t *testing.T) {
u := func(v int) *uint64 {
n := uint64(v)
return &n
}

f := func(v int) *float64 {
n := float64(v)
return &n
}

h := NewHistogramV2(
Metadata{},
time.Hour,
prometheus.HistogramOpts{
Namespace: "",
Subsystem: "",
Name: "",
Help: "",
ConstLabels: nil,
Buckets: []float64{
1.0,
5.0,
10.0,
25.0,
100.0,
},
},
)

// should return 0 if no observations are made
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))

// 200 is intentionally set us the first value to verify that the function
// does not return NaN or Inf.
measurements := []int64{200, 0, 4, 5, 10, 20, 25, 30, 40, 90}
var expSum float64
for i, m := range measurements {
h.RecordValue(m)
if i == 0 {
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
}
expSum += float64(m)
}

act := *h.ToPrometheusMetric().Histogram
exp := prometheusgo.Histogram{
SampleCount: u(len(measurements)),
SampleSum: &expSum,
Bucket: []*prometheusgo.Bucket{
{CumulativeCount: u(1), UpperBound: f(1)},
{CumulativeCount: u(3), UpperBound: f(5)},
{CumulativeCount: u(4), UpperBound: f(10)},
{CumulativeCount: u(6), UpperBound: f(25)},
{CumulativeCount: u(9), UpperBound: f(100)},
// NB: 200 is greater than the largest defined bucket so prometheus
// puts it in an implicit bucket with +Inf as the upper bound.
},
}

if !reflect.DeepEqual(act, exp) {
t.Fatalf("expected differs from actual: %s", pretty.Diff(exp, act))
}

require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, 1.0, h.ValueAtQuantileWindowed(10))
require.Equal(t, 17.5, h.ValueAtQuantileWindowed(50))
require.Equal(t, 75.0, h.ValueAtQuantileWindowed(80))
require.Equal(t, 125.0, h.ValueAtQuantileWindowed(100))
}

// TestHistogramBuckets is used to generate additional prometheus buckets to be
// used with HistogramV2. Please include obs-inf in the review process of new
// buckets.
func TestHistogramBuckets(t *testing.T) {
verifyAndPrint := func(t *testing.T, exp, act []float64) {
t.Helper()
Expand Down

0 comments on commit ad8b581

Please sign in to comment.