Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split EMF log with larger than 100 buckets. #242

Merged
merged 11 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 173 additions & 24 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ type summaryMetricEntry struct {
count uint64
}

type dataPointSplit struct {
values []float64
counts []float64
count int
length int
bucketMin float64
bucketMax float64
maxBuckets int
}
jefchien marked this conversation as resolved.
Show resolved Hide resolved

//func newDataPointSplit(count int, length int, bucketMin float64, bucketMax float64, maxBuckets int) *dataPointSplit {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
// return &dataPointSplit{
// values: []float64{},
// counts: []float64{},
// count: count,
// length: length,
// bucketMin: bucketMin,
// bucketMax: bucketMax,
// maxBuckets: maxBuckets,
// }
//}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, _ string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -213,42 +235,122 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into two data points if the total buckets exceed 100,
// the first data point contains the first 100 buckets, while the second data point includes the remaining buckets.
// Re-calculated Min, Max, Sum, Count for each split:
// 1. First split datapoint:
// - Max: From original metric.
// - Min: Last bucket’s bucketBegin in the first split.
// - Sum: From original metric.
// - Count: Calculated from the first split buckets.
//
// 2. Second split datapoint:
// - Max: First bucket’s bucketEnd in the second split.
// - Min: From original metric.
// - Sum: 0.
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
// - Count: Overall count - first split count.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, _ string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}

var bucketBegin float64
var bucketEnd float64
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}
firstSplit := dataPointSplit{
values: []float64{},
counts: []float64{},
count: int(metric.Count()),
length: 0,
bucketMin: metric.Min(),
bucketMax: metric.Max(),
maxBuckets: totalBucketLen,
}

currentLength := 0
splitThreshold := 100
var secondSplit dataPointSplit
if totalBucketLen > splitThreshold {
firstSplit.maxBuckets = splitThreshold
firstSplit.count = 0
secondSplit = dataPointSplit{
values: []float64{},
counts: []float64{},
count: 0,
length: 0,
bucketMin: metric.Min(),
bucketMax: metric.Max(),
maxBuckets: totalBucketLen - splitThreshold,
}
}

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
for i := positiveBucketCounts.Len() - 1; i >= 0; i-- {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketBegin = bucketEnd
bucketEnd = bucketBegin
}
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
if count > 0 && firstSplit.length < firstSplit.maxBuckets {
firstSplit.values = append(firstSplit.values, metricVal)
firstSplit.counts = append(firstSplit.counts, float64(count))
firstSplit.length++
currentLength++
if firstSplit.maxBuckets < totalBucketLen {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
firstSplit.count += int(count)
if currentLength == firstSplit.maxBuckets {
if bucketBegin < bucketEnd {
firstSplit.bucketMin = bucketBegin
} else {
firstSplit.bucketMin = bucketEnd
}
}
}
} else if count > 0 {
if currentLength == firstSplit.maxBuckets && currentLength != 0 {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
if bucketBegin < bucketEnd {
secondSplit.bucketMax = bucketEnd
} else {
secondSplit.bucketMax = bucketBegin
}
}
secondSplit.values = append(secondSplit.values, metricVal)
secondSplit.counts = append(secondSplit.counts, float64(count))
currentLength++
}
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
if metric.ZeroCount() > 0 && firstSplit.length < firstSplit.maxBuckets {
firstSplit.values = append(firstSplit.values, 0)
firstSplit.counts = append(firstSplit.counts, float64(metric.ZeroCount()))
firstSplit.length++
currentLength++
if firstSplit.maxBuckets < totalBucketLen {
firstSplit.count += int(metric.ZeroCount())
if currentLength == firstSplit.maxBuckets {
firstSplit.bucketMin = bucketBegin
}
}
} else if metric.ZeroCount() > 0 {
if currentLength == firstSplit.maxBuckets && currentLength != 0 {
secondSplit.bucketMax = 0
}
secondSplit.values = append(secondSplit.values, 0)
secondSplit.counts = append(secondSplit.counts, float64(metric.ZeroCount()))
currentLength++
}

// Set mid-point of negative buckets in values/counts array.
Expand All @@ -273,25 +375,72 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int,
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
if count > 0 && firstSplit.length < firstSplit.maxBuckets {
firstSplit.values = append(firstSplit.values, metricVal)
firstSplit.counts = append(firstSplit.counts, float64(count))
firstSplit.length++
currentLength++
if firstSplit.maxBuckets < totalBucketLen {
firstSplit.count += int(count)
if currentLength == firstSplit.length {
firstSplit.bucketMin = bucketEnd
if bucketBegin < bucketEnd {
firstSplit.bucketMin = bucketBegin
} else {
firstSplit.bucketMin = bucketEnd
}
}
}
} else if count > 0 {
if currentLength == firstSplit.maxBuckets && currentLength != 0 {
if bucketBegin < bucketEnd {
secondSplit.bucketMax = bucketEnd
} else {
secondSplit.bucketMax = bucketBegin
}
}
secondSplit.values = append(secondSplit.values, metricVal)
secondSplit.counts = append(secondSplit.counts, float64(count))
currentLength++
}
}

return []dataPoint{{
//fmt.Println("firstSplit", firstSplit)
//fmt.Println("secondSplit", secondSplit)

var datapoints []dataPoint
// Add second data point (last 100 elements or fewer)
jefchien marked this conversation as resolved.
Show resolved Hide resolved
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Values: firstSplit.values,
Counts: firstSplit.counts,
Count: uint64(firstSplit.count),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
Max: firstSplit.bucketMax,
Min: firstSplit.bucketMin,
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
})

// Add first data point with the remaining elements
if totalBucketLen > 100 {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: &cWMetricHistogram{
Values: secondSplit.values,
Counts: secondSplit.counts,
Count: metric.Count() - uint64(firstSplit.count),
Sum: 0,
Max: secondSplit.bucketMax,
Min: secondSplit.bucketMin,
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
return datapoints, true
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading