Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split EMF log with larger than 100 buckets. #242

Merged
merged 11 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 106 additions & 22 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,42 +213,94 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// If the total buckets exceed 100, the exponential histogram metric are split into two data points,
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
// the second data point contains the last 100 buckets, while the first data point includes the remaining buckets.
// Re-calculated Min, Max, Sum, Count for each split:
// 1. First split datapoint:
// - Max: From original metric.
// - Min: Last bucket’s bucketBegin in the first split.
// - Sum: 0.
// - Count: Calculated from the first split buckets.
//
// 2. Second split datapoint:
// - Max: First bucket’s bucketEnd in the second split.
// - Min: From original metric.
// - Sum: From original metric.
// - Count: Overall count - first split count.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, _ string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64
firstDataPointCount := 0
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
firstDataPointMin := metric.Min()
firstDataPointArrayValues := []float64{}
firstDataPointArrayCounts := []float64{}
secondDataPointMax := metric.Max()
secondDataPointArrayValues := []float64{}
secondDataPointArrayCounts := []float64{}
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}
currentLength := 0
firstDataPointLength := totalBucketLen - 100
if firstDataPointLength < 0 {
firstDataPointLength = 0
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
}

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
for i := positiveBucketCounts.Len() - 1; i >= 0; i-- {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketBegin = bucketEnd
bucketEnd = bucketBegin
}
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
if count > 0 && currentLength < firstDataPointLength {
firstDataPointArrayValues = append(firstDataPointArrayValues, metricVal)
firstDataPointArrayCounts = append(firstDataPointArrayCounts, float64(count))
firstDataPointCount += int(count)
currentLength++
if currentLength == firstDataPointLength {
firstDataPointMin = bucketBegin
}
} else if count > 0 {
if currentLength == firstDataPointLength && currentLength != 0 {
secondDataPointMax = bucketEnd
}
secondDataPointArrayValues = append(secondDataPointArrayValues, metricVal)
secondDataPointArrayCounts = append(secondDataPointArrayCounts, float64(count))
currentLength++
}
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
if metric.ZeroCount() > 0 && currentLength < firstDataPointLength {
firstDataPointArrayValues = append(firstDataPointArrayValues, 0)
firstDataPointArrayCounts = append(firstDataPointArrayCounts, float64(metric.ZeroCount()))
firstDataPointCount += int(metric.ZeroCount())
currentLength++
if currentLength == firstDataPointLength {
firstDataPointMin = 0
}
} else if metric.ZeroCount() > 0 {
if currentLength == firstDataPointLength && currentLength != 0 {
secondDataPointMax = 0
}
secondDataPointArrayValues = append(secondDataPointArrayValues, 0)
secondDataPointArrayCounts = append(secondDataPointArrayCounts, float64(metric.ZeroCount()))
currentLength++
}

// Set mid-point of negative buckets in values/counts array.
Expand All @@ -273,25 +325,57 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
if count > 0 && currentLength < firstDataPointLength {
firstDataPointArrayValues = append(firstDataPointArrayValues, metricVal)
firstDataPointArrayCounts = append(firstDataPointArrayCounts, float64(count))
firstDataPointCount += int(count)
currentLength++
if currentLength == firstDataPointLength {
firstDataPointMin = bucketEnd
}
} else if count > 0 {
if currentLength == firstDataPointLength && currentLength != 0 {
secondDataPointMax = bucketBegin
}
secondDataPointArrayValues = append(secondDataPointArrayValues, metricVal)
secondDataPointArrayCounts = append(secondDataPointArrayCounts, float64(count))
currentLength++
}
}

return []dataPoint{{
var datapoints []dataPoint
// Add second data point (last 100 elements or fewer)
jefchien marked this conversation as resolved.
Show resolved Hide resolved
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Values: secondDataPointArrayValues,
Counts: secondDataPointArrayCounts,
Count: metric.Count() - uint64(firstDataPointCount),
Sum: metric.Sum(),
Max: metric.Max(),
Max: secondDataPointMax,
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
})

