Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzhlogin committed Nov 7, 2024
1 parent 9cf6119 commit c687b51
Showing 1 changed file with 67 additions and 25 deletions.
92 changes: 67 additions & 25 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"

import (
"errors"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -111,23 +112,28 @@ type dataPointSplit struct {
capacity int
}

func (split *dataPointSplit) isFull() bool {
return split.length >= split.capacity
func (split *dataPointSplit) isNotFull() bool {
return split.length < split.capacity
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64, bucketBegin float64, bucketEnd float64) {
func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) error {
if split.length >= split.capacity {
// Return error if the split has exceeded capacity
return errors.New("Split is full, cannot append more data")
}
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.cWMetricHistogram.Max = bucketEnd
}
if split.length == split.capacity {
split.cWMetricHistogram.Min = bucketBegin
}
return nil
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
Expand Down Expand Up @@ -302,11 +308,11 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
}

// Set mid-point of positive buckets in values/counts array.
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex, totalBucketLen)
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set count of zero bucket in values/counts array.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBuckets(&split, metric, currentBucketIndex, currentZeroIndex, totalBucketLen)
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBuckets(&split, metric, currentBucketIndex, currentZeroIndex)
// Set mid-point of negative buckets in values/counts array.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex, totalBucketLen)
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)

// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
Expand All @@ -324,8 +330,8 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
return datapoints, true
}

func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int, totalBucketLen int) (int, int) {
if split.isFull() || currentPositiveIndex < 0 {
func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if !split.isNotFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

Expand All @@ -337,7 +343,7 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentPositiveIndex >= 0 {
for split.isNotFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
Expand All @@ -348,7 +354,19 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
split.appendMetricData(metricVal, count, bucketBegin, bucketEnd)
err := split.appendMetricData(metricVal, count)
if err != nil {
// Stop the loop if the split has exceeded capacity, and return the current indexes
return currentBucketIndex, currentPositiveIndex
}

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentPositiveIndex--
Expand All @@ -357,23 +375,35 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.
return currentBucketIndex, currentPositiveIndex
}

func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int, totalBucketLen int) (int, int) {
if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount(), 0, 0)
func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 {
err := split.appendMetricData(0, metric.ZeroCount())
if err != nil {
// Stop the loop if the split has exceeded capacity, and return the current indexes
return currentBucketIndex, currentZeroIndex
}

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if split.length == split.capacity {
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
}

return currentBucketIndex, currentZeroIndex
}

func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int, totalBucketLen int) (int, int) {
func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

Expand All @@ -385,7 +415,7 @@ func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
Expand All @@ -396,7 +426,19 @@ func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
split.appendMetricData(metricVal, count, bucketBegin, bucketEnd)
err := split.appendMetricData(metricVal, count)
if err != nil {
// Stop the loop if the split has exceeded capacity, and return the current indexes
return currentBucketIndex, currentNegativeIndex
}

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentNegativeIndex++
Expand Down

0 comments on commit c687b51

Please sign in to comment.