diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 161e5f0244df..8d796bdb36d0 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -14,6 +14,7 @@ const ( // Environment variables HostName = "HOST_NAME" + HostIP = "HOST_IP" RunInContainer = "RUN_IN_CONTAINER" RunAsHostProcessContainer = "RUN_AS_HOST_PROCESS_CONTAINER" diff --git a/internal/kubelet/client.go b/internal/kubelet/client.go index 93f234706109..582cded174b0 100644 --- a/internal/kubelet/client.go +++ b/internal/kubelet/client.go @@ -163,19 +163,25 @@ type tlsClientProvider struct { } func (p *tlsClientProvider) BuildClient() (Client, error) { - rootCAs, err := systemCertPoolPlusPath(p.cfg.CAFile) - if err != nil { - return nil, err - } - clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile) - if err != nil { - return nil, err + var rootCAs *x509.CertPool + var certificates []tls.Certificate + if !p.cfg.InsecureSkipVerify { + var err error + rootCAs, err = systemCertPoolPlusPath(p.cfg.CAFile) + if err != nil { + return nil, err + } + clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile) + if err != nil { + return nil, err + } + certificates = []tls.Certificate{clientCert} } return defaultTLSClient( p.endpoint, p.cfg.InsecureSkipVerify, rootCAs, - []tls.Certificate{clientCert}, + certificates, nil, p.logger, ) diff --git a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go index 28194910d3d2..755c2588cb64 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go +++ b/receiver/awscontainerinsightreceiver/internal/k8swindows/kubelet/client.go @@ -32,7 +32,7 @@ func (kp *kubeletProvider) getClient() (*kubeletutil.KubeletClient, error) { if kp.client != nil { return kp.client, nil } - kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, "", kp.logger) + kclient, err := kubeletutil.NewKubeletClient(kp.hostIP, kp.hostPort, nil, kp.logger) if err != nil { kp.logger.Error("failed to initialize new kubelet client, ", zap.Error(err)) return nil, err diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go index 1873d2480911..ef5eee59bc97 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient.go @@ -24,7 +24,7 @@ type KubeletClient struct { restClient kubelet.Client } -func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger *zap.Logger) (*KubeletClient, error) { +func NewKubeletClient(kubeIP string, port string, clientConfig *kubelet.ClientConfig, logger *zap.Logger) (*KubeletClient, error) { kubeClient := &KubeletClient{ Port: port, KubeIP: kubeIP, @@ -37,17 +37,10 @@ func NewKubeletClient(kubeIP string, port string, kubeConfigPath string, logger endpoint = endpoint + ":" + port // use service account for authentication by default - clientConfig := &kubelet.ClientConfig{ - APIConfig: k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeServiceAccount, - }, - } - if kubeConfigPath != "" { - // use kube-config for authentication + if clientConfig == nil { clientConfig = &kubelet.ClientConfig{ APIConfig: k8sconfig.APIConfig{ - AuthType: k8sconfig.AuthTypeKubeConfig, - KubeConfigPath: kubeConfigPath, + AuthType: k8sconfig.AuthTypeServiceAccount, }, } } @@ -98,3 +91,29 @@ func (k *KubeletClient) Summary(logger *zap.Logger) (*stats.Summary, error) { logger.Debug("/stats/summary API response unmarshalled successfully") return &out, nil } + +func ClientConfig(kubeConfigPath string, isSystemd bool) *kubelet.ClientConfig { + if !isSystemd { + return &kubelet.ClientConfig{ + APIConfig: k8sconfig.APIConfig{ + AuthType: k8sconfig.AuthTypeServiceAccount, + }, + } + } + if kubeConfigPath != "" { + // use kube-config for authentication + return &kubelet.ClientConfig{ + APIConfig: k8sconfig.APIConfig{ + AuthType: k8sconfig.AuthTypeKubeConfig, + KubeConfigPath: kubeConfigPath, + }, + } + } + // insecure TLS if not provided + return &kubelet.ClientConfig{ + APIConfig: k8sconfig.APIConfig{ + AuthType: k8sconfig.AuthTypeTLS, + }, + InsecureSkipVerify: true, + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go index c766b66457fb..2c62f09d6a5b 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/kubeletclient_test.go @@ -62,7 +62,7 @@ func TestNewKubeletClient(t *testing.T) { }, } for _, tt := range tests { - client, err := NewKubeletClient(tt.kubeIP, tt.port, "", zap.NewNop()) + client, err := NewKubeletClient(tt.kubeIP, tt.port, nil, zap.NewNop()) require.NoError(t, err) assert.Equal(t, client.KubeIP, tt.kubeIP) fc := (client.restClient).(*fakeClient) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/localnode.go b/receiver/awscontainerinsightreceiver/internal/stores/localnode.go index eddfec048273..9390d1697ff8 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/localnode.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/localnode.go @@ -5,7 +5,6 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "fmt" - "os" "go.uber.org/zap" @@ -42,18 +41,14 @@ type Decorator interface { } func NewLocalNodeDecorator(logger *zap.Logger, containerOrchestrator string, hostInfo hostInfo, hostName string, options ...Option) (*LocalNodeDecorator, error) { - nodeName := os.Getenv(ci.HostName) - if nodeName == "" && containerOrchestrator == ci.EKS { - nodeName = hostName - if nodeName == "" { - return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName) - } + if hostName == "" && containerOrchestrator == ci.EKS { + return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or agent config", ci.HostName) } d := &LocalNodeDecorator{ hostInfo: hostInfo, version: "0", - nodeName: nodeName, + nodeName: hostName, containerOrchestrator: containerOrchestrator, logger: logger, } diff --git a/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go b/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go index 50ae5d43b53b..2fefaeccb1e4 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/localnode_test.go @@ -34,18 +34,11 @@ func TestNewLocalNodeDecorator(t *testing.T) { assert.NotNil(t, d) assert.NoError(t, err) assert.Equal(t, d.nodeName, "test-hostname") - - t.Setenv(ci.HostName, "host") - d, err = NewLocalNodeDecorator(logger, "eks", nil, "") - assert.NotNil(t, d) - assert.NoError(t, err) - assert.Equal(t, d.nodeName, "host") } func TestEbsVolumeInfo(t *testing.T) { - t.Setenv(ci.HostName, "host") hostInfo := testutils.MockHostInfo{} - d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "") + d, err := NewLocalNodeDecorator(logger, "eks", hostInfo, "host") assert.NotNil(t, d) assert.NoError(t, err) @@ -83,7 +76,6 @@ func (d mockK8sDecorator) Shutdown() error { } func TestExpectedTags(t *testing.T) { - t.Setenv(ci.HostName, "host") hostInfo := testutils.MockHostInfo{ClusterName: "my-cluster"} k8sDecorator := mockK8sDecorator{} ecsInfo := testutils.MockECSInfo{} @@ -136,7 +128,7 @@ func TestExpectedTags(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo)) + d, err := NewLocalNodeDecorator(logger, testCase.containerOrchestrator, hostInfo, "host", WithK8sDecorator(k8sDecorator), WithECSInfo(&ecsInfo)) assert.NotNil(t, d) assert.NoError(t, err) diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go index 2239733d6302..8247601cfc6d 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/podstore.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/podstore.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "log" - "os" "regexp" "strings" "sync" @@ -21,7 +20,6 @@ import ( ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" ) const ( @@ -122,48 +120,34 @@ type PodStore struct { includeEnhancedMetrics bool } -func NewPodStore(hostIP string, prefFullPodName bool, addFullPodNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) { - podClient, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, kubeConfigPath, logger) - if err != nil { - return nil, err +func NewPodStore(client podClient, prefFullPodName bool, addFullPodNameMetricLabel bool, + includeEnhancedMetrics bool, hostName string, isSystemdEnabled bool, logger *zap.Logger) (*PodStore, error) { + if hostName == "" { + return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName) } - - // Try to detect kubelet permission issue here - if _, err := podClient.ListPods(); err != nil { - return nil, fmt.Errorf("cannot get pod from kubelet, err: %w", err) - } - - nodeName := os.Getenv(ci.HostName) - if nodeName == "" { - nodeName = hostName - if nodeName == "" { - return nil, fmt.Errorf("missing environment variable %s. Please check your deployment YAML config or passed as part of the agent config", ci.HostName) - } - } - - k8sClient := &k8sclient.K8sClient{} + var k8sClient *k8sclient.K8sClient nodeInfo := &nodeInfo{ - nodeName: nodeName, + nodeName: hostName, provider: nil, logger: logger, } if !isSystemdEnabled { k8sClient = k8sclient.Get(logger, - k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", nodeName)), + k8sclient.NodeSelector(fields.OneTermEqualSelector("metadata.name", hostName)), k8sclient.CaptureNodeLevelInfo(true), ) if k8sClient == nil { return nil, errors.New("failed to start pod store because k8sclient is nil") } - nodeInfo = newNodeInfo(nodeName, k8sClient.GetNodeClient(), logger) + nodeInfo = newNodeInfo(hostName, k8sClient.GetNodeClient(), logger) } podStore := &PodStore{ cache: newMapWithExpiry(podsExpiry), prevMeasurements: sync.Map{}, //prevMeasurements: make(map[string]*mapWithExpiry), - podClient: podClient, + podClient: client, nodeInfo: nodeInfo, prefFullPodName: prefFullPodName, includeEnhancedMetrics: includeEnhancedMetrics, diff --git a/receiver/awscontainerinsightreceiver/internal/stores/store.go b/receiver/awscontainerinsightreceiver/internal/stores/store.go index b5f6d5ba7c34..457a08ddbf72 100644 --- a/receiver/awscontainerinsightreceiver/internal/stores/store.go +++ b/receiver/awscontainerinsightreceiver/internal/stores/store.go @@ -5,11 +5,11 @@ package stores // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" - "errors" - "os" "time" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" ) // CIMetric represents the raw metric interface for container insights @@ -43,21 +43,15 @@ type K8sDecorator struct { podStore *PodStore } -func NewK8sDecorator(ctx context.Context, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, customHostIP string, customHostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) { - hostIP := os.Getenv("HOST_IP") - if hostIP == "" { - hostIP = customHostIP - if hostIP == "" { - return nil, errors.New("environment variable HOST_IP is not set in k8s deployment config or passed as part of the agent config") - } - } - +func NewK8sDecorator(ctx context.Context, kubeletClient *kubeletutil.KubeletClient, tagService bool, prefFullPodName bool, addFullPodNameMetricLabel bool, + addContainerNameMetricLabel bool, includeEnhancedMetrics bool, kubeConfigPath string, + hostName string, isSystemd bool, logger *zap.Logger) (*K8sDecorator, error) { k := &K8sDecorator{ ctx: ctx, addContainerNameMetricLabel: addContainerNameMetricLabel, } - podstore, err := NewPodStore(hostIP, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, kubeConfigPath, customHostName, isSystemd, logger) + podstore, err := NewPodStore(kubeletClient, prefFullPodName, addFullPodNameMetricLabel, includeEnhancedMetrics, hostName, isSystemd, logger) if err != nil { return nil, err } diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 7758ccf55f15..f90ba404899d 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -6,6 +6,8 @@ package awscontainerinsightreceiver // import "github.com/open-telemetry/opentel import ( "context" "errors" + "fmt" + "os" "runtime" "time" @@ -22,13 +24,18 @@ import ( ecsinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/ecsInfo" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/efa" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu" - hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + hostinfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8swindows" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/neuron" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper/decoratorconsumer" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" +) + +const ( + waitForKubeletInterval = 10 * time.Second ) var _ receiver.Metrics = (*awsContainerInsightReceiver)(nil) @@ -71,123 +78,181 @@ func newAWSContainerInsightReceiver( // Start collecting metrics from cadvisor and k8s api server (if it is an elected leader) func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error { ctx, acir.cancel = context.WithCancel(ctx) + hostInfo, hostInfoErr := hostinfo.NewInfo(acir.config.AWSSessionSettings, acir.config.ContainerOrchestrator, + acir.config.CollectionInterval, acir.settings.Logger, hostinfo.WithClusterName(acir.config.ClusterName), + hostinfo.WithSystemdEnabled(acir.config.RunOnSystemd)) + if hostInfoErr != nil { + return hostInfoErr + } - hostinfo, err := hostInfo.NewInfo(acir.config.AWSSessionSettings, acir.config.ContainerOrchestrator, acir.config.CollectionInterval, acir.settings.Logger, hostInfo.WithClusterName(acir.config.ClusterName), hostInfo.WithSystemdEnabled(acir.config.RunOnSystemd)) - if err != nil { - return err + hostName := os.Getenv(ci.HostName) + if hostName == "" { + hostName = acir.config.HostName } - if acir.config.ContainerOrchestrator == ci.EKS { - k8sDecorator, err := stores.NewK8sDecorator(ctx, acir.config.TagService, acir.config.PrefFullPodName, acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.KubeConfigPath, acir.config.HostIP, acir.config.HostName, acir.config.RunOnSystemd, acir.settings.Logger) + switch acir.config.ContainerOrchestrator { + case ci.EKS: + hostIP := os.Getenv(ci.HostIP) + if hostIP == "" { + hostIP = acir.config.HostIP + if hostIP == "" { + return errors.New("environment variable HOST_IP is not set in k8s deployment config or passed as part of the agent config") + } + } + client, err := kubeletutil.NewKubeletClient(hostIP, ci.KubeSecurePort, kubeletutil.ClientConfig(acir.config.KubeConfigPath, acir.config.RunOnSystemd), acir.settings.Logger) if err != nil { - acir.settings.Logger.Warn("Unable to start K8s decorator", zap.Error(err)) - } else { - acir.decorators = append(acir.decorators, k8sDecorator) + return fmt.Errorf("cannot initialize kubelet client: %w", err) } - - if runtime.GOOS == ci.OperatingSystemWindows { - acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, k8sDecorator, *hostinfo) - if err != nil { - return err - } + // wait for kubelet availability, but don't block on it + if acir.config.RunOnSystemd { + go func() { + if err = waitForKubelet(ctx, client, acir.settings.Logger); err != nil { + acir.settings.Logger.Error("Unable to connect to kubelet", zap.Error(err)) + return + } + acir.settings.Logger.Debug("Kubelet is available. Initializing the receiver") + if err = acir.initEKS(ctx, host, hostInfo, hostName, client); err != nil { + acir.settings.Logger.Error("Unable to initialize receiver", zap.Error(err)) + return + } + acir.start(ctx) + }() } else { - localnodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, - hostinfo, acir.config.HostName, stores.WithK8sDecorator(k8sDecorator)) - if err != nil { - acir.settings.Logger.Warn("Unable to start local node decorator", zap.Error(err)) - } else { - acir.decorators = append(acir.decorators, localnodeDecorator) - } - - acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, - acir.settings.Logger, cadvisor.WithDecorator(localnodeDecorator)) - if err != nil { + if err = checkKubelet(client); err != nil { return err } - - var leaderElection *k8sapiserver.LeaderElection - leaderElection, err = k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), - k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) - if err != nil { - acir.settings.Logger.Warn("Unable to elect leader node", zap.Error(err)) - } - - acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostinfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) - if err != nil { - acir.k8sapiserver = nil - acir.settings.Logger.Warn("Unable to connect to api-server", zap.Error(err)) + if err = acir.initEKS(ctx, host, hostInfo, hostName, client); err != nil { + return err } + go acir.start(ctx) + } + case ci.ECS: + if err := acir.initECS(host, hostInfo, hostName); err != nil { + return err + } + go acir.start(ctx) + default: + return fmt.Errorf("unsupported container_orchestrator: %s", acir.config.ContainerOrchestrator) + } - if acir.k8sapiserver != nil { - err = acir.initPrometheusScraper(ctx, host, hostinfo, leaderElection) - if err != nil { - acir.settings.Logger.Warn("Unable to start kube apiserver prometheus scraper", zap.Error(err)) - } - } + return nil +} - err = acir.initDcgmScraper(ctx, host, hostinfo, k8sDecorator) - if err != nil { - acir.settings.Logger.Debug("Unable to start dcgm scraper", zap.Error(err)) - } - err = acir.initPodResourcesStore() - if err != nil { - acir.settings.Logger.Debug("Unable to start pod resources store", zap.Error(err)) - } - err = acir.initNeuronScraper(ctx, host, hostinfo, k8sDecorator) - if err != nil { - acir.settings.Logger.Debug("Unable to start neuron scraper", zap.Error(err)) - } - err = acir.initEfaSysfsScraper(localnodeDecorator) - if err != nil { - acir.settings.Logger.Debug("Unable to start EFA scraper", zap.Error(err)) - } - } +func (acir *awsContainerInsightReceiver) initEKS(ctx context.Context, host component.Host, hostInfo *hostinfo.Info, + hostName string, kubeletClient *kubeletutil.KubeletClient) error { + k8sDecorator, err := stores.NewK8sDecorator(ctx, kubeletClient, acir.config.TagService, acir.config.PrefFullPodName, + acir.config.AddFullPodNameMetricLabel, acir.config.AddContainerNameMetricLabel, + acir.config.EnableControlPlaneMetrics, acir.config.KubeConfigPath, hostName, + acir.config.RunOnSystemd, acir.settings.Logger) + if err != nil { + acir.settings.Logger.Warn("Unable to start K8s decorator", zap.Error(err)) + } else { + acir.decorators = append(acir.decorators, k8sDecorator) } - if acir.config.ContainerOrchestrator == ci.ECS { - ecsInfo, err := ecsinfo.NewECSInfo(acir.config.CollectionInterval, hostinfo, host, acir.settings, ecsinfo.WithClusterName(acir.config.ClusterName)) + + if runtime.GOOS == ci.OperatingSystemWindows { + acir.containerMetricsProvider, err = k8swindows.New(acir.settings.Logger, k8sDecorator, *hostInfo) if err != nil { return err } + } else { + localNodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, + hostInfo, hostName, stores.WithK8sDecorator(k8sDecorator)) + if err != nil { + acir.settings.Logger.Warn("Unable to start local node decorator", zap.Error(err)) + } else { + acir.decorators = append(acir.decorators, localNodeDecorator) + } - localnodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, - hostinfo, acir.config.HostName, stores.WithECSInfo(ecsInfo)) + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostInfo, + acir.settings.Logger, cadvisor.WithDecorator(localNodeDecorator)) if err != nil { return err } - acir.decorators = append(acir.decorators, localnodeDecorator) - acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostinfo, - acir.settings.Logger, cadvisor.WithECSInfoCreator(ecsInfo), cadvisor.WithDecorator(localnodeDecorator)) + var leaderElection *k8sapiserver.LeaderElection + leaderElection, err = k8sapiserver.NewLeaderElection(acir.settings.Logger, k8sapiserver.WithLeaderLockName(acir.config.LeaderLockName), + k8sapiserver.WithLeaderLockUsingConfigMapOnly(acir.config.LeaderLockUsingConfigMapOnly)) if err != nil { - return err + acir.settings.Logger.Warn("Unable to elect leader node", zap.Error(err)) } - } - go func() { - // cadvisor collects data at dynamical intervals (from 1 to 15 seconds). If the ticker happens - // at beginning of a minute, it might read the data collected at end of last minute. To avoid this, - // we want to wait until at least two cadvisor collection intervals happens before collecting the metrics - secondsInMin := time.Now().Second() - if secondsInMin < 30 { - time.Sleep(time.Duration(30-secondsInMin) * time.Second) + acir.k8sapiserver, err = k8sapiserver.NewK8sAPIServer(hostInfo, acir.settings.Logger, leaderElection, acir.config.AddFullPodNameMetricLabel, acir.config.EnableControlPlaneMetrics, acir.config.EnableAcceleratedComputeMetrics) + if err != nil { + acir.k8sapiserver = nil + acir.settings.Logger.Warn("Unable to connect to api-server", zap.Error(err)) } - ticker := time.NewTicker(acir.config.CollectionInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - _ = acir.collectData(ctx) - case <-ctx.Done(): - return + + if acir.k8sapiserver != nil { + err = acir.initPrometheusScraper(ctx, host, hostInfo, leaderElection) + if err != nil { + acir.settings.Logger.Warn("Unable to start kube apiserver prometheus scraper", zap.Error(err)) } } - }() + err = acir.initDcgmScraper(ctx, host, hostInfo, k8sDecorator) + if err != nil { + acir.settings.Logger.Debug("Unable to start dcgm scraper", zap.Error(err)) + } + err = acir.initPodResourcesStore() + if err != nil { + acir.settings.Logger.Debug("Unable to start pod resources store", zap.Error(err)) + } + err = acir.initNeuronScraper(ctx, host, hostInfo, k8sDecorator) + if err != nil { + acir.settings.Logger.Debug("Unable to start neuron scraper", zap.Error(err)) + } + err = acir.initEfaSysfsScraper(localNodeDecorator) + if err != nil { + acir.settings.Logger.Debug("Unable to start EFA scraper", zap.Error(err)) + } + } return nil } -func (acir *awsContainerInsightReceiver) initPrometheusScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info, leaderElection *k8sapiserver.LeaderElection) error { +func (acir *awsContainerInsightReceiver) initECS(host component.Host, hostInfo *hostinfo.Info, hostName string) error { + ecsInfo, err := ecsinfo.NewECSInfo(acir.config.CollectionInterval, hostInfo, host, acir.settings, ecsinfo.WithClusterName(acir.config.ClusterName)) + if err != nil { + return err + } + + localNodeDecorator, err := stores.NewLocalNodeDecorator(acir.settings.Logger, acir.config.ContainerOrchestrator, + hostInfo, hostName, stores.WithECSInfo(ecsInfo)) + if err != nil { + return err + } + acir.decorators = append(acir.decorators, localNodeDecorator) + + acir.containerMetricsProvider, err = cadvisor.New(acir.config.ContainerOrchestrator, hostInfo, + acir.settings.Logger, cadvisor.WithECSInfoCreator(ecsInfo), cadvisor.WithDecorator(localNodeDecorator)) + if err != nil { + return err + } + return nil +} + +func (acir *awsContainerInsightReceiver) start(ctx context.Context) { + // cadvisor collects data at dynamical intervals (from 1 to 15 seconds). If the ticker happens + // at beginning of a minute, it might read the data collected at end of last minute. To avoid this, + // we want to wait until at least two cadvisor collection intervals happens before collecting the metrics + secondsInMin := time.Now().Second() + if secondsInMin < 30 { + time.Sleep(time.Duration(30-secondsInMin) * time.Second) + } + ticker := time.NewTicker(acir.config.CollectionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + _ = acir.collectData(ctx) + case <-ctx.Done(): + return + } + } +} + +func (acir *awsContainerInsightReceiver) initPrometheusScraper(ctx context.Context, host component.Host, hostInfo *hostinfo.Info, leaderElection *k8sapiserver.LeaderElection) error { if !acir.config.EnableControlPlaneMetrics { return nil } @@ -215,13 +280,13 @@ func (acir *awsContainerInsightReceiver) initPrometheusScraper(ctx context.Conte Endpoint: endpoint, Consumer: acir.nextConsumer, Host: host, - ClusterNameProvider: hostinfo, + ClusterNameProvider: hostInfo, LeaderElection: leaderElection, BearerToken: bearerToken, }) return err } -func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info, decorator *stores.K8sDecorator) error { +func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, host component.Host, hostInfo *hostinfo.Info, decorator *stores.K8sDecorator) error { if !acir.config.EnableAcceleratedComputeMetrics { return nil } @@ -240,8 +305,8 @@ func (acir *awsContainerInsightReceiver) initDcgmScraper(ctx context.Context, ho TelemetrySettings: acir.settings, Consumer: &decoConsumer, Host: host, - ScraperConfigs: gpu.GetScraperConfig(hostinfo), - HostInfoProvider: hostinfo, + ScraperConfigs: gpu.GetScraperConfig(hostInfo), + HostInfoProvider: hostInfo, Logger: acir.settings.Logger, } @@ -256,7 +321,7 @@ func (acir *awsContainerInsightReceiver) initPodResourcesStore() error { return err } -func (acir *awsContainerInsightReceiver) initNeuronScraper(ctx context.Context, host component.Host, hostinfo *hostInfo.Info, decorator *stores.K8sDecorator) error { +func (acir *awsContainerInsightReceiver) initNeuronScraper(ctx context.Context, host component.Host, hostInfo *hostinfo.Info, decorator *stores.K8sDecorator) error { if !acir.config.EnableAcceleratedComputeMetrics { return nil } @@ -294,8 +359,8 @@ func (acir *awsContainerInsightReceiver) initNeuronScraper(ctx context.Context, TelemetrySettings: acir.settings, Consumer: &podAttributesDecoratorConsumer, Host: host, - ScraperConfigs: neuron.GetNeuronScrapeConfig(hostinfo), - HostInfoProvider: hostinfo, + ScraperConfigs: neuron.GetNeuronScrapeConfig(hostInfo), + HostInfoProvider: hostInfo, Logger: acir.settings.Logger, } @@ -411,3 +476,27 @@ func (acir *awsContainerInsightReceiver) getK8sAPIServerEndpoint() (string, erro return endpoint, nil } + +func waitForKubelet(ctx context.Context, client *kubeletutil.KubeletClient, logger *zap.Logger) error { + for { + err := checkKubelet(client) + if err == nil { + return nil + } + logger.Debug("Kubelet unavailable. Waiting for next interval", zap.Error(err), zap.Stringer("interval", waitForKubeletInterval)) + select { + case <-time.After(waitForKubeletInterval): + continue + case <-ctx.Done(): + return fmt.Errorf("context closed without getting kubelet client: %w", ctx.Err()) + } + } +} + +func checkKubelet(client *kubeletutil.KubeletClient) error { + // Try to detect kubelet permission issue here + if _, err := client.ListPods(); err != nil { + return fmt.Errorf("cannot get pods from kubelet: %w", err) + } + return nil +}