Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Publisher resources when NATS module is disabled #237

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,13 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *eventin
_, err := r.kubeClient.GetCRD(ctx, k8s.NatsGVK.GroupResource().String())
if err != nil {
if apierrors.IsNotFound(err) {
err = fmt.Errorf("NATS module has to be installed: %v", err)
return ctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log)
// delete the publisher proxy resources, because the publisher deployment will go
// into CrashLoopBackOff.
log.Infof("NATS module not enabled, deleting publisher proxy resources")
delErr := r.eventingManager.DeletePublisherProxyResources(ctx, eventing)
grischperl marked this conversation as resolved.
Show resolved Hide resolved
// update the Eventing CR status.
notFoundErr := fmt.Errorf("NATS module has to be installed: %v", err)
return ctrl.Result{}, errors.Join(r.syncStatusWithNATSErr(ctx, eventing, notFoundErr, log), delErr)
}
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package controller_test

import (
"context"
"os"
"testing"

eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/eventing"
"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/kyma-project/eventing-manager/test/matchers"
"github.com/kyma-project/eventing-manager/test/utils"
natstestutils "github.com/kyma-project/nats-manager/testutils"
"github.com/onsi/gomega"

eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/eventing"
testutils "github.com/kyma-project/eventing-manager/test/utils/integration"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
)

const (
projectRootDir = "../../../../../"
)

var testEnvironment *testutils.TestEnvironment //nolint:gochecknoglobals // used in tests

// TestMain pre-hook and post-hook to run before and after all tests.
func TestMain(m *testing.M) {
// Note: The setup will provision a single K8s env and
// all the tests need to create and use a separate namespace

// setup env test
var err error
testEnvironment, err = testutils.NewTestEnvironment(testutils.TestEnvironmentConfig{
ProjectRootDir: projectRootDir,
CELValidationEnabled: false,
APIRuleCRDEnabled: true,
ApplicationRuleCRDEnabled: true,
NATSCRDEnabled: true,
AllowedEventingCR: nil,
})
if err != nil {
panic(err)
}

// run tests
code := m.Run()

// tear down test env
if err = testEnvironment.TearDown(); err != nil {
panic(err)
}

os.Exit(code)
}

