Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split EMF log with larger than 100 buckets. #242

Merged
merged 11 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 169 additions & 64 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ type summaryMetricEntry struct {
count uint64
}

type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, _ string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -213,85 +219,184 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, _ string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}

var bucketBegin float64
var bucketEnd float64
const splitThreshold = 100
var currentBucketIndex = 0
var datapoints []dataPoint
var currentPositiveIndex = metric.Positive().BucketCounts().Len() - 1
var currentZeroIndex = 0
var currentNegativeIndex = 0
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
} else {
bucketBegin = bucketEnd
if totalBucketLen == 0 {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: 0,
},
length: 0,
capacity: splitThreshold,
}
bucketEnd = math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))

// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
split.cWMetricHistogram.Sum = metric.Sum()
}
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
}
if totalBucketLen-currentBucketIndex < splitThreshold {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
split.capacity = totalBucketLen - currentBucketIndex
}

// Set mid-point of positive buckets in values/counts array.
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0

for split.length < split.capacity && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketEnd = bucketBegin
}
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
if bucketBegin < bucketEnd {
split.cWMetricHistogram.Max = bucketEnd
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
} else {
split.cWMetricHistogram.Max = bucketBegin
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
}
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
if bucketBegin < bucketEnd {
split.cWMetricHistogram.Min = bucketBegin
} else {
split.cWMetricHistogram.Min = bucketEnd
}
}
}
currentBucketIndex++
currentPositiveIndex--
}

// Set mid-point of negative buckets in values/counts array.
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.

negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 && split.length < split.capacity && currentZeroIndex == 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, 0)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(metric.ZeroCount()))
split.length++
split.cWMetricHistogram.Count += metric.ZeroCount()
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = 0
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = 0
}
currentZeroIndex++
currentBucketIndex++
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))

// Set mid-point of negative buckets in values/counts array.
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0

for split.length < split.capacity && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
if bucketBegin < bucketEnd {
split.cWMetricHistogram.Max = bucketEnd
} else {
split.cWMetricHistogram.Max = bucketBegin
}
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
if bucketBegin < bucketEnd {
split.cWMetricHistogram.Min = bucketBegin
} else {
split.cWMetricHistogram.Min = bucketEnd
}
}
}
currentBucketIndex++
currentNegativeIndex++
}

// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
return datapoints, true
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading