Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize exemplars #5249

Closed
1 of 6 tasks
MrAlias opened this issue Apr 22, 2024 · 34 comments · Fixed by #5861
Closed
1 of 6 tasks

Stabilize exemplars #5249

MrAlias opened this issue Apr 22, 2024 · 34 comments · Fixed by #5861
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics
Milestone

Comments

@MrAlias
Copy link
Contributor

MrAlias commented Apr 22, 2024

Exemplars are now stable in the specification.

  • Audit our implementation for compliance
  • Stabilize

Compliance TODOs

  • Export an ExemplarReservoir type and accept it as stream configuration
  • Enable exemplars by default
  • Allow the ExemplarFilter to be a configured for a MeterProvider with a default value of TraceBased (continue to support environment variable configuration of the ExemplarFilter)

Stabilization procedure and plan

TODO:

  • Handle the existing experimental envar
@MrAlias MrAlias added the area:metrics Part of OpenTelemetry Metrics label Apr 22, 2024
@MrAlias MrAlias added this to the v1.27.0 milestone Apr 22, 2024
@MrAlias MrAlias self-assigned this May 1, 2024
@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Stream configuration

[...]

The SDK MUST accept the following stream configuration parameters:

[...]

  • exemplar_reservoir: A
    functional type that generates an exemplar reservoir a MeterProvider will
    use when storing exemplars. This functional type needs to be a factory or
    callback similar to aggregation selection functionality which allows
    different reservoirs to be chosen by the aggregation.

    Users can provide an exemplar_reservoir, but it is up to their discretion.
    Therefore, the stream configuration parameter needs to be structured to
    accept an exemplar_reservoir, but MUST NOT obligate a user to provide one.
    If the user does not provide an exemplar_reservoir value, the
    MeterProvider MUST apply a default exemplar
    reservoir
    .

We need to export an ExemplarReservoir type and accept it as stream configuration.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A Metric SDK MUST provide a mechanism to sample Exemplars from measurements via the ExemplarFilter and ExemplarReservoir hooks.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Exemplar sampling SHOULD be turned on by default.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

If Exemplar sampling is off, the SDK MUST NOT have overhead related to exemplar sampling.

Our current implementation complies with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A Metric SDK MUST allow exemplar sampling to leverage the configuration of
metric aggregation. For example, Exemplar sampling of histograms should be able
to leverage bucket boundaries.

A Metric SDK SHOULD provide configuration for Exemplar sampling, specifically:

  • ExemplarFilter: filter which measurements can become exemplars.
  • ExemplarReservoir: storage and sampling of exemplars.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The ExemplarFilter configuration MUST allow users to select between one of the
built-in ExemplarFilters. While ExemplarFilter determines which measurements
are eligible for becoming an Exemplar, the ExemplarReservoir makes the
final decision if a measurement becomes an exemplar and is stored.

The ExemplarFilter SHOULD be a configuration parameter of a MeterProvider for
an SDK. The default value SHOULD be TraceBased. The filter configuration
SHOULD follow the environment variable specification.

We do not comply with this.

This is a clarification of #5249 (comment).

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

An OpenTelemetry SDK MUST support the following filters:

  • AlwaysOn
  • AlwaysOff
  • TraceBased

Our implementation supports these filters.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The ExemplarReservoir interface MUST provide a method to offer measurements to the reservoir and another to collect accumulated Exemplars.

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The val and attr
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
Collect(dest *[]metricdata.Exemplar[N])
}

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A new ExemplarReservoir MUST be created for every known timeseries data point, as determined by aggregation and view configuration. This data point, and its set of defining attributes, are referred to as the associated timeseries point.

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The "offer" method SHOULD have the ability to pull associated trace and span
information without needing to record full context. In other words, current
span context and baggage can be inspected at this point.

The "offer" method does not need to store all measurements it is given and
MAY further sample beyond the ExemplarFilter.

The "offer" method MAY accept a filtered subset of Attributes which diverge
from the timeseries the reservoir is associated with. This MUST be clearly
documented in the API interface and the reservoir MUST be given the Attributes
associated with its timeseries point either at construction so that additional
sampling performed by the reservoir has access to all attributes from a
measurement in the "offer" method. SDK authors are encouraged to benchmark
whether this option works best for their implementation.

The "collect" method MUST return accumulated Exemplars. Exemplars are expected
to abide by the AggregationTemporality of any metric point they are recorded
with. In other words, Exemplars reported against a metric data point SHOULD have
occurred within the start/stop timestamps of that point. SDKs are free to
decide whether "collect" should also reset internal storage for delta temporal
aggregation collection, or use a more optimal implementation.

