Skip to content

Commit

Permalink
Remove un-used code.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzhlogin committed Nov 12, 2024
1 parent 5eef5bd commit a4e0a1a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 51 deletions.
133 changes: 82 additions & 51 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ type dataPointSplit struct {
capacity int
}

func (split *dataPointSplit) isNotFull() bool {
return split.length < split.capacity
}

func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -211,11 +230,11 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
var currentBucketIndex = 0
currentBucketIndex := 0
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
currentZeroIndex := 0
currentNegativeIndex := 0
var datapoints []dataPoint
var currentPositiveIndex = metric.Positive().BucketCounts().Len() - 1
var currentZeroIndex = 0
var currentNegativeIndex = 0
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
Expand All @@ -239,34 +258,36 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
capacity := splitThreshold
if totalBucketLen-currentBucketIndex < splitThreshold {
capacity = totalBucketLen - currentBucketIndex
}

sum := 0.0
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
sum = metric.Sum()
}

split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: 0,
Sum: sum,
},
length: 0,
capacity: splitThreshold,
capacity: capacity,
}

// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
split.cWMetricHistogram.Sum = metric.Sum()
}

if totalBucketLen-currentBucketIndex < splitThreshold {
split.capacity = totalBucketLen - currentBucketIndex
}

// Set mid-point of positive buckets in values/counts array.
currentBucketIndex, currentPositiveIndex = iteratePositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex, totalBucketLen)
// Set count of zero bucket in values/counts array.
currentBucketIndex, currentZeroIndex = iterateZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex, totalBucketLen)
// Set mid-point of negative buckets in values/counts array.
currentBucketIndex, currentNegativeIndex = iterateNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex, totalBucketLen)
// Set collect values from positive buckets and save into split.
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set collect values from zero buckets and save into split.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
// Set collect values from negative buckets and save into split.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)

// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
Expand All @@ -276,10 +297,19 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}

// Override the min and max values of the first and last splits with the raw data of the metric.
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()

return datapoints, true
}

func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int, totalBucketLen int) (int, int) {
func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if !split.isNotFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
positiveBuckets := metric.Positive()
Expand All @@ -288,7 +318,7 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis
bucketBegin := 0.0
bucketEnd := 0.0

for split.length < split.capacity && currentPositiveIndex >= 0 {
for split.isNotFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
Expand All @@ -299,15 +329,14 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = bucketEnd
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = bucketBegin
if !split.isNotFull() {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
Expand All @@ -317,17 +346,16 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis
return currentBucketIndex, currentPositiveIndex
}

func iterateZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int, totalBucketLen int) (int, int) {
if metric.ZeroCount() > 0 && split.length < split.capacity && currentZeroIndex == 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, 0)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(metric.ZeroCount()))
split.length++
split.cWMetricHistogram.Count += metric.ZeroCount()
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = 0
func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount())

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = 0
if !split.isNotFull() {
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
Expand All @@ -336,12 +364,16 @@ func iterateZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogra
return currentBucketIndex, currentZeroIndex
}

func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int, totalBucketLen int) (int, int) {
func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
Expand All @@ -350,7 +382,7 @@ func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHis
bucketBegin := 0.0
bucketEnd := 0.0

for split.length < split.capacity && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
Expand All @@ -361,15 +393,14 @@ func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHis
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = bucketEnd
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = bucketBegin
if !split.isNotFull() {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
Expand Down
45 changes: 45 additions & 0 deletions exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,51 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSliceWithSplitDat
},
},
},
{
name: "Exponential histogram with more than 100 buckets, including positive, negative and zero buckets with zero counts",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
posBucketCounts := make([]uint64, 60)
for i := range posBucketCounts {
posBucketCounts[i] = uint64(i % 5)
}
histogramDP.Positive().BucketCounts().FromRaw(posBucketCounts)
histogramDP.SetZeroCount(2)
negBucketCounts := make([]uint64, 60)
for i := range negBucketCounts {
negBucketCounts[i] = uint64(i % 5)
}
histogramDP.Negative().BucketCounts().FromRaw(negBucketCounts)
histogramDP.SetSum(1000)
histogramDP.SetMin(-9e+17)
histogramDP.SetMax(9e+17)
histogramDP.SetCount(uint64(3662))
histogramDP.Attributes().PutStr("label1", "value1")
return histogramDPS
}(),
expectedDatapoints: []dataPoint{
{
name: "foo",
value: &cWMetricHistogram{
Values: []float64{8.646911284551352e+17, 4.323455642275676e+17, 2.161727821137838e+17, 1.080863910568919e+17, 2.7021597764222976e+16,
1.3510798882111488e+16, 6.755399441055744e+15, 3.377699720527872e+15, 8.44424930131968e+14, 4.22212465065984e+14, 2.11106232532992e+14,
1.05553116266496e+14, 2.6388279066624e+13, 1.3194139533312e+13, 6.597069766656e+12, 3.298534883328e+12, 8.24633720832e+11, 4.12316860416e+11,
2.06158430208e+11, 1.03079215104e+11, 2.5769803776e+10, 1.2884901888e+10, 6.442450944e+09, 3.221225472e+09, 8.05306368e+08, 4.02653184e+08,
2.01326592e+08, 1.00663296e+08, 2.5165824e+07, 1.2582912e+07, 6.291456e+06, 3.145728e+06, 786432, 393216, 196608, 98304, 24576, 12288, 6144, 3072,
768, 384, 192, 96, 24, 12, 6, 3, 0, -3, -6, -12, -24, -96, -192, -384, -768, -3072, -6144, -12288, -24576, -98304, -196608, -393216, -786432,
-3.145728e+06, -6.291456e+06, -1.2582912e+07, -2.5165824e+07, -1.00663296e+08, -2.01326592e+08, -4.02653184e+08, -8.05306368e+08, -3.221225472e+09,
-6.442450944e+09, -1.2884901888e+10, -2.5769803776e+10, -1.03079215104e+11, -2.06158430208e+11, -4.12316860416e+11, -8.24633720832e+11, -3.298534883328e+12,
-6.597069766656e+12, -1.3194139533312e+13, -2.6388279066624e+13, -1.05553116266496e+14, -2.11106232532992e+14, -4.22212465065984e+14, -8.44424930131968e+14,
-3.377699720527872e+15, -6.755399441055744e+15, -1.3510798882111488e+16, -2.7021597764222976e+16, -1.080863910568919e+17, -2.161727821137838e+17,
-4.323455642275676e+17, -8.646911284551352e+17},
Counts: []float64{4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3,
2, 1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4},
Sum: 1000, Count: 242, Min: -9e+17, Max: 9e+17},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
},
},
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit a4e0a1a

Please sign in to comment.