Skip to content

Commit

Permalink
Add PriorityClass (#145)
Browse files Browse the repository at this point in the history
* add priorityClass

* add references to pc in manager

* add `eventing` prefix to priorityClass name

* add PriorityClassName to epp deployment

* Add test

* add more test cases

* add tests for jobs

* add priorityClassName to cronjob and job

* remove colo

* test em deployment

* fix spelling
  • Loading branch information
friedrichwilken authored Oct 18, 2023
1 parent d7efc4e commit 05959b3
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 20 deletions.
1 change: 1 addition & 0 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
resources:
- manager.yaml
- service.yaml
- priority-class.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
Expand Down
1 change: 1 addition & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
app.kubernetes.io/instance: eventing-manager
app.kubernetes.io/component: eventing-manager
spec:
priorityClassName: "eventing-manager-priority-class"
# TODO(user): Uncomment the following code to configure the nodeAffinity expression
# according to the platforms which are supported by your solution.
# It is considered best practice to support multiple architectures. You can
Expand Down
7 changes: 7 additions & 0 deletions config/manager/priority-class.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: eventing-manager-priority-class
value: 2000000
globalDefault: false
description: "Scheduling priority of the Eventing-Manager module. Must not be blocked by unschedulable user workloads."
1 change: 1 addition & 0 deletions config/webhook/cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spec:
annotations:
sidecar.istio.io/inject: "false"
spec:
priorityClassName: eventing-manager-priority-class
restartPolicy: Never
containers:
- name: api-gateway
Expand Down
1 change: 1 addition & 0 deletions config/webhook/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spec:
annotations:
sidecar.istio.io/inject: "false"
spec:
priorityClassName: eventing-manager-priority-class
restartPolicy: Never
containers:
- name: api-gateway
Expand Down
12 changes: 6 additions & 6 deletions hack/e2e/common/testenvironment/test_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (te *TestEnvironment) CreateTestNamespace() error {

func (te *TestEnvironment) DeleteTestNamespace() error {
return common.Retry(FewAttempts, Interval, func() error {
// It's fine if the Namespace not exists.
// It's fine if the Namespace does not exist.
return client.IgnoreNotFound(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace)))
})
}
Expand Down Expand Up @@ -122,13 +122,13 @@ func (te *TestEnvironment) InitSinkClient() {

func (te *TestEnvironment) CreateAllSubscriptions() error {
ctx := context.TODO()
// create v1alpha1 subscriptions if not exists.
// Create v1alpha1 subscriptions if not exists.
err := te.CreateV1Alpha1Subscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest())
if err != nil {
return err
}

// create v1alpha2 subscriptions if not exists.
// Create v1alpha2 subscriptions if not exists.
return te.CreateV1Alpha2Subscriptions(ctx, fixtures.V1Alpha2SubscriptionsToTest())
}

Expand All @@ -140,7 +140,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error {
}
}

