Skip to content

Commit

Permalink
e2e: default policy test
Browse files Browse the repository at this point in the history
e2e: Add waiting mechanism and correct event watching/listing to default policy test

Structure changes to deploy.go
  • Loading branch information
jmxnzo committed Nov 18, 2024
1 parent 3a82af8 commit 728f791
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
109 changes: 109 additions & 0 deletions e2e/internal/kubeclient/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
Ready
// InitContainersRunning waits until all initial containers of all pods of the resource are running.
InitContainersRunning
// StartingBlocked waits until a specific FailedCreatePodSandBox Event is detected which indicates that the container does not start.
StartingBlocked
)

// ResourceWaiter is implemented by resources that can be waited for with WaitFor.
Expand Down Expand Up @@ -225,6 +227,39 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace
return true, nil
}

// Checks whether the FailedCreatePodSandBox Event occurred which indicates that the SetPolicy request is rejected and the Kata Shim fails to start the Pod sandbox.
func (c *Kubeclient) checkIfStartingBlocked(ctx context.Context, name string, namespace string, resource ResourceWaiter, evt watch.Event, startingPoint time.Time) (bool, error) {
switch evt.Type {
case watch.Deleted:
return false, fmt.Errorf("%s %s/%s was deleted while waiting for it", resource.kind(), namespace, name)
case watch.Error:
return false, fmt.Errorf("Watcher of %s %s/%s received an error event", resource.kind(), namespace, name)
case watch.Added:
fallthrough
case watch.Modified:
logger := c.log.With("namespace", namespace)
eventList, err := c.Client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{FieldSelector: "involvedObject.name=" + name, TypeMeta: metav1.TypeMeta{Kind: resource.kind()}})
if err != nil {
return false, err
}

// Expected event:
// Type: Warning
// Reason: FailedCreatePodSandBox
// From: pod/coordinator-0
// Message: Failed to create pod sandbox: rpc error: code = Unknown desc = failed to create containerd task: failed to create shim task: "UpdateInterfaceRequest: internal error query did not produce any values": unknown
for _, event := range eventList.Items {
if (event.LastTimestamp.After(startingPoint)) && (event.Reason == "FailedCreatePodSandBox") && (event.Message == "Failed to create pod sandbox: rpc error: code = Unknown desc = failed to create containerd task: failed to create shim task: \"UpdateInterfaceRequest: internal error query did not produce any values\": unknown") {
logger.Debug("Pod did not start", "name", name, "reason", event.Reason, "timestamp of failure", event.LastTimestamp.String())
return true, nil
}
}
return false, nil
default:
return false, fmt.Errorf("unexpected watch event while waiting for %s %s/%s: type=%s, object=%#v", resource.kind(), namespace, name, evt.Type, evt.Object)
}
}

// WaitFor watches the given resource kind and blocks until the desired number of pods are
// ready or the context expires (is cancelled or times out).
func (c *Kubeclient) WaitFor(ctx context.Context, condition WaitCondition, resource ResourceWaiter, namespace, name string) error {
Expand Down Expand Up @@ -370,6 +405,80 @@ loop:
}
}

// WaitForEvent watches the EventList as long as the Event corresponding to the WaitCondition occurred after calling the function.
func (c *Kubeclient) WaitForEvent(ctx context.Context, condition WaitCondition, resource ResourceWaiter, namespace, name string) error {
// The loop logic of this function is follows the pattern of waitFor(). Because we need to watch the EventList which should not implement the ResourceWaiter interface,
// we can not fall back to the resource watcher getter functions. Thus this function reimplements WaitFor() specifically designed to wait for specific ocuring events.

// StartingPoint is saved right here to ensure filtering only Events that happened after calling the function! The watcher keeps track of all events on the specific resource and
// as well calls the checking function when the EventList was changed regardless of the Event of our interest. Therefore avoidance of processing past Events has to be ensured, when
// checking the Event occurrence.
startingPoint := time.Now()
retryCounter := 30
retryLoop:
for {
// Watcher which preprocesses the eventList for the defined resource, based on the involvedObject name and the resource kind.
watcher, err := c.Client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{FieldSelector: "involvedObject.name=" + name, TypeMeta: metav1.TypeMeta{Kind: resource.kind()}})
if err != nil {
// If the server is down (because K3s was restarted), wait for a
// second and try again.
retryCounter--
if retryCounter != 0 {
sleep, cancel := context.WithTimeout(ctx, time.Second*1)
defer cancel()
<-sleep.Done()
continue retryLoop
}
return err
}
for {
evt, ok := <-watcher.ResultChan()

if !ok {
origErr := ctx.Err()
if origErr == nil {
retryCounter--
if retryCounter != 0 {
continue retryLoop
}
return fmt.Errorf("watcher for %s %s/%s unexpectedly closed", resource.kind(), namespace, name)
}
logger := c.log.With("namespace", namespace)
logger.Error("failed to wait for event", "condition", condition, "kind", resource, "name", name, "contextErr", ctx.Err())
if ctx.Err() != context.DeadlineExceeded {
return ctx.Err()
}
// Fetch and print debug information.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
pods, err := resource.getPods(ctx, c, namespace, name)
if err != nil {
logger.Error("could not fetch pods for resource", "kind", resource.kind(), "name", name, "error", err)
return origErr
}
for _, pod := range pods {
if !isPodReady(&pod) {
logger.Debug("pod not ready", "name", pod.Name, "status", c.toJSON(pod.Status))
}
}
return origErr
}
// ToDo maybe predefine the event, we are searching for and hand it in as parameter, thus no checking sub-methods are not required
// pro of this, we can wait for multiple events which might not happen sequentially
switch condition {
case StartingBlocked:
blocked, err := c.checkIfStartingBlocked(ctx, name, namespace, resource, evt, startingPoint)
if err != nil {
return err
}
if blocked {
return nil
}
}
}
}
}

