diff --git a/api/v1alpha1/eventing_types.go b/api/v1alpha1/eventing_types.go index 192610a1..aeee08b8 100644 --- a/api/v1alpha1/eventing_types.go +++ b/api/v1alpha1/eventing_types.go @@ -31,11 +31,13 @@ const ( StateReady string = "Ready" StateError string = "Error" StateProcessing string = "Processing" + StateWarning string = "Warning" ConditionNATSAvailable ConditionType = "NATSAvailable" ConditionPublisherProxyReady ConditionType = "PublisherProxyReady" ConditionWebhookReady ConditionType = "WebhookReady" ConditionSubscriptionManagerReady ConditionType = "SubscriptionManagerReady" + ConditionDeleted ConditionType = "Deleted" // common reasons ConditionReasonProcessing ConditionReason = "Processing" @@ -51,7 +53,7 @@ const ( ConditionReasonForbidden ConditionReason = "Forbidden" ConditionReasonWebhookFailed ConditionReason = "WebhookFailed" ConditionReasonWebhookReady ConditionReason = "Ready" - ConditionReasonDeletedFailed ConditionReason = "DeletionFailed" + ConditionReasonDeletionError ConditionReason = "DeletionError" // message for conditions ConditionPublisherProxyReadyMessage = "Publisher proxy is deployed" @@ -66,7 +68,6 @@ const ( ConditionReasonEventMeshSubManagerReady ConditionReason = "EventMeshSubscriptionManagerReady" ConditionReasonEventMeshSubManagerFailed ConditionReason = "EventMeshSubscriptionManagerFailed" ConditionReasonEventMeshSubManagerStopFailed ConditionReason = "EventMeshSubscriptionManagerStopFailed" - ConditionReasonSubscriptionManagerProcessing ConditionReason = "SubscriptionManagerProcessing" ) // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index 71fc0287..a7b3c883 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -56,6 +56,18 @@ func (sm *EventingStatus) UpdateConditionSubscriptionManagerReady(status metav1. meta.SetStatusCondition(&sm.Conditions, condition) } +func (es *EventingStatus) UpdateConditionDeletion(status metav1.ConditionStatus, reason ConditionReason, + message string) { + condition := metav1.Condition{ + Type: string(ConditionDeleted), + Status: status, + LastTransitionTime: metav1.Now(), + Reason: string(reason), + Message: message, + } + meta.SetStatusCondition(&es.Conditions, condition) +} + func (es *EventingStatus) SetSubscriptionManagerReadyConditionToTrue() { es.UpdateConditionSubscriptionManagerReady(metav1.ConditionTrue, ConditionReasonEventMeshSubManagerReady, ConditionSubscriptionManagerReadyMessage) @@ -67,6 +79,10 @@ func (es *EventingStatus) SetStateReady() { es.UpdateConditionPublisherProxyReady(metav1.ConditionTrue, ConditionReasonDeployed, ConditionPublisherProxyReadyMessage) } +func (ns *EventingStatus) SetStateWarning() { + ns.State = StateWarning +} + func (es *EventingStatus) SetNATSAvailableConditionToTrue() { es.UpdateConditionNATSAvailable(metav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage) } diff --git a/internal/controller/eventing/controller.go b/internal/controller/eventing/controller.go index 1c82a559..727eaccc 100644 --- a/internal/controller/eventing/controller.go +++ b/internal/controller/eventing/controller.go @@ -18,8 +18,8 @@ package eventing import ( "context" + "errors" "fmt" - eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" "github.com/kyma-project/eventing-manager/options" "github.com/kyma-project/eventing-manager/pkg/env" @@ -60,6 +60,8 @@ const ( NamespacePrefix = "/" EventMeshPublishEndpointForSubscriber = "/sap/ems/v1" EventMeshPublishEndpointForPublisher = "/sap/ems/v1/events" + + SubscriptionExistsErrMessage = "cannot delete the eventing module as subscription exists" ) // Reconciler reconciles an Eventing object @@ -265,6 +267,18 @@ func (r *Reconciler) handleEventingDeletion(ctx context.Context, eventing *event return ctrl.Result{}, nil } + // check if subscription resources exist + exists, err := r.eventingManager.SubscriptionExists(ctx) + if err != nil { + eventing.Status.SetStateError() + return ctrl.Result{}, r.syncStatusWithDeletionErr(ctx, eventing, err, log) + } + if exists { + eventing.Status.SetStateWarning() + return ctrl.Result{Requeue: true}, r.syncStatusWithDeletionErr(ctx, eventing, + errors.New(SubscriptionExistsErrMessage), log) + } + log.Info("handling Eventing deletion...") if eventing.Spec.Backend.Type == eventingv1alpha1.NatsBackendType { if err := r.stopNATSSubManager(true, log); err != nil { @@ -284,7 +298,7 @@ func (r *Reconciler) handleEventingDeletion(ctx context.Context, eventing *event // delete cluster-scoped resources, such as clusterrole and clusterrolebinding. if err := r.deleteClusterScopedResources(ctx, eventing); err != nil { return ctrl.Result{}, r.syncStatusWithPublisherProxyErrWithReason(ctx, - eventingv1alpha1.ConditionReasonDeletedFailed, eventing, err, log) + eventingv1alpha1.ConditionReasonDeletionError, eventing, err, log) } eventing.Status.SetPublisherProxyConditionToFalse( eventingv1alpha1.ConditionReasonDeleted, diff --git a/internal/controller/eventing/integrationtests/controller/integration_test.go b/internal/controller/eventing/integrationtests/controller/integration_test.go index 3de330f5..76368968 100644 --- a/internal/controller/eventing/integrationtests/controller/integration_test.go +++ b/internal/controller/eventing/integrationtests/controller/integration_test.go @@ -7,7 +7,6 @@ import ( eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/eventing" "github.com/kyma-project/eventing-manager/pkg/eventing" - ecsubmanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/mocks/ec" "github.com/kyma-project/eventing-manager/test/matchers" "github.com/kyma-project/eventing-manager/test/utils" testutils "github.com/kyma-project/eventing-manager/test/utils/integration" @@ -16,6 +15,7 @@ import ( gomegatypes "github.com/onsi/gomega/types" "github.com/stretchr/testify/require" + eventinv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" natstestutils "github.com/kyma-project/nats-manager/testutils" v1 "k8s.io/api/apps/v1" ) @@ -560,9 +560,10 @@ func Test_CreateEventingCR_EventMesh(t *testing.T) { func Test_DeleteEventingCR(t *testing.T) { t.Parallel() testCases := []struct { - name string - givenEventing *eventingv1alpha1.Eventing - subscriptionManagerMock *ecsubmanagermocks.Manager + name string + givenEventing *eventingv1alpha1.Eventing + givenSubscription *eventinv1alpha2.Subscription + wantMatches gomegatypes.GomegaMatcher }{ { name: "Delete Eventing CR should delete the owned resources", @@ -580,12 +581,42 @@ func Test_DeleteEventingCR(t *testing.T) { utils.WithEventingEventTypePrefix("test-prefix"), ), }, + { + name: "Delete should be blocked as subscription exists for NATS", + givenEventing: utils.NewEventingCR( + utils.WithEventingCRMinimal(), + utils.WithEventingStreamData("Memory", "1M", 1, 1), + utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"), + ), + givenSubscription: utils.NewSubscription("test-nats-subscription", "test-nats-namespace"), + wantMatches: gomega.And( + matchers.HaveStatusWarning(), + matchers.HaveDeletionErrorCondition(eventingcontroller.SubscriptionExistsErrMessage), + matchers.HaveFinalizer(), + ), + }, + { + name: "Delete should be blocked as subscription exist for EventMesh", + givenEventing: utils.NewEventingCR( + utils.WithEventMeshBackend("test-secret-name"), + utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"), + utils.WithEventingEventTypePrefix("test-prefix"), + ), + givenSubscription: utils.NewSubscription("test-eventmesh-subscription", "test-eventmesh-namespace"), + wantMatches: gomega.And( + matchers.HaveStatusWarning(), + matchers.HaveDeletionErrorCondition(eventingcontroller.SubscriptionExistsErrMessage), + matchers.HaveFinalizer(), + ), + }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() + g := gomega.NewWithT(t) + // given eventingcontroller.IsDeploymentReady = func(deployment *v1.Deployment) bool { return true @@ -615,6 +646,10 @@ func Test_DeleteEventingCR(t *testing.T) { testEnvironment.EnsureK8sResourceCreated(t, tc.givenEventing) defer func() { + if tc.givenSubscription != nil { + testEnvironment.EnsureSubscriptionResourceDeletion(t, tc.givenSubscription.Name, tc.givenSubscription.Namespace) + } + if !*testEnvironment.EnvTestInstance.UseExistingCluster { testEnvironment.EnsureDeploymentDeletion(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) } @@ -627,30 +662,43 @@ func Test_DeleteEventingCR(t *testing.T) { testEnvironment.EnsureDeploymentExists(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) testEnvironment.EnsureHPAExists(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureEventingResourceDeletion(t, tc.givenEventing.Name, givenNamespace) + if tc.givenSubscription != nil { + // create subscriptions if given. + testEnvironment.EnsureNamespaceCreation(t, tc.givenSubscription.Namespace) + testEnvironment.EnsureK8sResourceCreated(t, tc.givenSubscription) + testEnvironment.EnsureSubscriptionExists(t, tc.givenSubscription.Name, tc.givenSubscription.Namespace) - // then - if *testEnvironment.EnvTestInstance.UseExistingCluster { - testEnvironment.EnsureDeploymentNotFound(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureHPANotFound(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureK8sServiceNotFound(t, - eventing.GetPublisherPublishServiceName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureK8sServiceNotFound(t, - eventing.GetPublisherMetricsServiceName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureK8sServiceNotFound(t, - eventing.GetPublisherHealthServiceName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureK8sServiceAccountNotFound(t, - eventing.GetPublisherServiceAccountName(*tc.givenEventing), givenNamespace) + //then + // givenSubscription existence means deletion of Eventing CR should be blocked. + testEnvironment.EnsureK8sResourceDeleted(t, tc.givenEventing) + testEnvironment.GetEventingAssert(g, tc.givenEventing).Should(tc.wantMatches) } else { - // check if the owner reference is set. - // if owner reference is set then these resources would be garbage collected in real k8s cluster. - testEnvironment.EnsureEPPK8sResourcesHaveOwnerReference(t, *tc.givenEventing) - // ensure clusterrole and clusterrolebindings are deleted. + // then + // givenSubscription is nil means deletion of Eventing CR should be successful. + testEnvironment.EnsureEventingResourceDeletion(t, tc.givenEventing.Name, givenNamespace) + + // then + if *testEnvironment.EnvTestInstance.UseExistingCluster { + testEnvironment.EnsureDeploymentNotFound(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureHPANotFound(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureK8sServiceNotFound(t, + eventing.GetPublisherPublishServiceName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureK8sServiceNotFound(t, + eventing.GetPublisherMetricsServiceName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureK8sServiceNotFound(t, + eventing.GetPublisherHealthServiceName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureK8sServiceAccountNotFound(t, + eventing.GetPublisherServiceAccountName(*tc.givenEventing), givenNamespace) + } else { + // check if the owner reference is set. + // if owner reference is set then these resources would be garbage collected in real k8s cluster. + testEnvironment.EnsureEPPK8sResourcesHaveOwnerReference(t, *tc.givenEventing) + } + testEnvironment.EnsureK8sClusterRoleNotFound(t, + eventing.GetPublisherClusterRoleName(*tc.givenEventing), givenNamespace) + testEnvironment.EnsureK8sClusterRoleBindingNotFound(t, + eventing.GetPublisherClusterRoleBindingName(*tc.givenEventing), givenNamespace) } - testEnvironment.EnsureK8sClusterRoleNotFound(t, - eventing.GetPublisherClusterRoleName(*tc.givenEventing), givenNamespace) - testEnvironment.EnsureK8sClusterRoleBindingNotFound(t, - eventing.GetPublisherClusterRoleBindingName(*tc.givenEventing), givenNamespace) }) } } diff --git a/internal/controller/eventing/mocks/controller.go b/internal/controller/eventing/mocks/controller.go index ad5f91df..b78a62f1 100644 --- a/internal/controller/eventing/mocks/controller.go +++ b/internal/controller/eventing/mocks/controller.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/manager.go b/internal/controller/eventing/mocks/manager.go index 1fba02c9..b16fb5e9 100644 --- a/internal/controller/eventing/mocks/manager.go +++ b/internal/controller/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/nats_config_handler.go b/internal/controller/eventing/mocks/nats_config_handler.go index 88b6f69f..7a9b699f 100644 --- a/internal/controller/eventing/mocks/nats_config_handler.go +++ b/internal/controller/eventing/mocks/nats_config_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/status.go b/internal/controller/eventing/status.go index 5fcdb0ff..39715cc6 100644 --- a/internal/controller/eventing/status.go +++ b/internal/controller/eventing/status.go @@ -93,6 +93,14 @@ func (r *Reconciler) syncStatusWithWebhookErr(ctx context.Context, return errors.Join(err, r.syncEventingStatus(ctx, eventing, log)) } +func (r *Reconciler) syncStatusWithDeletionErr(ctx context.Context, + eventing *eventingv1alpha1.Eventing, err error, log *zap.SugaredLogger) error { + eventing.Status.UpdateConditionDeletion(metav1.ConditionFalse, + eventingv1alpha1.ConditionReasonDeletionError, err.Error()) + + return errors.Join(err, r.syncEventingStatus(ctx, eventing, log)) +} + // syncEventingStatus syncs Eventing status. func (r *Reconciler) syncEventingStatus(ctx context.Context, eventing *eventingv1alpha1.Eventing, log *zap.SugaredLogger) error { diff --git a/pkg/backend/jetstream/mocks/Backend.go b/pkg/backend/jetstream/mocks/Backend.go index 91c7a1c8..c0965e78 100644 --- a/pkg/backend/jetstream/mocks/Backend.go +++ b/pkg/backend/jetstream/mocks/Backend.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/pkg/backend/jetstream/mocks/JetStreamContext.go b/pkg/backend/jetstream/mocks/JetStreamContext.go index 8c51b94f..a180b0c0 100644 --- a/pkg/backend/jetstream/mocks/JetStreamContext.go +++ b/pkg/backend/jetstream/mocks/JetStreamContext.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/pkg/ems/api/events/client/mocks/PublisherManager.go b/pkg/ems/api/events/client/mocks/PublisherManager.go index 5bf56d03..ccb081a7 100644 --- a/pkg/ems/api/events/client/mocks/PublisherManager.go +++ b/pkg/ems/api/events/client/mocks/PublisherManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/pkg/eventing/manager.go b/pkg/eventing/manager.go index ed02e363..8d8deb80 100644 --- a/pkg/eventing/manager.go +++ b/pkg/eventing/manager.go @@ -39,6 +39,7 @@ type Manager interface { DeployPublisherProxyResources(context.Context, *v1alpha1.Eventing, *appsv1.Deployment) error GetBackendConfig() *env.BackendConfig SetBackendConfig(env.BackendConfig) + SubscriptionExists(ctx context.Context) (bool, error) } type EventingManager struct { @@ -215,6 +216,17 @@ func (em EventingManager) DeployPublisherProxyResources( return nil } +func (em *EventingManager) SubscriptionExists(ctx context.Context) (bool, error) { + subscriptionList, err := em.kubeClient.GetSubscriptions(ctx) + if err != nil { + return false, err + } + if len(subscriptionList.Items) > 0 { + return true, nil + } + return false, nil +} + func convertECBackendType(backendType v1alpha1.BackendType) (ecv1alpha1.BackendType, error) { switch backendType { case v1alpha1.EventMeshBackendType: diff --git a/pkg/eventing/manager_test.go b/pkg/eventing/manager_test.go index fe20f836..fc177939 100644 --- a/pkg/eventing/manager_test.go +++ b/pkg/eventing/manager_test.go @@ -21,6 +21,7 @@ import ( k8smocks "github.com/kyma-project/eventing-manager/pkg/k8s/mocks" testutils "github.com/kyma-project/eventing-manager/test/utils" ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1" natstestutils "github.com/kyma-project/nats-manager/testutils" "github.com/stretchr/testify/mock" @@ -461,3 +462,69 @@ func Test_DeployPublisherProxyResources(t *testing.T) { }) } } + +func Test_SubscriptionExists(t *testing.T) { + // Define test cases + testCases := []struct { + name string + givenSubscriptions *eventingv1alpha2.SubscriptionList + wantResult bool + wantError error + }{ + { + name: "no subscription should exist", + givenSubscriptions: &eventingv1alpha2.SubscriptionList{}, + wantResult: false, + wantError: nil, + }, + { + name: "subscriptions should exist", + givenSubscriptions: &eventingv1alpha2.SubscriptionList{ + TypeMeta: metav1.TypeMeta{ + Kind: "SubscriptionList", + APIVersion: "eventing.kyma-project.io/v1alpha2", + }, + Items: []eventingv1alpha2.Subscription{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Subscription", + APIVersion: "eventing.kyma-project.io/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-subscription", + Namespace: "test-namespace", + }, + }, + }, + }, + wantResult: true, + wantError: nil, + }, + { + name: "error should have occurred", + wantResult: false, + wantError: errors.New("client error"), + }, + } + + // Iterate over test cases + for _, tc := range testCases { + // Create a new instance of the mock client + kubeClient := new(k8smocks.Client) + + // Set up the behavior of the mock client + kubeClient.On("GetSubscriptions", mock.Anything).Return(tc.givenSubscriptions, tc.wantError) + + // Create a new instance of the EventingManager with the mock client + em := &EventingManager{ + kubeClient: kubeClient, + } + + // Call the SubscriptionExists method + result, err := em.SubscriptionExists(context.Background()) + + // Assert the result of the method + require.Equal(t, tc.wantResult, result, tc.name) + require.Equal(t, tc.wantError, err, tc.name) + } +} diff --git a/pkg/eventing/mocks/manager.go b/pkg/eventing/mocks/manager.go index b9e701bf..d22a8ba2 100644 --- a/pkg/eventing/mocks/manager.go +++ b/pkg/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks @@ -257,6 +257,58 @@ func (_c *Manager_SetBackendConfig_Call) RunAndReturn(run func(env.BackendConfig return _c } +// SubscriptionExists provides a mock function with given fields: ctx +func (_m *Manager) SubscriptionExists(ctx context.Context) (bool, error) { + ret := _m.Called(ctx) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Manager_SubscriptionExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubscriptionExists' +type Manager_SubscriptionExists_Call struct { + *mock.Call +} + +// SubscriptionExists is a helper method to define mock.On call +// - ctx context.Context +func (_e *Manager_Expecter) SubscriptionExists(ctx interface{}) *Manager_SubscriptionExists_Call { + return &Manager_SubscriptionExists_Call{Call: _e.mock.On("SubscriptionExists", ctx)} +} + +func (_c *Manager_SubscriptionExists_Call) Run(run func(ctx context.Context)) *Manager_SubscriptionExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Manager_SubscriptionExists_Call) Return(_a0 bool, _a1 error) *Manager_SubscriptionExists_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Manager_SubscriptionExists_Call) RunAndReturn(run func(context.Context) (bool, error)) *Manager_SubscriptionExists_Call { + _c.Call.Return(run) + return _c +} + // NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewManager(t interface { diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 651b897a..b7a1d1c9 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -5,6 +5,7 @@ import ( "errors" "strings" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1" admissionv1 "k8s.io/api/admissionregistration/v1" v1 "k8s.io/api/apps/v1" @@ -33,6 +34,7 @@ type Client interface { name string) (*admissionv1.ValidatingWebhookConfiguration, error) GetCRD(context.Context, string) (*apiextensionsv1.CustomResourceDefinition, error) ApplicationCRDExists(context.Context) (bool, error) + GetSubscriptions(ctx context.Context) (*eventingv1alpha2.SubscriptionList, error) } type KubeClient struct { @@ -174,3 +176,12 @@ func (c *KubeClient) GetValidatingWebHookConfiguration(ctx context.Context, } return &validatingWH, nil } + +func (c *KubeClient) GetSubscriptions(ctx context.Context) (*eventingv1alpha2.SubscriptionList, error) { + subscriptions := &eventingv1alpha2.SubscriptionList{} + err := c.client.List(ctx, subscriptions) + if err != nil { + return nil, err + } + return subscriptions, nil +} diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index 3ed8fbc8..560d1a04 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" testutils "github.com/kyma-project/eventing-manager/test/utils" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" admissionv1 "k8s.io/api/admissionregistration/v1" apiclientsetfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -634,3 +635,71 @@ func Test_ApplicationCRDExists(t *testing.T) { }) } } + +func TestGetSubscriptions(t *testing.T) { + // Define test cases + testCases := []struct { + name string + wantSubscriptionList *eventingv1alpha2.SubscriptionList + }{ + { + name: "exists subscription", + wantSubscriptionList: &eventingv1alpha2.SubscriptionList{ + TypeMeta: metav1.TypeMeta{ + Kind: "SubscriptionList", + APIVersion: "eventing.kyma-project.io/v1alpha2", + }, + Items: []eventingv1alpha2.Subscription{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Subscription", + APIVersion: "eventing.kyma-project.io/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-subscription", + Namespace: "test-namespace", + }, + }, + }, + }, + }, + { + name: "no subscription", + wantSubscriptionList: &eventingv1alpha2.SubscriptionList{}, + }, + } + + // Iterate over test cases + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // given + ctx := context.Background() + scheme := runtime.NewScheme() + err := eventingv1alpha2.AddToScheme(scheme) + require.NoError(t, err) + + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + + kubeClient := &KubeClient{ + client: fakeClient, + } + + // Create the secret if it should exist + if tc.wantSubscriptionList != nil && len(tc.wantSubscriptionList.Items) > 0 { + require.NoError(t, fakeClient.Create(ctx, &tc.wantSubscriptionList.Items[0])) + } + + // Call the GetSubscriptions method + result, _ := kubeClient.GetSubscriptions(context.Background()) + + // Assert the result of the method + if tc.wantSubscriptionList != nil && len(tc.wantSubscriptionList.Items) > 0 { + require.True(t, len(result.Items) > 0) + } else { + require.Equal(t, 0, len(result.Items)) + } + }) + } +} diff --git a/pkg/k8s/mocks/client.go b/pkg/k8s/mocks/client.go index 2798336e..7209202f 100644 --- a/pkg/k8s/mocks/client.go +++ b/pkg/k8s/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks @@ -17,6 +17,8 @@ import ( v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" v1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1" + + v1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" ) // Client is an autogenerated mock type for the Client type @@ -492,6 +494,60 @@ func (_c *Client_GetSecret_Call) RunAndReturn(run func(context.Context, string) return _c } +// GetSubscriptions provides a mock function with given fields: ctx +func (_m *Client) GetSubscriptions(ctx context.Context) (*v1alpha2.SubscriptionList, error) { + ret := _m.Called(ctx) + + var r0 *v1alpha2.SubscriptionList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*v1alpha2.SubscriptionList, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *v1alpha2.SubscriptionList); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1alpha2.SubscriptionList) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_GetSubscriptions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSubscriptions' +type Client_GetSubscriptions_Call struct { + *mock.Call +} + +// GetSubscriptions is a helper method to define mock.On call +// - ctx context.Context +func (_e *Client_Expecter) GetSubscriptions(ctx interface{}) *Client_GetSubscriptions_Call { + return &Client_GetSubscriptions_Call{Call: _e.mock.On("GetSubscriptions", ctx)} +} + +func (_c *Client_GetSubscriptions_Call) Run(run func(ctx context.Context)) *Client_GetSubscriptions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Client_GetSubscriptions_Call) Return(_a0 *v1alpha2.SubscriptionList, _a1 error) *Client_GetSubscriptions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_GetSubscriptions_Call) RunAndReturn(run func(context.Context) (*v1alpha2.SubscriptionList, error)) *Client_GetSubscriptions_Call { + _c.Call.Return(run) + return _c +} + // GetValidatingWebHookConfiguration provides a mock function with given fields: ctx, name func (_m *Client) GetValidatingWebHookConfiguration(ctx context.Context, name string) (*admissionregistrationv1.ValidatingWebhookConfiguration, error) { ret := _m.Called(ctx, name) diff --git a/pkg/subscriptionmanager/manager/mocks/manager.go b/pkg/subscriptionmanager/manager/mocks/manager.go index c41e5e6d..b245da66 100644 --- a/pkg/subscriptionmanager/manager/mocks/manager.go +++ b/pkg/subscriptionmanager/manager/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/mocks/manager_factory.go b/pkg/subscriptionmanager/mocks/manager_factory.go index 1909b489..ed2b5e0c 100644 --- a/pkg/subscriptionmanager/mocks/manager_factory.go +++ b/pkg/subscriptionmanager/mocks/manager_factory.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.35.4. DO NOT EDIT. package mocks diff --git a/test/matchers/matchers.go b/test/matchers/matchers.go index 7bff53e1..9a059e89 100644 --- a/test/matchers/matchers.go +++ b/test/matchers/matchers.go @@ -26,6 +26,13 @@ func HaveStatusProcessing() gomegatypes.GomegaMatcher { }, gomega.Equal(v1alpha1.StateProcessing)) } +func HaveStatusWarning() gomegatypes.GomegaMatcher { + return gomega.WithTransform( + func(n *v1alpha1.Eventing) string { + return n.Status.State + }, gomega.Equal(v1alpha1.StateWarning)) +} + func HaveStatusError() gomegatypes.GomegaMatcher { return gomega.WithTransform( func(n *v1alpha1.Eventing) string { @@ -116,11 +123,11 @@ func HaveEventMeshSubManagerNotReadyCondition(message string) gomegatypes.Gomega }) } -func HaveEventMeshSubManagerStopFailedCondition(message string) gomegatypes.GomegaMatcher { +func HaveDeletionErrorCondition(message string) gomegatypes.GomegaMatcher { return HaveCondition(metav1.Condition{ - Type: string(v1alpha1.ConditionSubscriptionManagerReady), + Type: string(v1alpha1.ConditionDeleted), Status: metav1.ConditionFalse, - Reason: string(v1alpha1.ConditionReasonEventMeshSubManagerStopFailed), + Reason: string(v1alpha1.ConditionReasonDeletionError), Message: message, }) } diff --git a/test/utils/integration/integration.go b/test/utils/integration/integration.go index 2c49a430..9748144e 100644 --- a/test/utils/integration/integration.go +++ b/test/utils/integration/integration.go @@ -48,6 +48,7 @@ import ( "github.com/kyma-project/eventing-manager/pkg/k8s" "github.com/kyma-project/eventing-manager/pkg/logger" evnttestutils "github.com/kyma-project/eventing-manager/test/utils" + eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1" "github.com/kyma-project/nats-manager/testutils" admissionv1 "k8s.io/api/admissionregistration/v1" @@ -63,14 +64,10 @@ const ( attachControlPlaneOutput = false testEnvStartDelay = time.Minute testEnvStartAttempts = 10 - namespacePrefixLength = 5 - TwoMinTimeOut = 120 * time.Second BigPollingInterval = 3 * time.Second BigTimeOut = 60 * time.Second SmallTimeOut = 5 * time.Second SmallPollingInterval = 1 * time.Second - EventTypePrefix = "prefix" - JSStreamName = "kyma" ) // TestEnvironment provides mocked resources for integration tests. @@ -125,6 +122,12 @@ func NewTestEnvironment(projectRootDir string, celValidationEnabled bool, return nil, err } + // add subscription CRD scheme + err = eventingv1alpha2.AddToScheme(scheme.Scheme) + if err != nil { + return nil, err + } + // +kubebuilder:scaffold:scheme k8sClient, err := client.New(envTestKubeCfg, client.Options{Scheme: scheme.Scheme}) @@ -446,6 +449,13 @@ func (env TestEnvironment) EnsureHPAExists(t *testing.T, name, namespace string) }, SmallTimeOut, SmallPollingInterval, "failed to ensure existence of HPA") } +func (env TestEnvironment) EnsureSubscriptionExists(t *testing.T, name, namespace string) { + require.Eventually(t, func() bool { + result, err := env.GetSubscriptionFromK8s(name, namespace) + return err == nil && result != nil + }, SmallTimeOut, SmallPollingInterval, "failed to ensure existence of Subscription") +} + func (env TestEnvironment) EnsureK8sResourceUpdated(t *testing.T, obj client.Object) { require.NoError(t, env.k8sClient.Update(env.Context, obj)) } @@ -560,6 +570,20 @@ func (env TestEnvironment) EnsureEventingResourceDeletionStateError(t *testing.T }, SmallTimeOut, SmallPollingInterval, "failed to ensure deletion of Eventing") } +func (env TestEnvironment) EnsureSubscriptionResourceDeletion(t *testing.T, name, namespace string) { + subscription := &eventingv1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + env.EnsureK8sResourceDeleted(t, subscription) + require.Eventually(t, func() bool { + _, err := env.GetSubscriptionFromK8s(name, namespace) + return err != nil && errors.IsNotFound(err) + }, BigTimeOut, BigPollingInterval, "failed to ensure deletion of Subscription") +} + func (env TestEnvironment) EnsureNATSResourceStateReady(t *testing.T, nats *natsv1alpha1.NATS) { env.makeNatsCrReady(t, nats) require.Eventually(t, func() bool { @@ -1026,6 +1050,18 @@ func (env TestEnvironment) GetHPAFromK8s(name, namespace string) (*autoscalingv1 return result, nil } +func (env TestEnvironment) GetSubscriptionFromK8s(name, namespace string) (*eventingv1alpha2.Subscription, error) { + nn := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + result := &eventingv1alpha2.Subscription{} + if err := env.k8sClient.Get(env.Context, nn, result); err != nil { + return nil, err + } + return result, nil +} + func (env TestEnvironment) CreateUnstructuredK8sResource(obj *unstructured.Unstructured) error { return env.k8sClient.Create(env.Context, obj) } diff --git a/test/utils/utils.go b/test/utils/utils.go index 8a7026d3..c110ec60 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -3,6 +3,7 @@ package utils import ( "errors" "fmt" + eventinv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" "math/rand" "reflect" "time" @@ -310,6 +311,20 @@ func NewOAuthSecret(name, namespace string) *v1.Secret { return secret } +func NewSubscription(name, namespace string) *eventinv1alpha2.Subscription { + return &eventinv1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: eventinv1alpha2.SubscriptionSpec{ + Sink: "test-sink", + Source: "test-source", + Types: []string{"test1.nats.type", "test2.nats.type"}, + }, + } +} + func FindObjectByKind(kind string, objects []client.Object) (client.Object, error) { for _, obj := range objects { if obj.GetObjectKind().GroupVersionKind().Kind == kind {