// delete v1alpha2 subscriptions if not exists.
// Delete v1alpha2 subscriptions if not exists.
for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() {
if err := te.DeleteSubscriptionFromK8s(subToTest.Name, te.TestConfigs.TestNamespace); err != nil {
return err
Expand All @@ -151,7 +151,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error {

func (te *TestEnvironment) WaitForAllSubscriptions() error {
ctx := context.TODO()
// wait for v1alpha1 subscriptions to get ready.
// Wait for v1alpha1 subscriptions to get ready.
err := te.WaitForSubscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest())
if err != nil {
return err
Expand All @@ -167,7 +167,7 @@ func (te *TestEnvironment) CreateV1Alpha1Subscriptions(ctx context.Context, subL
newSub := subInfo.ToSubscriptionV1Alpha1(te.TestConfigs.SubscriptionSinkURL, te.TestConfigs.TestNamespace)
return client.IgnoreAlreadyExists(te.K8sClient.Create(ctx, newSub))
})
// return error if all retries are exhausted.
// Return error if all retries are exhausted.
if err != nil {
return err
}
Expand Down
97 changes: 84 additions & 13 deletions hack/e2e/setup/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var testEnvironment *testenvironment.TestEnvironment
func TestMain(m *testing.M) {
testEnvironment = testenvironment.NewTestEnvironment()

// create test namespace,
// Create a test namespace.
if err := testEnvironment.CreateTestNamespace(); err != nil {
testEnvironment.Logger.Fatal(err.Error())
}
Expand All @@ -53,12 +53,11 @@ func TestMain(m *testing.M) {
}

// Create the Eventing CR used for testing.

if err := testEnvironment.SetupEventingCR(); err != nil {
testEnvironment.Logger.Fatal(err.Error())
}

// wait for a testenvironment.Interval for reconciliation to update status.
// Wait for a testenvironment.Interval for reconciliation to update status.
time.Sleep(testenvironment.Interval)

// Wait for Eventing CR to get ready.
Expand All @@ -77,10 +76,7 @@ func Test_WebhookServerCertSecret(t *testing.T) {
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.CoreV1().Secrets(NamespaceName).Get(ctx, WebhookServerCertSecretName, metav1.GetOptions{})
if getErr != nil {
return getErr
}
return nil
return getErr
})
require.NoError(t, err)
}
Expand All @@ -90,10 +86,20 @@ func Test_WebhookServerCertJob(t *testing.T) {
t.Parallel()
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
job, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
if getErr != nil {
return getErr
}

// Check if the PriorityClassName was set correctly.
if job.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("Job '%s' was expected to have PriorityClassName '%s' but has '%s'",
job.GetName(),
eventing.PriorityClassName,
job.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand All @@ -104,10 +110,24 @@ func Test_WebhookServerCertCronJob(t *testing.T) {
t.Parallel()
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
job, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get(
ctx,
WebhookServerCertJobName,
metav1.GetOptions{},
)
if getErr != nil {
return getErr
}

// Check if the PriorityClassName was set correctly.
if job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("ChronJob '%s' was expected to have PriorityClassName '%s' but has '%s'",
job.GetName(),
eventing.PriorityClassName,
job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand Down Expand Up @@ -240,6 +260,15 @@ func Test_PublisherProxyDeployment(t *testing.T) {
return err
}

if gotDeployment.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf(
"error while checking deployment '%s'; PriorityClasssName was supposed to be %s, but was %s",
gotDeployment.GetName(),
eventing.PriorityClassName,
gotDeployment.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand All @@ -254,7 +283,7 @@ func Test_PublisherProxyPods(t *testing.T) {

ctx := context.TODO()
eventingCR := EventingCR(eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType))
// RetryGet the Pods and test them.
// Retry to get the Pods and test them.
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
// get publisher deployment
gotDeployment, getErr := testEnvironment.GetDeployment(eventing.GetPublisherDeploymentName(*eventingCR), NamespaceName)
Expand All @@ -280,7 +309,7 @@ func Test_PublisherProxyPods(t *testing.T) {
)
}

// Go through all Pods, check its spec. It should be same as defined in Eventing CR
// Go through all Pods, check its spec. It should be same as defined in Eventing CR.
for _, pod := range pods.Items {
// find the container.
container := FindContainerInPod(pod, PublisherContainerName)
Expand All @@ -297,13 +326,22 @@ func Test_PublisherProxyPods(t *testing.T) {
)
}

// check if the ENV `BACKEND` is defined correctly.
// Check if the PriorityClassName was set as expected.
if pod.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("'PriorityClassName' of Pod %v should be %v but was %v",
pod.GetName(),
eventing.PriorityClassName,
pod.Spec.PriorityClassName,
)
}

// Check if the ENV `BACKEND` is defined correctly.
wantBackendENVValue := "nats"
if eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType) == eventingv1alpha1.EventMeshBackendType {
wantBackendENVValue = "beb"
}

// get value of ENV `BACKEND`
// Get value of ENV `BACKEND`.
gotBackendENVValue := ""
for _, envVar := range container.Env {
if envVar.Name == "BACKEND" {
Expand All @@ -325,3 +363,36 @@ func Test_PublisherProxyPods(t *testing.T) {
})
require.NoError(t, err)
}

func Test_PriorityClass(t *testing.T) {
t.Parallel()

ctx := context.TODO()

// Check if the PriorityClass exists in the cluster.
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.SchedulingV1().PriorityClasses().Get(
ctx, eventing.PriorityClassName, metav1.GetOptions{})
return getErr
})
require.Nil(t, err, fmt.Errorf("error while fetching PriorityClass: %v", err))

// Check if the Eventing-Manager Deployment has the right PriorityClassName. This implicits that the
// corresponding Pod also has the right PriorityClassName.
err = Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
deploy, getErr := testEnvironment.GetDeployment(ManagerDeploymentName, NamespaceName)
if getErr != nil {
return getErr
}

if deploy.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("deployment '%s' should have the PriorityClassName %s but was %s",
deploy.GetName(),
eventing.PriorityClassName,
deploy.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
}
10 changes: 10 additions & 0 deletions pkg/eventing/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (

PublisherSecretEMSURLKey = "ems-publish-url"
PublisherSecretBEBNamespaceKey = "beb-namespace"

PriorityClassName = "eventing-manager-priority-class"
)

var (
Expand All @@ -57,6 +59,7 @@ func newNATSPublisherDeployment(
WithNATSEnvVars(natsConfig, publisherConfig, eventing),
WithLogEnvVars(publisherConfig, eventing),
WithAffinity(GetPublisherDeploymentName(*eventing)),
WithPriorityClassName(PriorityClassName),
)
}

Expand All @@ -70,6 +73,7 @@ func newEventMeshPublisherDeployment(
WithContainers(publisherConfig, eventing),
WithBEBEnvVars(GetPublisherDeploymentName(*eventing), publisherConfig, eventing),
WithLogEnvVars(publisherConfig, eventing),
WithPriorityClassName(PriorityClassName),
)
}

Expand Down Expand Up @@ -136,6 +140,12 @@ func WithLabels(publisherName string, backendType v1alpha1.BackendType) DeployOp
}
}

func WithPriorityClassName(name string) DeployOpt {
return func(deployment *appsv1.Deployment) {
deployment.Spec.Template.Spec.PriorityClassName = name
}
}

func WithAffinity(publisherName string) DeployOpt {
return func(d *appsv1.Deployment) {
d.Spec.Template.Spec.Affinity = &v1.Affinity{
Expand Down
3 changes: 2 additions & 1 deletion pkg/eventing/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func Test_NewDeploymentSecurityContext(t *testing.T) {
testutils.WithEventingCRNamespace("test-namespace"),
)
deployment := newDeployment(givenEventing, config.PublisherConfig,
WithContainers(config.PublisherConfig, givenEventing))
WithContainers(config.PublisherConfig, givenEventing),
)

// when
podSecurityContext := deployment.Spec.Template.Spec.SecurityContext
Expand Down

0 comments on commit 05959b3

Please sign in to comment.