Exemplars MUST retain any attributes available in the measurement that
are not preserved by aggregation or view configuration for the associated
timeseries. Joining together attributes on an Exemplar with
those available on its associated metric data point should result in the
full set of attributes from the original sample measurement.

The ExemplarReservoir SHOULD avoid allocations when sampling exemplars.

We comply with these.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The SDK MUST include two types of built-in exemplar reservoirs:

  1. SimpleFixedSizeExemplarReservoir
  2. AlignedHistogramBucketExemplarReservoir

By default:

  • Explicit bucket histogram aggregation with more than 1 bucket SHOULD
    use AlignedHistogramBucketExemplarReservoir.
  • Base2 Exponential Histogram Aggregation SHOULD use a
    SimpleFixedSizeExemplarReservoir with a reservoir equal to the
    smaller of the maximum number of buckets configured on the aggregation or
    twenty (e.g. min(20, max_buckets)).
  • All other aggregations SHOULD use SimpleFixedSizeExemplarReservoir.

resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}
var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}
return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}
}

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

SimpleFixedSizeExemplarReservoir

This reservoir MUST use an uniformly-weighted sampling algorithm based on the
number of samples the reservoir has seen so far to determine if the offered
measurements should be sampled. For example, the simple reservoir sampling
algorithm
can be used:

if num_measurements_seen < num_buckets then
  bucket = num_measurements_seen
else
  bucket = random_integer(0, num_measurements_seen)
end
if bucket < num_buckets then
  reservoir[bucket] = measurement
end

Any stateful portion of sampling computation SHOULD be reset every collection
cycle. For the above example, that would mean that the num_measurements_seen
count is reset every time the reservoir is collected.

This Exemplar reservoir MAY take a configuration parameter for the size of the
reservoir. If no size configuration is provided, the default size MAY be
the number of possible concurrent threads (e.g. numer of CPUs) to help reduce
contention. Otherwise, a default size of 1 SHOULD be used.

We comply with this:

func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) {

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

AlignedHistogramBucketExemplarReservoir

This Exemplar reservoir MUST take a configuration parameter that is the
configuration of a Histogram. This implementation MUST keep the last seen
measurement that falls within a histogram bucket. The reservoir will accept
measurements using the equivalent of the following naive algorithm:

bucket = find_histogram_bucket(measurement)
if bucket < num_buckets then
  reservoir[bucket] = measurement
end

def find_histogram_bucket(measurement):
  for boundary, idx in bucket_boundaries do
    if value <= boundary then
      return idx
    end
  end
  return boundaries.length

This Exemplar reservoir MAY take a configuration parameter for the bucket
boundaries used by the reservoir. The size of the reservoir is always the
number of bucket boundaries plus one. This configuration parameter SHOULD have
the same format as specifying bucket boundaries to
Explicit Bucket Histogram Aggregation.

We comply with this: https://github.com/open-telemetry/opentelemetry-go/blob/7ee6ff19b51eb4bffdd48639ac5698c9ee8932d6/sdk/metric/internal/exemplar/hist.go

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Custom ExemplarReservoir

The SDK MUST provide a mechanism for SDK users to provide their own
ExemplarReservoir implementation. This extension MUST be configurable on
a metric View, although individual reservoirs MUST still be
instantiated per metric-timeseries (see
Exemplar Reservoir - Paragraph 2).

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

Stream configuration

[...]
The SDK MUST accept the following stream configuration parameters:
[...]

  • exemplar_reservoir: A
    functional type that generates an exemplar reservoir a MeterProvider will
    use when storing exemplars. This functional type needs to be a factory or
    callback similar to aggregation selection functionality which allows
    different reservoirs to be chosen by the aggregation.
    Users can provide an exemplar_reservoir, but it is up to their discretion.
    Therefore, the stream configuration parameter needs to be structured to
    accept an exemplar_reservoir, but MUST NOT obligate a user to provide one.
    If the user does not provide an exemplar_reservoir value, the
    MeterProvider MUST apply a default exemplar
    reservoir
    .

We need to export an ExemplarReservoir type and accept it as stream configuration.

This is going to be a challenge. The Stream type is not defined generically, but our current ExemplarReservoir is defined over [N int64 | float64] to accommodate both value types.

Not sure how we can restructure the ExemplarReservoir or update the Stream type to include this as a field yet.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

To address the generics, we can create a Value type:

type ValueType uint8

const (
	UnknownValueType ValueType = 0
	Int64ValueType   ValueType = 1
	Float64ValueType ValueType = 2
)

type Value struct {
	t   ValueType
	val uint64
}

func NewValue[N int64 | float64](val N) Value {
	switch v := any(val).(type) {
	case int64:
		return newInt64Value(v)
	case float64:
		return newFloat64Value(v)
	}
	return Value{}
}

func newInt64Value(val int64) Value {
	return Value{t: Int64ValueType, val: uint64(val)}
}

func newFloat64Value(val float64) Value {
	return Value{t: Float64ValueType, val: math.Float64bits(val)}
}

func (v Value) Type() ValueType { return v.t }

func (v Value) Int64() int64 {
	if v.t == Int64ValueType {
		return v.int64()
	}
	return 0
}

func (v Value) int64() int64 { return int64(v.val) }

func (v Value) Float64() float64 {
	if v.t == Float64ValueType {
		return math.Float64frombits(v.val)
	}
	return 0
}

func (v Value) float64() float64 { return math.Float64frombits(v.val) }

func (v Value) Any() any {
	switch v.t {
	case Int64ValueType:
		return v.int64()
	case Float64ValueType:
		return v.float64()
	}
	return nil
}

From there we can define an exemplar:

// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar struct {
	// FilteredAttributes are the attributes recorded with the measurement but
	// filtered out of the timeseries' aggregated data.
	FilteredAttributes []attribute.KeyValue
	// Time is the time when the measurement was recorded.
	Time time.Time
	// Value is the measured value.
	Value Value
	// SpanID is the ID of the span that was active during the measurement. If
	// no span was active or the span was not sampled this will be empty.
	SpanID []byte `json:",omitempty"`
	// TraceID is the ID of the trace the active span belonged to during the
	// measurement. If no span was active or the span was not sampled this will
	// be empty.
	TraceID []byte `json:",omitempty"`
}

