Skip to content

Commit 9bc3840

Browse files
authored
Merge branch 'main' into semconv-v1.27.0
2 parents 8f4d982 + 81b2a33 commit 9bc3840

21 files changed

+170
-54
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212

1313
- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
1414
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
15+
- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861)
16+
- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861)
17+
- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861)
1518
- The `go.opentelemetry.io/otel/semconv/v1.27.0` package.
1619
The package contains semantic conventions from the `v1.27.0` version of the OpenTelemetry Semantic Conventions. (#5894)
1720

sdk/metric/example_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,31 @@ func ExampleNewView_exponentialHistogram() {
242242
)
243243
}
244244

245+
func ExampleNewView_exemplarreservoirproviderselector() {
246+
// Create a view that makes all metrics use a different exemplar reservoir.
247+
view := metric.NewView(
248+
metric.Instrument{Name: "*"},
249+
metric.Stream{
250+
ExemplarReservoirProviderSelector: func(agg metric.Aggregation) exemplar.ReservoirProvider {
251+
// This example uses a fixed-size reservoir with a size of 10
252+
// for explicit bucket histograms instead of the default
253+
// bucket-aligned reservoir.
254+
if _, ok := agg.(metric.AggregationExplicitBucketHistogram); ok {
255+
return exemplar.FixedSizeReservoirProvider(10)
256+
}
257+
// Fall back to the default reservoir otherwise.
258+
return metric.DefaultExemplarReservoirProviderSelector(agg)
259+
},
260+
},
261+
)
262+
263+
// The created view can then be registered with the OpenTelemetry metric
264+
// SDK using the WithView option.
265+
_ = metric.NewMeterProvider(
266+
metric.WithView(view),
267+
)
268+
}
269+
245270
func ExampleWithExemplarFilter_disabled() {
246271
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
247272
_ = metric.NewMeterProvider(

sdk/metric/exemplar.go

+31-10
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,48 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
55

66
import (
77
"runtime"
8-
"slices"
98

9+
"go.opentelemetry.io/otel/attribute"
1010
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1111
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
1212
)
1313

14+
// ExemplarReservoirProviderSelector selects the
15+
// [exemplar.ReservoirProvider] to use
16+
// based on the [Aggregation] of the metric.
17+
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider
18+
1419
// reservoirFunc returns the appropriately configured exemplar reservoir
1520
// creation func based on the passed InstrumentKind and filter configuration.
16-
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
21+
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
22+
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
23+
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
24+
}
25+
}
26+
27+
// DefaultExemplarReservoirProviderSelector returns the default
28+
// [exemplar.ReservoirProvider] for the
29+
// provided [Aggregation].
30+
//
31+
// For explicit bucket histograms with more than 1 bucket, it uses the
32+
// [exemplar.HistogramReservoirProvider].
33+
// For exponential histograms, it uses the
34+
// [exemplar.FixedSizeReservoirProvider]
35+
// with a size of min(20, max_buckets).
36+
// For all other aggregations, it uses the
37+
// [exemplar.FixedSizeReservoirProvider]
38+
// with a size equal to the number of CPUs.
39+
//
40+
// Exemplar default reservoirs MAY change in a minor version bump. No
41+
// guarantees are made on the shape or statistical properties of returned
42+
// exemplars.
43+
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
1744
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
1845
// Explicit bucket histogram aggregation with more than 1 bucket will
1946
// use AlignedHistogramBucketExemplarReservoir.
2047
a, ok := agg.(AggregationExplicitBucketHistogram)
2148
if ok && len(a.Boundaries) > 0 {
22-
cp := slices.Clone(a.Boundaries)
23-
return func() aggregate.FilteredExemplarReservoir[N] {
24-
bounds := cp
25-
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
26-
}
49+
return exemplar.HistogramReservoirProvider(a.Boundaries)
2750
}
2851

2952
var n int
@@ -50,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f
5073
}
5174
}
5275

53-
return func() aggregate.FilteredExemplarReservoir[N] {
54-
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
55-
}
76+
return exemplar.FixedSizeReservoirProvider(n)
5677
}

sdk/metric/exemplar/fixed_size_reservoir.go

+7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ import (
1212
"go.opentelemetry.io/otel/attribute"
1313
)
1414

15+
// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
16+
func FixedSizeReservoirProvider(k int) ReservoirProvider {
17+
return func(_ attribute.Set) Reservoir {
18+
return NewFixedSizeReservoir(k)
19+
}
20+
}
21+
1522
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
1623
// k exemplars. If there are k or less measurements made, the Reservoir will
1724
// sample each one. If there are more than k, the Reservoir will then randomly

