From 077b95a68808b7cf9e18792546034c5bf3096d7c Mon Sep 17 00:00:00 2001 From: jmxnzo Date: Thu, 14 Nov 2024 16:58:01 +0100 Subject: [PATCH] e2e: default policy test e2e: Add waiting mechanism and correct event watching/listing to default policy test Structure changes to deploy.go e2e: rework and fetch event from the watch.Event directly e2e: Add default policy into policy test package --- e2e/internal/kubeclient/deploy.go | 102 ++++++++++++++++++++++++++++++ e2e/policy/policy_test.go | 31 +++++++++ 2 files changed, 133 insertions(+) diff --git a/e2e/internal/kubeclient/deploy.go b/e2e/internal/kubeclient/deploy.go index 05c03d069d..c38a38f7eb 100644 --- a/e2e/internal/kubeclient/deploy.go +++ b/e2e/internal/kubeclient/deploy.go @@ -33,6 +33,15 @@ const ( InitContainersRunning ) +// WaitEventCondition is an enum type for the possible wait conditions when using `kubeclient.WaitForEvent`. +type WaitEventCondition int + +const ( + _ WaitEventCondition = iota + // StartingBlocked waits until a specific FailedCreatePodSandBox Event is detected which indicates that the container does not start. + StartingBlocked +) + // ResourceWaiter is implemented by resources that can be waited for with WaitFor. type ResourceWaiter interface { kind() string @@ -225,6 +234,31 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace return true, nil } +// Checks whether the FailedCreatePodSandBox Event occurred which indicates that the SetPolicy request is rejected and the Kata Shim fails to start the Pod sandbox. +func (c *Kubeclient) checkIfStartingBlocked(name string, namespace string, resource ResourceWaiter, evt watch.Event, startingPoint time.Time) (bool, error) { + switch evt.Type { + case watch.Error: + return false, fmt.Errorf("Watcher of %s %s/%s received an error event", resource.kind(), namespace, name) + case watch.Added: + fallthrough + case watch.Modified: + logger := c.log.With("namespace", namespace) + event, ok := evt.Object.(*corev1.Event) + if !ok { + return false, fmt.Errorf("watcher received unexpected type %T", evt.Object) + } + + // Expected event: Reason: FailedCreatePodSandBox + if (event.LastTimestamp.After(startingPoint)) && (event.Reason == "FailedCreatePodSandBox") { + logger.Debug("Pod did not start", "name", name, "reason", event.Reason, "timestamp of failure", event.LastTimestamp.String()) + return true, nil + } + return false, nil + default: + return false, fmt.Errorf("unexpected watch event while waiting for %s %s/%s: type=%s, object=%#v", resource.kind(), namespace, name, evt.Type, evt.Object) + } +} + // WaitFor watches the given resource kind and blocks until the desired number of pods are // ready or the context expires (is cancelled or times out). func (c *Kubeclient) WaitFor(ctx context.Context, condition WaitCondition, resource ResourceWaiter, namespace, name string) error { @@ -370,6 +404,74 @@ loop: } } +// WaitForEvent watches the EventList as long as the Event corresponding to the WaitCondition occurred after calling the function. It reimplements WaitFor() specifically designed to wait for ocuring events. +func (c *Kubeclient) WaitForEvent(ctx context.Context, condition WaitEventCondition, resource ResourceWaiter, namespace, name string) error { + // StartingPoint is saved right here to avoid the processing of past events in the checking function! This was introduced, because otherwise calling waitForEvent multiple times + // resulted in reusing events with the same timestamp. + startingPoint := time.Now() + retryCounter := 30 +retryLoop: + for { + // Watcher which preprocesses the eventList for the defined resource, based on the involvedObject name and the resource kind. + watcher, err := c.Client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{FieldSelector: "involvedObject.name=" + name, TypeMeta: metav1.TypeMeta{Kind: resource.kind()}}) + if err != nil { + // If the server is down (because K3s was restarted), wait for a + // second and try again. + retryCounter-- + if retryCounter != 0 { + sleep, cancel := context.WithTimeout(ctx, time.Second*1) + defer cancel() + <-sleep.Done() + continue retryLoop + } + return err + } + for { + evt, ok := <-watcher.ResultChan() + + if !ok { + origErr := ctx.Err() + if origErr == nil { + retryCounter-- + if retryCounter != 0 { + continue retryLoop + } + return fmt.Errorf("watcher for %s %s/%s unexpectedly closed", resource.kind(), namespace, name) + } + logger := c.log.With("namespace", namespace) + logger.Error("failed to wait for event", "condition", condition, "kind", resource, "name", name, "contextErr", ctx.Err()) + if ctx.Err() != context.DeadlineExceeded { + return ctx.Err() + } + // Fetch and print debug information. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + pods, err := resource.getPods(ctx, c, namespace, name) //nolint:contextcheck // The parent context expired. + if err != nil { + logger.Error("could not fetch pods for resource", "kind", resource.kind(), "name", name, "error", err) + return origErr + } + for _, pod := range pods { + if !isPodReady(&pod) { + logger.Debug("pod not ready", "name", pod.Name, "status", c.toJSON(pod.Status)) + } + } + return origErr + } + switch condition { + case StartingBlocked: + blocked, err := c.checkIfStartingBlocked(name, namespace, resource, evt, startingPoint) + if err != nil { + return err + } + if blocked { + return nil + } + } + } + } +} + func (c *Kubeclient) toJSON(a any) string { s, err := json.Marshal(a) if err != nil { diff --git a/e2e/policy/policy_test.go b/e2e/policy/policy_test.go index bdd5e91a49..4d1eece77c 100644 --- a/e2e/policy/policy_test.go +++ b/e2e/policy/policy_test.go @@ -32,6 +32,8 @@ const ( opensslBackend = "openssl-backend" opensslFrontend = "openssl-frontend" coordinator = "coordinator" + // Persistent pod identifier of StatefulSet Coordinator is used. + coordinatorPod = "coordinator-0" ) var ( @@ -58,6 +60,35 @@ func TestPolicy(t *testing.T) { ct.Init(t, resources) + // Apply deployment using default policies + require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests") + + t.Run("check containers without policy annotation do not start", func(t *testing.T) { + require := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), ct.FactorPlatformTimeout(2*time.Minute)) + defer cancel() + + c := kubeclient.NewForTest(t) + + t.Log("Waiting to ensure container start up failed") + + // waiting for 2 ocurrences of the event + for range 2 { + err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod) + require.NoError(err) + } + t.Log("Restarting container") + + require.NoError(c.Restart(ctx, kubeclient.Deployment{}, ct.Namespace, coordinator)) + t.Log("Waiting to ensure container start up failed") + + for range 2 { + err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod) + require.NoError(err) + } + }) + + // initial deployment with pod allowed require.True(t, t.Run("generate", ct.Generate), "contrast generate needs to succeed for subsequent tests")