From c687b5151059501044c088f50fdcf7e50ae2fac2 Mon Sep 17 00:00:00 2001 From: Zhonghao Zhao Date: Thu, 7 Nov 2024 23:14:59 +0000 Subject: [PATCH] Address comments. --- exporter/awsemfexporter/datapoint.go | 92 ++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 25 deletions(-) diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index c244287bcdc3..91a7eb364452 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -4,6 +4,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" import ( + "errors" "fmt" "math" "strconv" @@ -111,23 +112,28 @@ type dataPointSplit struct { capacity int } -func (split *dataPointSplit) isFull() bool { - return split.length >= split.capacity +func (split *dataPointSplit) isNotFull() bool { + return split.length < split.capacity } -func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64, bucketBegin float64, bucketEnd float64) { +func (split *dataPointSplit) setMax(maxVal float64) { + split.cWMetricHistogram.Max = maxVal +} + +func (split *dataPointSplit) setMin(minVal float64) { + split.cWMetricHistogram.Min = minVal +} + +func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) error { + if split.length >= split.capacity { + // Return error if the split has exceeded capacity + return errors.New("Split is full, cannot append more data") + } split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal) split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count)) split.length++ split.cWMetricHistogram.Count += count - - // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) - if split.length == 1 { - split.cWMetricHistogram.Max = bucketEnd - } - if split.length == split.capacity { - split.cWMetricHistogram.Min = bucketBegin - } + return nil } // CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary. @@ -302,11 +308,11 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, } // Set mid-point of positive buckets in values/counts array. - currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex, totalBucketLen) + currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex) // Set count of zero bucket in values/counts array. - currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBuckets(&split, metric, currentBucketIndex, currentZeroIndex, totalBucketLen) + currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBuckets(&split, metric, currentBucketIndex, currentZeroIndex) // Set mid-point of negative buckets in values/counts array. - currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex, totalBucketLen) + currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex) // Add the current split to the datapoints list datapoints = append(datapoints, dataPoint{ @@ -324,8 +330,8 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, return datapoints, true } -func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int, totalBucketLen int) (int, int) { - if split.isFull() || currentPositiveIndex < 0 { +func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) { + if !split.isNotFull() || currentPositiveIndex < 0 { return currentBucketIndex, currentPositiveIndex } @@ -337,7 +343,7 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric. bucketBegin := 0.0 bucketEnd := 0.0 - for !split.isFull() && currentPositiveIndex >= 0 { + for split.isNotFull() && currentPositiveIndex >= 0 { index := currentPositiveIndex + int(positiveOffset) if bucketEnd == 0 { bucketEnd = math.Pow(base, float64(index+1)) @@ -348,7 +354,19 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric. metricVal := (bucketBegin + bucketEnd) / 2 count := positiveBucketCounts.At(currentPositiveIndex) if count > 0 { - split.appendMetricData(metricVal, count, bucketBegin, bucketEnd) + err := split.appendMetricData(metricVal, count) + if err != nil { + // Stop the loop if the split has exceeded capacity, and return the current indexes + return currentBucketIndex, currentPositiveIndex + } + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(bucketEnd) + } + if split.length == split.capacity { + split.setMin(bucketBegin) + } } currentBucketIndex++ currentPositiveIndex-- @@ -357,9 +375,21 @@ func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric. return currentBucketIndex, currentPositiveIndex } -func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int, totalBucketLen int) (int, int) { - if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 { - split.appendMetricData(0, metric.ZeroCount(), 0, 0) +func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) { + if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 { + err := split.appendMetricData(0, metric.ZeroCount()) + if err != nil { + // Stop the loop if the split has exceeded capacity, and return the current indexes + return currentBucketIndex, currentZeroIndex + } + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(0) + } + if split.length == split.capacity { + split.setMin(0) + } currentZeroIndex++ currentBucketIndex++ } @@ -367,13 +397,13 @@ func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.Expo return currentBucketIndex, currentZeroIndex } -func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int, totalBucketLen int) (int, int) { +func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) { // According to metrics spec, the value in histogram is expected to be non-negative. // https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram // However, the negative support is defined in metrics data model. // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram // The negative is also supported but only verified with unit test. - if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() { + if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() { return currentBucketIndex, currentNegativeIndex } @@ -385,7 +415,7 @@ func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric. bucketBegin := 0.0 bucketEnd := 0.0 - for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() { + for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() { index := currentNegativeIndex + int(negativeOffset) if bucketEnd == 0 { bucketEnd = -math.Pow(base, float64(index)) @@ -396,7 +426,19 @@ func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric. metricVal := (bucketBegin + bucketEnd) / 2 count := negativeBucketCounts.At(currentNegativeIndex) if count > 0 { - split.appendMetricData(metricVal, count, bucketBegin, bucketEnd) + err := split.appendMetricData(metricVal, count) + if err != nil { + // Stop the loop if the split has exceeded capacity, and return the current indexes + return currentBucketIndex, currentNegativeIndex + } + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(bucketEnd) + } + if split.length == split.capacity { + split.setMin(bucketBegin) + } } currentBucketIndex++ currentNegativeIndex++