Skip to content

Commit 69800ee

Browse files
authored
Support Delta & Cumulative temporality for LastValue aggregates (open-telemetry#5305)
* Add delta/cumulative/precomputed LastValue agg * Add cumulative testing * Add precomputed testing * Add changelog entry
1 parent 737f885 commit 69800ee

File tree

6 files changed

+470
-27
lines changed

6 files changed

+470
-27
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
2424
- De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230)
2525
- The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272)
2626
- Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286)
27+
- Set the start time for last-value aggregates in `go.opentelemetry.io/otel/sdk/metric`. (#5305)
2728
- The `Span` in `go.opentelemetry.io/otel/sdk/trace` will record links without span context if either non-empty `TraceState` or attributes are provided. (#5315)
2829

2930
### Fixed

sdk/metric/instrument_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func BenchmarkInstrument(b *testing.B) {
2525
build := aggregate.Builder[int64]{}
2626
var meas []aggregate.Measure[int64]
2727

28-
in, _ := build.LastValue()
28+
in, _ := build.PrecomputedLastValue()
2929
meas = append(meas, in)
3030

3131
build.Temporality = metricdata.CumulativeTemporality
@@ -50,7 +50,7 @@ func BenchmarkInstrument(b *testing.B) {
5050
build := aggregate.Builder[int64]{}
5151
var meas []aggregate.Measure[int64]
5252

53-
in, _ := build.LastValue()
53+
in, _ := build.PrecomputedLastValue()
5454
meas = append(meas, in)
5555

5656
build.Temporality = metricdata.CumulativeTemporality

sdk/metric/internal/aggregate/aggregate.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
7474
}
7575

7676
// LastValue returns a last-value aggregate function input and output.
77-
//
78-
// The Builder.Temporality is ignored and delta is use always.
7977
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
80-
// Delta temporality is the only temporality that makes semantic sense for
81-
// a last-value aggregate.
8278
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
79+
switch b.Temporality {
80+
case metricdata.DeltaTemporality:
81+
return b.filter(lv.measure), lv.delta
82+
default:
83+
return b.filter(lv.measure), lv.cumulative
84+
}
85+
}
8386

84-
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
85-
// Ignore if dest is not a metricdata.Gauge. The chance for memory
86-
// reuse of the DataPoints is missed (better luck next time).
87-
gData, _ := (*dest).(metricdata.Gauge[N])
88-
lv.computeAggregation(&gData.DataPoints)
89-
*dest = gData
90-
91-
return len(gData.DataPoints)
87+
// PrecomputedLastValue returns a last-value aggregate function input and
88+
// output. The aggregation returned from the returned ComputeAggregation
89+
// function will always only return values from the previous collection cycle.
90+
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
91+
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
92+
switch b.Temporality {
93+
case metricdata.DeltaTemporality:
94+
return b.filter(lv.measure), lv.delta
95+
default:
96+
return b.filter(lv.measure), lv.cumulative
9297
}
9398
}
9499

sdk/metric/internal/aggregate/lastvalue.go

+85-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la
2626
newRes: r,
2727
limit: newLimiter[datapoint[N]](limit),
2828
values: make(map[attribute.Distinct]datapoint[N]),
29+
start: now(),
2930
}
3031
}
3132

@@ -36,6 +37,7 @@ type lastValue[N int64 | float64] struct {
3637
newRes func() exemplar.Reservoir
3738
limit limiter[datapoint[N]]
3839
values map[attribute.Distinct]datapoint[N]
40+
start time.Time
3941
}
4042

4143
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
@@ -58,23 +60,103 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
5860
s.values[attr.Equivalent()] = d
5961
}
6062

61-
func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
63+
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
64+
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
65+
// the DataPoints is missed (better luck next time).
66+
gData, _ := (*dest).(metricdata.Gauge[N])
67+
68+
s.Lock()
69+
defer s.Unlock()
70+
71+
n := s.copyDpts(&gData.DataPoints)
72+
// Do not report stale values.
73+
clear(s.values)
74+
// Update start time for delta temporality.
75+
s.start = now()
76+
77+
*dest = gData
78+
79+
return n
80+
}
81+
82+
func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
83+
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
84+
// the DataPoints is missed (better luck next time).
85+
gData, _ := (*dest).(metricdata.Gauge[N])
86+
6287
s.Lock()
6388
defer s.Unlock()
6489

90+
n := s.copyDpts(&gData.DataPoints)
91+
// TODO (#3006): This will use an unbounded amount of memory if there
92+
// are unbounded number of attribute sets being aggregated. Attribute
93+
// sets that become "stale" need to be forgotten so this will not
94+
// overload the system.
95+
*dest = gData
96+
97+
return n
98+
}
99+
100+
// copyDpts copies the datapoints held by s into dest. The number of datapoints
101+
// copied is returned.
102+
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int {
65103
n := len(s.values)
66104
*dest = reset(*dest, n, n)
67105

68106
var i int
69107
for _, v := range s.values {
70108
(*dest)[i].Attributes = v.attrs
71-
// The event time is the only meaningful timestamp, StartTime is
72-
// ignored.
109+
(*dest)[i].StartTime = s.start
73110
(*dest)[i].Time = v.timestamp
74111
(*dest)[i].Value = v.value
75112
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
76113
i++
77114
}
115+
return n
116+
}
117+
118+
// newPrecomputedLastValue returns an aggregator that summarizes a set of
119+
// observations as the last one made.
120+
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] {
121+
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
122+
}
123+
124+
// precomputedLastValue summarizes a set of observations as the last one made.
125+
type precomputedLastValue[N int64 | float64] struct {
126+
*lastValue[N]
127+
}
128+
129+
func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
130+
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
131+
// the DataPoints is missed (better luck next time).
132+
gData, _ := (*dest).(metricdata.Gauge[N])
133+
134+
s.Lock()
135+
defer s.Unlock()
136+
137+
n := s.copyDpts(&gData.DataPoints)
78138
// Do not report stale values.
79139
clear(s.values)
140+
// Update start time for delta temporality.
141+
s.start = now()
142+
143+
*dest = gData
144+
145+
return n
146+
}
147+
148+
func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
149+
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
150+
// the DataPoints is missed (better luck next time).
151+
gData, _ := (*dest).(metricdata.Gauge[N])
152+
153+
s.Lock()
154+
defer s.Unlock()
155+
156+
n := s.copyDpts(&gData.DataPoints)
157+
// Do not report stale values.
158+
clear(s.values)
159+
*dest = gData
160+
161+
return n
80162
}

0 commit comments

Comments
 (0)