Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e: default policy test #1007

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions e2e/internal/kubeclient/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (
InitContainersRunning
)

// WaitEventCondition is an enum type for the possible wait conditions when using `kubeclient.WaitForEvent`.
type WaitEventCondition int

const (
_ WaitEventCondition = iota
// StartingBlocked waits until a specific FailedCreatePodSandBox Event is detected which indicates that the container does not start.
StartingBlocked
)

// ResourceWaiter is implemented by resources that can be waited for with WaitFor.
type ResourceWaiter interface {
kind() string
Expand Down Expand Up @@ -225,6 +234,32 @@ func (c *Kubeclient) checkIfRunning(ctx context.Context, name string, namespace
return true, nil
}

// IsStartingBlocked checks whether the FailedCreatePodSandBox Event occurred which indicates that the SetPolicy request is rejected and the Kata Shim fails to start the Pod sandbox.
func (c *Kubeclient) IsStartingBlocked(name string, namespace string, resource ResourceWaiter, evt watch.Event, startingPoint time.Time) (bool, error) {
switch evt.Type {
case watch.Error:
return false, fmt.Errorf("Watcher of %s %s/%s received an error event", resource.kind(), namespace, name)
case watch.Added:
fallthrough
case watch.Modified:
logger := c.log.With("namespace", namespace)
event, ok := evt.Object.(*corev1.Event)
if !ok {
return false, fmt.Errorf("watcher received unexpected type %T", evt.Object)
}

// Expected event: Reason: FailedCreatePodSandBox
// TODO(jmxnzo): Add patch to the existing error message in Kata Shim, to specifically allow detecting start-up of containers without policy annotation.
if (event.LastTimestamp.After(startingPoint)) && event.Reason == "FailedCreatePodSandBox" {
logger.Debug("Pod did not start", "name", name, "reason", event.Reason, "timestamp of failure", event.LastTimestamp.String())
return true, nil
}
return false, nil
default:
return false, fmt.Errorf("unexpected watch event while waiting for %s %s/%s: type=%s, object=%#v", resource.kind(), namespace, name, evt.Type, evt.Object)
}
}

// WaitFor watches the given resource kind and blocks until the desired number of pods are
// ready or the context expires (is cancelled or times out).
func (c *Kubeclient) WaitFor(ctx context.Context, condition WaitCondition, resource ResourceWaiter, namespace, name string) error {
Expand Down Expand Up @@ -370,6 +405,74 @@ loop:
}
}

// WaitForEvent watches the EventList as long as the Event corresponding to the WaitCondition occurred after calling the function. It reimplements WaitFor() specifically designed to wait for ocuring events.
func (c *Kubeclient) WaitForEvent(ctx context.Context, condition WaitEventCondition, resource ResourceWaiter, namespace, name string) error {
// StartingPoint is saved right here to avoid the processing of past events in the checking function! This was introduced, because otherwise calling waitForEvent multiple times
// resulted in reusing events with the same timestamp.
startingPoint := time.Now()
retryCounter := 30
retryLoop:
for {
// Watcher which preprocesses the eventList for the defined resource, based on the involvedObject name and the resource kind.
watcher, err := c.Client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{FieldSelector: "involvedObject.name=" + name, TypeMeta: metav1.TypeMeta{Kind: resource.kind()}})
if err != nil {
// If the server is down (because K3s was restarted), wait for a
// second and try again.
retryCounter--
if retryCounter != 0 {
sleep, cancel := context.WithTimeout(ctx, time.Second*1)
defer cancel()
<-sleep.Done()
continue retryLoop
}
return err
}
for {
evt, ok := <-watcher.ResultChan()

if !ok {
origErr := ctx.Err()
if origErr == nil {
retryCounter--
if retryCounter != 0 {
continue retryLoop
}
return fmt.Errorf("watcher for %s %s/%s unexpectedly closed", resource.kind(), namespace, name)
}
logger := c.log.With("namespace", namespace)
logger.Error("failed to wait for event", "condition", condition, "kind", resource, "name", name, "contextErr", ctx.Err())
if ctx.Err() != context.DeadlineExceeded {
return ctx.Err()
}
// Fetch and print debug information.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
pods, err := resource.getPods(ctx, c, namespace, name) //nolint:contextcheck // The parent context expired.
if err != nil {
logger.Error("could not fetch pods for resource", "kind", resource.kind(), "name", name, "error", err)
return origErr
}
for _, pod := range pods {
if !isPodReady(&pod) {
logger.Debug("pod not ready", "name", pod.Name, "status", c.toJSON(pod.Status))
}
}
return origErr
}
switch condition {
case StartingBlocked:
blocked, err := c.IsStartingBlocked(name, namespace, resource, evt, startingPoint)
if err != nil {
return err
}
if blocked {
return nil
}
}
}
}
}

func (c *Kubeclient) toJSON(a any) string {
s, err := json.Marshal(a)
if err != nil {
Expand Down
27 changes: 26 additions & 1 deletion e2e/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
opensslBackend = "openssl-backend"
opensslFrontend = "openssl-frontend"
coordinator = "coordinator"
// Persistent pod identifier of StatefulSet Coordinator is used.
coordinatorPod = "coordinator-0"
)

var (
Expand All @@ -58,11 +60,34 @@ func TestPolicy(t *testing.T) {

ct.Init(t, resources)

// Apply deployment using default policies
require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests")

t.Run("check containers without policy annotation do not start", func(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), ct.FactorPlatformTimeout(2*time.Minute))
defer cancel()

c := kubeclient.NewForTest(t)

t.Log("Waiting to ensure container start up failed")

err := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(err)

t.Log("Restarting container")

require.NoError(c.Restart(ctx, kubeclient.Deployment{}, ct.Namespace, coordinator))
t.Log("Waiting to ensure container start up failed")

errRst := c.WaitForEvent(ctx, kubeclient.StartingBlocked, kubeclient.Pod{}, ct.Namespace, coordinatorPod)
require.NoError(errRst)
})

// initial deployment with pod allowed
require.True(t, t.Run("generate", ct.Generate), "contrast generate needs to succeed for subsequent tests")

require.True(t, t.Run("apply", ct.Apply), "Kubernetes resources need to be applied for subsequent tests")

require.True(t, t.Run("set", ct.Set), "contrast set needs to succeed for subsequent tests")
require.True(t, t.Run("contrast verify", ct.Verify), "contrast verify needs to succeed for subsequent tests")

Expand Down