Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dcgm scraper to collect nvidia GPU metrics #160

Merged
merged 17 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .chloggen-aws/nvidia-gpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: containerinsightsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds DCGM scraper to collect NVIDIA GPU metrics"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [160]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Supports NVIDIA GPU metrics by adding a new prometheus data scraper in a k8s environment. The new scraper |
relabels the default DCGM labels into existing Container Insights labels.

# e.g. '[aws]'
# Include 'aws' if the change is done by cwa
# Default: '[user]'
change_logs: [aws]
3 changes: 2 additions & 1 deletion exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
}

if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") ||
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
Expand Down
20 changes: 18 additions & 2 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
// need to have 1 more metric than the default because the first is not going to be retained because it is a delta metric
containerInsightMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
containerInsightMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKubeAPIServerScraper")
gpuMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
gpuMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsDCGMExporterScraper")

counterSumMetrics := map[string]*metricInfo{
"spanCounter": {
Expand Down Expand Up @@ -368,12 +370,26 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
"spanName": "testSpan",
},
map[string]string{
(oTellibDimensionKey): "cloudwatch-lib",
"spanName": "testSpan",
oTellibDimensionKey: "cloudwatch-lib",
"spanName": "testSpan",
},
"myServiceNS/containerInsightsKubeAPIServerScraper",
containerInsightsReceiver,
},
{
"dcgm receiver",
gpuMetric,
map[string]string{
"isItAnError": "false",
"spanName": "testSpan",
},
map[string]string{
oTellibDimensionKey: "cloudwatch-lib",
"spanName": "testSpan",
},
"myServiceNS/containerInsightsDCGMExporterScraper",
containerInsightsReceiver,
},
}

