Skip to content

Commit

Permalink
Emit pending pod metrics from api-server
Browse files Browse the repository at this point in the history
  • Loading branch information
mitali-salvi committed Nov 9, 2023
1 parent 7c42456 commit a9478db
Show file tree
Hide file tree
Showing 8 changed files with 466 additions and 23 deletions.
36 changes: 28 additions & 8 deletions internal/aws/k8s/k8sclient/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type PodClient interface {
// Get the mapping between the namespace and the number of belonging pods
NamespaceToRunningPodNum() map[string]int
PodInfos() []*PodInfo
}

type podClientOption func(*podClient)
Expand All @@ -39,6 +40,7 @@ type podClient struct {

mu sync.RWMutex
namespaceToRunningPodNumMap map[string]int
podInfos []*PodInfo
}

func (c *podClient) NamespaceToRunningPodNum() map[string]int {
Expand All @@ -50,22 +52,35 @@ func (c *podClient) NamespaceToRunningPodNum() map[string]int {
return c.namespaceToRunningPodNumMap
}

func (c *podClient) PodInfos() []*PodInfo {
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.podInfos
}

func (c *podClient) refresh() {
c.mu.Lock()
defer c.mu.Unlock()

objsList := c.store.List()
namespaceToRunningPodNumMapNew := make(map[string]int)
podInfos := make([]*PodInfo, 0)
for _, obj := range objsList {
pod := obj.(*podInfo)
if pod.phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.namespace] = 1
pod := obj.(*PodInfo)
podInfos = append(podInfos, pod)

if pod.Phase == v1.PodRunning {
if podNum, ok := namespaceToRunningPodNumMapNew[pod.Namespace]; !ok {
namespaceToRunningPodNumMapNew[pod.Namespace] = 1
} else {
namespaceToRunningPodNumMapNew[pod.namespace] = podNum + 1
namespaceToRunningPodNumMapNew[pod.Namespace] = podNum + 1
}
}
}
c.podInfos = podInfos
c.namespaceToRunningPodNumMap = namespaceToRunningPodNumMapNew
}

Expand Down Expand Up @@ -105,9 +120,14 @@ func transformFuncPod(obj interface{}) (interface{}, error) {
if !ok {
return nil, fmt.Errorf("input obj %v is not Pod type", obj)
}
info := new(podInfo)
info.namespace = pod.Namespace
info.phase = pod.Status.Phase
info := new(PodInfo)
info.Name = pod.Name
info.Namespace = pod.Namespace
info.Uid = string(pod.UID)
info.Labels = pod.Labels
info.OwnerReferences = pod.OwnerReferences
info.Phase = pod.Status.Phase
info.Conditions = pod.Status.Conditions
return info, nil
}

Expand Down
12 changes: 9 additions & 3 deletions internal/aws/k8s/k8sclient/pod_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-c

import (
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type podInfo struct {
namespace string
phase v1.PodPhase
type PodInfo struct {
Name string
Namespace string
Uid string
Labels map[string]string
OwnerReferences []metaV1.OwnerReference
Phase v1.PodPhase
Conditions []v1.PodCondition
}
39 changes: 39 additions & 0 deletions internal/aws/k8s/k8sclient/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,42 @@ func TestTransformFuncPod(t *testing.T) {
assert.Nil(t, info)
assert.NotNil(t, err)
}

func TestPodClient_PodNameToPodMap(t *testing.T) {
skip(t, "Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11078")
setOption := podSyncCheckerOption(&mockReflectorSyncChecker{})

samplePodArray := []interface{}{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Name: "kube-proxy-csm88",
Namespace: "kube-system",
SelfLink: "/api/v1/namespaces/kube-system/pods/kube-proxy-csm88",
},
Status: v1.PodStatus{
Phase: "Running",
},
},
}

fakeClientSet := fake.NewSimpleClientset()
client := newPodClient(fakeClientSet, zap.NewNop(), setOption)
assert.NoError(t, client.store.Replace(samplePodArray, ""))
client.refresh()

expectedArray := []*PodInfo{
{
Name: "kube-proxy-csm88",
Namespace: "kube-system",
Uid: "bc5f5839-f62e-44b9-a79e-af250d92dcb1",
Labels: map[string]string{},
Phase: v1.PodRunning,
},
}

resultMap := client.PodInfos()
assert.Equal(t, expectedArray, resultMap)
client.shutdown()
assert.True(t, client.stopped)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,51 @@ package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
)

// K8sAPIServer is a struct that produces metrics from kubernetes api server
type K8sAPIServer struct {
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
nodeName string // get the value from downward API
logger *zap.Logger
clusterNameProvider clusterNameProvider
cancel context.CancelFunc
leaderElection *LeaderElection
addFullPodNameMetricLabel bool
includeEnhancedMetrics bool
}

type clusterNameProvider interface {
GetClusterName() string
GetInstanceID() string
GetInstanceType() string
}

type Option func(*K8sAPIServer)

