Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubelet summary API for Windows #142

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ require (
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubelet v0.28.3 // indirect
k8s.io/kubelet v0.28.4 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/controller-runtime v0.16.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ require (
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.100.1
k8s.io/kubelet v0.28.3
)

require (
Expand Down Expand Up @@ -214,7 +216,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
2 changes: 2 additions & 0 deletions receiver/awscontainerinsightreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf

// When there is more than one stats point, always use the last one
curStats := GetStats(info)
metric := newCadvisorMetric(containerType, c.logger)
metric := NewCadvisorMetric(containerType, c.logger)
metric.cgroupPath = info.Name
multiplier := float64(decimalToMillicores)
assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat
expectedKey := []string{ci.DiskIOAsync, ci.DiskIOSync, ci.DiskIORead, ci.DiskIOWrite, ci.DiskIOTotal}
for _, cur := range curStatsSet {
curDevName := devName(cur)
metric := newCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric := NewCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger)
metric.tags[ci.DiskDev] = curDevName
for _, key := range expectedKey {
if curVal, curOk := cur.Stats[key]; curOk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CAdvisorMetric struct {
logger *zap.Logger
}

func newCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
func NewCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric {
metric := &CAdvisorMetric{
fields: make(map[string]any),
tags: make(map[string]string),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem
metrics := make([]*CAdvisorMetric, 0, len(stats.Filesystem))

for _, v := range stats.Filesystem {
metric := newCadvisorMetric(containerType, f.logger)
metric := NewCadvisorMetric(containerType, f.logger)
if v.Device == "" {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf
return metrics
}

metric := newCadvisorMetric(containerType, m.logger)
metric := NewCadvisorMetric(containerType, m.logger)
metric.cgroupPath = info.Name
curStats := GetStats(info)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

netIfceMetrics[i] = netIfceMetric

metric := newCadvisorMetric(mType, n.logger)
metric := NewCadvisorMetric(mType, n.logger)
metric.tags[ci.NetIfce] = cur.Name
for k, v := range netIfceMetric {
metric.fields[ci.MetricName(mType, k)] = v
Expand All @@ -81,7 +81,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro

aggregatedFields := ci.SumFields(netIfceMetrics)
if len(aggregatedFields) > 0 {
metric := newCadvisorMetric(containerType, n.logger)
metric := NewCadvisorMetric(containerType, n.logger)
for k, v := range aggregatedFields {
metric.fields[ci.MetricName(containerType, k)] = v
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt
func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string {
// for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html

// Windows does not support file system eg: /rootfs to get Nvme Block name.
// todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device
if runtime.GOOS == "windows" {
return ""
}
hasRootFs := true
if _, err := e.osLstat(hostProc); os.IsNotExist(err) {
hasRootFs = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"os"
"runtime"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
Expand Down Expand Up @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap
opt(nc)
}

if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
ctx := context.Background()
if runtime.GOOS != "windows" {
if _, err := nc.osLstat(hostProc); os.IsNotExist(err) {
return nil, err
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx = context.WithValue(ctx, common.EnvKey, envMap)
}
envMap := common.EnvMap{common.HostProcEnvKey: hostProc}
ctx := context.WithValue(context.Background(), common.EnvKey, envMap)

nc.parseCPU(ctx)
nc.parseMemory(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !linux
// +build !linux

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

// These variables are invalid for Windows
const (
rootfs = ""
hostProc = rootfs + ""
hostMounts = hostProc + ""
)
6 changes: 0 additions & 6 deletions receiver/awscontainerinsightreceiver/internal/host/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"time"
)

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)

func hostJitter(max time.Duration) time.Duration {
hostName, err := os.Hostname()
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/host/utilsconst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"

const (
rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container
hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc"
hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host
)
111 changes: 111 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"context"
"errors"
"os"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
cancel context.CancelFunc
logger *zap.Logger
nodeName string `toml:"node_name"`
k8sDecorator stores.K8sDecorator
summaryProvider *kubeletSummaryProvider
hostInfo host.Info
}

func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) {
nodeName := os.Getenv("HOST_NAME")
if nodeName == "" {
return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config")
}
k8sSummaryProvider, err := new(logger, hostInfo)
if err != nil {
logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err))
return nil, err
}
return &K8sWindows{
logger: logger,
nodeName: nodeName,
k8sDecorator: *decorator,
summaryProvider: k8sSummaryProvider,
hostInfo: hostInfo,
}, nil
}

func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
k.logger.Debug("D! called K8sWindows GetMetrics")
var result []pmetric.Metrics

metrics, err := k.summaryProvider.getMetrics()
if err != nil {
k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err))
return result
}
metrics = k.decorateMetrics(metrics)
for _, k8sSummaryMetric := range metrics {
md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger)
result = append(result, md)
}

return result
}

func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric {
//ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes()
var result []*extractors.CAdvisorMetric
for _, m := range cadvisormetrics {
tags := m.GetTags()
//c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV)

// add version
//tags[ci.Version] = c.version

// add nodeName for node, pod and container
metricType := tags[ci.MetricType]
if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) ||
ci.IsPod(metricType) || ci.IsContainer(metricType)) {
tags[ci.NodeNameKey] = c.nodeName
}

// add instance id and type
if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" {
tags[ci.InstanceID] = instanceID
}
if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" {
tags[ci.InstanceType] = instanceType
}

// add scaling group name
tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName()

// add tags for EKS
tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName()

out := c.k8sDecorator.Decorate(m)
if out != nil {
result = append(result, out)
}
}
return result
}

func (k *K8sWindows) Shutdown() error {
k.logger.Debug("D! called K8sWindows Shutdown")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !windows
// +build !windows

package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type K8sWindows struct {
}

// New is a dummy function to construct a dummy K8sWindows struct for linux
func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) {
return &K8sWindows{}, nil
}

// GetMetrics is a dummy function to always returns empty metrics for linux
func (k *K8sWindows) GetMetrics() []pmetric.Metrics {
return []pmetric.Metrics{}
}

func (k *K8sWindows) Shutdown() error {
return nil
}
Loading
Loading