for _, tc := range testCases {
Expand Down
4 changes: 4 additions & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ const (
// Special type for pause container
// because containerd does not set container name pause container name to POD like docker does.
TypeInfraContainer = "InfraContainer"
TypeGpuContainer = "ContainerGPU"
TypeGpuPod = "PodGPU"
TypeGpuNode = "NodeGPU"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeGpuNode, and TypeGpuCluster are nowhere used, do we need this?

Copy link
Author

@movence movence Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that they are not currently being used. The intention is to use them for GPU count metrics.

TypeGpuCluster = "ClusterGPU"

// unit
UnitBytes = "Bytes"
Expand Down
18 changes: 9 additions & 9 deletions internal/aws/containerinsight/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ const (
EKS = "eks"
KubeSecurePort = "10250"

// attribute names
Kubernetes = "kubernetes"
K8sNamespace = "Namespace"
PodIDKey = "PodId"
PodNameKey = "PodName"
FullPodNameKey = "FullPodName"
K8sPodNameKey = "K8sPodName"
ContainerNamekey = "ContainerName"
ContainerIDkey = "ContainerId"
AttributeKubernetes = "kubernetes"
AttributeK8sNamespace = "Namespace"
AttributePodID = "PodId"
AttributePodName = "PodName"
AttributeFullPodName = "FullPodName"
AttributeK8sPodName = "K8sPodName"
AttributeContainerName = "ContainerName"
AttributeContainerID = "ContainerId"
AttributeGpuDevice = "GpuDevice"

PodStatus = "pod_status"
ContainerStatus = "container_status"
Expand Down
43 changes: 43 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,49 @@ func GetUnitForMetric(metric string) string {
return metricToUnitMap[metric]
}

type FieldsAndTagsPair struct {
Fields map[string]any
Tags map[string]string
}

// ConvertToFieldsAndTags converts OTLP metric to a field containing metric values and a tag containing for decoration
func ConvertToFieldsAndTags(m pmetric.Metric, logger *zap.Logger) []FieldsAndTagsPair {
var converted []FieldsAndTagsPair
if m.Name() == "" {
return converted
}

var dps pmetric.NumberDataPointSlice
switch m.Type() {
case pmetric.MetricTypeGauge:
dps = m.Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = m.Sum().DataPoints()
default:
logger.Warn("Unsupported metric type", zap.String("metric", m.Name()), zap.String("type", m.Type().String()))
}

if dps.Len() == 0 {
logger.Warn("Metric has no datapoint", zap.String("metric", m.Name()))
}

for i := 0; i < dps.Len(); i++ {
tags := make(map[string]string)
attrs := dps.At(i).Attributes()
attrs.Range(func(k string, v pcommon.Value) bool {
tags[k] = v.AsString()
return true
})
converted = append(converted, FieldsAndTagsPair{
Fields: map[string]any{
m.Name(): nil, // metric value not needed for attribute decoration
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to have this limitation (plus only supporting gauge and sum), then I think just move this helper method to the gpu decorator (it's only used from there right now anyway). Having it here in this common place implies that it should work for any OTLP metric.

},
Tags: tags,
})
}
return converted
}

// ConvertToOTLPMetrics converts a field containing metric values and a tag containing the relevant labels to OTLP metrics
func ConvertToOTLPMetrics(fields map[string]any, tags map[string]string, logger *zap.Logger) pmetric.Metrics {
md := pmetric.NewMetrics()
Expand Down
3 changes: 3 additions & 0 deletions receiver/awscontainerinsightreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ type Config struct {
// EnableControlPlaneMetrics enables additional metrics sourced from the Kubernetes API server /metrics prometheus endpoint
// The default value is false.
EnableControlPlaneMetrics bool `mapstructure:"enable_control_plane_metrics"`

// EnableAcceleratedComputeMetrics enabled features with accelerated compute resources where metrics are scraped from vendor specific sources
EnableAcceleratedComputeMetrics bool `mapstructure:"accelerated_compute_metrics"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

// The amount of time for which to keep stats in memory.
Expand Down Expand Up @@ -109,7 +110,7 @@ type EcsInfo interface {
}

type Decorator interface {
Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric
Decorate(stores.CIMetric) stores.CIMetric
Shutdown() error
}

Expand Down Expand Up @@ -197,7 +198,7 @@ func (c *Cadvisor) addEbsVolumeInfo(tags map[string]string, ebsVolumeIdsUsedAsPV
}
}

func (c *Cadvisor) addECSMetrics(cadvisormetrics []*extractors.CAdvisorMetric) {
func (c *Cadvisor) addECSMetrics(cadvisormetrics []*stores.RawContainerInsightsMetric) {

if len(cadvisormetrics) == 0 {
c.logger.Warn("cadvisor can't collect any metrics!")
Expand Down Expand Up @@ -256,9 +257,9 @@ func addECSResources(tags map[string]string) {
}
}

func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
func (c *Cadvisor) decorateMetrics(cadvisormetrics []*stores.RawContainerInsightsMetric) []*stores.RawContainerInsightsMetric {
ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
var result []*stores.RawContainerInsightsMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)
Expand Down Expand Up @@ -307,7 +308,7 @@ func (c *Cadvisor) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric)

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
result = append(result, out.(*stores.RawContainerInsightsMetric))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

type mockCadvisorManager struct {
Expand Down Expand Up @@ -73,7 +73,7 @@ var mockCreateManagerWithError = func(memoryCache *memory.InMemoryCache, sysfs s
type MockK8sDecorator struct {
}

func (m *MockK8sDecorator) Decorate(metric *extractors.CAdvisorMetric) *extractors.CAdvisorMetric {
func (m *MockK8sDecorator) Decorate(metric stores.CIMetric) stores.CIMetric {
return metric
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

// cadvisor doesn't support windows, define the dummy functions
Expand All @@ -26,7 +26,7 @@ type Cadvisor struct {
}

type Decorator interface {
Decorate(*extractors.CAdvisorMetric) *extractors.CAdvisorMetric
Decorate(*stores.RawContainerInsightsMetric) *stores.RawContainerInsightsMetric
Shutdown() error
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

const (
Expand All @@ -38,8 +39,8 @@ type podKey struct {
namespace string
}

func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) []*extractors.CAdvisorMetric {
var metrics []*extractors.CAdvisorMetric
func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) []*stores.RawContainerInsightsMetric {
var metrics []*stores.RawContainerInsightsMetric
podKeys := make(map[string]podKey)

// first iteration of container infos processes individual container info and
Expand Down Expand Up @@ -88,8 +89,8 @@ func processContainers(cInfos []*cInfo.ContainerInfo, mInfo extractors.CPUMemInf
}

// processContainers get metrics for individual container and gather information for pod so we can look it up later.
func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) ([]*extractors.CAdvisorMetric, *podKey, error) {
var result []*extractors.CAdvisorMetric
func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, containerOrchestrator string, logger *zap.Logger) ([]*stores.RawContainerInsightsMetric, *podKey, error) {
var result []*stores.RawContainerInsightsMetric
var pKey *podKey

if isContainerInContainer(info.Name) {
Expand Down Expand Up @@ -123,9 +124,9 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
podPath := path.Dir(info.Name)
pKey = &podKey{cgroupPath: podPath, podName: podName, podID: podID, namespace: namespace}

tags[ci.PodIDKey] = podID
tags[ci.K8sPodNameKey] = podName
tags[ci.K8sNamespace] = namespace
tags[ci.AttributePodID] = podID
tags[ci.AttributeK8sPodName] = podName
tags[ci.AttributeK8sNamespace] = namespace
switch containerName {
// For docker, pause container name is set to POD while containerd does not set it.
// See https://github.com/aws/amazon-cloudwatch-agent/issues/188
Expand All @@ -134,9 +135,9 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
// other pod info like CPU, Mem are dealt within in processPod.
containerType = ci.TypeInfraContainer
default:
tags[ci.ContainerNamekey] = containerName
tags[ci.AttributeContainerName] = containerName
containerID := path.Base(info.Name)
tags[ci.ContainerIDkey] = containerID
tags[ci.AttributeContainerID] = containerID
pKey.containerIds = []string{containerID}
containerType = ci.TypeContainer
// TODO(pvasir): wait for upstream fix https://github.com/google/cadvisor/issues/2785
Expand Down Expand Up @@ -165,8 +166,8 @@ func processContainer(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProv
return result, pKey, nil
}

func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, podKeys map[string]podKey, logger *zap.Logger) []*extractors.CAdvisorMetric {
var result []*extractors.CAdvisorMetric
func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider, podKeys map[string]podKey, logger *zap.Logger) []*stores.RawContainerInsightsMetric {
var result []*stores.RawContainerInsightsMetric
if isContainerInContainer(info.Name) {
logger.Debug("drop metric because it's nested container", zap.String("name", info.Name))
return result
Expand All @@ -178,9 +179,9 @@ func processPod(info *cInfo.ContainerInfo, mInfo extractors.CPUMemInfoProvider,
}

tags := map[string]string{}
tags[ci.PodIDKey] = podKey.podID
tags[ci.K8sPodNameKey] = podKey.podName
tags[ci.K8sNamespace] = podKey.namespace
tags[ci.AttributePodID] = podKey.podID
tags[ci.AttributeK8sPodName] = podKey.podName
tags[ci.AttributeK8sNamespace] = podKey.namespace

tags[ci.Timestamp] = strconv.FormatInt(extractors.GetStats(info).Timestamp.UnixNano(), 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

const (
Expand All @@ -24,29 +25,29 @@ func (c *CPUMetricExtractor) HasValue(info *cInfo.ContainerInfo) bool {
return info.Spec.HasCpu
}

func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInfoProvider, containerType string) []*CAdvisorMetric {
var metrics []*CAdvisorMetric
func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInfoProvider, containerType string) []*stores.RawContainerInsightsMetric {
var metrics []*stores.RawContainerInsightsMetric
// Skip infra container and handle node, pod, other containers in pod
if containerType == ci.TypeInfraContainer {
return metrics
}

// When there is more than one stats point, always use the last one
curStats := GetStats(info)
metric := newCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
metric := stores.NewRawContainerInsightsMetric(containerType, c.logger)
metric.ContainerName = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUUser), info.Name, float64(curStats.Cpu.Usage.User), curStats.Timestamp, multiplier)
assignRateValueToField(&c.rateCalculator, metric.Fields, ci.MetricName(containerType, ci.CPUSystem), info.Name, float64(curStats.Cpu.Usage.System), curStats.Timestamp, multiplier)

numCores := mInfo.GetNumCores()
if metric.fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 {
metric.fields[ci.MetricName(containerType, ci.CPUUtilization)] = metric.fields[ci.MetricName(containerType, ci.CPUTotal)].(float64) / float64(numCores*decimalToMillicores) * 100
if metric.Fields[ci.MetricName(containerType, ci.CPUTotal)] != nil && numCores != 0 {
metric.Fields[ci.MetricName(containerType, ci.CPUUtilization)] = metric.Fields[ci.MetricName(containerType, ci.CPUTotal)].(float64) / float64(numCores*decimalToMillicores) * 100
}

if containerType == ci.TypeNode || containerType == ci.TypeInstance {
metric.fields[ci.MetricName(containerType, ci.CPULimit)] = numCores * decimalToMillicores
metric.Fields[ci.MetricName(containerType, ci.CPULimit)] = numCores * decimalToMillicores
}

metrics = append(metrics, metric)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/testutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"
)

func TestCPUStats(t *testing.T) {
Expand All @@ -22,7 +23,7 @@ func TestCPUStats(t *testing.T) {
containerType := containerinsight.TypeContainer
extractor := NewCPUMetricExtractor(nil)

var cMetrics []*CAdvisorMetric
var cMetrics []*stores.RawContainerInsightsMetric
if extractor.HasValue(result[0]) {
cMetrics = extractor.GetValue(result[0], MockCPUMemInfo, containerType)
}
Expand Down
Loading
Loading