// Add first data point with the remaining elements
if firstDataPointCount > 0 {
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: &cWMetricHistogram{
Values: firstDataPointArrayValues,
Counts: firstDataPointArrayCounts,
Count: uint64(firstDataPointCount),
Sum: 0,
Max: metric.Max(),
Min: firstDataPointMin,
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
return datapoints, true
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
116 changes: 114 additions & 2 deletions exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,35 @@ func generateTestExponentialHistogramMetricWithInfs(name string) pmetric.Metrics
return otelMetrics
}

func generateTestExponentialHistogramMetricWithLongBuckets(name string) pmetric.Metrics {
otelMetrics := pmetric.NewMetrics()
rs := otelMetrics.ResourceMetrics().AppendEmpty()
metrics := rs.ScopeMetrics().AppendEmpty().Metrics()
metric := metrics.AppendEmpty()
metric.SetName(name)
metric.SetUnit("Seconds")
exponentialHistogramMetric := metric.SetEmptyExponentialHistogram()

exponentialHistogramDatapoint := exponentialHistogramMetric.DataPoints().AppendEmpty()
exponentialHistogramDatapoint.SetCount(3662)
exponentialHistogramDatapoint.SetSum(1000)
exponentialHistogramDatapoint.SetMin(-9e+17)
exponentialHistogramDatapoint.SetMax(9e+17)
exponentialHistogramDatapoint.SetZeroCount(2)
posBucketCounts := make([]uint64, 60)
for i := range posBucketCounts {
posBucketCounts[i] = uint64(i + 1)
}
exponentialHistogramDatapoint.Positive().BucketCounts().FromRaw(posBucketCounts)
negBucketCounts := make([]uint64, 60)
for i := range negBucketCounts {
negBucketCounts[i] = uint64(i + 1)
}
exponentialHistogramDatapoint.Negative().BucketCounts().FromRaw(negBucketCounts)
exponentialHistogramDatapoint.Attributes().PutStr("label1", "value1")
return otelMetrics
}

func generateTestSummaryMetric(name string) pmetric.Metrics {
otelMetrics := pmetric.NewMetrics()
rs := otelMetrics.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -894,7 +923,7 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing.
}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{1.5, 3, 6, 0, -1.5, -3, -6}, Counts: []float64{1, 2, 3, 4, 1, 2, 3}},
value: &cWMetricHistogram{Values: []float64{6, 3, 1.5, 0, -1.5, -3, -6}, Counts: []float64{3, 2, 1, 4, 1, 2, 3}},
labels: map[string]string{"label1": "value1"},
},
},
Expand All @@ -915,7 +944,7 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing.
}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{0.625, 2.5, 10, 0, -0.625, -2.5, -10}, Counts: []float64{1, 2, 3, 4, 1, 2, 3}},
value: &cWMetricHistogram{Values: []float64{10, 2.5, 0.625, 0, -0.625, -2.5, -10}, Counts: []float64{3, 2, 1, 4, 1, 2, 3}},
labels: map[string]string{"label1": "value1", "label2": "value2"},
},
},
Expand All @@ -939,6 +968,89 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing.

}

