diff --git a/test/metric_value_benchmark/eks_daemonset_test.go b/test/metric_value_benchmark/eks_daemonset_test.go index 35e4298a7..97aff8411 100644 --- a/test/metric_value_benchmark/eks_daemonset_test.go +++ b/test/metric_value_benchmark/eks_daemonset_test.go @@ -6,48 +6,120 @@ package metric_value_benchmark import ( + "context" "encoding/json" - "errors" "fmt" "log" + "strings" "time" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" - "golang.org/x/exp/slices" + "github.com/qri-io/jsonschema" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "github.com/aws/amazon-cloudwatch-agent-test/environment" + "github.com/aws/amazon-cloudwatch-agent-test/internal/awsservice" "github.com/aws/amazon-cloudwatch-agent-test/test/metric" "github.com/aws/amazon-cloudwatch-agent-test/test/metric/dimension" "github.com/aws/amazon-cloudwatch-agent-test/test/metric_value_benchmark/eks_resources" "github.com/aws/amazon-cloudwatch-agent-test/test/status" "github.com/aws/amazon-cloudwatch-agent-test/test/test_runner" - "github.com/aws/amazon-cloudwatch-agent-test/util/awsservice" ) -const containerInsightsNamespace = "ContainerInsights" - -// list of metrics with more dimensions e.g. PodName and Namespace -var metricsWithMoreDimensions = []string{"pod_number_of_container_restarts"} - type EKSDaemonTestRunner struct { test_runner.BaseTestRunner env *environment.MetaData } func (e *EKSDaemonTestRunner) Validate() status.TestGroupResult { - metrics := e.GetMeasuredMetrics() testResults := make([]status.TestResult, 0) - for _, name := range metrics { - testResults = append(testResults, e.validateInstanceMetrics(name)) + + // Fetching metrics by cluster dimension + testMap := e.getMetricsInClusterDimension() + + // Validate metrics availability and data + for dim, metrics := range eks_resources.DimensionStringToMetricsMap { + testResults = append(testResults, e.validateMetricsAvailability(dim, metrics, testMap)) + for _, m := range metrics { + testResults = append(testResults, e.validateMetricData(m, e.translateDimensionStringToDimType(dim))) + } } + // Validate EMF logs testResults = append(testResults, e.validateLogs(e.env)) + + // Additional validation for sample count of metrics + testResults = append(testResults, e.validateSampleCountMetrics()) + return status.TestGroupResult{ Name: e.GetTestName(), TestResults: testResults, } } +// validateSampleCountMetrics checks if the sample count of specific metrics matches the expected values +func (e *EKSDaemonTestRunner) validateSampleCountMetrics() status.TestResult { + log.Println("Validating sample count of metrics") + + podCount, err := e.getRunningPodsCount() + if err != nil { + log.Println("Error getting running pods count:", err) + return status.TestResult{Name: "SampleCountValidation", Status: status.FAILED} + } + + metricName := "pod_cpu_utilization" + dims := e.translateDimensionStringToDimType("ClusterName") + + valueFetcher := metric.MetricValueFetcher{} + values, err := valueFetcher.Fetch(containerInsightsNamespace, metricName, dims, metric.SAMPLE_COUNT, metric.MinuteStatPeriod) + if err != nil { + log.Println("Error fetching metric data for", metricName, ":", err) + return status.TestResult{Name: "SampleCountValidation", Status: status.FAILED} + } + + for _, value := range values { + if int(value) != podCount { + log.Printf("Sample count for %s does not match expected value. Expected: %d, Found: %f\n", metricName, podCount, value) + return status.TestResult{Name: "SampleCountValidation", Status: status.FAILED} + } + } + + return status.TestResult{Name: "SampleCountValidation", Status: status.SUCCESSFUL} +} +func (e *EKSDaemonTestRunner) getRunningPodsCount() (int, error) { + // Create an in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + log.Printf("Error creating in-cluster config: %v\n", err) + return 0, err + } + + // Create a clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Printf("Error creating clientset: %v\n", err) + return 0, err + } + + // Get the list of pods + pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + log.Printf("Error listing pods: %v\n", err) + return 0, err + } + + // Count running pods + runningPodsCount := 0 + for _, pod := range pods.Items { + if pod.Status.Phase == "Running" { + runningPodsCount++ + } + } + + return runningPodsCount, nil +} + func (e *EKSDaemonTestRunner) validateInstanceMetrics(name string) status.TestResult { testResult := status.TestResult{ Name: name, @@ -60,42 +132,14 @@ func (e *EKSDaemonTestRunner) validateInstanceMetrics(name string) status.TestRe Value: dimension.UnknownDimensionValue(), }, }) + if len(failed) > 0 { log.Println("failed to get dimensions") return testResult } - // get list of metrics that has more dimensions for container insights - // this is to avoid adding more dimension provider for non-trivial dimensions e.g. PodName - listFetcher := metric.MetricListFetcher{} - if slices.Contains(metricsWithMoreDimensions, name) { - metrics, err := listFetcher.Fetch(containerInsightsNamespace, name, dims) - if err != nil { - log.Println("failed to fetch metric list", err) - return testResult - } - - if len(metrics) < 1 { - log.Println("metric list is empty") - return testResult - } - - // just verify 1 of returned metrics for values - for _, dim := range metrics[0].Dimensions { - // skip since it's provided by dimension provider - if *dim.Name == "ClusterName" { - continue - } - - dims = append(dims, types.Dimension{ - Name: dim.Name, - Value: dim.Value, - }) - } - } - - valueFetcher := metric.MetricValueFetcher{} - values, err := valueFetcher.Fetch(containerInsightsNamespace, name, dims, metric.AVERAGE, metric.HighResolutionStatPeriod) + fetcher := metric.MetricValueFetcher{} + values, err := fetcher.Fetch("ContainerInsights", name, dims, metric.AVERAGE, metric.HighResolutionStatPeriod) if err != nil { log.Println("failed to fetch metrics", err) return testResult @@ -126,34 +170,42 @@ func (e *EKSDaemonTestRunner) validateLogs(env *environment.MetaData) status.Tes } for _, instance := range eKSInstances { + validateLogContents := func(s string) bool { + return strings.Contains(s, fmt.Sprintf("\"ClusterName\":\"%s\"", env.EKSClusterName)) + } + + var ok bool stream := *instance.InstanceName - err = awsservice.ValidateLogs( - group, - stream, - nil, - &now, - awsservice.AssertLogsNotEmpty(), - awsservice.AssertPerLog( - awsservice.AssertLogSchema(func(message string) (string, error) { - var eksClusterType awsservice.EKSClusterType - innerErr := json.Unmarshal([]byte(message), &eksClusterType) - if innerErr != nil { - return "", fmt.Errorf("failed to unmarshal log file: %w", innerErr) - } - - log.Printf("eksClusterType is: %s", eksClusterType.Type) - jsonSchema, ok := eks_resources.EksClusterValidationMap[eksClusterType.Type] - if !ok { - return "", errors.New("invalid cluster type provided") - } - return jsonSchema, nil - }), - awsservice.AssertLogContainsSubstring(fmt.Sprintf("\"ClusterName\":\"%s\"", env.EKSClusterName)), - ), - ) - - if err != nil { - log.Printf("log validation (%s/%s) failed: %v", group, stream, err) + ok, err = awsservice.ValidateLogs(group, stream, nil, &now, func(logs []string) bool { + if len(logs) < 1 { + log.Println(fmt.Sprintf("failed to get logs for instance: %s", stream)) + return false + } + + for _, l := range logs { + var eksClusterType awsservice.EKSClusterType + err := json.Unmarshal([]byte(l), &eksClusterType) + if err != nil { + log.Println("failed to unmarshal log file") + } + + log.Println(fmt.Sprintf("eksClusterType is: %s", eksClusterType.Type)) + jsonSchema, ok := eks_resources.EksClusterValidationMap[eksClusterType.Type] + if !ok { + log.Println("invalid cluster type provided") + return false + } + rs := jsonschema.Must(jsonSchema) + + if !awsservice.MatchEMFLogWithSchema(l, rs, validateLogContents) { + log.Println("failed to match log with schema") + return false + } + } + return true + }) + + if err != nil || !ok { return testResult } } @@ -176,37 +228,27 @@ func (e *EKSDaemonTestRunner) GetAgentRunDuration() time.Duration { func (e *EKSDaemonTestRunner) GetMeasuredMetrics() []string { return []string{ - "cluster_failed_node_count", - "cluster_node_count", - "namespace_number_of_running_pods", - "node_cpu_limit", "node_cpu_reserved_capacity", - "node_cpu_usage_total", "node_cpu_utilization", - "node_filesystem_utilization", - "node_memory_limit", - "node_memory_reserved_capacity", - "node_memory_utilization", - "node_memory_working_set", "node_network_total_bytes", - "node_number_of_running_containers", + "node_filesystem_utilization", "node_number_of_running_pods", - "pod_cpu_reserved_capacity", - "pod_cpu_utilization", - "pod_cpu_utilization_over_pod_limit", - "pod_memory_reserved_capacity", - "pod_memory_utilization", - "pod_memory_utilization_over_pod_limit", - "pod_network_rx_bytes", - "pod_network_tx_bytes", - "service_number_of_running_pods", + "node_number_of_running_containers", + "node_memory_utilization", + "node_memory_reserved_capacity", } } -func (t *EKSDaemonTestRunner) SetAgentConfig(config test_runner.AgentConfig) {} +func (t *EKSDaemonTestRunner) SetAgentConfig(config test_runner.AgentConfig) { +} + +func (e *EKSDaemonTestRunner) SetupBeforeAgentRun() error { + return nil +} func (e *EKSDaemonTestRunner) SetupAfterAgentRun() error { return nil } var _ test_runner.ITestRunner = (*EKSDaemonTestRunner)(nil) +