Skip to content

Commit

Permalink
[exporter/awsemf] Split exponential histograms larger than 100 bucket…
Browse files Browse the repository at this point in the history
…s. (#242)
  • Loading branch information
zzhlogin authored Nov 20, 2024
1 parent b5f3b0c commit 9c1ddd2
Show file tree
Hide file tree
Showing 5 changed files with 745 additions and 46 deletions.
221 changes: 179 additions & 42 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ type summaryMetricEntry struct {
count uint64
}

type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

func (split *dataPointSplit) isNotFull() bool {
return split.length < split.capacity
}

func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, _ string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -213,85 +238,197 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, _ string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
currentBucketIndex := 0
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
currentZeroIndex := 0
currentNegativeIndex := 0
var datapoints []dataPoint
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
capacity := splitThreshold
if totalBucketLen-currentBucketIndex < splitThreshold {
capacity = totalBucketLen - currentBucketIndex
}

sum := 0.0
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
sum = metric.Sum()
}

split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: sum,
},
length: 0,
capacity: capacity,
}

// Set collect values from positive buckets and save into split.
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set collect values from zero buckets and save into split.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
// Set collect values from negative buckets and save into split.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)

if split.length > 0 {
// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
}

if len(datapoints) == 0 {
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

// Override the min and max values of the first and last splits with the raw data of the metric.
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()

return datapoints, true
}

func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if !split.isNotFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
bucketBegin := 0.0
bucketEnd := 0.0

for split.isNotFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketBegin = bucketEnd
bucketEnd = bucketBegin
}
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if !split.isNotFull() {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentPositiveIndex--
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
return currentBucketIndex, currentPositiveIndex
}

func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount())

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if !split.isNotFull() {
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
}

// Set mid-point of negative buckets in values/counts array.
return currentBucketIndex, currentZeroIndex
}

func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
bucketBegin := 0.0
bucketEnd := 0.0

for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if !split.isNotFull() {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentNegativeIndex++
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
return currentBucketIndex, currentNegativeIndex
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading

0 comments on commit 9c1ddd2

Please sign in to comment.