sdk/metric/exemplar/fixed_size_reservoir_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ import (
1515
)
1616

1717
func TestNewFixedSizeReservoir(t *testing.T) {
18-
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
19-
return NewFixedSizeReservoir(n), n
18+
t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) {
19+
return FixedSizeReservoirProvider(n), n
2020
}))
2121

22-
t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
23-
return NewFixedSizeReservoir(n), n
22+
t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) {
23+
return FixedSizeReservoirProvider(n), n
2424
}))
2525
}
2626

sdk/metric/exemplar/histogram_reservoir.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,21 @@ import (
1212
"go.opentelemetry.io/otel/attribute"
1313
)
1414

15+
// HistogramReservoirProvider is a provider of [HistogramReservoir].
16+
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
17+
cp := slices.Clone(bounds)
18+
slices.Sort(cp)
19+
return func(_ attribute.Set) Reservoir {
20+
return NewHistogramReservoir(cp)
21+
}
22+
}
23+
1524
// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
1625
// measurement that falls within a histogram bucket. The histogram bucket
1726
// upper-boundaries are define by bounds.
1827
//
19-
// The passed bounds will be sorted by this function.
28+
// The passed bounds must be sorted before calling this function.
2029
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
21-
slices.Sort(bounds)
2230
return &HistogramReservoir{
2331
bounds: bounds,
2432
storage: newStorage(len(bounds) + 1),

sdk/metric/exemplar/histogram_reservoir_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import "testing"
77

88
func TestHist(t *testing.T) {
99
bounds := []float64{0, 100}
10-
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
11-
return NewHistogramReservoir(bounds), len(bounds)
10+
t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) {
11+
return HistogramReservoirProvider(bounds), len(bounds)
1212
}))
1313

14-
t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
15-
return NewHistogramReservoir(bounds), len(bounds)
14+
t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) {
15+
return HistogramReservoirProvider(bounds), len(bounds)
1616
}))
1717
}

sdk/metric/exemplar/reservoir.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,11 @@ type Reservoir interface {
3030
// The Reservoir state is preserved after this call.
3131
Collect(dest *[]Exemplar)
3232
}
33+
34+
// ReservoirProvider creates new [Reservoir]s.
35+
//
36+
// The attributes provided are attributes which are kept by the aggregation, and
37+
// are exclusive with attributes passed to Offer. The combination of these
38+
// attributes and the attributes passed to Offer is the complete set of
39+
// attributes a measurement was made with.
40+
type ReservoirProvider func(attr attribute.Set) Reservoir

sdk/metric/exemplar/reservoir_test.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
// Sat Jan 01 2000 00:00:00 GMT+0000.
1919
var staticTime = time.Unix(946684800, 0)
2020

21-
type factory func(requestedCap int) (r Reservoir, actualCap int)
21+
type factory func(requestedCap int) (r ReservoirProvider, actualCap int)
2222

