Skip to content

Commit fecb92e

Browse files
authored
Add the experimental exemplar feature (open-telemetry#4871)
* Add the experimental exemplar feature * Add exemplars to EXPERIMENTAL.md * Add changelog entry * Fix hist buckets > 1 detection * Collect instead of Flush res about to be deleted * Add e2e test * Do not pre-alloc ResourceMetrics This only has a single use. * Fix grammatical error in comment * Add test cases Default and invalid OTEL_METRICS_EXEMPLAR_FILTER. Test sampled and non-sampled context for trace_based. * Comment nCPU * Doc OTEL_METRICS_EXEMPLAR_FILTER
1 parent d9d9507 commit fecb92e

14 files changed

+522
-59
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1111
### Added
1212

1313
- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
14+
- Experimental exemplar exporting is added to the metric SDK.
15+
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)
1416

1517
### Fixed
1618

sdk/metric/EXPERIMENTAL.md

+61
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,67 @@ Disable the cardinality limit.
4040
unset OTEL_GO_X_CARDINALITY_LIMIT
4141
```
4242

43+
### Exemplars
44+
45+
A sample of measurements made may be exported directly as a set of exemplars.
46+
47+
This experimental feature can be enabled by setting the `OTEL_GO_X_EXEMPLAR` environment variable.
48+
The value of must be the case-insensitive string of `"true"` to enable the feature.
49+
All other values are ignored.
50+
51+
Exemplar filters are a supported.
52+
The exemplar filter applies to all measurements made.
53+
They filter these measurements, only allowing certain measurements to be passed to the underlying exemplar reservoir.
54+
55+
To change the exemplar filter from the default `"trace_based"` filter set the `OTEL_METRICS_EXEMPLAR_FILTER` environment variable.
56+
The value must be the case-sensitive string defined by the [OpenTelemetry specification].
57+
58+
- `"always_on"`: allows all measurements
59+
- `"always_off"`: denies all measurements
60+
- `"trace_based"`: allows only sampled measurements
61+
62+
All values other than these will result in the default, `"trace_based"`, exemplar filter being used.
63+
64+
[OpenTelemetry specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/a6ca2fd484c9e76fe1d8e1c79c99f08f4745b5ee/specification/configuration/sdk-environment-variables.md#exemplar
65+
66+
#### Examples
67+
68+
Enable exemplars to be exported.
69+
70+
```console
71+
export OTEL_GO_X_EXEMPLAR=true
72+
```
73+
74+
Disable exemplars from being exported.
75+
76+
```console
77+
unset OTEL_GO_X_EXEMPLAR
78+
```
79+
80+
Set the exemplar filter to allow all measurements.
81+
82+
```console
83+
export OTEL_METRICS_EXEMPLAR_FILTER=always_on
84+
```
85+
86+
Set the exemplar filter to deny all measurements.
87+
88+
```console
89+
export OTEL_METRICS_EXEMPLAR_FILTER=always_off
90+
```
91+
92+
Set the exemplar filter to only allow sampled measurements.
93+
94+
```console
95+
export OTEL_METRICS_EXEMPLAR_FILTER=trace_based
96+
```
97+
98+
Revert to the default exemplar filter (`"trace_based"`)
99+
100+
```console
101+
unset OTEL_METRICS_EXEMPLAR_FILTER
102+
```
103+
43104
## Compatibility and Stability
44105

45106
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).

sdk/metric/benchmark_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"runtime"
1921
"strconv"
2022
"testing"
2123

@@ -24,6 +26,7 @@ import (
2426
"go.opentelemetry.io/otel/attribute"
2527
"go.opentelemetry.io/otel/metric"
2628
"go.opentelemetry.io/otel/sdk/metric/metricdata"
29+
"go.opentelemetry.io/otel/trace"
2730
)
2831

2932
var viewBenchmarks = []struct {
@@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) {
369372
b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...))))
370373
}
371374
}
375+
376+
func BenchmarkExemplars(b *testing.B) {
377+
sc := trace.NewSpanContext(trace.SpanContextConfig{
378+
SpanID: trace.SpanID{0o1},
379+
TraceID: trace.TraceID{0o1},
380+
TraceFlags: trace.FlagsSampled,
381+
})
382+
ctx := trace.ContextWithSpanContext(context.Background(), sc)
383+
384+
attr := attribute.NewSet(
385+
attribute.String("user", "Alice"),
386+
attribute.Bool("admin", true),
387+
)
388+
389+
setup := func(name string) (metric.Meter, Reader) {
390+
r := NewManualReader()
391+
v := NewView(Instrument{Name: "*"}, Stream{
392+
AttributeFilter: func(kv attribute.KeyValue) bool {
393+
return kv.Key == attribute.Key("user")
394+
},
395+
})
396+
mp := NewMeterProvider(WithReader(r), WithView(v))
397+
return mp.Meter(name), r
398+
}
399+
nCPU := runtime.NumCPU() // Size of the fixed reservoir used.
400+
401+
b.Setenv("OTEL_GO_X_EXEMPLAR", "true")
402+
403+
name := fmt.Sprintf("Int64Counter/%d", nCPU)
404+
b.Run(name, func(b *testing.B) {
405+
m, r := setup("Int64Counter")
406+
i, err := m.Int64Counter("int64-counter")
407+
assert.NoError(b, err)
408+
409+
rm := newRM(metricdata.Sum[int64]{
410+
DataPoints: []metricdata.DataPoint[int64]{
411+
{Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)},
412+
},
413+
})
414+
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars)
415+
416+
b.ReportAllocs()
417+
b.ResetTimer()
418+
for n := 0; n < b.N; n++ {
419+
for j := 0; j < 2*nCPU; j++ {
420+
i.Add(ctx, 1, metric.WithAttributeSet(attr))
421+
}
422+
423+
_ = r.Collect(ctx, rm)
424+
assert.Len(b, *e, nCPU)
425+
}
426+
})
427+
428+
name = fmt.Sprintf("Int64Histogram/%d", nCPU)
429+
b.Run(name, func(b *testing.B) {
430+
m, r := setup("Int64Counter")
431+
i, err := m.Int64Histogram("int64-histogram")
432+
assert.NoError(b, err)
433+
434+
rm := newRM(metricdata.Histogram[int64]{
435+
DataPoints: []metricdata.HistogramDataPoint[int64]{
436+
{Exemplars: make([]metricdata.Exemplar[int64], 0, 1)},
437+
},
438+
})
439+
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars)
440+
441+
b.ReportAllocs()
442+
b.ResetTimer()
443+
for n := 0; n < b.N; n++ {
444+
for j := 0; j < 2*nCPU; j++ {
445+
i.Record(ctx, 1, metric.WithAttributeSet(attr))
446+
}
447+
448+
_ = r.Collect(ctx, rm)
449+
assert.Len(b, *e, 1)
450+
}
451+
})
452+
}
453+
454+
func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics {
455+
return &metricdata.ResourceMetrics{
456+
ScopeMetrics: []metricdata.ScopeMetrics{
457+
{Metrics: []metricdata.Metrics{{Data: a}}},
458+
},
459+
}
460+
}

sdk/metric/exemplar.go

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package metric // import "go.opentelemetry.io/otel/sdk/metric"
16+
17+
import (
18+
"os"
19+
"runtime"
20+
21+
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
22+
"go.opentelemetry.io/otel/sdk/metric/internal/x"
23+
)
24+
25+
// reservoirFunc returns the appropriately configured exemplar reservoir
26+
// creation func based on the passed InstrumentKind and user defined
27+
// environment variables.
28+
//
29+
// Note: This will only return non-nil values when the experimental exemplar
30+
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
31+
// is not set to always_off.
32+
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
33+
if !x.Exemplars.Enabled() {
34+
return nil
35+
}
36+
37+
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
38+
resF := func() func() exemplar.Reservoir[N] {
39+
// Explicit bucket histogram aggregation with more than 1 bucket will
40+
// use AlignedHistogramBucketExemplarReservoir.
41+
a, ok := agg.(AggregationExplicitBucketHistogram)
42+
if ok && len(a.Boundaries) > 0 {
43+
cp := make([]float64, len(a.Boundaries))
44+
copy(cp, a.Boundaries)
45+
return func() exemplar.Reservoir[N] {
46+
bounds := cp
47+
return exemplar.Histogram[N](bounds)
48+
}
49+
}
50+
51+
var n int
52+
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
53+
// Base2 Exponential Histogram Aggregation SHOULD use a
54+
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
55+
// smaller of the maximum number of buckets configured on the
56+
// aggregation or twenty (e.g. min(20, max_buckets)).
57+
n = int(a.MaxSize)
58+
if n > 20 {
59+
n = 20
60+
}
61+
} else {
62+
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
63+
// This Exemplar reservoir MAY take a configuration parameter for
64+
// the size of the reservoir. If no size configuration is
65+
// provided, the default size MAY be the number of possible
66+
// concurrent threads (e.g. numer of CPUs) to help reduce
67+
// contention. Otherwise, a default size of 1 SHOULD be used.
68+
n = runtime.NumCPU()
69+
if n < 1 {
70+
// Should never be the case, but be defensive.
71+
n = 1
72+
}
73+
}
74+
75+
return func() exemplar.Reservoir[N] {
76+
return exemplar.FixedSize[N](n)
77+
}
78+
}
79+
80+
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
81+
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
82+
83+
switch os.Getenv(filterEnvKey) {
84+
case "always_on":
85+
return resF()
86+
case "always_off":
87+
return exemplar.Drop[N]
88+
case "trace_based":
89+
fallthrough
90+
default:
91+
newR := resF()
92+
return func() exemplar.Reservoir[N] {
93+
return exemplar.SampledFilter(newR())
94+
}
95+
}
96+
}

sdk/metric/internal/aggregate/aggregate.go

+28-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"go.opentelemetry.io/otel/attribute"
22+
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
2223
"go.opentelemetry.io/otel/sdk/metric/metricdata"
2324
)
2425

@@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct {
4445
// Filter is the attribute filter the aggregate function will use on the
4546
// input of measurements.
4647
Filter attribute.Filter
48+
// ReservoirFunc is the factory function used by aggregate functions to
49+
// create new exemplar reservoirs for a new seen attribute set.
50+
//
51+
// If this is not provided a default factory function that returns an
52+
// exemplar.Drop reservoir will be used.
53+
ReservoirFunc func() exemplar.Reservoir[N]
4754
// AggregationLimit is the cardinality limit of measurement attributes. Any
4855
// measurement for new attributes once the limit has been reached will be
4956
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct {
5461
AggregationLimit int
5562
}
5663

57-
func (b Builder[N]) filter(f Measure[N]) Measure[N] {
64+
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
65+
if b.ReservoirFunc != nil {
66+
return b.ReservoirFunc
67+
}
68+
69+
return exemplar.Drop[N]
70+
}
71+
72+
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
73+
74+
func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
5875
if b.Filter != nil {
5976
fltr := b.Filter // Copy to make it immutable after assignment.
6077
return func(ctx context.Context, n N, a attribute.Set) {
61-
fAttr, _ := a.Filter(fltr)
62-
f(ctx, n, fAttr)
78+
fAttr, dropped := a.Filter(fltr)
79+
f(ctx, n, fAttr, dropped)
6380
}
6481
}
65-
return f
82+
return func(ctx context.Context, n N, a attribute.Set) {
83+
f(ctx, n, a, nil)
84+
}
6685
}
6786

6887
// LastValue returns a last-value aggregate function input and output.
@@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
7190
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
7291
// Delta temporality is the only temporality that makes semantic sense for
7392
// a last-value aggregate.
74-
lv := newLastValue[N](b.AggregationLimit)
93+
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
7594

7695
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
7796
// Ignore if dest is not a metricdata.Gauge. The chance for memory
@@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
87106
// PrecomputedSum returns a sum aggregate function input and output. The
88107
// arguments passed to the input are expected to be the precomputed sum values.
89108
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
90-
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
109+
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
91110
switch b.Temporality {
92111
case metricdata.DeltaTemporality:
93112
return b.filter(s.measure), s.delta
@@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
98117

99118
// Sum returns a sum aggregate function input and output.
100119
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
101-
s := newSum[N](monotonic, b.AggregationLimit)
120+
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
102121
switch b.Temporality {
103122
case metricdata.DeltaTemporality:
104123
return b.filter(s.measure), s.delta
@@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
110129
// ExplicitBucketHistogram returns a histogram aggregate function input and
111130
// output.
112131
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
113-
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
132+
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
114133
switch b.Temporality {
115134
case metricdata.DeltaTemporality:
116135
return b.filter(h.measure), h.delta
@@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
122141
// ExponentialBucketHistogram returns a histogram aggregate function input and
123142
// output.
124143
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
125-
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
144+
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
126145
switch b.Temporality {
127146
case metricdata.DeltaTemporality:
128147
return b.filter(h.measure), h.delta

0 commit comments

Comments
 (0)