Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit pod status metrics for pending pods from api server #139

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions internal/aws/k8s/k8sclient/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type PodClient interface {
// Get the mapping between the namespace and the number of belonging pods
NamespaceToRunningPodNum() map[string]int
PodInfos() []*PodInfo
}

type podClientOption func(*podClient)
Expand All @@ -39,6 +40,7 @@ type podClient struct {

mu sync.RWMutex
namespaceToRunningPodNumMap map[string]int
podInfos []*PodInfo
}

func (c *podClient) NamespaceToRunningPodNum() map[string]int {
Expand All @@ -50,22 +52,35 @@ func (c *podClient) NamespaceToRunningPodNum() map[string]int {
return c.namespaceToRunningPodNumMap
}

func (c *podClient) PodInfos() []*PodInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.podInfos
}

func (c *podClient) refresh() {
c.mu.Lock()
defer c.mu.Unlock()

objsList := c.store.List()
namespaceToRunningPodNumMapNew := make(map[string]int)
podInfos := make([]*PodInfo, 0)
for _, obj := range objsList {
pod := obj.(*podInfo)
if pod.phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.namespace] = 1
pod := obj.(*PodInfo)
podInfos = append(podInfos, pod)

if pod.Phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.Namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.Namespace] = 1
} else {
namespaceToRunningPodNumMapNew[pod.namespace] = podNum + 1
namespaceToRunningPodNumMapNew[pod.Namespace] = podNum + 1
}
}
}
c.podInfos = podInfos
c.namespaceToRunningPodNumMap = namespaceToRunningPodNumMapNew
}

Expand Down Expand Up @@ -105,9 +120,14 @@ func transformFuncPod(obj interface{}) (interface{}, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Pod type", obj)
}
info := new(podInfo)
info.namespace = pod.Namespace
info.phase = pod.Status.Phase
info := new(PodInfo)
info.Name = pod.Name
info.Namespace = pod.Namespace
info.Uid = string(pod.UID)
info.Labels = pod.Labels
info.OwnerReferences = pod.OwnerReferences
info.Phase = pod.Status.Phase
info.Conditions = pod.Status.Conditions
return info, nil
}

Expand Down
12 changes: 9 additions & 3 deletions internal/aws/k8s/k8sclient/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c

import (
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type podInfo struct {
namespace string
phase v1.PodPhase
type PodInfo struct {
Name string
Namespace string
Uid string
Labels map[string]string
OwnerReferences []metaV1.OwnerReference
Phase v1.PodPhase
Conditions []v1.PodCondition
}
39 changes: 39 additions & 0 deletions internal/aws/k8s/k8sclient/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,42 @@ func TestTransformFuncPod(t *testing.T) {
assert.Nil(t, info)
assert.NotNil(t, err)
}

func TestPodClient_PodNameToPodMap(t *testing.T) {
skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078")
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
setOption := podSyncCheckerOption(&mockReflectorSyncChecker{})

samplePodArray := []interface{}{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Name: "kube-proxy-csm88",
Namespace: "kube-system",
SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88",
},
Status: v1.PodStatus{
Phase: "Running",
},
},
}

fakeClientSet := fake.NewSimpleClientset()
client := newPodClient(fakeClientSet, zap.NewNop(), setOption)
assert.NoError(t, client.store.Replace(samplePodArray, ""))
client.refresh()

expectedArray := []*PodInfo{
{
Name: "kube-proxy-csm88",
Namespace: "kube-system",
Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Labels: map[string]string{},
Phase: v1.PodRunning,
},
}

resultMap := client.PodInfos()
assert.Equal(t, expectedArray, resultMap)
client.shutdown()
assert.True(t, client.stopped)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
)

// K8sAPIServer is a struct that produces metrics from kubernetes api server
type K8sAPIServer struct {
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
addFullPodNameMetricLabel bool
includeEnhancedMetrics bool
}