// Test_DeletionOfPublisherResourcesWhenNATSNotEnabled tests that the publisher proxy resources are deleted when
mfaizanse marked this conversation as resolved.
Show resolved Hide resolved
// NATS modules is disabled. Steps of this test:
// 1. Enable NATS modules.
// 2. Create Eventing CR with NATS backend.
// 3. Make sure publisher proxy resources exists.
// 4. Delete NATS CRD and trigger reconciliation of Eventing CR.
// 5. Verify that the publisher proxy resources are deleted.
func Test_DeletionOfPublisherResourcesWhenNATSNotEnabled(t *testing.T) {
g := gomega.NewWithT(t)

// given
eventingcontroller.IsDeploymentReady = func(deployment *appsv1.Deployment) bool { return true }
// define CRs.
givenEventing := utils.NewEventingCR(
utils.WithEventingCRMinimal(),
utils.WithEventingStreamData("Memory", "1M", 1, 1),
utils.WithEventingPublisherData(2, 2, "199m", "99Mi", "399m", "199Mi"),
utils.WithEventingEventTypePrefix("test-prefix"),
utils.WithEventingDomain(utils.Domain),
)
givenNATS := natstestutils.NewNATSCR(
natstestutils.WithNATSCRDefaults(),
natstestutils.WithNATSCRNamespace(givenEventing.Namespace),
)
// create unique namespace for this test run.
givenNamespace := givenEventing.Namespace
testEnvironment.EnsureNamespaceCreation(t, givenNamespace)

// create NATS with Ready status.
testEnvironment.EnsureK8sResourceCreated(t, givenNATS)
testEnvironment.EnsureNATSResourceStateReady(t, givenNATS)

// create Eventing CR.
testEnvironment.EnsureK8sResourceCreated(t, givenEventing)

// wait until Eventing CR status is Ready.
testEnvironment.GetEventingAssert(g, givenEventing).Should(matchers.HaveStatusReady())

// check if EPP deployment, HPA resources created.
ensureEPPDeploymentAndHPAResources(t, givenEventing, testEnvironment)
// check if EPP resources exists.
ensureK8sResources(t, givenEventing, testEnvironment)

natsCRD, err := testEnvironment.KubeClient.GetCRD(context.TODO(), k8s.NatsGVK.GroupResource().String())
require.NoError(t, err)

// define cleanup.
defer func() {
// Important: install NATS CRD again.
testEnvironment.EnsureCRDCreated(t, natsCRD)
testEnvironment.EnsureEventingResourceDeletion(t, givenEventing.Name, givenNamespace)
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// when
// delete NATS CR & CRD
testEnvironment.EnsureK8sResourceDeleted(t, givenNATS)
testEnvironment.EnsureNATSCRDDeleted(t)

// then
// wait until Eventing CR status is Error.
testEnvironment.GetEventingAssert(g, givenEventing).Should(gomega.And(
matchers.HaveStatusError(),
matchers.HaveNATSNotAvailableConditionWith("NATS module has to be installed: customresourcedefinitions.apiextensions.k8s.io \"nats.operator.kyma-project.io\" not found"),
))

// verify that the publisher proxy resources are deleted.
testEnvironment.EnsureDeploymentNotFound(t, eventing.GetPublisherDeploymentName(*givenEventing), givenNamespace)
testEnvironment.EnsureHPANotFound(t, eventing.GetPublisherDeploymentName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherPublishServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherMetricsServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceNotFound(t,
eventing.GetPublisherHealthServiceName(*givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceAccountNotFound(t,
eventing.GetPublisherServiceAccountName(*givenEventing), givenNamespace)
}

func ensureEPPDeploymentAndHPAResources(t *testing.T, givenEventing *eventingv1alpha1.Eventing, testEnvironment *testutils.TestEnvironment) {
testEnvironment.EnsureDeploymentExists(t, eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
testEnvironment.EnsureHPAExists(t, eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
testEnvironment.EnsureEventingSpecPublisherReflected(t, givenEventing)
testEnvironment.EnsureEventingReplicasReflected(t, givenEventing)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, givenEventing)
testEnvironment.EnsurePublisherDeploymentENVSet(t, givenEventing)
}

func ensureK8sResources(t *testing.T, givenEventing *eventingv1alpha1.Eventing, testEnvironment *testutils.TestEnvironment) {
testEnvironment.EnsureEPPK8sResourcesExists(t, *givenEventing)

// check if the owner reference is set.
testEnvironment.EnsureEPPK8sResourcesHaveOwnerReference(t, *givenEventing)

// check if EPP resources are correctly created.
deployment, err := testEnvironment.GetDeploymentFromK8s(eventing.GetPublisherDeploymentName(*givenEventing), givenEventing.Namespace)
require.NoError(t, err)
// K8s Services
testEnvironment.EnsureEPPPublishServiceCorrect(t, deployment, *givenEventing)
testEnvironment.EnsureEPPMetricsServiceCorrect(t, deployment, *givenEventing)
testEnvironment.EnsureEPPHealthServiceCorrect(t, deployment, *givenEventing)
// ClusterRole
testEnvironment.EnsureEPPClusterRoleCorrect(t, *givenEventing)
// ClusterRoleBinding
testEnvironment.EnsureEPPClusterRoleBindingCorrect(t, *givenEventing)
}
43 changes: 42 additions & 1 deletion pkg/eventing/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -38,6 +40,7 @@ type Manager interface {
natsConfig *env.NATSConfig,
backendType v1alpha1.BackendType) (*appsv1.Deployment, error)
DeployPublisherProxyResources(context.Context, *v1alpha1.Eventing, *appsv1.Deployment) error
DeletePublisherProxyResources(ctx context.Context, eventing *v1alpha1.Eventing) error
GetBackendConfig() *env.BackendConfig
SetBackendConfig(env.BackendConfig)
SubscriptionExists(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -198,7 +201,7 @@ func (em EventingManager) DeployPublisherProxyResources(
newPublisherProxyHealthService(GetPublisherHealthServiceName(*eventing), eventing.Namespace, publisherDeployment.Labels,
publisherDeployment.Spec.Template.Labels),
// HPA to auto-scale publisher proxy.
newHorizontalPodAutoscaler(publisherDeployment, int32(eventing.Spec.Publisher.Min),
newHorizontalPodAutoscaler(publisherDeployment.Name, publisherDeployment.Namespace, int32(eventing.Spec.Publisher.Min),
int32(eventing.Spec.Publisher.Max), cpuUtilization, memoryUtilization),
}

Expand All @@ -217,6 +220,44 @@ func (em EventingManager) DeployPublisherProxyResources(
return nil
}

func (em EventingManager) DeletePublisherProxyResources(ctx context.Context, eventing *v1alpha1.Eventing) error {
// define list of resources to delete for EPP.
publisherDeployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: GetPublisherDeploymentName(*eventing),
Namespace: eventing.Namespace,
},
}

resources := []client.Object{
// Deployment
publisherDeployment,
// ServiceAccount
newPublisherProxyServiceAccount(GetPublisherServiceAccountName(*eventing), eventing.Namespace, map[string]string{}),
// Service to expose event publishing endpoint of EPP.
newPublisherProxyService(GetPublisherPublishServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// Service to expose metrics endpoint of EPP.
newPublisherProxyMetricsService(GetPublisherMetricsServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// Service to expose health endpoint of EPP.
newPublisherProxyHealthService(GetPublisherHealthServiceName(*eventing), eventing.Namespace, map[string]string{}, map[string]string{}),
// HPA to auto-scale publisher proxy.
newHorizontalPodAutoscaler(publisherDeployment.Name, eventing.Namespace, 0, 0, 0, 0),
}

// delete the resources on k8s.
for _, object := range resources {
// delete the object.
if err := em.kubeClient.DeleteResource(ctx, object); err != nil {
return err
}
}
return nil
}

func (em *EventingManager) SubscriptionExists(ctx context.Context) (bool, error) {
subscriptionList, err := em.kubeClient.GetSubscriptions(ctx)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions pkg/eventing/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,82 @@ func Test_DeployPublisherProxyResources(t *testing.T) {
}
}

func Test_DeletePublisherProxyResources(t *testing.T) {
t.Parallel()

// given
newScheme := runtime.NewScheme()
require.NoError(t, v1alpha1.AddToScheme(newScheme))

// test cases
testCases := []struct {
name string
givenEventing *v1alpha1.Eventing
givenEPPDeployment *appsv1.Deployment
wantError bool
wantDeletedResourcesCount int
}{
{
name: "should have delete EPP resources",
givenEventing: testutils.NewEventingCR(
testutils.WithEventingCRName("test-eventing"),
testutils.WithEventingCRNamespace("test"),
testutils.WithEventingCRMinimal(),
testutils.WithEventingPublisherData(2, 4, "100m", "256Mi", "200m", "512Mi"),
),
givenEPPDeployment: testutils.NewDeployment("test", "test", map[string]string{}),
wantDeletedResourcesCount: 6,
},
{
name: "should return error when delete fails",
givenEventing: testutils.NewEventingCR(
testutils.WithEventingCRName("test-eventing"),
testutils.WithEventingCRNamespace("test"),
testutils.WithEventingCRMinimal(),
testutils.WithEventingPublisherData(2, 4, "100m", "256Mi", "200m", "512Mi"),
),
givenEPPDeployment: testutils.NewDeployment("test", "test", map[string]string{}),
wantError: true,
},
}

// Iterate over the test cases.
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// given
ctx := context.Background()
mockClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects().Build()
kubeClient := new(k8smocks.Client)

// define mocks behaviours.
if tc.wantError {
kubeClient.On("DeleteResource", ctx, mock.Anything).Return(errors.New("failed"))
} else {
kubeClient.On("DeleteResource", ctx, mock.Anything).Return(nil).Times(tc.wantDeletedResourcesCount)
}

// initialize EventingManager object.
em := EventingManager{
Client: mockClient,
kubeClient: kubeClient,
}

// when
err := em.DeletePublisherProxyResources(ctx, tc.givenEventing)

// then
if tc.wantError {
require.Error(t, err)
} else {
require.NoError(t, err)
kubeClient.AssertExpectations(t)
}
})
}
}

func Test_SubscriptionExists(t *testing.T) {
// Define test cases
testCases := []struct {
Expand Down
43 changes: 43 additions & 0 deletions pkg/eventing/mocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading