Skip to content

Commit

Permalink
Add kubelet availability check for systemd CI. (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Jun 4, 2024
1 parent d5d6147 commit 82dc971
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 170 deletions.
1 change: 1 addition & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

// Environment variables
HostName = "HOST_NAME"
HostIP = "HOST_IP"
RunInContainer = "RUN_IN_CONTAINER"
RunAsHostProcessContainer = "RUN_AS_HOST_PROCESS_CONTAINER"

Expand Down
22 changes: 14 additions & 8 deletions internal/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,25 @@ type tlsClientProvider struct {
}

func (p *tlsClientProvider) BuildClient() (Client, error) {
rootCAs, err := systemCertPoolPlusPath(p.cfg.CAFile)
if err != nil {
return nil, err
}
clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile)
if err != nil {
return nil, err
var rootCAs *x509.CertPool
var certificates []tls.Certificate
if !p.cfg.InsecureSkipVerify {
var err error
rootCAs, err = systemCertPoolPlusPath(p.cfg.CAFile)
if err != nil {
return nil, err
}
clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile)
if err != nil {
return nil, err
}
certificates = []tls.Certificate{clientCert}
}
return defaultTLSClient(
p.endpoint,
p.cfg.InsecureSkipVerify,
rootCAs,
[]tls.Certificate{clientCert},
certificates,
nil,
p.logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) {
if kp.client != nil {
return kp.client, nil
}
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, "", kp.logger)
kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, nil, kp.logger)
if err != nil {
kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err))
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type KubeletClient struct {
restClient kubelet.Client
}

func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger *zap.Logger) (*KubeletClient, error) {
func NewKubeletClient(kubeIP string, port string, clientConfig *kubelet.ClientConfig, logger *zap.Logger) (*KubeletClient, error) {
kubeClient := &KubeletClient{
Port: port,
KubeIP: kubeIP,
Expand All @@ -37,17 +37,10 @@ func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger
endpoint = endpoint + ":" + port

// use service account for authentication by default
clientConfig := &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
if kubeConfigPath != "" {
// use kube-config for authentication
if clientConfig == nil {
clientConfig = &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeKubeConfig,
KubeConfigPath: kubeConfigPath,
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
}
Expand Down Expand Up @@ -98,3 +91,29 @@ func (k *KubeletClient) Summary(logger *zap.Logger) (*stats.Summary, error) {
logger.Debug("/stats/summary API response unmarshalled successfully")
return &out, nil
}

func ClientConfig(kubeConfigPath string, isSystemd bool) *kubelet.ClientConfig {
if !isSystemd {
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}
}
if kubeConfigPath != "" {
// use kube-config for authentication
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeKubeConfig,
KubeConfigPath: kubeConfigPath,
},
}
}
// insecure TLS if not provided
return &kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeTLS,
},
InsecureSkipVerify: true,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestNewKubeletClient(t *testing.T) {
},
}
for _, tt := range tests {
client, err := NewKubeletClient(tt.kubeIP, tt.port, "", zap.NewNop())
client, err := NewKubeletClient(tt.kubeIP, tt.port, nil, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, client.KubeIP, tt.kubeIP)
fc := (client.restClient).(*fakeClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"fmt"
"os"

"go.uber.org/zap"

Expand Down Expand Up @@ -42,18 +41,14 @@ type Decorator interface {
}

func NewLocalNodeDecorator(logger *zap.Logger, containerOrchestrator string, hostInfo hostInfo, hostName string, options ...Option) (*LocalNodeDecorator, error) {
nodeName := os.Getenv(ci.HostName)
if nodeName == "" && containerOrchestrator == ci.EKS {
nodeName = hostName
if nodeName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName)
}
if hostName == "" && containerOrchestrator == ci.EKS {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName)
}

d := &LocalNodeDecorator{
hostInfo: hostInfo,
version: "0",
nodeName: nodeName,
nodeName: hostName,
containerOrchestrator: containerOrchestrator,
logger: logger,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,11 @@ func TestNewLocalNodeDecorator(t *testing.T) {
assert.NotNil(t, d)
assert.NoError(t, err)
assert.Equal(t, d.nodeName, "test-hostname")

t.Setenv(ci.HostName, "host")
d, err = NewLocalNodeDecorator(logger, "eks", nil, "")
assert.NotNil(t, d)
assert.NoError(t, err)
assert.Equal(t, d.nodeName, "host")
}

func TestEbsVolumeInfo(t *testing.T) {
t.Setenv(ci.HostName, "host")
hostInfo := testutils.MockHostInfo{}
d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "")
d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "host")
assert.NotNil(t, d)
assert.NoError(t, err)

Expand Down Expand Up @@ -83,7 +76,6 @@ func (d mockK8sDecorator) Shutdown() error {
}

func TestExpectedTags(t *testing.T) {
t.Setenv(ci.HostName, "host")
hostInfo := testutils.MockHostInfo{ClusterName: "my-cluster"}
k8sDecorator := mockK8sDecorator{}
ecsInfo := testutils.MockECSInfo{}
Expand Down Expand Up @@ -136,7 +128,7 @@ func TestExpectedTags(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo))
d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "host", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo))
assert.NotNil(t, d)
assert.NoError(t, err)

