Skip to content

Commit

Permalink
Remove Publisher resources when NATS module is disabled (#237)
Browse files Browse the repository at this point in the history
* Remove Publisher resources when NATS module is disabled

* added tests

* added integration tests

* added comments
  • Loading branch information
mfaizanse authored Nov 13, 2023
1 parent d8e77ef commit f0d0636
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 10 deletions.
9 changes: 7 additions & 2 deletions internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,13 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *eventin
_, err := r.kubeClient.GetCRD(ctx, k8s.NatsGVK.GroupResource().String())
if err != nil {
if apierrors.IsNotFound(err) {
err = fmt.Errorf("NATS module has to be installed: %v", err)
return ctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log)
// delete the publisher proxy resources, because the publisher deployment will go
// into CrashLoopBackOff.
log.Infof("NATS module not enabled, deleting publisher proxy resources")
delErr := r.eventingManager.DeletePublisherProxyResources(ctx, eventing)
// update the Eventing CR status.
notFoundErr := fmt.Errorf("NATS module has to be installed: %v", err)
return ctrl.Result{}, errors.Join(r.syncStatusWithNATSErr(ctx, eventing, notFoundErr, log), delErr)
}
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package controller_test

import (
"context"
"os"
"testing"

eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/eventing"
"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/kyma-project/eventing-manager/test/matchers"
"github.com/kyma-project/eventing-manager/test/utils"
natstestutils "github.com/kyma-project/nats-manager/testutils"
"github.com/onsi/gomega"

eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/eventing"
testutils "github.com/kyma-project/eventing-manager/test/utils/integration"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
)

const (
projectRootDir = "../../../../../"
)

var testEnvironment *testutils.TestEnvironment //nolint:gochecknoglobals // used in tests

// TestMain pre-hook and post-hook to run before and after all tests.
func TestMain(m *testing.M) {
// Note: The setup will provision a single K8s env and
// all the tests need to create and use a separate namespace

// setup env test
var err error
testEnvironment, err = testutils.NewTestEnvironment(testutils.TestEnvironmentConfig{
ProjectRootDir: projectRootDir,
CELValidationEnabled: false,
APIRuleCRDEnabled: true,
ApplicationRuleCRDEnabled: true,
NATSCRDEnabled: true,
AllowedEventingCR: nil,
})
if err != nil {
panic(err)
}

// run tests
code := m.Run()

// tear down test env
if err = testEnvironment.TearDown(); err != nil {
panic(err)
}

os.Exit(code)
}

// Test_DeletionOfPublisherResourcesWhenNATSNotEnabled tests that the publisher proxy resources are deleted when
// NATS modules is disabled. Steps of this test:
// 1. Enable NATS modules.
// 2. Create Eventing CR with NATS backend.
// 3. Make sure publisher proxy resources exists.
// 4. Delete NATS CRD and trigger reconciliation of Eventing CR.
// 5. Verify that the publisher proxy resources are deleted.
func Test_DeletionOfPublisherResourcesWhenNATSNotEnabled(t *testing.T) {
g := gomega.NewWithT(t)

// given
eventingcontroller.IsDeploymentReady = func(deployment *appsv1.Deployment) bool { return true }
// define CRs.
givenEventing := utils.NewEventingCR(
utils.WithEventingCRMinimal(),
utils.WithEventingStreamData("Memory", "1M", 1, 1),
utils.WithEventingPublisherData(2, 2, "199m", "99Mi", "399m", "199Mi"),
utils.WithEventingEventTypePrefix("test-prefix"),
utils.WithEventingDomain(utils.Domain),
)
givenNATS := natstestutils.NewNATSCR(
natstestutils.WithNATSCRDefaults(),
natstestutils.WithNATSCRNamespace(givenEventing.Namespace),
)
// create unique namespace for this test run.
givenNamespace := givenEventing.Namespace
testEnvironment.EnsureNamespaceCreation(t, givenNamespace)

// create NATS with Ready status.
testEnvironment.EnsureK8sResourceCreated(t, givenNATS)
testEnvironment.EnsureNATSResourceStateReady(t, givenNATS)

// create Eventing CR.
testEnvironment.EnsureK8sResourceCreated(t, givenEventing)

// wait until Eventing CR status is Ready.
testEnvironment.GetEventingAssert(g, givenEventing).Should(matchers.HaveStatusReady())

// check if EPP deployment, HPA resources created.
ensureEPPDeploymentAndHPAResources(t, givenEventing, testEnvironment)
// check if EPP resources exists.
ensureK8sResources(t, givenEventing, testEnvironment)

natsCRD, err := testEnvironment.KubeClient.GetCRD(context.TODO(), k8s.NatsGVK.GroupResource().String())
require.NoError(t, err)

// define cleanup.
defer func() {
// Important: install NATS CRD again.
testEnvironment.EnsureCRDCreated(t, natsCRD)
testEnvironment.EnsureEventingResourceDeletion(t, givenEventing.Name, givenNamespace)
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// when
// delete NATS CR & CRD
testEnvironment.EnsureK8sResourceDeleted(t, givenNATS)
testEnvironment.EnsureNATSCRDDeleted(t)

// then
// wait until Eventing CR status is Error.
testEnvironment.GetEventingAssert(g, givenEventing).Should(gomega.And(
matchers.HaveStatusError(),
matchers.HaveNATSNotAvailableConditionWith("NATS module has to be installed: customresourcedefinitions.apiextensions.k8s.io \"nats.operator.kyma-project.io\" not found"),
))

// verify that the publisher proxy resources are deleted.
testEnvironment.EnsureDeploymentNotFound(t, eventing.GetPublisherDeploymentName(*givenEventing), givenNamespace)
testEnvironment.EnsureHPANotFound(t, eventing.GetPublisherDeploymentName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherPublishServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherMetricsServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherHealthServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceAccountNotFound(t,
eventing.GetPublisherServiceAccountName(*givenEventing), givenNamespace)
}

func ensureEPPDeploymentAndHPAResources(t *testing.T, givenEventing *eventingv1alpha1.Eventing, testEnvironment *testutils.TestEnvironment) {
testEnvironment.EnsureDeploymentExists(t, eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
testEnvironment.EnsureHPAExists(t, eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
testEnvironment.EnsureEventingSpecPublisherReflected(t, givenEventing)
testEnvironment.EnsureEventingReplicasReflected(t, givenEventing)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, givenEventing)
testEnvironment.EnsurePublisherDeploymentENVSet(t, givenEventing)
}

func ensureK8sResources(t *testing.T, givenEventing *eventingv1alpha1.Eventing, testEnvironment *testutils.TestEnvironment) {
testEnvironment.EnsureEPPK8sResourcesExists(t, *givenEventing)

// check if the owner reference is set.
testEnvironment.EnsureEPPK8sResourcesHaveOwnerReference(t, *givenEventing)

// check if EPP resources are correctly created.
deployment, err := testEnvironment.GetDeploymentFromK8s(eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
require.NoError(t, err)
// K8s Services
testEnvironment.EnsureEPPPublishServiceCorrect(t, deployment, *givenEventing)
testEnvironment.EnsureEPPMetricsServiceCorrect(t, deployment, *givenEventing)
testEnvironment.EnsureEPPHealthServiceCorrect(t, deployment, *givenEventing)
// ClusterRole
testEnvironment.EnsureEPPClusterRoleCorrect(t, *givenEventing)
// ClusterRoleBinding
testEnvironment.EnsureEPPClusterRoleBindingCorrect(t, *givenEventing)
}
43 changes: 42 additions & 1 deletion pkg/eventing/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -38,6 +40,7 @@ type Manager interface {
natsConfig *env.NATSConfig,
backendType v1alpha1.BackendType) (*appsv1.Deployment, error)
DeployPublisherProxyResources(context.Context, *v1alpha1.Eventing, *appsv1.Deployment) error
DeletePublisherProxyResources(ctx context.Context, eventing *v1alpha1.Eventing) error
GetBackendConfig() *env.BackendConfig
SetBackendConfig(env.BackendConfig)
SubscriptionExists(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -198,7 +201,7 @@ func (em EventingManager) DeployPublisherProxyResources(
newPublisherProxyHealthService(GetPublisherHealthServiceName(*eventing), eventing.Namespace, publisherDeployment.Labels,
publisherDeployment.Spec.Template.Labels),
// HPA to auto-scale publisher proxy.
newHorizontalPodAutoscaler(publisherDeployment, int32(eventing.Spec.Publisher.Min),
newHorizontalPodAutoscaler(publisherDeployment.Name, publisherDeployment.Namespace, int32(eventing.Spec.Publisher.Min),
int32(eventing.Spec.Publisher.Max), cpuUtilization, memoryUtilization),
}

Expand All @@ -217,6 +220,44 @@ func (em EventingManager) DeployPublisherProxyResources(
return nil
}

func (em EventingManager) DeletePublisherProxyResources(ctx context.Context, eventing *v1alpha1.Eventing) error {
// define list of resources to delete for EPP.
publisherDeployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: GetPublisherDeploymentName(*eventing),
Namespace: eventing.Namespace,
},
}

resources := []client.Object{
// Deployment
publisherDeployment,
// ServiceAccount
newPublisherProxyServiceAccount(GetPublisherServiceAccountName(*eventing), eventing.Namespace, map[string]string{}),
// Service to expose event publishing endpoint of EPP.
newPublisherProxyService(GetPublisherPublishServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// Service to expose metrics endpoint of EPP.
newPublisherProxyMetricsService(GetPublisherMetricsServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// Service to expose health endpoint of EPP.
newPublisherProxyHealthService(GetPublisherHealthServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// HPA to auto-scale publisher proxy.
newHorizontalPodAutoscaler(publisherDeployment.Name, eventing.Namespace, 0, 0, 0, 0),
}

// delete the resources on k8s.
for _, object := range resources {
// delete the object.
if err := em.kubeClient.DeleteResource(ctx, object); err != nil {
return err
}
}
return nil
}

func (em *EventingManager) SubscriptionExists(ctx context.Context) (bool, error) {
subscriptionList, err := em.kubeClient.GetSubscriptions(ctx)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions pkg/eventing/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,82 @@ func Test_DeployPublisherProxyResources(t *testing.T) {
}
}

func Test_DeletePublisherProxyResources(t *testing.T) {
t.Parallel()

// given
newScheme := runtime.NewScheme()
require.NoError(t, v1alpha1.AddToScheme(newScheme))

// test cases
testCases := []struct {
name string
givenEventing *v1alpha1.Eventing
givenEPPDeployment *appsv1.Deployment
wantError bool
wantDeletedResourcesCount int
}{
{
name: "should have delete EPP resources",
givenEventing: testutils.NewEventingCR(
testutils.WithEventingCRName("test-eventing"),
testutils.WithEventingCRNamespace("test"),
testutils.WithEventingCRMinimal(),
testutils.WithEventingPublisherData(2, 4, "100m", "256Mi", "200m", "512Mi"),
),
givenEPPDeployment: testutils.NewDeployment("test", "test", map[string]string{}),
wantDeletedResourcesCount: 6,
},
{
name: "should return error when delete fails",
givenEventing: testutils.NewEventingCR(
testutils.WithEventingCRName("test-eventing"),
testutils.WithEventingCRNamespace("test"),
testutils.WithEventingCRMinimal(),
testutils.WithEventingPublisherData(2, 4, "100m", "256Mi", "200m", "512Mi"),
),
givenEPPDeployment: testutils.NewDeployment("test", "test", map[string]string{}),
wantError: true,
},
}

// Iterate over the test cases.
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// given
ctx := context.Background()
mockClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects().Build()
kubeClient := new(k8smocks.Client)

// define mocks behaviours.
if tc.wantError {
kubeClient.On("DeleteResource", ctx, mock.Anything).Return(errors.New("failed"))
} else {
kubeClient.On("DeleteResource", ctx, mock.Anything).Return(nil).Times(tc.wantDeletedResourcesCount)
}

// initialize EventingManager object.
em := EventingManager{
Client: mockClient,
kubeClient: kubeClient,
}

// when
err := em.DeletePublisherProxyResources(ctx, tc.givenEventing)

// then
if tc.wantError {
require.Error(t, err)
} else {
require.NoError(t, err)
kubeClient.AssertExpectations(t)
}
})
}
}

func Test_SubscriptionExists(t *testing.T) {
// Define test cases
testCases := []struct {
Expand Down
43 changes: 43 additions & 0 deletions pkg/eventing/mocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f0d0636

Please sign in to comment.