and a Reservoir

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir interface {
	// Offer accepts the parameters associated with a measurement. The
	// parameters will be stored as an exemplar if the Reservoir decides to
	// sample the measurement.
	//
	// The passed ctx needs to contain any baggage or span that were active
	// when the measurement was made. This information may be used by the
	// Reservoir in making a sampling decision.
	//
	// The time t is the time when the measurement was made. The val and attr
	// parameters are the value and dropped (filtered) attributes of the
	// measurement respectively.
	Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue)

	// Collect returns all the held exemplars.
	//
	// The Reservoir state is preserved after this call.
	Collect(dest *[]Exemplar)
}

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

I'm working to refactor sdk/metric/internal/exemplar to match this design. That way we can validate it before releasing.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 6, 2024

Moving to the v1.28.0 milestone as #5285 needs to be released and used before that code is exported to resolve this.

@MrAlias MrAlias modified the milestones: v1.27.0, v1.28.0 May 6, 2024
@MrAlias MrAlias modified the milestones: v1.28.0, v1.29.0 Jun 20, 2024
@MrAlias
Copy link
Contributor Author

MrAlias commented Jul 11, 2024

Blocked by #5542

@MrAlias MrAlias modified the milestones: v1.29.0, v1.30.0 Aug 8, 2024
@MrAlias
Copy link
Contributor Author

MrAlias commented Aug 8, 2024

The plan is to try and get a PR merged with the new public interface right after the next release. That way we have a longer time to evaluate the change before it becomes stable (:grimacing:).

@XSAM XSAM modified the milestones: v1.30.0, v1.31.0 Sep 9, 2024
dashpole added a commit that referenced this issue Sep 11, 2024
Part of #5249.

Addresses
#5249 (comment)

This removes handling of the `OTEL_GO_X_EXEMPLAR` environment variable.

Instead of changing the default for the existing environment variable to
enable it by default, i'm just removing it entirely. Users can still
disable the feature by setting the filter to always_off. Since we will
continue to support that configuration, it seems better to direct users
there, rather than give them a temporary equivalent.
@dashpole dashpole assigned dashpole and unassigned MrAlias Sep 19, 2024
MrAlias pushed a commit that referenced this issue Sep 26, 2024
Part of #5249

This makes all existing types designed to implement the public Exemplar
API public by moving most of `internal/exemplar` to `exemplar`. The only
types that are not being made public are `exemplar.Drop`, and
`exemplar.FilteredReservoir`. Those types are moved to
`internal/aggregate`, and are renamed to `DropReservoir` and
`FilteredExemplarReservoir`.

