Skip to content

Commit

Permalink
Adding support for processing and publishing AWS neuron metrics as pa…
Browse files Browse the repository at this point in the history
…rt of CW Container Insights (aws#1084)
  • Loading branch information
aditya-purang authored Mar 27, 2024
1 parent 4bda88a commit 487bfcf
Show file tree
Hide file tree
Showing 11 changed files with 1,481 additions and 12 deletions.
15 changes: 15 additions & 0 deletions internal/containerinsightscommon/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ const (
GpuTotal = "gpu_total"
GpuUniqueId = "UUID"

NeuronCoreUtilization = "neuroncore_utilization"
NeuronCoreMemoryUtilizationTotal = "neuroncore_memory_usage_total"
NeuronCoreMemoryUtilizationConstants = "neuroncore_memory_usage_constants"
NeuronCoreMemoryUtilizationModelCode = "neuroncore_memory_usage_model_code"
NeuronCoreMemoryUtilizationSharedScratchpad = "neuroncore_memory_usage_model_shared_scratchpad"
NeuronCoreMemoryUtilizationRuntimeMemory = "neuroncore_memory_usage_runtime_memory"
NeuronCoreMemoryUtilizationTensors = "neuroncore_memory_usage_tensors"
NeuronDeviceHardwareEccEvents = "neurondevice_hw_ecc_events_total"
NeuronExecutionStatus = "neuron_execution_status"
NeuronExecutionErrors = "neuron_execution_errors"
NeuronRuntimeMemoryUsage = "neurondevice_runtime_memory_used_bytes"
NeuronInstanceInfo = "instance_info"
NeuronHardware = "neuron_hardware"
NeuronExecutionLatency = "neuron_execution_latency"

TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
TypeClusterNamespace = "ClusterNamespace"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
)

var memoryMetricsNames = map[string]struct{}{
containerinsightscommon.NeuronCoreMemoryUtilizationConstants: {},
containerinsightscommon.NeuronCoreMemoryUtilizationModelCode: {},
containerinsightscommon.NeuronCoreMemoryUtilizationSharedScratchpad: {},
containerinsightscommon.NeuronCoreMemoryUtilizationRuntimeMemory: {},
containerinsightscommon.NeuronCoreMemoryUtilizationTensors: {},
}

type NeuronCoreInfo struct {
neuronCoreIndex string
neuronDeviceIndex string
runtimeTag string
}

type AwsNeuronMemoryMetricsAggregator struct {
memoryMetricValuesAggregator map[NeuronCoreInfo]float64
aggregatedMemoryMetricAttributes pcommon.Map
metricTimestamp pcommon.Timestamp
MemoryMetricsFound bool
}

func NewMemoryMemoryAggregator() *AwsNeuronMemoryMetricsAggregator {
return &AwsNeuronMemoryMetricsAggregator{memoryMetricValuesAggregator: map[NeuronCoreInfo]float64{}, MemoryMetricsFound: false}
}

func (d *AwsNeuronMemoryMetricsAggregator) AggregateMemoryMetric(originalMetric pmetric.Metric) {
if _, exists := memoryMetricsNames[originalMetric.Name()]; !exists {
return
}

datapoints := originalMetric.Gauge().DataPoints()

if datapoints.Len() <= 0 {
return
}

d.MemoryMetricsFound = true
d.aggregatedMemoryMetricAttributes = datapoints.At(0).Attributes()
d.metricTimestamp = datapoints.At(0).Timestamp()

for i := 0; i < datapoints.Len(); i++ {
datapoint := datapoints.At(i)

neuronCoreIndexValue, neuronCoreIndexValueExists := datapoint.Attributes().Get(NeuronCoreAttributeKey)
neuronDeviceIndexValue, neuronDeviceIndexValueExists := datapoint.Attributes().Get(NeuronDeviceAttributeKey)
runtimeTagValue, runtimeTagExists := datapoint.Attributes().Get(RuntimeTag)

if neuronCoreIndexValueExists && neuronDeviceIndexValueExists && runtimeTagExists {
neuronCoreInfo := NeuronCoreInfo{neuronCoreIndex: neuronCoreIndexValue.AsString(), neuronDeviceIndex: neuronDeviceIndexValue.AsString(), runtimeTag: runtimeTagValue.AsString()}
d.memoryMetricValuesAggregator[neuronCoreInfo] += datapoint.DoubleValue()
}
}

}

func (d *AwsNeuronMemoryMetricsAggregator) FlushAggregatedMemoryMetric() pmetric.Metric {
aggregatedMemoryMetric := pmetric.NewMetric()
aggregatedMemoryMetric.SetName(containerinsightscommon.NeuronCoreMemoryUtilizationTotal)
datapoints := aggregatedMemoryMetric.SetEmptySum().DataPoints()

for neuronCoreInfo, totalMemoryUsed := range d.memoryMetricValuesAggregator {
datapoint := datapoints.AppendEmpty()
datapoint.SetDoubleValue(totalMemoryUsed)
d.aggregatedMemoryMetricAttributes.CopyTo(datapoint.Attributes())

datapoint.Attributes().PutStr(NeuronCoreAttributeKey, neuronCoreInfo.neuronCoreIndex)
datapoint.Attributes().PutStr(NeuronDeviceAttributeKey, neuronCoreInfo.neuronDeviceIndex)
datapoint.Attributes().PutStr(RuntimeTag, neuronCoreInfo.runtimeTag)
datapoint.SetTimestamp(d.metricTimestamp)
}

// Reset the aggregator
d.resetMemoryMetricAggregator()
return aggregatedMemoryMetric
}

func (d *AwsNeuronMemoryMetricsAggregator) resetMemoryMetricAggregator() {
d.memoryMetricValuesAggregator = map[NeuronCoreInfo]float64{}
d.MemoryMetricsFound = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
)

const (
dummy = "dummy"
)

var (
memoryUsageMetricValuesMap = map[string]float64{
"0": 20,
"2": 40,
}
)

func TestMemoryMetricAggregator_AggregateMemoryMetric(t *testing.T) {
aggregator := NewMemoryMemoryAggregator()

// Create a sample original metric with gauge data points
tensorsMemoryUsage := createSampleMetric(containerinsightscommon.NeuronCoreMemoryUtilizationTensors)
nonNeuronMetric := createSampleMetric(dummy)

// Call the method being tested
aggregator.AggregateMemoryMetric(tensorsMemoryUsage)
aggregator.AggregateMemoryMetric(nonNeuronMetric)

// Assert that memory metrics were found
assert.True(t, aggregator.MemoryMetricsFound)
}

func TestMemoryMetricAggregator_NonNeuronMetric(t *testing.T) {
aggregator := NewMemoryMemoryAggregator()

// Create a sample original metric with gauge data points
nonNeuronMetric := createSampleMetric("dummy")

// Call the method being tested
aggregator.AggregateMemoryMetric(nonNeuronMetric)

// Assert that memory metrics were found
assert.False(t, aggregator.MemoryMetricsFound)
}

func TestMemoryMetricAggregator_FlushAggregatedMemoryMetric(t *testing.T) {
aggregator := NewMemoryMemoryAggregator()
aggregator.aggregatedMemoryMetricAttributes = pcommon.NewMap()
aggregator.aggregatedMemoryMetricAttributes.FromRaw(map[string]any{
NeuronCoreAttributeKey: "9",
NeuronDeviceAttributeKey: "9",
dummy: dummy,
})

aggregator.metricTimestamp = staticTimestamp

// Add some data to the aggregator
// Create a sample original metric with gauge data points
tensorsMemoryUsage := createSampleMetric(containerinsightscommon.NeuronCoreMemoryUtilizationTensors)
constantsMemoryUsage := createSampleMetric(containerinsightscommon.NeuronCoreMemoryUtilizationConstants)
nonNeuronMetric := createSampleMetric(dummy)

// Call the method being tested
aggregator.AggregateMemoryMetric(tensorsMemoryUsage)
aggregator.AggregateMemoryMetric(constantsMemoryUsage)
aggregator.AggregateMemoryMetric(nonNeuronMetric)

// Call the method being tested
aggregatedMetric := aggregator.FlushAggregatedMemoryMetric()
aggregatedMetricDatapoints := aggregatedMetric.Sum().DataPoints()
// Assert the result
assert.NotNil(t, aggregatedMetric)
assert.Equal(t, containerinsightscommon.NeuronCoreMemoryUtilizationTotal, aggregatedMetric.Name())
assert.Equal(t, 2, aggregatedMetricDatapoints.Len())

for i := 0; i < aggregatedMetricDatapoints.Len(); i++ {
datapoint := aggregatedMetricDatapoints.At(i)
assert.Equal(t, staticTimestamp.String(), datapoint.Timestamp().String())
assert.Equal(t, 4, datapoint.Attributes().Len())

actualNeuronCoreIndex, _ := datapoint.Attributes().Get(NeuronCoreAttributeKey)
actualNeuronDeviceIndex, _ := datapoint.Attributes().Get(NeuronDeviceAttributeKey)
actualRuntimeTag, _ := datapoint.Attributes().Get(RuntimeTag)

assert.Equal(t, memoryUsageMetricValuesMap[actualNeuronCoreIndex.AsString()], datapoint.DoubleValue())
assert.Equal(t, "1", actualRuntimeTag.AsString())
assert.NotEqual(t, "9", actualNeuronCoreIndex.AsString())
assert.NotEqual(t, "9", actualNeuronDeviceIndex.AsString())
}
}

func createSampleMetric(metricName string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(metricName)

// Add gauge data points
dataPoints := metric.SetEmptyGauge().DataPoints()
dataPoint1 := dataPoints.AppendEmpty()
dataPoint1.SetDoubleValue(10.0)
dataPoint1.SetTimestamp(staticTimestamp)
dataPoint1.Attributes().FromRaw(map[string]any{
NeuronCoreAttributeKey: "0",
NeuronDeviceAttributeKey: "0",
dummy: dummy,
RuntimeTag: "1",
})

dataPoint2 := dataPoints.AppendEmpty()
dataPoint2.SetDoubleValue(20.0)
dataPoint1.SetTimestamp(staticTimestamp)
dataPoint2.Attributes().FromRaw(map[string]any{
NeuronCoreAttributeKey: "2",
NeuronDeviceAttributeKey: "1",
dummy: dummy,
RuntimeTag: "1",
})

return metric
}
Loading

0 comments on commit 487bfcf

Please sign in to comment.