// NewK8sAPIServer creates a k8sApiServer which can generate cluster-level metrics
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, options ...Option) (*K8sAPIServer, error) {
func NewK8sAPIServer(cnp clusterNameProvider, logger *zap.Logger, leaderElection *LeaderElection, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, options ...Option) (*K8sAPIServer, error) {

k := &K8sAPIServer{
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
logger: logger,
clusterNameProvider: cnp,
leaderElection: leaderElection,
addFullPodNameMetricLabel: addFullPodNameMetricLabel,
includeEnhancedMetrics: includeEnhancedMetrics,
}

for _, opt := range options {
Expand Down Expand Up @@ -85,6 +96,7 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics {
result = append(result, k.getServiceMetrics(clusterName, timestampNs)...)
result = append(result, k.getStatefulSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getReplicaSetMetrics(clusterName, timestampNs)...)
result = append(result, k.getPodMetrics(clusterName, timestampNs)...)

return result
}
Expand Down Expand Up @@ -279,6 +291,137 @@ func (k *K8sAPIServer) getReplicaSetMetrics(clusterName, timestampNs string) []p
return metrics
}

func (k *K8sAPIServer) getPodMetrics(clusterName, timestampNs string) []pmetric.Metrics {
var metrics []pmetric.Metrics
podsList := k.leaderElection.podClient.PodInfos()
podKeyToServiceNamesMap := k.leaderElection.epClient.PodKeyToServiceNames()

for _, podInfo := range podsList {
if podInfo.Phase == corev1.PodPending {
fields := map[string]interface{}{}

if k.includeEnhancedMetrics {
addPodStatusMetrics(fields, podInfo)
addPodConditionMetrics(fields, podInfo)
}

attributes := map[string]string{
ci.ClusterNameKey: clusterName,
ci.MetricType: ci.TypePod,
ci.Timestamp: timestampNs,
ci.PodNameKey: podInfo.Name,
ci.K8sNamespace: podInfo.Namespace,
ci.Version: "0",
}

podKey := k8sutil.CreatePodKey(podInfo.Namespace, podInfo.Name)
if serviceList, ok := podKeyToServiceNamesMap[podKey]; ok {
if len(serviceList) > 0 {
attributes[ci.TypeService] = serviceList[0]
}
}

if k.nodeName != "" {
attributes["NodeName"] = k.nodeName
}
// add instance id and type of the leader node
if instanceID := k.clusterNameProvider.GetInstanceID(); instanceID != "" {
attributes[ci.InstanceID] = instanceID
}
if instanceType := k.clusterNameProvider.GetInstanceType(); instanceType != "" {
attributes[ci.InstanceType] = instanceType
}

attributes[ci.PodStatus] = "Pending"

kubernetesBlob := map[string]interface{}{}
k.getKubernetesBlob(podInfo, kubernetesBlob, attributes)
if k.nodeName != "" {
kubernetesBlob["host"] = k.nodeName
}
if len(kubernetesBlob) > 0 {
kubernetesInfo, err := json.Marshal(kubernetesBlob)
if err != nil {
k.logger.Warn("Error parsing kubernetes blob for pod metrics")
} else {
attributes[ci.Kubernetes] = string(kubernetesInfo)
}
}
attributes[ci.SourcesKey] = "[\"apiserver\"]"
md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger)
metrics = append(metrics, md)
}
}
return metrics
}

func (k *K8sAPIServer) getKubernetesBlob(pod *k8sclient.PodInfo, kubernetesBlob map[string]interface{}, attributes map[string]string) {
var owners []interface{}
podName := ""
for _, owner := range pod.OwnerReferences {
if owner.Kind != "" && owner.Name != "" {
kind := owner.Kind
name := owner.Name
if owner.Kind == ci.ReplicaSet {
rsToDeployment := k.leaderElection.replicaSetClient.ReplicaSetToDeployment()
if parent := rsToDeployment[owner.Name]; parent != "" {
kind = ci.Deployment
name = parent
} else if parent := parseDeploymentFromReplicaSet(owner.Name); parent != "" {
kind = ci.Deployment
name = parent
}
} else if owner.Kind == ci.Job {
if parent := parseCronJobFromJob(owner.Name); parent != "" {
kind = ci.CronJob
name = parent
} else if !k.addFullPodNameMetricLabel {
name = getJobNamePrefix(name)
}
}
owners = append(owners, map[string]string{"owner_kind": kind, "owner_name": name})

if podName == "" {
if owner.Kind == ci.StatefulSet {
podName = pod.Name
} else if owner.Kind == ci.DaemonSet || owner.Kind == ci.Job ||
owner.Kind == ci.ReplicaSet || owner.Kind == ci.ReplicationController {
podName = name
}
}
}
}

if len(owners) > 0 {
kubernetesBlob["pod_owners"] = owners
}

labels := make(map[string]string)
for k, v := range pod.Labels {
labels[k] = v
}
if len(labels) > 0 {
kubernetesBlob["labels"] = labels
}
kubernetesBlob["namespace_name"] = pod.Namespace
kubernetesBlob["pod_id"] = pod.Uid

// if podName is not set according to a well-known controllers, then set it to its own name
if podName == "" {
if strings.HasPrefix(pod.Name, KubeProxy) && !k.addFullPodNameMetricLabel {
podName = KubeProxy
} else {
podName = pod.Name
}
}

attributes[ci.PodNameKey] = podName
if k.addFullPodNameMetricLabel {
attributes[ci.FullPodNameKey] = pod.Name
kubernetesBlob["pod_name"] = pod.Name
}
}

// Shutdown stops the k8sApiServer
func (k *K8sAPIServer) Shutdown() error {
if k.cancel != nil {
Expand Down
Loading

0 comments on commit a9478db

Please sign in to comment.