The following types are made public:

* `exemplar.Exemplar`
* `exemplar.Filter`
* `exemplar.SampledFilter`
* `exemplar.AlwaysOnFilter`
* `exemplar.HistogramReservoir`
* `exemplar.FixedSizeReservoir`
* `exemplar.Reservoir`
* `exemplar.Value`
* `exemplar.ValueType`
@dashpole
Copy link
Contributor

Looking into stream configuration, there are a few different ways we could approach it. Our Stream needs to be extended with some kind of "exemplar reservoir provider." The main question is, what should the exemplar Provider look like. Options include:

Option 1:

type ExemplarReservoirProvider func() exemplar.Reservoir

Option 2:

type ExemplarReservoirProvider func(agg Aggregation) exemplar.Reservoir

The text of the specification is:

exemplar_reservoir: A functional type that generates an exemplar reservoir a MeterProvider will use when storing exemplars. This functional type needs to be a factory or callback similar to aggregation selection functionality which allows different reservoirs to be chosen by the aggregation.

Users can provide an exemplar_reservoir, but it is up to their discretion. Therefore, the stream configuration parameter needs to be structured to accept an exemplar_reservoir, but MUST NOT obligate a user to provide one. If the user does not provide an exemplar_reservoir value, the MeterProvider MUST apply a default exemplar reservoir.

If we choose Option 1, we can't implement the "default reservoir provider" as an ExemplarReservoirProvider, which feels strange. We would have a ExemplarReservoirProvider-Provider that provides a different provider based on the aggregation.

With option 1 users can still effectively do the same thing as with option 2. If they plan to override the default aggregation, they can simply provide the appropriate reservoir for that aggregation. If they do not plan to override the default aggregation, they can invoke DefaultAggregationSelector on the instrument kind to get it. But that feels like a lot of work when we already have it.

The fact that we will need to use the Aggregation when choosing our exemplar Reservoir makes me think we should go with Option 2. It is also nice to be able to provide the default reservoir provider to users so they can use it as a fallback.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

I've prototyped option 2 in #5861

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

More importantly, I've confirmed in #5861 that we can implement stream configuration without requiring any changes to the public types in sdk/metric/exemplar.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

Starting a new audit of everything except stream+ filter configuration:

An Exemplar is a recorded Measurement that exposes the following pieces of information:

  • The value of the Measurement that was recorded by the API call.
  • The time the API call was made to record a Measurement.
  • The set of Attributes associated with the Measurement not already included in a metric data point.
  • The associated trace id and span id of the active Span within Context of the Measurement at API call time.

// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar struct {
// FilteredAttributes are the attributes recorded with the measurement but
// filtered out of the timeseries' aggregated data.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was recorded.
Time time.Time
// Value is the measured value.
Value Value
// SpanID is the ID of the span that was active during the measurement. If
// no span was active or the span was not sampled this will be empty.
SpanID []byte `json:",omitempty"`
// TraceID is the ID of the trace the active span belonged to during the
// measurement. If no span was active or the span was not sampled this will
// be empty.
TraceID []byte `json:",omitempty"`
}

We comply with this.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

Exemplar sampling SHOULD be turned on by default. If Exemplar sampling is off, the SDK MUST NOT have overhead related to exemplar sampling.

default:
filter = exemplar.SampledFilter

We comply with this.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

The ExemplarFilter configuration MUST allow users to select between one of the built-in ExemplarFilters. While ExemplarFilter determines which measurements are eligible for becoming an Exemplar, the ExemplarReservoir makes the final decision if a measurement becomes an exemplar and is stored.

type Filter func(context.Context) bool

Our exemplar.Filter is a function, which the SampledFilter and AlwaysOnFilter implement. In that sense, any configuration which allows users to provide an exemplar.Filter would allow a user to provide either of the SampledFilter or the AlwaysOnFilter.

Note that the AlwaysOffFilter is being added in #5850, and would also be an option.

The only question I have is whether SampledFilter should be renamed to TraceBased to match the specification. I've opened #5862.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

The ExemplarReservoir interface MUST provide a method to offer measurements to the reservoir and another to collect accumulated Exemplars.

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The val and attr
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue)
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

The "offer" method SHOULD accept measurements, including:

The value of the measurement.
The complete set of Attributes of the measurement.
The Context of the measurement, which covers the Baggage and the current active Span.
A timestamp that best represents when the measurement was taken.