func (c *Kubeclient) toJSON(a any) string {
s, err := json.Marshal(a)
if err != nil {
Expand Down
87 changes: 87 additions & 0 deletions e2e/nopolicy/nopolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 Edgeless Systems GmbH
// SPDX-License-Identifier: AGPL-3.0-only

//go:build e2e

package nopolicy

import (
"context"
"flag"
"os"
"testing"
"time"

"github.com/edgelesssys/contrast/e2e/internal/contrasttest"
"github.com/edgelesssys/contrast/e2e/internal/kubeclient"
"github.com/edgelesssys/contrast/internal/kuberesource"
"github.com/edgelesssys/contrast/internal/manifest"
"github.com/edgelesssys/contrast/internal/platforms"
"github.com/stretchr/testify/require"
)

const (
// Persistent pod identifier of StatefulSet Coordinator is used.
coordinatorPod = "coordinator-0"
coordinator = "coordinator"
)

// Namespace the tests are executed in.
var (
imageReplacementsFile, namespaceFile, platformStr string
skipUndeploy bool
)

// TestNoPolicy runs e2e test on the example of coordinator pod of the OpenSSL deployment, to ensure that container without policy annotation (default policy) do not start.
func TestNoPolicy(t *testing.T) {
platform, err := platforms.FromString(platformStr)
require.NoError(t, err)
ct := contrasttest.New(t, imageReplacementsFile, namespaceFile, platform, skipUndeploy)
runtimeHandler, err := manifest.RuntimeHandler(platform)
require.NoError(t, err)

// Deployment resources are limited to Coordinator only.
resources := kuberesource.CoordinatorBundle()
resources = kuberesource.PatchRuntimeHandlers(resources, runtimeHandler)
resources = kuberesource.AddPortForwarders(resources)
ct.Init(t, resources)

// Apply deployment using default policies
require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests")

t.Run("check containers without policy annotation do not start", func(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), ct.FactorPlatformTimeout(2*time.Minute))
defer cancel()

c := kubeclient.NewForTest(t)

t.Logf("Cwd: %s", ct.WorkDir)
t.Log("Waiting to ensure container start up failed")

// waiting for 3 ocurrences of the event
for range 3 {
err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(err)
}
t.Log("Restarting container")

require.NoError(c.Restart(ctx, kubeclient.Deployment{}, ct.Namespace, coordinator))
t.Log("Waiting to ensure container start up failed")

for range 3 {
err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(err)
}
})
}

func TestMain(m *testing.M) {
flag.StringVar(&imageReplacementsFile, "image-replacements", "", "path to image replacements file")
flag.StringVar(&namespaceFile, "namespace-file", "", "file to store the namespace in")
flag.StringVar(&platformStr, "platform", "", "Deployment platform")
flag.BoolVar(&skipUndeploy, "skip-undeploy", false, "skip undeploy step in the test")
flag.Parse()

os.Exit(m.Run())
}
1 change: 1 addition & 0 deletions packages/by-name/contrast/package.nix
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ let
"e2e/servicemesh"
"e2e/release"
"e2e/policy"
"e2e/nopolicy"
"e2e/workloadsecret"
"e2e/volumestatefulset"
"e2e/regression"
Expand Down

0 comments on commit 728f791

Please sign in to comment.