2323
func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
2424
return func(t *testing.T) {
@@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
2929
t.Run("CaptureSpanContext", func(t *testing.T) {
3030
t.Helper()
3131

32-
r, n := f(1)
32+
rp, n := f(1)
3333
if n < 1 {
3434
t.Skip("skipping, reservoir capacity less than 1:", n)
3535
}
36+
r := rp(*attribute.EmptySet())
3637

3738
tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01}
3839
sc := trace.NewSpanContext(trace.SpanContextConfig{
@@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
6061
t.Run("FilterAttributes", func(t *testing.T) {
6162
t.Helper()
6263

63-
r, n := f(1)
64+
rp, n := f(1)
6465
if n < 1 {
6566
t.Skip("skipping, reservoir capacity less than 1:", n)
6667
}
68+
r := rp(*attribute.EmptySet())
6769

6870
adminTrue := attribute.Bool("admin", true)
6971
r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue})
@@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
8385
t.Run("CollectLessThanN", func(t *testing.T) {
8486
t.Helper()
8587

86-
r, n := f(2)
88+
rp, n := f(2)
8789
if n < 2 {
8890
t.Skip("skipping, reservoir capacity less than 2:", n)
8991
}
92+
r := rp(*attribute.EmptySet())
9093

9194
r.Offer(ctx, staticTime, NewValue(N(10)), nil)
9295

@@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
99102
t.Run("MultipleOffers", func(t *testing.T) {
100103
t.Helper()
101104

102-
r, n := f(3)
105+
rp, n := f(3)
103106
if n < 1 {
104107
t.Skip("skipping, reservoir capacity less than 1:", n)
105108
}
109+
r := rp(*attribute.EmptySet())
106110

107111
for i := 0; i < n+1; i++ {
108112
v := NewValue(N(i))
@@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
127131
t.Run("DropAll", func(t *testing.T) {
128132
t.Helper()
129133

130-
r, n := f(0)
134+
rp, n := f(0)
131135
if n > 0 {
132136
t.Skip("skipping, reservoir capacity greater than 0:", n)
133137
}
138+
r := rp(*attribute.EmptySet())
134139

135140
r.Offer(context.Background(), staticTime, NewValue(N(10)), nil)
136141

sdk/metric/instrument.go

+6
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ type Stream struct {
144144
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
145145
// provide an allow-list of attribute keys here.
146146
AttributeFilter attribute.Filter
147+
// ExemplarReservoirProvider selects the
148+
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
149+
// on the [Aggregation].
150+
//
151+
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
152+
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
147153
}
148154

149155
// instID are the identifying properties of a instrument.

sdk/metric/internal/aggregate/aggregate.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct {
3838
//
3939
// If this is not provided a default factory function that returns an
4040
// dropReservoir reservoir will be used.
41-
ReservoirFunc func() FilteredExemplarReservoir[N]
41+
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
4242
// AggregationLimit is the cardinality limit of measurement attributes. Any
4343
// measurement for new attributes once the limit has been reached will be
4444
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct {
4949
AggregationLimit int
5050
}
5151

52-
func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
52+
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
5353
if b.ReservoirFunc != nil {
5454
return b.ReservoirFunc
5555
}

sdk/metric/internal/aggregate/aggregate_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
7272
return func() { now = orig }
7373
}
7474

75-
func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
76-
return dropReservoir[N]()
75+
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
76+
return dropReservoir[N](attr)
7777
}
7878

7979
func TestBuilderFilter(t *testing.T) {

sdk/metric/internal/aggregate/drop.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
)
1212

1313
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
14-
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
14+
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
15+
return &dropRes[N]{}
16+
}
1517

1618
type dropRes[N int64 | float64] struct{}
1719

sdk/metric/internal/aggregate/drop_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/stretchr/testify/assert"
1010

11+
"go.opentelemetry.io/otel/attribute"
1112
"go.opentelemetry.io/otel/sdk/metric/exemplar"
1213
)
1314

@@ -17,7 +18,7 @@ func TestDrop(t *testing.T) {
1718
}
1819

1920
func testDropFiltered[N int64 | float64](t *testing.T) {
20-
r := dropReservoir[N]()
21+
r := dropReservoir[N](*attribute.EmptySet())
2122

2223
var dest []exemplar.Exemplar
2324
r.Collect(&dest)

sdk/metric/internal/aggregate/exponential_histogram.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
283283
// newExponentialHistogram returns an Aggregator that summarizes a set of
284284
// measurements as an exponential histogram. Each histogram is scoped by attributes
285285
// and the aggregation cycle the measurements were made in.
286-
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
286+
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
287287
return &expoHistogram[N]{
288288
noSum: noSum,
289289
noMinMax: noMinMax,
@@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
306306
maxSize int
307307
maxScale int32
308308

309-
newRes func() FilteredExemplarReservoir[N]
309+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
310310
limit limiter[*expoHistogramDataPoint[N]]
311311
values map[attribute.Distinct]*expoHistogramDataPoint[N]
312312
valuesMu sync.Mutex
@@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
327327
v, ok := e.values[attr.Equivalent()]
328328
if !ok {
329329
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
330-
v.res = e.newRes()
330+
v.res = e.newRes(attr)
331331

332332
e.values[attr.Equivalent()] = v
333333
}

sdk/metric/internal/aggregate/histogram.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
4747
noSum bool
4848
bounds []float64
4949

50-
newRes func() FilteredExemplarReservoir[N]
50+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
5151
limit limiter[*buckets[N]]
5252
values map[attribute.Distinct]*buckets[N]
5353
valuesMu sync.Mutex
5454
}
5555

56-
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
56+
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
5757
// The responsibility of keeping all buckets correctly associated with the
5858
// passed boundaries is ultimately this type's responsibility. Make a copy
5959
// here so we can always guarantee this. Or, in the case of failure, have
@@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
9393
//
9494
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
9595
b = newBuckets[N](attr, len(s.bounds)+1)
96-
b.res = s.newRes()
96+
b.res = s.newRes(attr)
9797

9898
// Ensure min and max are recorded values (not zero), for new buckets.
9999
b.min, b.max = value, value
@@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
108108

109109
// newHistogram returns an Aggregator that summarizes a set of measurements as
110110
// an histogram.
111-
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
111+
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
112112
return &histogram[N]{
113113
histValues: newHistValues[N](boundaries, noSum, limit, r),
114114
noMinMax: noMinMax,

0 commit comments

Comments
 (0)