func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSliceWithSplitDataPoints(t *testing.T) {
dmd := generateDeltaMetricMetadata(false, "foo", false)

testCases := []struct {
name string
histogramDPS pmetric.ExponentialHistogramDataPointSlice
expectedDatapoint1 dataPoint
expectedDatapoint2 dataPoint
}{
{
name: "Exponential histogram with more than 100 buckets",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
posBucketCounts := make([]uint64, 60)
for i := range posBucketCounts {
posBucketCounts[i] = uint64(i + 1)
}
histogramDP.Positive().BucketCounts().FromRaw(posBucketCounts)
histogramDP.SetZeroCount(2)
negBucketCounts := make([]uint64, 60)
for i := range negBucketCounts {
negBucketCounts[i] = uint64(i + 1)
}
histogramDP.Negative().BucketCounts().FromRaw(negBucketCounts)
histogramDP.SetSum(1000)
histogramDP.SetMin(-9e+17)
histogramDP.SetMax(9e+17)
histogramDP.SetCount(uint64(3662))
histogramDP.Attributes().PutStr("label1", "value1")
return histogramDPS
}(),
expectedDatapoint1: dataPoint{
name: "foo",
value: &cWMetricHistogram{
Values: []float64{4.12316860416e+11, 2.06158430208e+11, 1.03079215104e+11, 5.1539607552e+10, 2.5769803776e+10,
1.2884901888e+10, 6.442450944e+09, 3.221225472e+09, 1.610612736e+09, 8.05306368e+08, 4.02653184e+08, 2.01326592e+08, 1.00663296e+08,
5.0331648e+07, 2.5165824e+07, 1.2582912e+07, 6.291456e+06, 3.145728e+06, 1.572864e+06, 786432, 393216, 196608, 98304, 49152, 24576,
12288, 6144, 3072, 1536, 768, 384, 192, 96, 48, 24, 12, 6, 3, 1.5, 0, -1.5, -3, -6, -12, -24, -48, -96, -192, -384, -768, -1536, -3072,
-6144, -12288, -24576, -49152, -98304, -196608, -393216, -786432, -1.572864e+06, -3.145728e+06, -6.291456e+06, -1.2582912e+07, -2.5165824e+07,
-5.0331648e+07, -1.00663296e+08, -2.01326592e+08, -4.02653184e+08, -8.05306368e+08, -1.610612736e+09, -3.221225472e+09, -6.442450944e+09,
-1.2884901888e+10, -2.5769803776e+10, -5.1539607552e+10, -1.03079215104e+11, -2.06158430208e+11, -4.12316860416e+11, -8.24633720832e+11,
-1.649267441664e+12, -3.298534883328e+12, -6.597069766656e+12, -1.3194139533312e+13, -2.6388279066624e+13, -5.2776558133248e+13,
-1.05553116266496e+14, -2.11106232532992e+14, -4.22212465065984e+14, -8.44424930131968e+14, -1.688849860263936e+15, -3.377699720527872e+15,
-6.755399441055744e+15, -1.3510798882111488e+16, -2.7021597764222976e+16, -5.404319552844595e+16, -1.080863910568919e+17, -2.161727821137838e+17,
-4.323455642275676e+17, -8.646911284551352e+17},
Counts: []float64{39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7,
6, 5, 4, 3, 2, 1, 2, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60},
Sum: 1000, Count: 2612, Min: -9e+17, Max: 5.49755813888e+11},
labels: map[string]string{"label1": "value1"},
},
expectedDatapoint2: dataPoint{
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
name: "foo",
value: &cWMetricHistogram{
Values: []float64{8.646911284551352e+17, 4.323455642275676e+17, 2.161727821137838e+17, 1.080863910568919e+17, 5.404319552844595e+16, 2.7021597764222976e+16,
1.3510798882111488e+16, 6.755399441055744e+15, 3.377699720527872e+15, 1.688849860263936e+15, 8.44424930131968e+14, 4.22212465065984e+14,
2.11106232532992e+14, 1.05553116266496e+14, 5.2776558133248e+13, 2.6388279066624e+13, 1.3194139533312e+13, 6.597069766656e+12, 3.298534883328e+12,
1.649267441664e+12, 8.24633720832e+11},
Counts: []float64{60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40},
Sum: 0, Count: 1050, Min: 5.49755813888e+11, Max: 9e+17},
labels: map[string]string{"label1": "value1"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(_ *testing.T) {
exponentialHistogramDatapointSlice := exponentialHistogramDataPointSlice{dmd, tc.histogramDPS}
emfCalcs := setupEmfCalculators()
defer require.NoError(t, shutdownEmfCalculators(emfCalcs))
dps, retained := exponentialHistogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs)

assert.True(t, retained)
assert.Equal(t, 1, exponentialHistogramDatapointSlice.Len())
assert.Equal(t, 2, len(dps))
assert.Equal(t, tc.expectedDatapoint1, dps[0])
assert.Equal(t, tc.expectedDatapoint2, dps[1])
})
}

}

func TestIsStaleNaNInf_ExponentialHistogramDataPointSlice(t *testing.T) {

testCases := []struct {
Expand Down
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/grouped_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func addToGroupedMetric(
continue
}

for _, dp := range dps {
for i, dp := range dps {
labels := dp.labels

if metricType, ok := labels["Type"]; ok {
Expand Down Expand Up @@ -87,6 +87,7 @@ func addToGroupedMetric(
}

// Extra params to use when grouping metrics
metadata.groupedMetricMetadata.metricIndex = i
groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)
if _, ok := groupedMetrics[groupKey]; ok {
// if MetricName already exists in metrics map, print warning log
Expand Down
Loading