type clusterNameProvider interface {
Expand All @@ -33,12 +40,14 @@ type clusterNameProvider interface {
type Option func(*K8sAPIServer)

// NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, options ...Option) (*K8sAPIServer, error) {
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use the option pattern so the signature doesnt have to change everywhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be taken care of as part of the code cleanup task


k := &K8sAPIServer{
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
addFullPodNameMetricLabel: addFullPodNameMetricLabel,
includeEnhancedMetrics: includeEnhancedMetrics,
}

for _, opt := range options {
Expand Down Expand Up @@ -85,6 +94,7 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics {
result = append(result, k.getServiceMetrics(clusterName, timestampNs)...)
result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getPendingPodStatusMetrics(clusterName, timestampNs)...)

return result
}
Expand Down Expand Up @@ -279,6 +289,129 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p
return metrics
}

// Statues and conditions for all pods assigned to a node are determined in podstore.go. Given Pending pods do not have a node allocated to them, we need to fetch their details from the K8s API Server here.
func (k *K8sAPIServer) getPendingPodStatusMetrics(clusterName, timestampNs string) []pmetric.Metrics {
var metrics []pmetric.Metrics
podsList := k.leaderElection.podClient.PodInfos()
podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames()

for _, podInfo := range podsList {
if podInfo.Phase == corev1.PodPending {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
fields := map[string]interface{}{}

if k.includeEnhancedMetrics {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
addPodStatusMetrics(fields, podInfo)
addPodConditionMetrics(fields, podInfo)
}

attributes := map[string]string{
ci.ClusterNameKey: clusterName,
ci.MetricType: ci.TypePod,
ci.Timestamp: timestampNs,
ci.PodNameKey: podInfo.Name,
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
ci.K8sNamespace: podInfo.Namespace,
ci.Version: "0",
}

podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name)
if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok {
if len(serviceList) > 0 {
attributes[ci.TypeService] = serviceList[0]
}
}

attributes[ci.PodStatus] = string(corev1.PodPending)
attributes["k8s.node.name"] = "pending"

kubernetesBlob := map[string]interface{}{}
k.getKubernetesBlob(podInfo, kubernetesBlob, attributes)
if k.nodeName != "" {
kubernetesBlob["host"] = k.nodeName
}
if len(kubernetesBlob) > 0 {
kubernetesInfo, err := json.Marshal(kubernetesBlob)
if err != nil {
k.logger.Warn("Error parsing kubernetes blob for pod metrics")
} else {
attributes[ci.Kubernetes] = string(kubernetesInfo)
}
}
attributes[ci.SourcesKey] = "[\"apiserver\"]"
md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger)
metrics = append(metrics, md)
}
}
return metrics
}

// TODO this is duplicated code from podstore.go, move this to a common package to re-use
func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob map[string]interface{}, attributes map[string]string) {
mitali-salvi marked this conversation as resolved.
Show resolved Hide resolved
var owners []interface{}
podName := ""
for _, owner := range pod.OwnerReferences {
if owner.Kind != "" && owner.Name != "" {
kind := owner.Kind
name := owner.Name
if owner.Kind == ci.ReplicaSet {
rsToDeployment := k.leaderElection.replicaSetClient.ReplicaSetToDeployment()
if parent := rsToDeployment[owner.Name]; parent != "" {
kind = ci.Deployment
name = parent
} else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" {
kind = ci.Deployment
name = parent
}
} else if owner.Kind == ci.Job {
if parent := parseCronJobFromJob(owner.Name); parent != "" {
kind = ci.CronJob
name = parent
} else if !k.addFullPodNameMetricLabel {
name = getJobNamePrefix(name)
}
}
owners = append(owners, map[string]string{"owner_kind": kind, "owner_name": name})

if podName == "" {
if owner.Kind == ci.StatefulSet {
podName = pod.Name
} else if owner.Kind == ci.DaemonSet || owner.Kind == ci.Job ||
owner.Kind == ci.ReplicaSet || owner.Kind == ci.ReplicationController {
podName = name
}
}
}
}

if len(owners) > 0 {
kubernetesBlob["pod_owners"] = owners
}

labels := make(map[string]string)
for k, v := range pod.Labels {
labels[k] = v
}
if len(labels) > 0 {
kubernetesBlob["labels"] = labels
}
kubernetesBlob["namespace_name"] = pod.Namespace
kubernetesBlob["pod_id"] = pod.Uid

// if podName is not set according to a well-known controllers, then set it to its own name
if podName == "" {
if strings.HasPrefix(pod.Name, KubeProxy) && !k.addFullPodNameMetricLabel {
podName = KubeProxy
} else {
podName = pod.Name
}
}

attributes[ci.PodNameKey] = podName
if k.addFullPodNameMetricLabel {
attributes[ci.FullPodNameKey] = pod.Name
kubernetesBlob["pod_name"] = pod.Name
}
}

// Shutdown stops the k8sApiServer
func (k *K8sAPIServer) Shutdown() error {
if k.cancel != nil {
Expand Down
Loading
Loading