We provide all of those, EXCEPT we provide the filtered set of attributes, rather than the complete set. This is acceptable given that "The offer method MAY accept a filtered subset of Attributes" below.

The "offer" method SHOULD have the ability to pull associated trace and span information without needing to record full context. In other words, current span context and baggage can be inspected at this point.

This can be done by inspecting the Go context.Context.

The "offer" method does not need to store all measurements it is given and MAY further sample beyond the ExemplarFilter.

The interface does not prevent users from handling measurements in the way they want to.

The "offer" method MAY accept a filtered subset of Attributes which diverge from the timeseries the reservoir is associated with. This MUST be clearly documented in the API and the reservoir MUST be given the Attributes associated with its timeseries point either at construction so that additional sampling performed by the reservoir has access to all attributes from a measurement in the "offer" method. SDK authors are encouraged to benchmark whether this option works best for their implementation.

We do accept a filtered subset of attributes, but DO NOT comply with this:

the reservoir MUST be given the Attributes associated with its timeseries point either at construction so that additional sampling performed by the reservoir has access to all attributes from a measurement in the "offer" method.

We do not currently comply with this, but this only applies to custom reservoir implementations. See #5861 for how this could be implemented without changes to any of the existing functions.

The "collect" method MUST return accumulated Exemplars. Exemplars are expected to abide by the AggregationTemporality of any metric point they are recorded with. In other words, Exemplars reported against a metric data point SHOULD have occurred within the start/stop timestamps of that point. SDKs are free to decide whether "collect" should also reset internal storage for delta temporal aggregation collection, or use a more optimal implementation.

Our collect function does not reset internal storage for delta temporal aggregation that I can tell. @MrAlias this differs from your analysis here: #5249 (comment).

We do not comply with this

Exemplars MUST retain any attributes available in the measurement that are not preserved by aggregation or view configuration for the associated timeseries. Joining together attributes on an Exemplar with those available on its associated metric data point should result in the full set of attributes from the original sample measurement.

We comply with this, since offer provides filtered attributes.

A new ExemplarReservoir MUST be created for every known timeseries data point, as determined by aggregation and view configuration. This data point, and its set of defining attributes, are referred to as the associated timeseries point.

We comply with this. Within a single aggregation, a new reservoir is created (note the b.resFunc() call):

s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())

The ExemplarReservoir SHOULD avoid allocations when sampling exemplars.

We do our best :), but this should not depend on the interface definitions.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

The SDK MUST include two types of built-in exemplar reservoirs:

  • SimpleFixedSizeExemplarReservoir
  • AlignedHistogramBucketExemplarReservoir

func NewFixedSizeReservoir(k int) *FixedSizeReservoir {

func NewHistogramReservoir(bounds []float64) *HistogramReservoir {

We've chosen to use simpler names than the specification recommends, but for good reason:

  • We have omitted "simple" from the fixed size exemplar reservoir. To me, that seems reasonable, since it doesn't add any additional information (simple compared to what?). We can always use other prefixes for new "fixed size" reservoirs in the future.
  • We have omitted "aligned" and "bucket" from the histogram reservoir. This also seems reasonable, as any reservoirs intended for exponential histograms can use "exponential" in their name. The only logical way to collect exemplars with a histogram is to align them with buckets in some way, so the additional names don't add information.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

By default:

  • Explicit bucket histogram aggregation with more than 1 bucket SHOULD use AlignedHistogramBucketExemplarReservoir.
  • Base2 Exponential Histogram Aggregation SHOULD use a SimpleFixedSizeExemplarReservoir with a reservoir equal to the smaller of the maximum number of buckets configured on the aggregation or twenty (e.g. min(20, max_buckets)).
  • All other aggregations SHOULD use SimpleFixedSizeExemplarReservoir.

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
}
var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
}

We are compliant.

@dashpole
Copy link
Contributor

dashpole commented Oct 1, 2024

The SDK MUST provide a mechanism for SDK users to provide their own ExemplarReservoir implementation.

Our current exemplar.Reservoir interface would allow for user-provided implementations:

type Reservoir interface {

@MrAlias
Copy link
Contributor Author

MrAlias commented Oct 1, 2024

The "collect" method MUST return accumulated Exemplars. Exemplars are expected to abide by the AggregationTemporality of any metric point they are recorded with. In other words, Exemplars reported against a metric data point SHOULD have occurred within the start/stop timestamps of that point. SDKs are free to decide whether "collect" should also reset internal storage for delta temporal aggregation collection, or use a more optimal implementation.

Our collect function does not reset internal storage for delta temporal aggregation that I can tell. @MrAlias this differs from your analysis here: #5249 (comment).

Based on: #4873

Which was based on #4871 (comment)

We only ever will return exemplars from the most recent time interval. The reservoir is destroyed each cycle.

Given this is a recommendation and we use the more "optimal implementation" of deleting the reservoir for delta temporality, it seems like a valid case to not comply.

@MrAlias
Copy link
Contributor Author

MrAlias commented Oct 1, 2024

We do accept a filtered subset of attributes, but DO NOT comply with this:

the reservoir MUST be given the Attributes associated with its timeseries point either at construction so that additional sampling performed by the reservoir has access to all attributes from a measurement in the "offer" method.

We do not currently comply with this, but this only applies to custom reservoir implementations. See #5861 for how this could be implemented without changes to any of the existing functions.

That's a good catch.

This was something imposed on us by other implementations that only accept the full attribute sets for Offer.

I'm still not sold it is needed given the concept of an attribute filter exists, and none of these attributes are needed by our built-in reservoirs.

I think your work-around in #5861 is a good solution though. And will ensure we are compliant 👍

@MrAlias MrAlias modified the milestones: v1.31.0, v1.32.0 Oct 10, 2024
dashpole added a commit that referenced this issue Oct 11, 2024
Part of #5249

### Spec

https://opentelemetry.io/docs/specs/otel/metrics/sdk/#exemplarfilter

> The ExemplarFilter configuration MUST allow users to select between
one of the built-in ExemplarFilters. While ExemplarFilter determines
which measurements are eligible for becoming an Exemplar, the
ExemplarReservoir makes the final decision if a measurement becomes an
exemplar and is stored.

> The ExemplarFilter SHOULD be a configuration parameter of a
MeterProvider for an SDK. The default value SHOULD be TraceBased. The
filter configuration SHOULD follow the [environment variable
specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#exemplar).

> An OpenTelemetry SDK MUST support the following filters:

> *
[AlwaysOn](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwayson)
> *
[AlwaysOff](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff)
> *
[TraceBased](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#tracebased)

### Changes

* adds exemplar.AlwaysOffFilter, which is one of the required filters
from the SDK:
https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff
* adds `metric.WithExemplarFilter` as an option for the metrics SDK.
* moves handling of `OTEL_METRICS_EXEMPLAR_FILTER` to the same location
as config handling to make code easier to navigate.



dropReservoir can actually be removed, but I plan to do that in a
follow-up refactor, since it will be a large diff.

---------

Co-authored-by: Damien Mathieu <[email protected]>
Co-authored-by: Tyler Yahn <[email protected]>
Co-authored-by: Chester Cheung <[email protected]>
XSAM added a commit that referenced this issue Oct 23, 2024
Topic: #5249

This reverts commit 8041156 (PR: #5899)
due to the performance degradation found by Benchmarks CI
https://github.com/open-telemetry/opentelemetry-go/actions/runs/11447364022/job/31848519243

Here is the benchmark test on my machine:

```
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/sdk/metric
                                       │   old.txt   │                new.txt                 │
                                       │   sec/op    │    sec/op     vs base                  │
Instrument/instrumentImpl/aggregate-10   3.378µ ± 3%   49.366µ ± 1%  +1361.40% (p=0.000 n=10)
Instrument/observable/observe-10         2.288µ ± 2%   37.791µ ± 1%  +1551.73% (p=0.000 n=10)
geomean                                  2.780µ         43.19µ       +1453.65%

                                       │   old.txt    │                 new.txt                 │
                                       │     B/op     │     B/op       vs base                  │
Instrument/instrumentImpl/aggregate-10   1.245Ki ± 1%   22.363Ki ± 0%  +1696.08% (p=0.000 n=10)
Instrument/observable/observe-10           823.0 ± 1%    17432.5 ± 0%  +2018.17% (p=0.000 n=10)
geomean                                  1.000Ki         19.51Ki       +1850.48%

                                       │  old.txt   │                new.txt                │
                                       │ allocs/op  │  allocs/op   vs base                  │
Instrument/instrumentImpl/aggregate-10   1.000 ± 0%   21.000 ± 0%  +2000.00% (p=0.000 n=10)
Instrument/observable/observe-10         1.000 ± 0%   16.000 ± 0%  +1500.00% (p=0.000 n=10)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics
Projects
None yet
3 participants