Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split EMF log with larger than 100 buckets. #242

Merged
merged 11 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 177 additions & 42 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ type summaryMetricEntry struct {
count uint64
}

type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

func (split *dataPointSplit) isNotFull() bool {
return split.length < split.capacity
}

func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, _ string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -213,85 +238,195 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, _ string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
var currentBucketIndex = 0
var currentPositiveIndex = metric.Positive().BucketCounts().Len() - 1
var currentZeroIndex = 0
var currentNegativeIndex = 0
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
var datapoints []dataPoint
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

if totalBucketLen == 0 {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
capacity := splitThreshold
if totalBucketLen-currentBucketIndex < splitThreshold {
capacity = totalBucketLen - currentBucketIndex
}

sum := 0.0
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
sum = metric.Sum()
}

split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: sum,
},
length: 0,
capacity: capacity,
}

// Set mid-point of positive buckets in values/counts array.
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set count of zero bucket in values/counts array.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBuckets(&split, metric, currentBucketIndex, currentZeroIndex)
// Set mid-point of negative buckets in values/counts array.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved

// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}

//Override the min and max values of the first and last splits with the raw data of the metric.
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()

return datapoints, true
}

func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if !split.isNotFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
bucketBegin := 0.0
bucketEnd := 0.0

for split.isNotFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketBegin = bucketEnd
bucketEnd = bucketBegin
}
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentPositiveIndex--
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
return currentBucketIndex, currentPositiveIndex
}

func collectDatapointsWithZeroBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount())

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if split.length == split.capacity {
zzhlogin marked this conversation as resolved.
Show resolved Hide resolved
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
}

// Set mid-point of negative buckets in values/counts array.
return currentBucketIndex, currentZeroIndex
}

func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
bucketBegin := 0.0
bucketEnd := 0.0

for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.length == split.capacity {
split.setMin(bucketBegin)
}
}
currentBucketIndex++
currentNegativeIndex++
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes()),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
return currentBucketIndex, currentNegativeIndex
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading
Loading