diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 63e20a396680..3e9aa34426be 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -696,7 +696,7 @@ require ( k8s.io/klog v1.0.0 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect - k8s.io/kubelet v0.28.3 // indirect + k8s.io/kubelet v0.28.4 // indirect k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect sigs.k8s.io/controller-runtime v0.16.3 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index 6f8b140d0cc8..78d249122f65 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -2266,8 +2266,8 @@ k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= -k8s.io/kubelet v0.28.3 h1:bp/uIf1R5F61BlFvFtzc4PDEiK7TtFcw3wFJlc0V0LM= -k8s.io/kubelet v0.28.3/go.mod h1:E3NHYbp/v45Ao6AD0EOZnqO3L0R6Haks6Nm0+bnFwtU= +k8s.io/kubelet v0.28.4 h1:Ypxy1jaFlSXFXbg/yVtFOU2ZxErBVRJfLu8+t4s7Dtw= +k8s.io/kubelet v0.28.4/go.mod h1:w1wPI12liY/aeC70nqKYcNNkr6/nbyvdMB7P7wmww2o= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230711102312-30195339c3c7 h1:ZgnF1KZsYxWIifwSNZFZgNtWE89WI5yiP5WwlfDoIyc= k8s.io/utils v0.0.0-20230711102312-30195339c3c7/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/go.mod b/go.mod index 062030d21475..c6e4d1161312 100644 --- a/go.mod +++ b/go.mod @@ -695,7 +695,7 @@ require ( k8s.io/klog v1.0.0 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect - k8s.io/kubelet v0.28.3 // indirect + k8s.io/kubelet v0.28.4 // indirect k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect sigs.k8s.io/controller-runtime v0.16.3 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index 5bed4abad2cb..e86f13330a0d 100644 --- a/go.sum +++ b/go.sum @@ -2271,8 +2271,8 @@ k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= -k8s.io/kubelet v0.28.3 h1:bp/uIf1R5F61BlFvFtzc4PDEiK7TtFcw3wFJlc0V0LM= -k8s.io/kubelet v0.28.3/go.mod h1:E3NHYbp/v45Ao6AD0EOZnqO3L0R6Haks6Nm0+bnFwtU= +k8s.io/kubelet v0.28.4 h1:Ypxy1jaFlSXFXbg/yVtFOU2ZxErBVRJfLu8+t4s7Dtw= +k8s.io/kubelet v0.28.4/go.mod h1:w1wPI12liY/aeC70nqKYcNNkr6/nbyvdMB7P7wmww2o= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230711102312-30195339c3c7 h1:ZgnF1KZsYxWIifwSNZFZgNtWE89WI5yiP5WwlfDoIyc= k8s.io/utils v0.0.0-20230711102312-30195339c3c7/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index 8772cf95cd71..8104c541d1e5 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -31,6 +31,8 @@ require ( k8s.io/apimachinery v0.28.3 k8s.io/client-go v0.28.3 k8s.io/klog v1.0.0 + k8s.io/klog/v2 v2.100.1 + k8s.io/kubelet v0.28.3 ) require ( @@ -214,7 +216,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230711102312-30195339c3c7 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index d586693f9062..9c713f254065 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -1115,6 +1115,8 @@ k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= +k8s.io/kubelet v0.28.3 h1:bp/uIf1R5F61BlFvFtzc4PDEiK7TtFcw3wFJlc0V0LM= +k8s.io/kubelet v0.28.3/go.mod h1:E3NHYbp/v45Ao6AD0EOZnqO3L0R6Haks6Nm0+bnFwtU= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230711102312-30195339c3c7 h1:ZgnF1KZsYxWIifwSNZFZgNtWE89WI5yiP5WwlfDoIyc= k8s.io/utils v0.0.0-20230711102312-30195339c3c7/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go index e2b8851d6740..08369aa2be90 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/cpu_extractor.go @@ -33,7 +33,7 @@ func (c *CPUMetricExtractor) GetValue(info *cInfo.ContainerInfo, mInfo CPUMemInf // When there is more than one stats point, always use the last one curStats := GetStats(info) - metric := newCadvisorMetric(containerType, c.logger) + metric := NewCadvisorMetric(containerType, c.logger) metric.cgroupPath = info.Name multiplier := float64(decimalToMillicores) assignRateValueToField(&c.rateCalculator, metric.fields, ci.MetricName(containerType, ci.CPUTotal), info.Name, float64(curStats.Cpu.Usage.Total), curStats.Timestamp, multiplier) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go index 384acf0e5e4f..f3f3d5888c4c 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/diskio_extractor.go @@ -41,7 +41,7 @@ func (d *DiskIOMetricExtractor) extractIoMetrics(curStatsSet []cInfo.PerDiskStat expectedKey := []string{ci.DiskIOAsync, ci.DiskIOSync, ci.DiskIORead, ci.DiskIOWrite, ci.DiskIOTotal} for _, cur := range curStatsSet { curDevName := devName(cur) - metric := newCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger) + metric := NewCadvisorMetric(getDiskIOMetricType(containerType, d.logger), d.logger) metric.tags[ci.DiskDev] = curDevName for _, key := range expectedKey { if curVal, curOk := cur.Stats[key]; curOk { diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go index 398ad4805a59..457d5a3dbf46 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/extractor.go @@ -44,7 +44,7 @@ type CAdvisorMetric struct { logger *zap.Logger } -func newCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric { +func NewCadvisorMetric(mType string, logger *zap.Logger) *CAdvisorMetric { metric := &CAdvisorMetric{ fields: make(map[string]any), tags: make(map[string]string), diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go index 6e2f888b461b..093ff0d4b1cf 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/fs_extractor.go @@ -33,7 +33,7 @@ func (f *FileSystemMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMem metrics := make([]*CAdvisorMetric, 0, len(stats.Filesystem)) for _, v := range stats.Filesystem { - metric := newCadvisorMetric(containerType, f.logger) + metric := NewCadvisorMetric(containerType, f.logger) if v.Device == "" { continue } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go index fe7750c39093..310061282750 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/mem_extractor.go @@ -28,7 +28,7 @@ func (m *MemMetricExtractor) GetValue(info *cinfo.ContainerInfo, mInfo CPUMemInf return metrics } - metric := newCadvisorMetric(containerType, m.logger) + metric := NewCadvisorMetric(containerType, m.logger) metric.cgroupPath = info.Name curStats := GetStats(info) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go index 3affa24971fd..d8bdb23fcfe9 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors/net_extractor.go @@ -70,7 +70,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro netIfceMetrics[i] = netIfceMetric - metric := newCadvisorMetric(mType, n.logger) + metric := NewCadvisorMetric(mType, n.logger) metric.tags[ci.NetIfce] = cur.Name for k, v := range netIfceMetric { metric.fields[ci.MetricName(mType, k)] = v @@ -81,7 +81,7 @@ func (n *NetMetricExtractor) GetValue(info *cinfo.ContainerInfo, _ CPUMemInfoPro aggregatedFields := ci.SumFields(netIfceMetrics) if len(aggregatedFields) > 0 { - metric := newCadvisorMetric(containerType, n.logger) + metric := NewCadvisorMetric(containerType, n.logger) for k, v := range aggregatedFields { metric.fields[ci.MetricName(containerType, k)] = v } diff --git a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go index 23690675c287..e6eeefdb4c89 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go +++ b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "strings" "sync" "time" @@ -141,6 +142,12 @@ func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAtt func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string { // for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html + + // Windows does not support file system eg: /rootfs to get Nvme Block name. + // todo: Implement logic to identify Nvme devices on Windows. Refer https://docs.aws.amazon.com/AWSEC2/latest/WindowsGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device + if runtime.GOOS == "windows" { + return "" + } hasRootFs := true if _, err := e.osLstat(hostProc); os.IsNotExist(err) { hasRootFs = false diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go index 8c77d4634831..522591e95c82 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go @@ -6,6 +6,7 @@ package host // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" "os" + "runtime" "github.com/shirou/gopsutil/v3/common" "github.com/shirou/gopsutil/v3/cpu" @@ -46,11 +47,14 @@ func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCap opt(nc) } - if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { - return nil, err + ctx := context.Background() + if runtime.GOOS != "windows" { + if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { + return nil, err + } + envMap := common.EnvMap{common.HostProcEnvKey: hostProc} + ctx = context.WithValue(ctx, common.EnvKey, envMap) } - envMap := common.EnvMap{common.HostProcEnvKey: hostProc} - ctx := context.WithValue(context.Background(), common.EnvKey, envMap) nc.parseCPU(ctx) nc.parseMemory(ctx) diff --git a/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go b/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go new file mode 100644 index 000000000000..2bd7285f21ea --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/utilconst_windows.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !linux +// +build !linux + +package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + +// These variables are invalid for Windows +const ( + rootfs = "" + hostProc = rootfs + "" + hostMounts = hostProc + "" +) diff --git a/receiver/awscontainerinsightreceiver/internal/host/utils.go b/receiver/awscontainerinsightreceiver/internal/host/utils.go index edd4d00833cb..b9e2fed4d854 100644 --- a/receiver/awscontainerinsightreceiver/internal/host/utils.go +++ b/receiver/awscontainerinsightreceiver/internal/host/utils.go @@ -10,12 +10,6 @@ import ( "time" ) -const ( - rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container - hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc" - hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host -) - func hostJitter(max time.Duration) time.Duration { hostName, err := os.Hostname() if err != nil { diff --git a/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go b/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go new file mode 100644 index 000000000000..adb162d5d1ff --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/utilsconst.go @@ -0,0 +1,13 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows +// +build !windows + +package host // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + +const ( + rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container + hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc" + hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host +) diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go new file mode 100644 index 000000000000..c4622c226098 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" + +import ( + "context" + "errors" + "os" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type K8sWindows struct { + cancel context.CancelFunc + logger *zap.Logger + nodeName string `toml:"node_name"` + k8sDecorator stores.K8sDecorator + summaryProvider *kubeletSummaryProvider + hostInfo host.Info +} + +func New(logger *zap.Logger, decorator *stores.K8sDecorator, hostInfo host.Info) (*K8sWindows, error) { + nodeName := os.Getenv("HOST_NAME") + if nodeName == "" { + return nil, errors.New("missing environment variable HOST_NAME. Please check your deployment YAML config") + } + k8sSummaryProvider, err := new(logger, hostInfo) + if err != nil { + logger.Error("failed to initialize kubelet summary provider, ", zap.Error(err)) + return nil, err + } + return &K8sWindows{ + logger: logger, + nodeName: nodeName, + k8sDecorator: *decorator, + summaryProvider: k8sSummaryProvider, + hostInfo: hostInfo, + }, nil +} + +func (k *K8sWindows) GetMetrics() []pmetric.Metrics { + k.logger.Debug("D! called K8sWindows GetMetrics") + var result []pmetric.Metrics + + metrics, err := k.summaryProvider.getMetrics() + if err != nil { + k.logger.Error("error getting metrics from kubelet summary provider, ", zap.Error(err)) + return result + } + metrics = k.decorateMetrics(metrics) + for _, k8sSummaryMetric := range metrics { + md := ci.ConvertToOTLPMetrics(k8sSummaryMetric.GetFields(), k8sSummaryMetric.GetTags(), k.logger) + result = append(result, md) + } + + return result +} + +func (c *K8sWindows) decorateMetrics(cadvisormetrics []*extractors.CAdvisorMetric) []*extractors.CAdvisorMetric { + //ebsVolumeIdsUsedAsPV := c.hostInfo.ExtractEbsIDsUsedByKubernetes() + var result []*extractors.CAdvisorMetric + for _, m := range cadvisormetrics { + tags := m.GetTags() + //c.addEbsVolumeInfo(tags, ebsVolumeIdsUsedAsPV) + + // add version + //tags[ci.Version] = c.version + + // add nodeName for node, pod and container + metricType := tags[ci.MetricType] + if c.nodeName != "" && (ci.IsNode(metricType) || ci.IsInstance(metricType) || + ci.IsPod(metricType) || ci.IsContainer(metricType)) { + tags[ci.NodeNameKey] = c.nodeName + } + + // add instance id and type + if instanceID := c.hostInfo.GetInstanceID(); instanceID != "" { + tags[ci.InstanceID] = instanceID + } + if instanceType := c.hostInfo.GetInstanceType(); instanceType != "" { + tags[ci.InstanceType] = instanceType + } + + // add scaling group name + tags[ci.AutoScalingGroupNameKey] = c.hostInfo.GetAutoScalingGroupName() + + // add tags for EKS + tags[ci.ClusterNameKey] = c.hostInfo.GetClusterName() + + out := c.k8sDecorator.Decorate(m) + if out != nil { + result = append(result, out) + } + } + return result +} + +func (k *K8sWindows) Shutdown() error { + k.logger.Debug("D! called K8sWindows Shutdown") + return nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go new file mode 100644 index 000000000000..dbc75d936351 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/k8swindows_nowindows.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows +// +build !windows + +package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" +) + +type K8sWindows struct { +} + +// New is a dummy function to construct a dummy K8sWindows struct for linux +func New(_ *zap.Logger, _ *stores.K8sDecorator, _ host.Info) (*K8sWindows, error) { + return &K8sWindows{}, nil +} + +// GetMetrics is a dummy function to always returns empty metrics for linux +func (k *K8sWindows) GetMetrics() []pmetric.Metrics { + return []pmetric.Metrics{} +} + +func (k *K8sWindows) Shutdown() error { + return nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go new file mode 100644 index 000000000000..cc956e5b208c --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package k8swindows // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" + +import ( + "fmt" + "os" + + ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor/extractors" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" + + "go.uber.org/zap" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" +) + +type kubeletSummaryProvider struct { + logger *zap.Logger + hostIP string + hostPort string + client *kubeletutil.KubeletClient + hostInfo host.Info +} + +func new(logger *zap.Logger, info host.Info) (*kubeletSummaryProvider, error) { + hostIP := os.Getenv("HOST_IP") + kclient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize kubelet client: %w", err) + } + return &kubeletSummaryProvider{ + logger: logger, + client: kclient, + hostInfo: info, + }, nil +} + +func (k *kubeletSummaryProvider) getMetrics() ([]*extractors.CAdvisorMetric, error) { + summary, err := k.client.Summary(k.logger) + if err != nil { + k.logger.Error("kubelet summary API failed, ", zap.Error(err)) + return nil, err + } + + return k.getPodMetrics(summary) +} + +func (k *kubeletSummaryProvider) getContainerMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { + var metrics []*extractors.CAdvisorMetric + // todo: implement CPU, memory metrics from containers + return metrics, nil +} + +func (k *kubeletSummaryProvider) getPodMetrics(summary *stats.Summary) ([]*extractors.CAdvisorMetric, error) { + // todo: This is not complete implementation of pod level metric collection since network level metrics are pending + // May need to add some more pod level labels for store decorators to work properly + + var metrics []*extractors.CAdvisorMetric + + nodeCPUCores := k.hostInfo.GetNumCores() + for _, pod := range summary.Pods { + k.logger.Info(fmt.Sprintf("pod summary %v", pod.PodRef.Name)) + metric := extractors.NewCadvisorMetric(ci.TypePod, k.logger) + + metric.AddField(ci.PodIDKey, pod.PodRef.UID) + metric.AddField(ci.K8sPodNameKey, pod.PodRef.Name) + metric.AddField(ci.K8sNamespace, pod.PodRef.Namespace) + + // CPU metric + metric.AddField(ci.MetricName(ci.TypePod, ci.CPUTotal), *pod.CPU.UsageCoreNanoSeconds) + metric.AddField(ci.MetricName(ci.TypePod, ci.CPUUtilization), float64(*pod.CPU.UsageCoreNanoSeconds)/float64(nodeCPUCores)) + + // Memory metrics + metric.AddField(ci.MetricName(ci.TypePod, ci.MemUsage), *pod.Memory.UsageBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemRss), *pod.Memory.RSSBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemWorkingset), *pod.Memory.WorkingSetBytes) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemReservedCapacity), k.hostInfo.GetMemoryCapacity()) + metric.AddField(ci.MetricName(ci.TypePod, ci.MemUtilization), float64(*pod.Memory.WorkingSetBytes)/float64(k.hostInfo.GetMemoryCapacity())*100) + metrics = append(metrics, metric) + } + return metrics, nil +} + +func (k *kubeletSummaryProvider) getNodeMetrics() ([]*extractors.CAdvisorMetric, error) { + var metrics []*extractors.CAdvisorMetric + //todo: Implement CPU, memory and network metrics at node + return metrics, nil +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go index 8a36a35bbf5f..8ec921a04322 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/utils/net" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" @@ -69,3 +70,22 @@ func (k *KubeletClient) ListPods() ([]corev1.Pod, error) { return pods.Items, nil } + +// Summary hits kubelet summary API using service account authentication. +// Summary API returns metrics at container, pod and node level for CPU, memory, networking and file system resources. +func (k *KubeletClient) Summary(logger *zap.Logger) (*stats.Summary, error) { + logger.Debug("Calling kubelet /stats/summary API") + + b, err := k.restClient.Get("/stats/summary") + if err != nil { + return nil, fmt.Errorf("call to kubelet /stats/summary API failed %w", err) + } + var out stats.Summary + err = json.Unmarshal(b, &out) + if err != nil { + return nil, fmt.Errorf("kubelet summary unmarshalling failed %w", err) + } + + logger.Debug("/stats/summary API response unmarshalled successfully") + return &out, nil +} diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 9972acd01718..0db9a3b02820 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -6,6 +6,7 @@ package awscontainerinsightreceiver // import "github.com/open-telemetry/opentel import ( "context" "errors" + "runtime" "time" "go.opentelemetry.io/collector/component" @@ -21,6 +22,7 @@ import ( ecsinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/ecsInfo" hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" ) @@ -33,13 +35,13 @@ type metricsProvider interface { // awsContainerInsightReceiver implements the receiver.Metrics type awsContainerInsightReceiver struct { - settings component.TelemetrySettings - nextConsumer consumer.Metrics - config *Config - cancel context.CancelFunc - cadvisor metricsProvider - k8sapiserver metricsProvider - prometheusScraper *k8sapiserver.PrometheusScraper + settings component.TelemetrySettings + nextConsumer consumer.Metrics + config *Config + cancel context.CancelFunc + containerMetricsProvider metricsProvider + k8sapiserver metricsProvider + prometheusScraper *k8sapiserver.PrometheusScraper } // newAWSContainerInsightReceiver creates the aws container insight receiver with the given parameters. @@ -74,26 +76,34 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone return err } - decoratorOption := cadvisor.WithDecorator(k8sDecorator) - acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption) - if err != nil { - return err - } + // todo: will need to use k8sAPIServer to add stats about enhanced CI. + if runtime.GOOS == "windows" { + acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, k8sDecorator, *hostinfo) + if err != nil { + return err + } + } else { + decoratorOption := cadvisor.WithDecorator(k8sDecorator) + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, decoratorOption) + if err != nil { + return err + } - leaderElection, err := k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), - k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) - if err != nil { - return err - } + leaderElection, err := k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), + k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) + if err != nil { + return err + } - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) - if err != nil { - return err - } + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics) + if err != nil { + return err + } - err = acir.startPrometheusScraper(ctx, host, hostinfo, leaderElection) - if err != nil { - acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + err = acir.startPrometheusScraper(ctx, host, hostinfo, leaderElection) + if err != nil { + acir.settings.Logger.Debug("Unable to start kube apiserver prometheus scraper", zap.Error(err)) + } } } if acir.config.ContainerOrchestrator == ci.ECS { @@ -105,7 +115,7 @@ func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host compone ecsOption := cadvisor.WithECSInfoCreator(ecsInfo) - acir.cadvisor, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, ecsOption) + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, acir.settings.Logger, ecsOption) if err != nil { return err } @@ -186,8 +196,8 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { if acir.k8sapiserver != nil { errs = errors.Join(errs, acir.k8sapiserver.Shutdown()) } - if acir.cadvisor != nil { - errs = errors.Join(errs, acir.cadvisor.Shutdown()) + if acir.containerMetricsProvider != nil { + errs = errors.Join(errs, acir.containerMetricsProvider.Shutdown()) } return errs @@ -197,14 +207,15 @@ func (acir *awsContainerInsightReceiver) Shutdown(context.Context) error { // collectData collects container stats from Amazon ECS Task Metadata Endpoint func (acir *awsContainerInsightReceiver) collectData(ctx context.Context) error { var mds []pmetric.Metrics - if acir.cadvisor == nil && acir.k8sapiserver == nil { + + if acir.containerMetricsProvider == nil && acir.k8sapiserver == nil { err := errors.New("both cadvisor and k8sapiserver failed to start") acir.settings.Logger.Error("Failed to collect stats", zap.Error(err)) return err } - if acir.cadvisor != nil { - mds = append(mds, acir.cadvisor.GetMetrics()...) + if acir.containerMetricsProvider != nil { + mds = append(mds, acir.containerMetricsProvider.GetMetrics()...) } if acir.k8sapiserver != nil { diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index 27686d194f1b..f3b251023d02 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -90,12 +90,12 @@ func TestCollectData(t *testing.T) { _ = r.Start(context.Background(), nil) ctx := context.Background() r.k8sapiserver = &mockK8sAPIServer{} - r.cadvisor = &mockCadvisor{} + r.containerMetricsProvider = &mockCadvisor{} err = r.collectData(ctx) require.Nil(t, err) // test the case when cadvisor and k8sapiserver failed to initialize - r.cadvisor = nil + r.containerMetricsProvider = nil r.k8sapiserver = nil err = r.collectData(ctx) require.NotNil(t, err) @@ -114,7 +114,7 @@ func TestCollectDataWithErrConsumer(t *testing.T) { r := metricsReceiver.(*awsContainerInsightReceiver) _ = r.Start(context.Background(), nil) - r.cadvisor = &mockCadvisor{} + r.containerMetricsProvider = &mockCadvisor{} r.k8sapiserver = &mockK8sAPIServer{} ctx := context.Background() @@ -138,12 +138,12 @@ func TestCollectDataWithECS(t *testing.T) { _ = r.Start(context.Background(), nil) ctx := context.Background() - r.cadvisor = &mockCadvisor{} + r.containerMetricsProvider = &mockCadvisor{} err = r.collectData(ctx) require.Nil(t, err) // test the case when cadvisor and k8sapiserver failed to initialize - r.cadvisor = nil + r.containerMetricsProvider = nil err = r.collectData(ctx) require.NotNil(t, err) }