Expand Down
34 changes: 9 additions & 25 deletions receiver/awscontainerinsightreceiver/internal/stores/podstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"log"
"os"
"regexp"
"strings"
"sync"
Expand All @@ -21,7 +20,6 @@ import (
ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient"
awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)

const (
Expand Down Expand Up @@ -122,48 +120,34 @@ type PodStore struct {
includeEnhancedMetrics bool
}

func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, kubeConfigPath, logger)
if err != nil {
return nil, err
func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool,
includeEnhancedMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) {
if hostName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName)
}

// Try to detect kubelet permission issue here
if _, err := podClient.ListPods(); err != nil {
return nil, fmt.Errorf("cannot get pod from kubelet, err: %w", err)
}

nodeName := os.Getenv(ci.HostName)
if nodeName == "" {
nodeName = hostName
if nodeName == "" {
return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName)
}
}

k8sClient := &k8sclient.K8sClient{}
var k8sClient *k8sclient.K8sClient
nodeInfo := &nodeInfo{
nodeName: nodeName,
nodeName: hostName,
provider: nil,
logger: logger,
}
if !isSystemdEnabled {
k8sClient = k8sclient.Get(logger,
k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)),
k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", hostName)),
k8sclient.CaptureNodeLevelInfo(true),
)

if k8sClient == nil {
return nil, errors.New("failed to start pod store because k8sclient is nil")
}
nodeInfo = newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger)
nodeInfo = newNodeInfo(hostName, k8sClient.GetNodeClient(), logger)
}

podStore := &PodStore{
cache: newMapWithExpiry(podsExpiry),
prevMeasurements: sync.Map{},
//prevMeasurements: make(map[string]*mapWithExpiry),
podClient: podClient,
podClient: client,
nodeInfo: nodeInfo,
prefFullPodName: prefFullPodName,
includeEnhancedMetrics: includeEnhancedMetrics,
Expand Down
18 changes: 6 additions & 12 deletions receiver/awscontainerinsightreceiver/internal/stores/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"errors"
"os"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)

// CIMetric represents the raw metric interface for container insights
Expand Down Expand Up @@ -43,21 +43,15 @@ type K8sDecorator struct {
podStore *PodStore
}

func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, customHostIP string, customHostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) {
hostIP := os.Getenv("HOST_IP")
if hostIP == "" {
hostIP = customHostIP
if hostIP == "" {
return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config or passed as part of the agent config")
}
}

func NewK8sDecorator(ctx context.Context, kubeletClient *kubeletutil.KubeletClient, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool,
addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string,
hostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) {
k := &K8sDecorator{
ctx: ctx,
addContainerNameMetricLabel: addContainerNameMetricLabel,
}

podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, kubeConfigPath, customHostName, isSystemd, logger)
podstore, err := NewPodStore(kubeletClient, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, hostName, isSystemd, logger)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 82dc971

Please sign in to comment.