Skip to content

Commit

Permalink
Filter attributes neuron metrics (#1262)
Browse files Browse the repository at this point in the history
This change adds per-type emf attribute filtering to AWSNeuron EMF metrics
  • Loading branch information
aditya-purang authored Aug 7, 2024
1 parent 5a46112 commit 55a3d88
Show file tree
Hide file tree
Showing 8 changed files with 684 additions and 217 deletions.
1 change: 1 addition & 0 deletions internal/containerinsightscommon/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PodOwnersKey = "PodOwners"
HostKey = "host"
K8sKey = "kubernetes"
K8sLabelsKey = "labels"

RunningPodCount = "number_of_running_pods"
RunningContainerCount = "number_of_running_containers"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"regexp"
)

const (
PROCESSED_NEURON_METRIC_PATTERN = "^(container|node|pod)_(neuroncore_|neurondevice_).*|^node_neuron_.*"
)

type AwsNeuronMetricChecker struct {
}

func NewAwsNeuronMetricChecker() *AwsNeuronMetricChecker {
return &AwsNeuronMetricChecker{}
}

func (md *AwsNeuronMetricChecker) IsProcessedNeuronMetric(name string) bool {
matched, err := regexp.MatchString(PROCESSED_NEURON_METRIC_PATTERN, name)
if err != nil {
print(err)
return false
}
return matched
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package internal

import (
"testing"
)

func TestAwsNeuronMetricModifier_IsProcessedNeuronMetric(t *testing.T) {
tests := []struct {
name string
input string
expected bool
}{
{
name: "container_neuroncore_prefix",
input: "container_neuroncore_metric",
expected: true,
},
{
name: "pod_neuroncore_prefix",
input: "pod_neuroncore_metric",
expected: true,
},
{
name: "node_neuroncore_prefix",
input: "node_neuroncore_metric",
expected: true,
},
{
name: "container_neurondevice_prefix",
input: "container_neurondevice_metric",
expected: true,
},
{
name: "pod_neurondevice_prefix",
input: "pod_neurondevice_metric",
expected: true,
},
{
name: "node_neurondevice_prefix",
input: "node_neurondevice_metric",
expected: true,
},
{
name: "node_neuron_prefix",
input: "node_neuron_metric",
expected: true,
},
{
name: "container_neuron_prefix",
input: "container_neuron_metric",
expected: false,
},
{
name: "other_prefix",
input: "other_metric",
expected: false,
},
}

md := NewAwsNeuronMetricChecker()

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := md.IsProcessedNeuronMetric(test.input)
if result != test.expected {
t.Errorf("IsProcessedNeuronMetric(%q) = %v, expected %v", test.input, result, test.expected)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package internal
import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -99,26 +98,6 @@ var (
"sram_ecc_corrected": NeuronDeviceHardwareEccEventsAggregatedMetric,
"sram_ecc_uncorrected": NeuronDeviceHardwareEccEventsAggregatedMetric},
}

MetricAttributesToKeep = map[string]struct{}{
ClusterName: {},
ContainerName: {},
FullPodName: {},
InstanceId: {},
InstanceType: {},
K8sPodName: {},
Namespace: {},
NeuronDevice: {},
NodeName: {},
PodName: {},
Service: {},
AvailabilityZone: {},
Kubernetes: {},
Region: {},
RuntimeTag: {},
SubnetId: {},
NeuronCore: {},
}
)

func NewMetricModifier(logger *zap.Logger) *AwsNeuronMetricModifier {
Expand Down Expand Up @@ -156,7 +135,6 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
}

modifiedMetricSlice := md.extractDatapointsAsMetricsAndAggregate(originalMetric)
filterLabels(modifiedMetricSlice, originalMetricName)
md.duplicateMetrics(modifiedMetricSlice, originalMetricName, originalMetric.Sum().DataPoints(), metrics)
}

Expand Down Expand Up @@ -251,7 +229,6 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin

// Creating body for the aggregated metric and add it to the new newMetricSlice for each runtime
for aggregatedMetricMetadata, value := range aggregatedValuesPerRuntimeTag {
// Aggregated metric for neuron device ecc events is not required
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), aggregatedMetricMetadata.aggregatedMetricName, originalMetric.Unit())

originalMetricDatapoints.At(0).CopyTo(aggregatedMetric.SetEmptySum().DataPoints().AppendEmpty())
Expand All @@ -269,30 +246,6 @@ func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(origin
return newMetricSlice
}

// This method removes the attribute keys which are not required. The removal is necessary so that the metrics are grouped together
func filterLabels(slice pmetric.MetricSlice, originalMetricName string) {
_, exists := metricModificationsMap[originalMetricName]
if !exists {
return
}

for i := 0; i < slice.Len(); i++ {
m := slice.At(i)

dps := m.Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
attributes := dps.At(j).Attributes()
attributes.RemoveIf(func(label string, value pcommon.Value) bool {
_, exists := MetricAttributesToKeep[label]
if !exists {
return true
}
return false
})
}
}
}

// This method prefixes NeuronCore and NeuronDevice values with `core` and `device` respectively
// to make the attribute values more verbose
func prefixCoreAndDeviceLabels(originalMetric pmetric.Metric) {
Expand Down
Loading

0 comments on commit 55a3d88

Please sign in to comment.