Skip to content

Commit

Permalink
Add dcgm scraper to collect nvidia GPU metrics (amazon-contributing#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
movence authored Mar 1, 2024
1 parent d6cc539 commit d3bf111
Show file tree
Hide file tree
Showing 40 changed files with 1,416 additions and 447 deletions.
24 changes: 24 additions & 0 deletions .chloggen-aws/nvidia-gpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: containerinsightsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds DCGM scraper to collect NVIDIA GPU metrics"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [160]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Supports NVIDIA GPU metrics by adding a new prometheus data scraper in a k8s environment. The new scraper |
relabels the default DCGM labels into existing Container Insights labels.

# e.g. '[aws]'
# Include 'aws' if the change is done by cwa
# Default: '[user]'
change_logs: [aws]
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
}

if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
Expand Down
20 changes: 18 additions & 2 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
// need to have 1 more metric than the default because the first is not going to be retained because it is a delta metric
containerInsightMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
containerInsightMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKubeAPIServerScraper")
gpuMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
gpuMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsDCGMExporterScraper")

counterSumMetrics := map[string]*metricInfo{
"spanCounter": {
Expand Down Expand Up @@ -368,12 +370,26 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
"spanName": "testSpan",
},
map[string]string{
(oTellibDimensionKey): "cloudwatch-lib",
"spanName": "testSpan",
oTellibDimensionKey: "cloudwatch-lib",
"spanName": "testSpan",
},
"myServiceNS/containerInsightsKubeAPIServerScraper",
containerInsightsReceiver,
},
{
"dcgm receiver",
gpuMetric,
map[string]string{
"isItAnError": "false",
"spanName": "testSpan",
},
map[string]string{
oTellibDimensionKey: "cloudwatch-lib",
"spanName": "testSpan",
},
"myServiceNS/containerInsightsDCGMExporterScraper",
containerInsightsReceiver,
},
}

for _, tc := range testCases {
Expand Down
4 changes: 4 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ const (
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
TypeGpuContainer = "ContainerGPU"
TypeGpuPod = "PodGPU"
TypeGpuNode = "NodeGPU"
TypeGpuCluster = "ClusterGPU"

// unit
UnitBytes = "Bytes"
Expand Down
18 changes: 9 additions & 9 deletions internal/aws/containerinsight/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ const (
EKS = "eks"
KubeSecurePort = "10250"

// attribute names
Kubernetes = "kubernetes"
K8sNamespace = "Namespace"
PodIDKey = "PodId"
PodNameKey = "PodName"
FullPodNameKey = "FullPodName"
K8sPodNameKey = "K8sPodName"
ContainerNamekey = "ContainerName"
ContainerIDkey = "ContainerId"
AttributeKubernetes = "kubernetes"
AttributeK8sNamespace = "Namespace"
AttributePodID = "PodId"
AttributePodName = "PodName"
AttributeFullPodName = "FullPodName"
AttributeK8sPodName = "K8sPodName"
AttributeContainerName = "ContainerName"
AttributeContainerID = "ContainerId"
AttributeGpuDevice = "GpuDevice"

PodStatus = "pod_status"
ContainerStatus = "container_status"
Expand Down
43 changes: 43 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,49 @@ func GetUnitForMetric(metric string) string {
return metricToUnitMap[metric]
}

type FieldsAndTagsPair struct {
Fields map[string]any
Tags map[string]string
}

// ConvertToFieldsAndTags converts OTLP metric to a field containing metric values and a tag containing for decoration
func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) []FieldsAndTagsPair {
var converted []FieldsAndTagsPair
if m.Name() == "" {
return converted
}

var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
default:
logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String()))
}

if dps.Len() == 0 {
logger.Warn("Metric has no datapoint", zap.String("metric", m.Name()))
}

for i := 0; i < dps.Len(); i++ {
tags := make(map[string]string)
attrs := dps.At(i).Attributes()
attrs.Range(func(k string, v pcommon.Value) bool {
tags[k] = v.AsString()
return true
})
converted = append(converted, FieldsAndTagsPair{
Fields: map[string]any{
m.Name(): nil, // metric value not needed for attribute decoration
},
Tags: tags,
})
}
return converted
}

// ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics
func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics {
md := pmetric.NewMetrics()
Expand Down
3 changes: 3 additions & 0 deletions receiver/awscontainerinsightreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ type Config struct {
// EnableControlPlaneMetrics enables additional metrics sourced from the Kubernetes API server /metrics prometheus endpoint
// The default value is false.
EnableControlPlaneMetrics bool `mapstructure:"enable_control_plane_metrics"`

// EnableAcceleratedComputeMetrics enabled features with accelerated compute resources where metrics are scraped from vendor specific sources
EnableAcceleratedComputeMetrics bool `mapstructure:"accelerated_compute_metrics"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

// The amount of time for which to keep stats in memory.
Expand Down Expand Up @@ -109,7 +110,7 @@ type EcsInfo interface {
}

type Decorator interface {
Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric
Decorate(stores.CIMetric) stores.CIMetric
Shutdown() error
}

Expand Down Expand Up @@ -197,7 +198,7 @@ func (c *Cadvisor) addEbsVolumeInfo(tags map[string]string, ebsVolumeIdsUsedAsPV
}
}

func (c *Cadvisor) addECSMetrics(cadvisormetrics []*extractors.CAdvisorMetric) {
func (c *Cadvisor) addECSMetrics(cadvisormetrics []*stores.RawContainerInsightsMetric) {

if len(cadvisormetrics) == 0 {
c.logger.Warn("cadvisor can't collect any metrics!")
Expand Down Expand Up @@ -256,9 +257,9 @@ func addECSResources(tags map[string]string) {
}
}

func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
func (c *Cadvisor) decorateMetrics(cadvisormetrics []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric {
ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
var result []*stores.RawContainerInsightsMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)
Expand Down Expand Up @@ -307,7 +308,7 @@ func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric)

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
result = append(result, out.(*stores.RawContainerInsightsMetric))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

type mockCadvisorManager struct {
Expand Down Expand Up @@ -73,7 +73,7 @@ var mockCreateManagerWithError = func(memoryCache *memory.InMemoryCache, sysfs s
type MockK8sDecorator struct {
}

func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric {
func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric {
return metric
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

// cadvisor doesn't support windows, define the dummy functions
Expand All @@ -26,7 +26,7 @@ type Cadvisor struct {
}

type Decorator interface {
Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric
Decorate(*stores.RawContainerInsightsMetric) *stores.RawContainerInsightsMetric
Shutdown() error
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

const (
Expand All @@ -38,8 +39,8 @@ type podKey struct {
namespace string
}

func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) []*extractors.CAdvisorMetric {
var metrics []*extractors.CAdvisorMetric
func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) []*stores.RawContainerInsightsMetric {
var metrics []*stores.RawContainerInsightsMetric
podKeys := make(map[string]podKey)

// first iteration of container infos processes individual container info and
Expand Down Expand Up @@ -88,8 +89,8 @@ func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInf
}

// processContainers get metrics for individual container and gather information for pod so we can look it up later.
func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) ([]*extractors.CAdvisorMetric, *podKey, error) {
var result []*extractors.CAdvisorMetric
func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) ([]*stores.RawContainerInsightsMetric, *podKey, error) {
var result []*stores.RawContainerInsightsMetric
var pKey *podKey

if isContainerInContainer(info.Name) {
Expand Down Expand Up @@ -123,9 +124,9 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
podPath := path.Dir(info.Name)
pKey = &podKey{cgroupPath: podPath, podName: podName, podID: podID, namespace: namespace}

tags[ci.PodIDKey] = podID
tags[ci.K8sPodNameKey] = podName
tags[ci.K8sNamespace] = namespace
tags[ci.AttributePodID] = podID
tags[ci.AttributeK8sPodName] = podName
tags[ci.AttributeK8sNamespace] = namespace
switch containerName {
// For docker, pause container name is set to POD while containerd does not set it.
// See https://github.com/aws/amazon-cloudwatch-agent/issues/188
Expand All @@ -134,9 +135,9 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
// other pod info like CPU, Mem are dealt within in processPod.
containerType = ci.TypeInfraContainer
default:
tags[ci.ContainerNamekey] = containerName
tags[ci.AttributeContainerName] = containerName
containerID := path.Base(info.Name)
tags[ci.ContainerIDkey] = containerID
tags[ci.AttributeContainerID] = containerID
pKey.containerIds = []string{containerID}
containerType = ci.TypeContainer
// TODO(pvasir): wait for upstream fix https://github.com/google/cadvisor/issues/2785
Expand Down Expand Up @@ -165,8 +166,8 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
return result, pKey, nil
}

func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, podKeys map[string]podKey, logger *zap.Logger) []*extractors.CAdvisorMetric {
var result []*extractors.CAdvisorMetric
func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, podKeys map[string]podKey, logger *zap.Logger) []*stores.RawContainerInsightsMetric {
var result []*stores.RawContainerInsightsMetric
if isContainerInContainer(info.Name) {
logger.Debug("drop metric because it's nested container", zap.String("name", info.Name))
return result
Expand All @@ -178,9 +179,9 @@ func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider,
}

tags := map[string]string{}
tags[ci.PodIDKey] = podKey.podID
tags[ci.K8sPodNameKey] = podKey.podName
tags[ci.K8sNamespace] = podKey.namespace
tags[ci.AttributePodID] = podKey.podID
tags[ci.AttributeK8sPodName] = podKey.podName
tags[ci.AttributeK8sNamespace] = podKey.namespace

tags[ci.Timestamp] = strconv.FormatInt(extractors.GetStats(info).Timestamp.UnixNano(), 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

const (
Expand All @@ -24,29 +25,29 @@ func (c *CPUMetricExtractor) HasValue(info *cInfo.ContainerInfo) bool {
return info.Spec.HasCpu
}

func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInfoProvider, containerType string) []*CAdvisorMetric {
var metrics []*CAdvisorMetric
func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInfoProvider, containerType string) []*stores.RawContainerInsightsMetric {
var metrics []*stores.RawContainerInsightsMetric
// Skip infra container and handle node, pod, other containers in pod
if containerType == ci.TypeInfraContainer {
return metrics
}

// When there is more than one stats point, always use the last one
curStats := GetStats(info)
metric := newCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
metric := stores.NewRawContainerInsightsMetric(containerType, c.logger)
metric.ContainerName = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)

numCores := mInfo.GetNumCores()
if metric.fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 {
metric.fields[ci.MetricName(containerType, ci.CPUUtilization)] = metric.fields[ci.MetricName(containerType, ci.CPUTotal)].(float64) / float64(numCores*decimalToMillicores) * 100
if metric.Fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 {
metric.Fields[ci.MetricName(containerType, ci.CPUUtilization)] = metric.Fields[ci.MetricName(containerType, ci.CPUTotal)].(float64) / float64(numCores*decimalToMillicores) * 100
}

if containerType == ci.TypeNode || containerType == ci.TypeInstance {
metric.fields[ci.MetricName(containerType, ci.CPULimit)] = numCores * decimalToMillicores
metric.Fields[ci.MetricName(containerType, ci.CPULimit)] = numCores * decimalToMillicores
}

metrics = append(metrics, metric)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

func TestCPUStats(t *testing.T) {
Expand All @@ -22,7 +23,7 @@ func TestCPUStats(t *testing.T) {
containerType := containerinsight.TypeContainer
extractor := NewCPUMetricExtractor(nil)

var cMetrics []*CAdvisorMetric
var cMetrics []*stores.RawContainerInsightsMetric
if extractor.HasValue(result[0]) {
cMetrics = extractor.GetValue(result[0], MockCPUMemInfo, containerType)
}
Expand Down
Loading

0 comments on commit d3bf111

Please sign in to comment.