Skip to content

Commit

Permalink
Add pod level metric collection for Windows
Browse files Browse the repository at this point in the history
This PR defines code structure for metric provider which works on Windows.

### Changelog
1. Changed receiver.go in awscontainerinsights to run for Windows with metric provider.
2. Added summary API in kubeletclient
3. Add kubeletProvider to return metrics at different levels i.e. pod, contianer, node.
4. Updated hostInfo providers to run for Windows.
5. Updated ebsVolume Info provider to run for Windows.

## Todos:
1. Define correct ebsVolume Info provider for Windows
2. Change logic around k8s leader election to run for Windows
  • Loading branch information
KlwntSingh committed Dec 16, 2023
1 parent c00ca1f commit d575b52
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf

// When there is more than one stats point, always use the last one
curStats := GetStats(info)
metric := newCadvisorMetric(containerType, c.logger)
metric := NewCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat
expectedKey := []string{ci.DiskIOAsync, ci.DiskIOSync, ci.DiskIORead, ci.DiskIOWrite, ci.DiskIOTotal}
for _, cur := range curStatsSet {
curDevName := devName(cur)
metric := newCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric := NewCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric.tags[ci.DiskDev] = curDevName
for _, key := range expectedKey {
if curVal, curOk := cur.Stats[key]; curOk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CAdvisorMetric struct {
logger *zap.Logger
}

func newCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
func NewCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
metric := &CAdvisorMetric{
fields: make(map[string]any),
tags: make(map[string]string),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem
metrics := make([]*CAdvisorMetric, 0, len(stats.Filesystem))

for _, v := range stats.Filesystem {
metric := newCadvisorMetric(containerType, f.logger)
metric := NewCadvisorMetric(containerType, f.logger)
if v.Device == "" {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf
return metrics
}

metric := newCadvisorMetric(containerType, m.logger)
metric := NewCadvisorMetric(containerType, m.logger)
metric.cgroupPath = info.Name
curStats := GetStats(info)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

netIfceMetrics[i] = netIfceMetric

metric := newCadvisorMetric(mType, n.logger)
metric := NewCadvisorMetric(mType, n.logger)
metric.tags[ci.NetIfce] = cur.Name
for k, v := range netIfceMetric {
metric.fields[ci.MetricName(mType, k)] = v
Expand All @@ -81,7 +81,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

aggregatedFields := ci.SumFields(netIfceMetrics)
if len(aggregatedFields) > 0 {
metric := newCadvisorMetric(containerType, n.logger)
metric := NewCadvisorMetric(containerType, n.logger)
for k, v := range aggregatedFields {
metric.fields[ci.MetricName(containerType, k)] = v
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt
func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string {
// for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html

// Windows does not support file system eg: /rootfs to get Nvme Block name.
// todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device
if runtime.GOOS == "windows" {
return ""
}
hasRootFs := true
if _, err := e.osLstat(hostProc); os.IsNotExist(err) {
hasRootFs = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"os"
"runtime"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
Expand Down Expand Up @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap
opt(nc)
}

if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
ctx := context.Background()
if runtime.GOOS != "windows" {
if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx = context.WithValue(ctx, common.EnvKey, envMap)
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx := context.WithValue(context.Background(), common.EnvKey, envMap)

nc.parseCPU(ctx)
nc.parseMemory(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !linux
// +build !linux

package host

// These variables are invalid for Windows
const (
rootfs = ""
hostProc = rootfs + ""
hostMounts = hostProc + ""
)
6 changes: 0 additions & 6 deletions receiver/awscontainerinsightreceiver/internal/host/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"time"
)

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)

func hostJitter(max time.Duration) time.Duration {
hostName, err := os.Hostname()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/host/utilsconst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package host

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)
111 changes: 111 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows

import (
"context"
"errors"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
cancel context.CancelFunc
logger *zap.Logger
nodeName string `toml:"node_name"`
k8sDecorator stores.K8sDecorator
summaryProvider *kubeletSummaryProvider
hostInfo host.Info
}

func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) {
nodeName := os.Getenv("HOST_NAME")
if nodeName == "" {
return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config")
}
k8sSummaryProvider, err := new(logger, hostInfo)
if err != nil {
logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err))
return nil, err
}
return &K8sWindows{
logger: logger,
nodeName: nodeName,
k8sDecorator: *decorator,
summaryProvider: k8sSummaryProvider,
hostInfo: hostInfo,
}, nil
}

func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
k.logger.Debug("D! called K8sWindows GetMetrics")
var result []pmetric.Metrics

metrics, err := k.summaryProvider.getMetrics()
if err != nil {
k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err))
return result
}
metrics = k.decorateMetrics(metrics)
for _, k8sSummaryMetric := range metrics {
md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger)
result = append(result, md)
}

return result
}

func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)

// add version
//tags[ci.Version] = c.version

// add nodeName for node, pod and container
metricType := tags[ci.MetricType]
if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
ci.IsPod(metricType) || ci.IsContainer(metricType)) {
tags[ci.NodeNameKey] = c.nodeName
}

// add instance id and type
if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" {
tags[ci.InstanceID] = instanceID
}
if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" {
tags[ci.InstanceType] = instanceType
}

// add scaling group name
tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName()

// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
}
}
return result
}

func (k *K8sWindows) Shutdown() error {
k.logger.Debug("D! called K8sWindows Shutdown")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package k8swindows

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
}

// New is a dummy function to construct a dummy K8sWindows struct for linux
func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) {
return &K8sWindows{}, nil
}

// GetMetrics is a dummy function to always returns empty metrics for linux
func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return []pmetric.Metrics{}
}

func (k *K8sWindows) Shutdown() error {
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows

import (
"fmt"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"

"go.uber.org/zap"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)

type kubeletSummaryProvider struct {
logger *zap.Logger
hostIP string
hostPort string
client *kubeletutil.KubeletClient
hostInfo host.Info
}

func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) {
hostIP := os.Getenv("HOST_IP")
kclient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize kubelet client: %w", err)
}
return &kubeletSummaryProvider{
logger: logger,
client: kclient,
hostInfo: info,
}, nil
}

func (k *kubeletSummaryProvider) getMetrics() ([]*extractors.CAdvisorMetric, error) {
summary, err := k.client.Summary(k.logger)
if err != nil {
k.logger.Error("kubelet summary API failed, ", zap.Error(err))
return nil, err
}

return k.getPodMetrics(summary)
}

func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) {
var metrics []*extractors.CAdvisorMetric
// todo: implement CPU, memory metrics from containers
return metrics, nil
}

func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) {
// todo: This is not complete implementation of pod level metric collection since network level metrics are pending
// May need to add some more pod level labels for store decorators to work properly

var metrics []*extractors.CAdvisorMetric

nodeCPUCores := k.hostInfo.GetNumCores()
for _, pod := range summary.Pods {
k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name))
metric := extractors.NewCadvisorMetric(ci.TypePod, k.logger)

metric.AddField(ci.PodIDKey, pod.PodRef.UID)
metric.AddField(ci.K8sPodNameKey, pod.PodRef.Name)
metric.AddField(ci.K8sNamespace, pod.PodRef.Namespace)

// CPU metric
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), *pod.CPU.UsageCoreNanoSeconds)
metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores))

// Memory metrics
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes)
metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity())
metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100)
metrics = append(metrics, metric)
}
return metrics, nil
}

func (k *kubeletSummaryProvider) getNodeMetrics() ([]*extractors.CAdvisorMetric, error) {
var metrics []*extractors.CAdvisorMetric
//todo: Implement CPU, memory and network metrics at node
return metrics, nil
}
Loading

0 comments on commit d575b52

Please sign in to comment.