From 05959b341ad26531f2221bda634b0ec1c1d6aefd Mon Sep 17 00:00:00 2001 From: Friedrich Date: Wed, 18 Oct 2023 08:52:57 +0200 Subject: [PATCH] Add PriorityClass (#145) * add priorityClass * add references to pc in manager * add `eventing` prefix to priorityClass name * add PriorityClassName to epp deployment * Add test * add more test cases * add tests for jobs * add priorityClassName to cronjob and job * remove colo * test em deployment * fix spelling --- config/manager/kustomization.yaml | 1 + config/manager/manager.yaml | 1 + config/manager/priority-class.yaml | 7 ++ config/webhook/cronjob.yaml | 1 + config/webhook/job.yaml | 1 + .../testenvironment/test_environment.go | 12 +-- hack/e2e/setup/setup_test.go | 97 ++++++++++++++++--- pkg/eventing/deployment.go | 10 ++ pkg/eventing/deployment_test.go | 3 +- 9 files changed, 113 insertions(+), 20 deletions(-) create mode 100644 config/manager/priority-class.yaml diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 08f5dbbe..72d61ce9 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -1,6 +1,7 @@ resources: - manager.yaml - service.yaml +- priority-class.yaml apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 26254ce4..66e8a51b 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -29,6 +29,7 @@ spec: app.kubernetes.io/instance: eventing-manager app.kubernetes.io/component: eventing-manager spec: + priorityClassName: "eventing-manager-priority-class" # TODO(user): Uncomment the following code to configure the nodeAffinity expression # according to the platforms which are supported by your solution. # It is considered best practice to support multiple architectures. You can diff --git a/config/manager/priority-class.yaml b/config/manager/priority-class.yaml new file mode 100644 index 00000000..209b6cb5 --- /dev/null +++ b/config/manager/priority-class.yaml @@ -0,0 +1,7 @@ +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: eventing-manager-priority-class +value: 2000000 +globalDefault: false +description: "Scheduling priority of the Eventing-Manager module. Must not be blocked by unschedulable user workloads." diff --git a/config/webhook/cronjob.yaml b/config/webhook/cronjob.yaml index 139a7d43..918016a0 100644 --- a/config/webhook/cronjob.yaml +++ b/config/webhook/cronjob.yaml @@ -19,6 +19,7 @@ spec: annotations: sidecar.istio.io/inject: "false" spec: + priorityClassName: eventing-manager-priority-class restartPolicy: Never containers: - name: api-gateway diff --git a/config/webhook/job.yaml b/config/webhook/job.yaml index f8297e60..02368432 100644 --- a/config/webhook/job.yaml +++ b/config/webhook/job.yaml @@ -15,6 +15,7 @@ spec: annotations: sidecar.istio.io/inject: "false" spec: + priorityClassName: eventing-manager-priority-class restartPolicy: Never containers: - name: api-gateway diff --git a/hack/e2e/common/testenvironment/test_environment.go b/hack/e2e/common/testenvironment/test_environment.go index 2f4fd413..2e665f20 100644 --- a/hack/e2e/common/testenvironment/test_environment.go +++ b/hack/e2e/common/testenvironment/test_environment.go @@ -90,7 +90,7 @@ func (te *TestEnvironment) CreateTestNamespace() error { func (te *TestEnvironment) DeleteTestNamespace() error { return common.Retry(FewAttempts, Interval, func() error { - // It's fine if the Namespace not exists. + // It's fine if the Namespace does not exist. return client.IgnoreNotFound(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace))) }) } @@ -122,13 +122,13 @@ func (te *TestEnvironment) InitSinkClient() { func (te *TestEnvironment) CreateAllSubscriptions() error { ctx := context.TODO() - // create v1alpha1 subscriptions if not exists. + // Create v1alpha1 subscriptions if not exists. err := te.CreateV1Alpha1Subscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest()) if err != nil { return err } - // create v1alpha2 subscriptions if not exists. + // Create v1alpha2 subscriptions if not exists. return te.CreateV1Alpha2Subscriptions(ctx, fixtures.V1Alpha2SubscriptionsToTest()) } @@ -140,7 +140,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error { } } - // delete v1alpha2 subscriptions if not exists. + // Delete v1alpha2 subscriptions if not exists. for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() { if err := te.DeleteSubscriptionFromK8s(subToTest.Name, te.TestConfigs.TestNamespace); err != nil { return err @@ -151,7 +151,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error { func (te *TestEnvironment) WaitForAllSubscriptions() error { ctx := context.TODO() - // wait for v1alpha1 subscriptions to get ready. + // Wait for v1alpha1 subscriptions to get ready. err := te.WaitForSubscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest()) if err != nil { return err @@ -167,7 +167,7 @@ func (te *TestEnvironment) CreateV1Alpha1Subscriptions(ctx context.Context, subL newSub := subInfo.ToSubscriptionV1Alpha1(te.TestConfigs.SubscriptionSinkURL, te.TestConfigs.TestNamespace) return client.IgnoreAlreadyExists(te.K8sClient.Create(ctx, newSub)) }) - // return error if all retries are exhausted. + // Return error if all retries are exhausted. if err != nil { return err } diff --git a/hack/e2e/setup/setup_test.go b/hack/e2e/setup/setup_test.go index 4ba00974..d30387a4 100644 --- a/hack/e2e/setup/setup_test.go +++ b/hack/e2e/setup/setup_test.go @@ -36,7 +36,7 @@ var testEnvironment *testenvironment.TestEnvironment func TestMain(m *testing.M) { testEnvironment = testenvironment.NewTestEnvironment() - // create test namespace, + // Create a test namespace. if err := testEnvironment.CreateTestNamespace(); err != nil { testEnvironment.Logger.Fatal(err.Error()) } @@ -53,12 +53,11 @@ func TestMain(m *testing.M) { } // Create the Eventing CR used for testing. - if err := testEnvironment.SetupEventingCR(); err != nil { testEnvironment.Logger.Fatal(err.Error()) } - // wait for a testenvironment.Interval for reconciliation to update status. + // Wait for a testenvironment.Interval for reconciliation to update status. time.Sleep(testenvironment.Interval) // Wait for Eventing CR to get ready. @@ -77,10 +76,7 @@ func Test_WebhookServerCertSecret(t *testing.T) { ctx := context.TODO() err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error { _, getErr := testEnvironment.K8sClientset.CoreV1().Secrets(NamespaceName).Get(ctx, WebhookServerCertSecretName, metav1.GetOptions{}) - if getErr != nil { - return getErr - } - return nil + return getErr }) require.NoError(t, err) } @@ -90,10 +86,20 @@ func Test_WebhookServerCertJob(t *testing.T) { t.Parallel() ctx := context.TODO() err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error { - _, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{}) + job, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{}) if getErr != nil { return getErr } + + // Check if the PriorityClassName was set correctly. + if job.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName { + return fmt.Errorf("Job '%s' was expected to have PriorityClassName '%s' but has '%s'", + job.GetName(), + eventing.PriorityClassName, + job.Spec.Template.Spec.PriorityClassName, + ) + } + return nil }) require.NoError(t, err) @@ -104,10 +110,24 @@ func Test_WebhookServerCertCronJob(t *testing.T) { t.Parallel() ctx := context.TODO() err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error { - _, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{}) + job, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get( + ctx, + WebhookServerCertJobName, + metav1.GetOptions{}, + ) if getErr != nil { return getErr } + + // Check if the PriorityClassName was set correctly. + if job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName { + return fmt.Errorf("ChronJob '%s' was expected to have PriorityClassName '%s' but has '%s'", + job.GetName(), + eventing.PriorityClassName, + job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName, + ) + } + return nil }) require.NoError(t, err) @@ -240,6 +260,15 @@ func Test_PublisherProxyDeployment(t *testing.T) { return err } + if gotDeployment.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName { + return fmt.Errorf( + "error while checking deployment '%s'; PriorityClasssName was supposed to be %s, but was %s", + gotDeployment.GetName(), + eventing.PriorityClassName, + gotDeployment.Spec.Template.Spec.PriorityClassName, + ) + } + return nil }) require.NoError(t, err) @@ -254,7 +283,7 @@ func Test_PublisherProxyPods(t *testing.T) { ctx := context.TODO() eventingCR := EventingCR(eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType)) - // RetryGet the Pods and test them. + // Retry to get the Pods and test them. err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error { // get publisher deployment gotDeployment, getErr := testEnvironment.GetDeployment(eventing.GetPublisherDeploymentName(*eventingCR), NamespaceName) @@ -280,7 +309,7 @@ func Test_PublisherProxyPods(t *testing.T) { ) } - // Go through all Pods, check its spec. It should be same as defined in Eventing CR + // Go through all Pods, check its spec. It should be same as defined in Eventing CR. for _, pod := range pods.Items { // find the container. container := FindContainerInPod(pod, PublisherContainerName) @@ -297,13 +326,22 @@ func Test_PublisherProxyPods(t *testing.T) { ) } - // check if the ENV `BACKEND` is defined correctly. + // Check if the PriorityClassName was set as expected. + if pod.Spec.PriorityClassName != eventing.PriorityClassName { + return fmt.Errorf("'PriorityClassName' of Pod %v should be %v but was %v", + pod.GetName(), + eventing.PriorityClassName, + pod.Spec.PriorityClassName, + ) + } + + // Check if the ENV `BACKEND` is defined correctly. wantBackendENVValue := "nats" if eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType) == eventingv1alpha1.EventMeshBackendType { wantBackendENVValue = "beb" } - // get value of ENV `BACKEND` + // Get value of ENV `BACKEND`. gotBackendENVValue := "" for _, envVar := range container.Env { if envVar.Name == "BACKEND" { @@ -325,3 +363,36 @@ func Test_PublisherProxyPods(t *testing.T) { }) require.NoError(t, err) } + +func Test_PriorityClass(t *testing.T) { + t.Parallel() + + ctx := context.TODO() + + // Check if the PriorityClass exists in the cluster. + err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error { + _, getErr := testEnvironment.K8sClientset.SchedulingV1().PriorityClasses().Get( + ctx, eventing.PriorityClassName, metav1.GetOptions{}) + return getErr + }) + require.Nil(t, err, fmt.Errorf("error while fetching PriorityClass: %v", err)) + + // Check if the Eventing-Manager Deployment has the right PriorityClassName. This implicits that the + // corresponding Pod also has the right PriorityClassName. + err = Retry(testenvironment.Attempts, testenvironment.Interval, func() error { + deploy, getErr := testEnvironment.GetDeployment(ManagerDeploymentName, NamespaceName) + if getErr != nil { + return getErr + } + + if deploy.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName { + return fmt.Errorf("deployment '%s' should have the PriorityClassName %s but was %s", + deploy.GetName(), + eventing.PriorityClassName, + deploy.Spec.Template.Spec.PriorityClassName, + ) + } + + return nil + }) +} diff --git a/pkg/eventing/deployment.go b/pkg/eventing/deployment.go index dd4ff52b..5b30a545 100644 --- a/pkg/eventing/deployment.go +++ b/pkg/eventing/deployment.go @@ -39,6 +39,8 @@ const ( PublisherSecretEMSURLKey = "ems-publish-url" PublisherSecretBEBNamespaceKey = "beb-namespace" + + PriorityClassName = "eventing-manager-priority-class" ) var ( @@ -57,6 +59,7 @@ func newNATSPublisherDeployment( WithNATSEnvVars(natsConfig, publisherConfig, eventing), WithLogEnvVars(publisherConfig, eventing), WithAffinity(GetPublisherDeploymentName(*eventing)), + WithPriorityClassName(PriorityClassName), ) } @@ -70,6 +73,7 @@ func newEventMeshPublisherDeployment( WithContainers(publisherConfig, eventing), WithBEBEnvVars(GetPublisherDeploymentName(*eventing), publisherConfig, eventing), WithLogEnvVars(publisherConfig, eventing), + WithPriorityClassName(PriorityClassName), ) } @@ -136,6 +140,12 @@ func WithLabels(publisherName string, backendType v1alpha1.BackendType) DeployOp } } +func WithPriorityClassName(name string) DeployOpt { + return func(deployment *appsv1.Deployment) { + deployment.Spec.Template.Spec.PriorityClassName = name + } +} + func WithAffinity(publisherName string) DeployOpt { return func(d *appsv1.Deployment) { d.Spec.Template.Spec.Affinity = &v1.Affinity{ diff --git a/pkg/eventing/deployment_test.go b/pkg/eventing/deployment_test.go index caf2dce7..d136c720 100644 --- a/pkg/eventing/deployment_test.go +++ b/pkg/eventing/deployment_test.go @@ -102,7 +102,8 @@ func Test_NewDeploymentSecurityContext(t *testing.T) { testutils.WithEventingCRNamespace("test-namespace"), ) deployment := newDeployment(givenEventing, config.PublisherConfig, - WithContainers(config.PublisherConfig, givenEventing)) + WithContainers(config.PublisherConfig, givenEventing), + ) // when podSecurityContext := deployment.Spec.Template.Spec.SecurityContext