Skip to content

Commit

Permalink
Update metrics for AWS Neuron (aws#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-purang authored Apr 3, 2024
1 parent 5f676d1 commit c9fbf91
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 280 deletions.
2 changes: 1 addition & 1 deletion internal/containerinsightscommon/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
NeuronCoreMemoryUtilizationSharedScratchpad = "neuroncore_memory_usage_model_shared_scratchpad"
NeuronCoreMemoryUtilizationRuntimeMemory = "neuroncore_memory_usage_runtime_memory"
NeuronCoreMemoryUtilizationTensors = "neuroncore_memory_usage_tensors"
NeuronDeviceHardwareEccEvents = "neurondevice_hw_ecc_events_total"
NeuronDeviceHardwareEccEvents = "neurondevice_hw_ecc_events"
NeuronExecutionStatus = "neuron_execution_status"
NeuronExecutionErrors = "neuron_execution_errors"
NeuronRuntimeMemoryUsage = "neurondevice_runtime_memory_used_bytes"
Expand Down
129 changes: 80 additions & 49 deletions plugins/processors/gpuattributes/internal/awsneuron_metric_modifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,40 @@ import (
)

const (
aggregatedMetricSuffix = "_total"
ErrorType = "error_type"
StatusType = "status_type"
EventType = "event_type"
logTypeSuffix = "AWSNeuron"
MemoryLocation = "memory_location"

Core = "Core"
Device = "Device"
Percentile = "percentile"
PodName = "PodName"
Count = "Count"
Bytes = "Bytes"
Seconds = "Seconds"
Percent = "Percent"
NeuronCoreAttributeKey = "NeuronCore"
NeuronDeviceAttributeKey = "NeuronDevice"
RuntimeTag = "runtime_tag"
ClusterName = "ClusterName"
ContainerName = "ContainerName"
FullPodName = "FullPodName"
InstanceId = "InstanceId"
InstanceType = "InstanceType"
K8sPodName = "K8sPodName"
Namespace = "Namespace"
NeuronCore = "NeuronCore"
NeuronDevice = "NeuronDevice"
NodeName = "NodeName"
Service = "Service"
AvailabilityZone = "availability_zone"
Kubernetes = "kubernetes"
Region = "region"
SubnetId = "subnet_id"
ErrorType = "error_type"
StatusType = "status_type"
EventType = "event_type"
logTypeSuffix = "AWSNeuron"
MemoryLocation = "memory_location"

Core = "Core"
Device = "Device"
Percentile = "percentile"
PodName = "PodName"
Count = "Count"
Bytes = "Bytes"
Seconds = "Seconds"
Percent = "Percent"
NeuronCoreAttributeKey = "NeuronCore"
NeuronDeviceAttributeKey = "NeuronDevice"
RuntimeTag = "runtime_tag"
ClusterName = "ClusterName"
ContainerName = "ContainerName"
FullPodName = "FullPodName"
InstanceId = "InstanceId"
InstanceType = "InstanceType"
K8sPodName = "K8sPodName"
Namespace = "Namespace"
NeuronCore = "NeuronCore"
NeuronDevice = "NeuronDevice"
NodeName = "NodeName"
Service = "Service"
AvailabilityZone = "availability_zone"
Kubernetes = "kubernetes"
Region = "region"
SubnetId = "subnet_id"
NeuronExecutionErrorsAggregatedMetric = containerinsightscommon.NeuronExecutionErrors + "_total"
NeuronDeviceHardwareEccEventsAggregatedMetric = containerinsightscommon.NeuronDeviceHardwareEccEvents + "_total"
)

type AwsNeuronMetricModifier struct {
Expand All @@ -60,6 +61,12 @@ type MetricModifications struct {
Unit string
}

type MetricDatapointAggregationKey struct {
runtimeTag string
aggregatedMetricName string
deviceId string
}

var (
metricModificationsMap = map[string]MetricModifications{
containerinsightscommon.NeuronExecutionErrors: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: ErrorType, LogTypeSuffix: "", Unit: Count},
Expand All @@ -79,6 +86,20 @@ var (
}
attributeValuePrefixingMap = map[string]string{NeuronCoreAttributeKey: "core", NeuronDeviceAttributeKey: "device"}

uniquesDatapointsToAggregatedMetricMappings = map[string]map[string]string{
containerinsightscommon.NeuronExecutionErrors: {"generic": NeuronExecutionErrorsAggregatedMetric,
"numerical": NeuronExecutionErrorsAggregatedMetric,
"transient": NeuronExecutionErrorsAggregatedMetric,
"model": NeuronExecutionErrorsAggregatedMetric,
"runtime": NeuronExecutionErrorsAggregatedMetric,
"hardware": NeuronExecutionErrorsAggregatedMetric},
// execution_status metric will be added here incrementally
containerinsightscommon.NeuronDeviceHardwareEccEvents: {"mem_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"mem_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric},
}

MetricAttributesToKeep = map[string]struct{}{
ClusterName: {},
ContainerName: {},
Expand Down Expand Up @@ -171,6 +192,7 @@ func keepSpecificDatapointBasedOnAttribute(originalMetric pmetric.Metric, attrib
// It also creates a new metric for each datapoint based on the unique target attribute.
// example :
// in: unique_target_attribute = error_type
// and error_type: A,B,C need to be aggregated in neuron_execution_errors_total metric then
//
// neuron_execution_errors {
// datapoints : [
Expand Down Expand Up @@ -206,34 +228,43 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin
}

originalMetricDatapoints := originalMetric.Sum().DataPoints()
aggregatedValuesPerRuntimeTag := map[string]float64{}

aggregatedValuesPerRuntimeTag := map[MetricDatapointAggregationKey]float64{}
uniqueAttributeToAggregatedMetricMappings, needsAggregation := uniquesDatapointsToAggregatedMetricMappings[originalMetric.Name()]
for i := 0; i < originalMetricDatapoints.Len(); i++ {
originalDatapoint := originalMetricDatapoints.At(i)

runtimeTag, _ := originalDatapoint.Attributes().Get(RuntimeTag)
aggregatedValuesPerRuntimeTag[runtimeTag.AsString()] += originalDatapoint.DoubleValue()
deviceId, _ := originalDatapoint.Attributes().Get(NeuronDeviceAttributeKey)
uniqueAttributeValue, _ := originalDatapoint.Attributes().Get(uniqueAttribute)

// only add to the aggregation map if the datapoint to aggregated metric mappings are defined for the original metric
if needsAggregation {
aggregatedMetricName := uniqueAttributeToAggregatedMetricMappings[uniqueAttributeValue.Str()]
aggregatedValuesPerRuntimeTag[MetricDatapointAggregationKey{runtimeTag: runtimeTag.Str(), aggregatedMetricName: aggregatedMetricName, deviceId: deviceId.Str()}] += originalDatapoint.DoubleValue()
}

// Creating a new metric from the current datapoint and adding it to the new newMetricSlice
subtypeValue, _ := originalDatapoint.Attributes().Get(uniqueAttribute)
newNameMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), originalMetric.Name()+"_"+subtypeValue.Str(), originalMetric.Unit())
newNameMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), originalMetric.Name()+"_"+uniqueAttributeValue.Str(), originalMetric.Unit())
originalDatapoint.CopyTo(newNameMetric.SetEmptySum().DataPoints().AppendEmpty())
// setting value of temporality to cumulative so that agent performs delta conversion on this metric
newNameMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}

