Skip to content

Commit

Permalink
Add kubelet summary API for Windows (amazon-contributing#142)
Browse files Browse the repository at this point in the history
* Add pod level metric collection for Windows

This PR defines code structure for metric provider which works on Windows.

### Changelog
1. Changed receiver.go in awscontainerinsights to run for Windows with metric provider.
2. Added summary API in kubeletclient
3. Add kubeletProvider to return metrics at different levels i.e. pod, contianer, node.
4. Updated hostInfo providers to run for Windows.
5. Updated ebsVolume Info provider to run for Windows.

## Todos:
1. Define correct ebsVolume Info provider for Windows
2. Change logic around k8s leader election to run for Windows
  • Loading branch information
KlwntSingh authored Dec 19, 2023
1 parent c00ca1f commit 4edef80
Show file tree
Hide file tree
Showing 23 changed files with 361 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ require (
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.100.1
k8s.io/kubelet v0.28.3
)

require (
Expand Down Expand Up @@ -214,7 +216,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
2 changes: 2 additions & 0 deletions receiver/awscontainerinsightreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf

// When there is more than one stats point, always use the last one
curStats := GetStats(info)
metric := newCadvisorMetric(containerType, c.logger)
metric := NewCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat
expectedKey := []string{ci.DiskIOAsync, ci.DiskIOSync, ci.DiskIORead, ci.DiskIOWrite, ci.DiskIOTotal}
for _, cur := range curStatsSet {
curDevName := devName(cur)
metric := newCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric := NewCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric.tags[ci.DiskDev] = curDevName
for _, key := range expectedKey {
if curVal, curOk := cur.Stats[key]; curOk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CAdvisorMetric struct {
logger *zap.Logger
}

func newCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
func NewCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
metric := &CAdvisorMetric{
fields: make(map[string]any),
tags: make(map[string]string),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem
metrics := make([]*CAdvisorMetric, 0, len(stats.Filesystem))

for _, v := range stats.Filesystem {
metric := newCadvisorMetric(containerType, f.logger)
metric := NewCadvisorMetric(containerType, f.logger)
if v.Device == "" {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf
return metrics
}

metric := newCadvisorMetric(containerType, m.logger)
metric := NewCadvisorMetric(containerType, m.logger)
metric.cgroupPath = info.Name
curStats := GetStats(info)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

netIfceMetrics[i] = netIfceMetric

metric := newCadvisorMetric(mType, n.logger)
metric := NewCadvisorMetric(mType, n.logger)
metric.tags[ci.NetIfce] = cur.Name
for k, v := range netIfceMetric {
metric.fields[ci.MetricName(mType, k)] = v
Expand All @@ -81,7 +81,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

aggregatedFields := ci.SumFields(netIfceMetrics)
if len(aggregatedFields) > 0 {
metric := newCadvisorMetric(containerType, n.logger)
metric := NewCadvisorMetric(containerType, n.logger)
for k, v := range aggregatedFields {
metric.fields[ci.MetricName(containerType, k)] = v
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt
func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string {
// for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html

// Windows does not support file system eg: /rootfs to get Nvme Block name.
// todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device
if runtime.GOOS == "windows" {
return ""
}
hasRootFs := true
if _, err := e.osLstat(hostProc); os.IsNotExist(err) {
hasRootFs = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"os"
"runtime"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
Expand Down Expand Up @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap
opt(nc)
}

if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
ctx := context.Background()
if runtime.GOOS != "windows" {
if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx = context.WithValue(ctx, common.EnvKey, envMap)
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx := context.WithValue(context.Background(), common.EnvKey, envMap)

nc.parseCPU(ctx)
nc.parseMemory(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !linux
// +build !linux

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

// These variables are invalid for Windows
const (
rootfs = ""
hostProc = rootfs + ""
hostMounts = hostProc + ""
)
6 changes: 0 additions & 6 deletions receiver/awscontainerinsightreceiver/internal/host/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"time"
)

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)

func hostJitter(max time.Duration) time.Duration {
hostName, err := os.Hostname()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/host/utilsconst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)
111 changes: 111 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"context"
"errors"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
cancel context.CancelFunc
logger *zap.Logger
nodeName string `toml:"node_name"`
k8sDecorator stores.K8sDecorator
summaryProvider *kubeletSummaryProvider
hostInfo host.Info
}

func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) {
nodeName := os.Getenv("HOST_NAME")
if nodeName == "" {
return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config")
}
k8sSummaryProvider, err := new(logger, hostInfo)
if err != nil {
logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err))
return nil, err
}
return &K8sWindows{
logger: logger,
nodeName: nodeName,
k8sDecorator: *decorator,
summaryProvider: k8sSummaryProvider,
hostInfo: hostInfo,
}, nil
}

func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
k.logger.Debug("D! called K8sWindows GetMetrics")
var result []pmetric.Metrics

metrics, err := k.summaryProvider.getMetrics()
if err != nil {
k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err))
return result
}
metrics = k.decorateMetrics(metrics)
for _, k8sSummaryMetric := range metrics {
md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger)
result = append(result, md)
}

return result
}

func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)

// add version
//tags[ci.Version] = c.version

// add nodeName for node, pod and container
metricType := tags[ci.MetricType]
if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
ci.IsPod(metricType) || ci.IsContainer(metricType)) {
tags[ci.NodeNameKey] = c.nodeName
}

// add instance id and type
if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" {
tags[ci.InstanceID] = instanceID
}
if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" {
tags[ci.InstanceType] = instanceType
}

// add scaling group name
tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName()

// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
}
}
return result
}

func (k *K8sWindows) Shutdown() error {
k.logger.Debug("D! called K8sWindows Shutdown")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
}

// New is a dummy function to construct a dummy K8sWindows struct for linux
func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) {
return &K8sWindows{}, nil
}

// GetMetrics is a dummy function to always returns empty metrics for linux
func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return []pmetric.Metrics{}
}

func (k *K8sWindows) Shutdown() error {
return nil
}
Loading

0 comments on commit 4edef80

Please sign in to comment.