if originalMetric.Name() != containerinsightscommon.NeuronDeviceHardwareEccEvents {
// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for runtimeTag, value := range aggregatedValuesPerRuntimeTag {
// Aggregated metric for neuron device ecc events is not required
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), originalMetric.Name()+aggregatedMetricSuffix, originalMetric.Unit())
// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for aggregatedMetricMetadata, value := range aggregatedValuesPerRuntimeTag {
// Aggregated metric for neuron device ecc events is not required
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), aggregatedMetricMetadata.aggregatedMetricName, originalMetric.Unit())

originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
aggregatedMetric.Sum().DataPoints().At(0).SetDoubleValue(value)
aggregatedMetric.Sum().DataPoints().At(0).Attributes().PutStr(RuntimeTag, runtimeTag)
originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
aggregatedMetric.Sum().DataPoints().At(0).SetDoubleValue(value)
aggregatedMetric.Sum().DataPoints().At(0).Attributes().PutStr(RuntimeTag, aggregatedMetricMetadata.runtimeTag)

// setting value of temporality to cumulative so that agent performs delta conversion on this metric
aggregatedMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
if aggregatedMetricMetadata.deviceId != "" {
aggregatedMetric.Sum().DataPoints().At(0).Attributes().PutStr(NeuronDeviceAttributeKey, aggregatedMetricMetadata.deviceId)
}

// setting value of temporality to cumulative so that agent performs delta conversion on this metric
aggregatedMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}

return newMetricSlice
Expand Down Expand Up @@ -271,7 +302,7 @@ func prefixCoreAndDeviceLabels(originalMetric pmetric.Metric) {
dp := dps.At(i)
for attributeKey, attributeValuePrefix := range attributeValuePrefixingMap {
if value, exists := dp.Attributes().Get(attributeKey); exists {
dp.Attributes().PutStr(attributeKey, attributeValuePrefix+value.AsString())
dp.Attributes().PutStr(attributeKey, attributeValuePrefix+value.Str())
}
}
}
Expand Down
Loading

0 comments on commit c9fbf91

Please sign in to comment.