From a117e06e39dfee2f2b67468076212055387efe68 Mon Sep 17 00:00:00 2001 From: marcobebway Date: Mon, 15 Apr 2024 11:37:36 +0200 Subject: [PATCH] Refactor subscription validation --- cmd/main.go | 2 +- .../subscription/eventmesh/reconciler.go | 33 ++--- .../reconciler_internal_integration_test.go | 24 ++-- .../test/reconciler_integration_test.go | 120 ++++++++++-------- .../subscription/eventmesh/test/utils.go | 9 +- .../eventmesh/testwebhookauth/utils.go | 10 +- .../subscription/jetstream/reconciler.go | 59 ++++----- .../jetstream/reconciler_integration_test.go | 6 +- .../reconciler_internal_unit_test.go | 25 ++-- .../subscription/jetstream/test_utils_test.go | 8 +- .../eventing/subscription/validator/sink.go | 62 +++++++++ .../subscription/validator/sink_test.go | 83 ++++++++++++ .../subscription/validator/subscription.go | 47 +++++++ .../validator/subscription_test.go | 3 + internal/webhook/cleanup.go | 110 ++++++++++------ pkg/backend/sink/validator.go | 68 ---------- pkg/backend/sink/validator_test.go | 95 -------------- .../eventmesh/eventmesh.go | 8 +- .../jetstream/jetstream.go | 8 +- 19 files changed, 432 insertions(+), 348 deletions(-) create mode 100644 internal/controller/eventing/subscription/validator/sink.go create mode 100644 internal/controller/eventing/subscription/validator/sink_test.go create mode 100644 internal/controller/eventing/subscription/validator/subscription.go create mode 100644 internal/controller/eventing/subscription/validator/subscription_test.go delete mode 100644 pkg/backend/sink/validator.go delete mode 100644 pkg/backend/sink/validator_test.go diff --git a/cmd/main.go b/cmd/main.go index 95ab7d61..0eddd564 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -219,7 +219,7 @@ func main() { //nolint:funlen // main function needs to initialize many object } if errs := webhook.CleanupResources(ctx, k8sClient); len(errs) > 0 { - setupLog.Error(errors.Join(errs...), "unable to cleanup kubernetes webhook resources") + setupLog.Error(errors.Join(errs...), "failed to cleanup kubernetes webhook resources") syncLogger(ctrLogger) os.Exit(1) } diff --git a/internal/controller/eventing/subscription/eventmesh/reconciler.go b/internal/controller/eventing/subscription/eventmesh/reconciler.go index 7552b74e..fcaecff1 100644 --- a/internal/controller/eventing/subscription/eventmesh/reconciler.go +++ b/internal/controller/eventing/subscription/eventmesh/reconciler.go @@ -28,11 +28,11 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" controllererrors "github.com/kyma-project/eventing-manager/internal/controller/errors" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/internal/controller/events" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" "github.com/kyma-project/eventing-manager/pkg/constants" "github.com/kyma-project/eventing-manager/pkg/ems/api/events/types" @@ -55,7 +55,7 @@ type Reconciler struct { oauth2credentials *eventmesh.OAuth2ClientCredentials // nameMapper is used to map the Kyma subscription name to a subscription name on EventMesh. nameMapper backendutils.NameMapper - sinkValidator sink.Validator + subscriptionValidator validator.SubscriptionValidator collector *metrics.Collector syncConditionWebhookCallStatus syncConditionWebhookCallStatusFunc } @@ -73,8 +73,8 @@ const ( func NewReconciler(client client.Client, logger *logger.Logger, recorder record.EventRecorder, cfg env.Config, cleaner cleaner.Cleaner, eventMeshBackend eventmesh.Backend, - credential *eventmesh.OAuth2ClientCredentials, mapper backendutils.NameMapper, validator sink.Validator, - collector *metrics.Collector, domain string, + credential *eventmesh.OAuth2ClientCredentials, mapper backendutils.NameMapper, + subscriptionValidator validator.SubscriptionValidator, collector *metrics.Collector, domain string, ) *Reconciler { if err := eventMeshBackend.Initialize(cfg); err != nil { logger.WithContext().Errorw("Failed to start reconciler", "name", @@ -90,7 +90,7 @@ func NewReconciler(client client.Client, logger *logger.Logger, recorder record. cleaner: cleaner, oauth2credentials: credential, nameMapper: mapper, - sinkValidator: validator, + subscriptionValidator: subscriptionValidator, collector: collector, syncConditionWebhookCallStatus: syncConditionWebhookCallStatus, } @@ -144,7 +144,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re } // Validate subscription. - if validationErr := r.handleSubscriptionValidation(ctx, sub); validationErr != nil { + if validationErr := r.validateSubscription(ctx, sub); validationErr != nil { if updateErr := r.updateStatus(ctx, currentSubscription, sub, log); updateErr != nil { return kctrl.Result{}, errors.Join(validationErr, updateErr) } @@ -184,25 +184,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re return result, nil } -func (r *Reconciler) handleSubscriptionValidation(ctx context.Context, desiredSubscription *eventingv1alpha2.Subscription) error { +func (r *Reconciler) validateSubscription(ctx context.Context, subscription *eventingv1alpha2.Subscription) error { var err error - if err = r.validateSubscriptionSpec(ctx, desiredSubscription); err != nil { - desiredSubscription.Status.SetNotReady() - desiredSubscription.Status.ClearTypes() - desiredSubscription.Status.ClearBackend() - desiredSubscription.Status.ClearConditions() + if err = r.subscriptionValidator.Validate(ctx, *subscription); err != nil { + subscription.Status.SetNotReady() + subscription.Status.ClearTypes() + subscription.Status.ClearBackend() + subscription.Status.ClearConditions() } - desiredSubscription.Status.SetSubscriptionSpecValidCondition(err) + subscription.Status.SetSubscriptionSpecValidCondition(err) return err } -func (r *Reconciler) validateSubscriptionSpec(ctx context.Context, subscription *eventingv1alpha2.Subscription) error { - if errList := subscription.ValidateSpec(); len(errList) > 0 { - return errors.Join(errList.ToAggregate()) - } - return r.sinkValidator.Validate(ctx, subscription) -} - // updateSubscription updates the subscription changes to k8s. func (r *Reconciler) updateSubscription(ctx context.Context, sub *eventingv1alpha2.Subscription, logger *zap.SugaredLogger) error { namespacedName := &ktypes.NamespacedName{ diff --git a/internal/controller/eventing/subscription/eventmesh/reconciler_internal_integration_test.go b/internal/controller/eventing/subscription/eventmesh/reconciler_internal_integration_test.go index 0171d0d6..7603a498 100644 --- a/internal/controller/eventing/subscription/eventmesh/reconciler_internal_integration_test.go +++ b/internal/controller/eventing/subscription/eventmesh/reconciler_internal_integration_test.go @@ -24,11 +24,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh" "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh/mocks" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" "github.com/kyma-project/eventing-manager/pkg/ems/api/events/types" "github.com/kyma-project/eventing-manager/pkg/env" @@ -79,8 +79,8 @@ func TestReconciler_Reconcile(t *testing.T) { backendSyncErr := errors.New("backend sync error") backendDeleteErr := errors.New("backend delete error") validatorErr := errors.New("invalid sink") - happyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil }) - unhappyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return validatorErr }) + happyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) + unhappyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return validatorErr }) testCases := []struct { name string @@ -262,7 +262,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) { eventingtesting.WithMaxInFlight(10), ) - validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil }) + subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) col := metrics.NewCollector() @@ -292,7 +292,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) { testenv.backend, testenv.credentials, testenv.mapper, - validator, + subscriptionValidator, col, utils.Domain), testenv.fakeClient @@ -322,7 +322,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) { testenv.backend, testenv.credentials, testenv.mapper, - validator, + subscriptionValidator, col, utils.Domain), testenv.fakeClient @@ -393,7 +393,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) { eventingtesting.WithMaxInFlight(10), ) - validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil }) + subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) col := metrics.NewCollector() testCases := []struct { @@ -423,7 +423,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) { testenv.backend, testenv.credentials, testenv.mapper, - validator, + subscriptionValidator, col, utils.Domain), testenv.fakeClient @@ -459,7 +459,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) { testenv.backend, testenv.credentials, testenv.mapper, - validator, + subscriptionValidator, col, utils.Domain), testenv.fakeClient @@ -579,7 +579,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) { func TestReconciler_PreserveBackendHashes(t *testing.T) { ctx := context.Background() collector := metrics.NewCollector() - validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil }) + subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) const ( ev2hash = int64(118518533334734) @@ -620,7 +620,7 @@ func TestReconciler_PreserveBackendHashes(t *testing.T) { te.backend.On("Initialize", mock.Anything).Return(nil) te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) return NewReconciler(te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner, - te.backend, te.credentials, te.mapper, validator, collector, utils.Domain), te.fakeClient + te.backend, te.credentials, te.mapper, subscriptionValidator, collector, utils.Domain), te.fakeClient }, wantEv2Hash: ev2hash, wantEventMeshHash: eventMeshHash, @@ -650,7 +650,7 @@ func TestReconciler_PreserveBackendHashes(t *testing.T) { te.backend.On("Initialize", mock.Anything).Return(nil) te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) return NewReconciler(te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner, - te.backend, te.credentials, te.mapper, validator, collector, utils.Domain), te.fakeClient + te.backend, te.credentials, te.mapper, subscriptionValidator, collector, utils.Domain), te.fakeClient }, wantEv2Hash: ev2hash, wantEventMeshHash: eventMeshHash, diff --git a/internal/controller/eventing/subscription/eventmesh/test/reconciler_integration_test.go b/internal/controller/eventing/subscription/eventmesh/test/reconciler_integration_test.go index 4f65d09d..25f4acbc 100644 --- a/internal/controller/eventing/subscription/eventmesh/test/reconciler_integration_test.go +++ b/internal/controller/eventing/subscription/eventmesh/test/reconciler_integration_test.go @@ -53,7 +53,7 @@ func Test_CreateSubscription(t *testing.T) { testCases := []struct { name string givenSubscriptionFunc func(namespace string) *eventingv1alpha2.Subscription - wantSubscriptionMatchers gomegatypes.GomegaMatcher + wantSubscriptionMatchers func(namespace string) gomegatypes.GomegaMatcher wantEventMeshSubMatchers gomegatypes.GomegaMatcher wantEventMeshSubCheck bool wantAPIRuleCheck bool @@ -74,13 +74,15 @@ func Test_CreateSubscription(t *testing.T) { eventingtesting.WithMaxInFlight(10), ) }, - wantSubscriptionMatchers: gomega.And( - eventingtesting.HaveSubscriptionNotReady(), - eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition( - eventingv1alpha2.ConditionSubscriptionSpecValid, - eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors, - kcorev1.ConditionFalse, "Sink validation failed: Service \"invalid\" not found")), - ), + wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveSubscriptionNotReady(), + eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition( + eventingv1alpha2.ConditionSubscriptionSpecValid, + eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors, + kcorev1.ConditionFalse, fmt.Sprintf("Subscription validation failed: Sink validation failed: service invalid.%s not found in the cluster", namespace))), + ) + }, }, { name: "should succeed to create subscription if types are non-empty", @@ -96,19 +98,21 @@ func Test_CreateSubscription(t *testing.T) { eventingtesting.WithMaxInFlight(10), ) }, - wantSubscriptionMatchers: gomega.And( - eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), - eventingtesting.HaveSubscriptionActiveCondition(), - eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ - { - OriginalType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1EventNotClean), - CleanType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1Event), - }, { - OriginalType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1EventNotClean), - CleanType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1Event), - }, - }), - ), + wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), + eventingtesting.HaveSubscriptionActiveCondition(), + eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ + { + OriginalType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1EventNotClean), + CleanType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1Event), + }, { + OriginalType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1EventNotClean), + CleanType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1Event), + }, + }), + ) + }, wantEventMeshSubMatchers: gomega.And( eventmeshsubmatchers.HaveEvents(emstypes.Events{ { @@ -135,9 +139,11 @@ func Test_CreateSubscription(t *testing.T) { eventingtesting.WithSinkURL(eventingtesting.ValidSinkURL(namespace, testName)), ) }, - wantSubscriptionMatchers: gomega.And( - eventingtesting.HaveSubscriptionActiveCondition(), - ), + wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveSubscriptionActiveCondition(), + ) + }, wantEventMeshSubMatchers: gomega.And( // should have default values for protocol and webhook auth eventmeshsubmatchers.HaveContentMode(emTestEnsemble.envConfig.ContentMode), @@ -163,16 +169,18 @@ func Test_CreateSubscription(t *testing.T) { eventingtesting.WithSinkURL(eventingtesting.ValidSinkURL(namespace, testName)), ) }, - wantSubscriptionMatchers: gomega.And( - eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), - eventingtesting.HaveSubscriptionActiveCondition(), - eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ - { - OriginalType: eventingtesting.EventMeshExactType, - CleanType: eventingtesting.EventMeshExactType, - }, - }), - ), + wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), + eventingtesting.HaveSubscriptionActiveCondition(), + eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ + { + OriginalType: eventingtesting.EventMeshExactType, + CleanType: eventingtesting.EventMeshExactType, + }, + }), + ) + }, wantEventMeshSubMatchers: gomega.And( eventmeshsubmatchers.HaveEvents(emstypes.Events{ { @@ -197,16 +205,18 @@ func Test_CreateSubscription(t *testing.T) { eventingtesting.WithMaxInFlight(10), ) }, - wantSubscriptionMatchers: gomega.And( - eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), - eventingtesting.HaveSubscriptionActiveCondition(), - eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ - { - OriginalType: eventingtesting.OrderCreatedV1EventNotClean, - CleanType: eventingtesting.OrderCreatedV1Event, - }, - }), - ), + wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer), + eventingtesting.HaveSubscriptionActiveCondition(), + eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{ + { + OriginalType: eventingtesting.OrderCreatedV1EventNotClean, + CleanType: eventingtesting.OrderCreatedV1Event, + }, + }), + ) + }, wantEventMeshSubMatchers: gomega.And( eventmeshsubmatchers.HaveEvents(emstypes.Events{ { @@ -242,7 +252,7 @@ func Test_CreateSubscription(t *testing.T) { ensureK8sResourceCreated(ctx, t, givenSubscription) // check if the subscription is as required - getSubscriptionAssert(ctx, g, givenSubscription).Should(testcase.wantSubscriptionMatchers) + getSubscriptionAssert(ctx, g, givenSubscription).Should(testcase.wantSubscriptionMatchers(testNamespace)) if testcase.wantAPIRuleCheck { // check if an APIRule was created for the subscription @@ -545,14 +555,16 @@ func Test_FixingSinkAndApiRule(t *testing.T) { ) } - wantSubscriptionWithoutSinkMatchers := gomega.And( - eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition( - eventingv1alpha2.ConditionSubscriptionSpecValid, - eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors, - kcorev1.ConditionFalse, - "Sink validation failed: Service \"invalid\" not found", - )), - ) + wantSubscriptionWithoutSinkMatchers := func(namespace string) gomegatypes.GomegaMatcher { + return gomega.And( + eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition( + eventingv1alpha2.ConditionSubscriptionSpecValid, + eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors, + kcorev1.ConditionFalse, + fmt.Sprintf("Subscription validation failed: Sink validation failed: service invalid.%s not found in the cluster", namespace), + )), + ) + } givenUpdateSubscriptionWithSinkFunc := func(namespace, name, sinkFormat, path string) *eventingv1alpha2.Subscription { return eventingtesting.NewSubscription(name, namespace, @@ -611,7 +623,7 @@ func Test_FixingSinkAndApiRule(t *testing.T) { ensureK8sResourceCreated(ctx, t, givenSubscription) createdSubscription := givenSubscription.DeepCopy() // check if the created subscription is correct - getSubscriptionAssert(ctx, g, createdSubscription).Should(wantSubscriptionWithoutSinkMatchers) + getSubscriptionAssert(ctx, g, createdSubscription).Should(wantSubscriptionWithoutSinkMatchers(testNamespace)) // update subscription with valid sink givenUpdateSubscription.ResourceVersion = createdSubscription.ResourceVersion diff --git a/internal/controller/eventing/subscription/eventmesh/test/utils.go b/internal/controller/eventing/subscription/eventmesh/test/utils.go index 0b083732..d0fafa71 100644 --- a/internal/controller/eventing/subscription/eventmesh/test/utils.go +++ b/internal/controller/eventing/subscription/eventmesh/test/utils.go @@ -32,10 +32,10 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" subscriptioncontrollereventmesh "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/eventmesh" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" backendeventmesh "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" "github.com/kyma-project/eventing-manager/pkg/constants" emstypes "github.com/kyma-project/eventing-manager/pkg/ems/api/events/types" @@ -122,11 +122,14 @@ func setupSuite() error { // setup eventMesh reconciler recorder := k8sManager.GetEventRecorderFor("eventing-controller") - sinkValidator := sink.NewValidator(k8sManager.GetClient(), recorder) emTestEnsemble.envConfig = getEnvConfig() eventMesh, credentials := setupEventMesh(defaultLogger) + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(k8sManager.GetClient()) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) + col := metrics.NewCollector() testReconciler := subscriptioncontrollereventmesh.NewReconciler( k8sManager.GetClient(), @@ -137,7 +140,7 @@ func setupSuite() error { eventMesh, credentials, emTestEnsemble.nameMapper, - sinkValidator, + subscriptionValidator, col, testutils.Domain, ) diff --git a/internal/controller/eventing/subscription/eventmesh/testwebhookauth/utils.go b/internal/controller/eventing/subscription/eventmesh/testwebhookauth/utils.go index 30bb7adb..509f64b1 100644 --- a/internal/controller/eventing/subscription/eventmesh/testwebhookauth/utils.go +++ b/internal/controller/eventing/subscription/eventmesh/testwebhookauth/utils.go @@ -30,10 +30,10 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" subscriptioncontrollereventmesh "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/eventmesh" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" backendeventmesh "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" emstypes "github.com/kyma-project/eventing-manager/pkg/ems/api/events/types" "github.com/kyma-project/eventing-manager/pkg/env" @@ -124,10 +124,14 @@ func setupSuite() (*eventMeshTestEnsemble, error) { // setup eventMesh reconciler recorder := k8sManager.GetEventRecorderFor("eventing-controller") - sinkValidator := sink.NewValidator(k8sManager.GetClient(), recorder) emTestEnsemble.envConfig = getEnvConfig(emTestEnsemble) eventMeshBackend = backendeventmesh.NewEventMesh(credentials, emTestEnsemble.nameMapper, defaultLogger) col := metrics.NewCollector() + + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(k8sManager.GetClient()) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) + testReconciler = subscriptioncontrollereventmesh.NewReconciler( k8sManager.GetClient(), defaultLogger, @@ -137,7 +141,7 @@ func setupSuite() (*eventMeshTestEnsemble, error) { eventMeshBackend, credentials, emTestEnsemble.nameMapper, - sinkValidator, + subscriptionValidator, col, testutils.Domain, ) diff --git a/internal/controller/eventing/subscription/jetstream/reconciler.go b/internal/controller/eventing/subscription/jetstream/reconciler.go index e816d70d..199f2f02 100644 --- a/internal/controller/eventing/subscription/jetstream/reconciler.go +++ b/internal/controller/eventing/subscription/jetstream/reconciler.go @@ -20,11 +20,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/internal/controller/events" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" "github.com/kyma-project/eventing-manager/pkg/backend/jetstream" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" emerrors "github.com/kyma-project/eventing-manager/pkg/errors" "github.com/kyma-project/eventing-manager/pkg/logger" @@ -40,28 +40,28 @@ const ( type Reconciler struct { client.Client - Backend jetstream.Backend - recorder record.EventRecorder - logger *logger.Logger - cleaner cleaner.Cleaner - sinkValidator sink.Validator - customEventsChannel chan event.GenericEvent - collector *metrics.Collector + Backend jetstream.Backend + recorder record.EventRecorder + logger *logger.Logger + cleaner cleaner.Cleaner + subscriptionValidator validator.SubscriptionValidator + customEventsChannel chan event.GenericEvent + collector *metrics.Collector } func NewReconciler(client client.Client, jsBackend jetstream.Backend, logger *logger.Logger, recorder record.EventRecorder, cleaner cleaner.Cleaner, - defaultSinkValidator sink.Validator, collector *metrics.Collector, + subscriptionValidator validator.SubscriptionValidator, collector *metrics.Collector, ) *Reconciler { reconciler := &Reconciler{ - Client: client, - Backend: jsBackend, - recorder: recorder, - logger: logger, - cleaner: cleaner, - sinkValidator: defaultSinkValidator, - customEventsChannel: make(chan event.GenericEvent), - collector: collector, + Client: client, + Backend: jsBackend, + recorder: recorder, + logger: logger, + cleaner: cleaner, + subscriptionValidator: subscriptionValidator, + customEventsChannel: make(chan event.GenericEvent), + collector: collector, } return reconciler } @@ -131,8 +131,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re } // Validate subscription. - if validationErr := r.handleSubscriptionValidation(ctx, desiredSubscription); validationErr != nil { - if errors.Is(validationErr, sink.ErrSinkValidationFailed) { + if validationErr := r.validateSubscription(ctx, desiredSubscription); validationErr != nil { + if errors.Is(validationErr, validator.ErrSinkValidationFailed) { if deleteErr := r.Backend.DeleteSubscriptionsOnly(desiredSubscription); deleteErr != nil { log.Errorw("Failed to delete JetStream subscriptions", "error", deleteErr) return kctrl.Result{}, deleteErr @@ -171,25 +171,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re return kctrl.Result{}, r.syncSubscriptionStatus(ctx, desiredSubscription, nil, log) } -func (r *Reconciler) handleSubscriptionValidation(ctx context.Context, desiredSubscription *eventingv1alpha2.Subscription) error { +func (r *Reconciler) validateSubscription(ctx context.Context, subscription *eventingv1alpha2.Subscription) error { var err error - if err = r.validateSubscriptionSpec(ctx, desiredSubscription); err != nil { - desiredSubscription.Status.SetNotReady() - desiredSubscription.Status.ClearTypes() - desiredSubscription.Status.ClearBackend() - desiredSubscription.Status.ClearConditions() + if err = r.subscriptionValidator.Validate(ctx, *subscription); err != nil { + subscription.Status.SetNotReady() + subscription.Status.ClearTypes() + subscription.Status.ClearBackend() + subscription.Status.ClearConditions() } - desiredSubscription.Status.SetSubscriptionSpecValidCondition(err) + subscription.Status.SetSubscriptionSpecValidCondition(err) return err } -func (r *Reconciler) validateSubscriptionSpec(ctx context.Context, subscription *eventingv1alpha2.Subscription) error { - if errList := subscription.ValidateSpec(); len(errList) > 0 { - return errors.Join(errList.ToAggregate()) - } - return r.sinkValidator.Validate(ctx, subscription) -} - func (r *Reconciler) updateSubscriptionMetrics(current, desired *eventingv1alpha2.Subscription) { for _, currentType := range current.Status.Backend.Types { found := false diff --git a/internal/controller/eventing/subscription/jetstream/reconciler_integration_test.go b/internal/controller/eventing/subscription/jetstream/reconciler_integration_test.go index 1f117d8b..f03f1099 100644 --- a/internal/controller/eventing/subscription/jetstream/reconciler_integration_test.go +++ b/internal/controller/eventing/subscription/jetstream/reconciler_integration_test.go @@ -11,7 +11,6 @@ import ( gomegatypes "github.com/onsi/gomega/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - kcorev1 "k8s.io/api/core/v1" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" eventingtesting "github.com/kyma-project/eventing-manager/testing" @@ -187,12 +186,9 @@ func Test_CreateSubscription(t *testing.T) { K8sSubscription: []gomegatypes.GomegaMatcher{ eventingtesting.HaveCondition( ConditionInvalidSink( - "Sink validation failed: Service \"testapp\" not found", + "Subscription validation failed: Sink validation failed: service testapp.test not found in the cluster", )), }, - K8sEvents: []kcorev1.Event{ - EventInvalidSink("Sink does not correspond to a valid cluster local svc"), - }, }, }, { diff --git a/internal/controller/eventing/subscription/jetstream/reconciler_internal_unit_test.go b/internal/controller/eventing/subscription/jetstream/reconciler_internal_unit_test.go index b713db95..ab618553 100644 --- a/internal/controller/eventing/subscription/jetstream/reconciler_internal_unit_test.go +++ b/internal/controller/eventing/subscription/jetstream/reconciler_internal_unit_test.go @@ -20,11 +20,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" "github.com/kyma-project/eventing-manager/pkg/backend/jetstream" backendjetstreammocks "github.com/kyma-project/eventing-manager/pkg/backend/jetstream/mocks" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/logger" eventingtesting "github.com/kyma-project/eventing-manager/testing" @@ -64,8 +64,8 @@ func Test_Reconcile(t *testing.T) { missingSubSyncErr := jetstream.ErrMissingSubscription backendDeleteErr := errors.New("backend delete error") validatorErr := errors.New("invalid sink") - happyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil }) - unhappyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return validatorErr }) + happyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) + unhappyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return validatorErr }) collector := metrics.NewCollector() testCases := []struct { @@ -746,16 +746,19 @@ func setupTestEnvironment(t *testing.T, objs ...client.Object) *TestEnvironment t.Fatalf("initialize logger failed: %v", err) } jsCleaner := cleaner.NewJetStreamCleaner(defaultLogger) - defaultSinkValidator := sink.NewValidator(fakeClient, recorder) + + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(fakeClient) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) reconciler := Reconciler{ - Backend: mockedBackend, - Client: fakeClient, - logger: defaultLogger, - recorder: recorder, - sinkValidator: defaultSinkValidator, - cleaner: jsCleaner, - collector: metrics.NewCollector(), + Backend: mockedBackend, + Client: fakeClient, + logger: defaultLogger, + recorder: recorder, + subscriptionValidator: subscriptionValidator, + cleaner: jsCleaner, + collector: metrics.NewCollector(), } return &TestEnvironment{ diff --git a/internal/controller/eventing/subscription/jetstream/test_utils_test.go b/internal/controller/eventing/subscription/jetstream/test_utils_test.go index ffba32e1..9867a251 100644 --- a/internal/controller/eventing/subscription/jetstream/test_utils_test.go +++ b/internal/controller/eventing/subscription/jetstream/test_utils_test.go @@ -29,11 +29,11 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" subscriptioncontrollerjetstream "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/jetstream" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/internal/controller/events" backendcleaner "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" "github.com/kyma-project/eventing-manager/pkg/backend/jetstream" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/logger" eventingtesting "github.com/kyma-project/eventing-manager/testing" @@ -172,13 +172,17 @@ func startReconciler() error { k8sClient := k8sManager.GetClient() recorder := k8sManager.GetEventRecorderFor("eventing-controller-jetstream") + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(k8sClient) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) + jsTestEnsemble.Reconciler = subscriptioncontrollerjetstream.NewReconciler( k8sClient, jetStreamHandler, defaultLogger, recorder, cleaner, - sink.NewValidator(k8sClient, recorder), + subscriptionValidator, metricsCollector, ) diff --git a/internal/controller/eventing/subscription/validator/sink.go b/internal/controller/eventing/subscription/validator/sink.go new file mode 100644 index 00000000..26fe2629 --- /dev/null +++ b/internal/controller/eventing/subscription/validator/sink.go @@ -0,0 +1,62 @@ +package validator + +import ( + "context" + "fmt" + + pkgerrors "github.com/pkg/errors" + kcorev1 "k8s.io/api/core/v1" + ktypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kyma-project/eventing-manager/pkg/utils" +) + +var ErrSinkValidationFailed = pkgerrors.New("Sink validation failed") + +type SinkValidator interface { + Validate(ctx context.Context, sink string) error +} + +type sinkValidator struct { + client client.Client +} + +// Perform a compile-time check. +var _ SinkValidator = &sinkValidator{} + +func NewSinkValidator(client client.Client) SinkValidator { + return &sinkValidator{client: client} +} + +func (sv sinkValidator) Validate(ctx context.Context, sink string) error { + _, subDomains, err := utils.GetSinkData(sink) + if err != nil { + return fmt.Errorf("%w: %w", ErrSinkValidationFailed, err) + } + + if len(subDomains) < 2 { + return fmt.Errorf("%w: sink format should contain the service name.namespace", ErrSinkValidationFailed) + } + + namespace, name := subDomains[1], subDomains[0] + if !sv.serviceExists(ctx, namespace, name) { + return fmt.Errorf("%w: service %s.%s not found in the cluster", ErrSinkValidationFailed, name, namespace) + } + + return nil +} + +func (sv sinkValidator) serviceExists(ctx context.Context, namespace, name string) bool { + return sv.client.Get(ctx, ktypes.NamespacedName{Namespace: namespace, Name: name}, &kcorev1.Service{}) == nil +} + +// SinkValidatorFunc implements the SinkValidator interface. +type SinkValidatorFunc func(ctx context.Context, sink string) error + +// Perform a compile-time check. +var _ SinkValidator = SinkValidatorFunc(func(_ context.Context, _ string) error { return nil }) + +func (svf SinkValidatorFunc) Validate(ctx context.Context, sink string) error { + return svf(ctx, sink) +} diff --git a/internal/controller/eventing/subscription/validator/sink_test.go b/internal/controller/eventing/subscription/validator/sink_test.go new file mode 100644 index 00000000..301f2801 --- /dev/null +++ b/internal/controller/eventing/subscription/validator/sink_test.go @@ -0,0 +1,83 @@ +package validator + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + kcorev1 "k8s.io/api/core/v1" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestSinkValidator(t *testing.T) { + // given + const ( + namespaceName = "test-namespace" + serviceName = "test-service" + ) + + ctx := context.Background() + fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + validator := NewSinkValidator(fakeClient) + + tests := []struct { + name string + givenSink string + givenService *kcorev1.Service + wantErr error + }{ + { + name: "With empty sink", + givenSink: "", + givenService: nil, + wantErr: ErrSinkValidationFailed, + }, + { + name: "With invalid sink URL", + givenSink: "[:invalid:url:]", + givenService: nil, + wantErr: ErrSinkValidationFailed, + }, + { + name: "With invalid sink format", + givenSink: "http://insuffecient", + givenService: nil, + wantErr: ErrSinkValidationFailed, + }, + { + name: "With non-existing service", + givenSink: fmt.Sprintf("https://%s.%s", serviceName, namespaceName), + givenService: nil, + wantErr: ErrSinkValidationFailed, + }, + { + name: "With existing service", + givenSink: fmt.Sprintf("https://%s.%s", serviceName, namespaceName), + givenService: &kcorev1.Service{ + ObjectMeta: kmetav1.ObjectMeta{Name: serviceName, Namespace: namespaceName}, + }, + wantErr: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // create service if needed + if test.givenService != nil { + require.NoError(t, fakeClient.Create(ctx, test.givenService)) + } + + // when + gotErr := validator.Validate(ctx, test.givenSink) + + // then + if test.wantErr == nil { + require.NoError(t, gotErr) + } else { + require.ErrorIs(t, gotErr, ErrSinkValidationFailed) + } + }) + } +} diff --git a/internal/controller/eventing/subscription/validator/subscription.go b/internal/controller/eventing/subscription/validator/subscription.go new file mode 100644 index 00000000..07080310 --- /dev/null +++ b/internal/controller/eventing/subscription/validator/subscription.go @@ -0,0 +1,47 @@ +package validator + +import ( + "context" + "fmt" + + pkgerrors "github.com/pkg/errors" + + eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" +) + +var ErrSubscriptionValidationFailed = pkgerrors.New("Subscription validation failed") + +type SubscriptionValidator interface { + Validate(ctx context.Context, subscription eventingv1alpha2.Subscription) error +} + +type subscriptionValidator struct { + sinkValidator SinkValidator +} + +// Perform a compile-time check. +var _ SubscriptionValidator = &subscriptionValidator{} + +func NewSubscriptionValidator(sinkValidator SinkValidator) SubscriptionValidator { + return &subscriptionValidator{sinkValidator: sinkValidator} +} + +func (sv *subscriptionValidator) Validate(ctx context.Context, subscription eventingv1alpha2.Subscription) error { + if errs := subscription.ValidateSpec(); len(errs) > 0 { + return fmt.Errorf("%w: %w", ErrSubscriptionValidationFailed, errs.ToAggregate()) + } + if err := sv.sinkValidator.Validate(ctx, subscription.Spec.Sink); err != nil { + return fmt.Errorf("%w: %w", ErrSubscriptionValidationFailed, err) + } + return nil +} + +// SubscriptionValidatorFunc implements the SinkValidator interface. +type SubscriptionValidatorFunc func(ctx context.Context, subscription eventingv1alpha2.Subscription) error + +// Perform a compile-time check. +var _ SubscriptionValidator = SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil }) + +func (svf SubscriptionValidatorFunc) Validate(ctx context.Context, subscription eventingv1alpha2.Subscription) error { + return svf(ctx, subscription) +} diff --git a/internal/controller/eventing/subscription/validator/subscription_test.go b/internal/controller/eventing/subscription/validator/subscription_test.go new file mode 100644 index 00000000..b53c73ea --- /dev/null +++ b/internal/controller/eventing/subscription/validator/subscription_test.go @@ -0,0 +1,3 @@ +package validator + +// TODO(marcobebway): Add tests for the subscription validator. diff --git a/internal/webhook/cleanup.go b/internal/webhook/cleanup.go index 2d866577..a17a9b54 100644 --- a/internal/webhook/cleanup.go +++ b/internal/webhook/cleanup.go @@ -20,56 +20,92 @@ func CleanupResources(ctx context.Context, client kctrlclient.Client) []error { mutatingWebhookConfiguration = "subscription-mutating-webhook-configuration" validatingWebhookConfiguration = "subscription-validating-webhook-configuration" ) - return []error{ - deleteService(ctx, client, namespace, service), - deleteCronJob(ctx, client, namespace, cronjob), - deleteJob(ctx, client, namespace, job), - deleteMutatingWebhookConfiguration(ctx, client, namespace, mutatingWebhookConfiguration), - deleteValidatingWebhookConfiguration(ctx, client, namespace, validatingWebhookConfiguration), - } + var errList = make([]error, 0, 5) + appendIfError(errList, deleteService(ctx, client, namespace, service)) + appendIfError(errList, deleteCronJob(ctx, client, namespace, cronjob)) + appendIfError(errList, deleteJob(ctx, client, namespace, job)) + appendIfError(errList, deleteMutatingWebhookConfiguration(ctx, client, namespace, mutatingWebhookConfiguration)) + appendIfError(errList, deleteValidatingWebhookConfiguration(ctx, client, namespace, validatingWebhookConfiguration)) + return errList } func deleteService(ctx context.Context, client kctrlclient.Client, namespace, name string) error { - return client.Delete(ctx, &kcorev1.Service{ - ObjectMeta: kmetav1.ObjectMeta{ - Namespace: namespace, - Name: name, - }, - }) + return kctrlclient.IgnoreNotFound( + client.Delete( + ctx, + &kcorev1.Service{ + ObjectMeta: kmetav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + }, + kctrlclient.PropagationPolicy(kmetav1.DeletePropagationBackground), + ), + ) } func deleteCronJob(ctx context.Context, client kctrlclient.Client, namespace, name string) error { - return client.Delete(ctx, &kbatchv1.CronJob{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - }) + return kctrlclient.IgnoreNotFound( + client.Delete( + ctx, + &kbatchv1.CronJob{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + kctrlclient.PropagationPolicy(kmetav1.DeletePropagationBackground), + ), + ) } func deleteJob(ctx context.Context, client kctrlclient.Client, namespace, name string) error { - return client.Delete(ctx, &kbatchv1.Job{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - }) + return kctrlclient.IgnoreNotFound( + client.Delete( + ctx, + &kbatchv1.Job{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + kctrlclient.PropagationPolicy(kmetav1.DeletePropagationBackground), + ), + ) } func deleteMutatingWebhookConfiguration(ctx context.Context, client kctrlclient.Client, namespace, name string) error { - return client.Delete(ctx, &kadmissionregistrationv1.MutatingWebhookConfiguration{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - }) + return kctrlclient.IgnoreNotFound( + client.Delete( + ctx, + &kadmissionregistrationv1.MutatingWebhookConfiguration{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + kctrlclient.PropagationPolicy(kmetav1.DeletePropagationBackground), + ), + ) } func deleteValidatingWebhookConfiguration(ctx context.Context, client kctrlclient.Client, namespace, name string) error { - return client.Delete(ctx, &kadmissionregistrationv1.ValidatingWebhookConfiguration{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - }) + return kctrlclient.IgnoreNotFound( + client.Delete( + ctx, + &kadmissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: kmetav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + kctrlclient.PropagationPolicy(kmetav1.DeletePropagationBackground), + ), + ) +} + +func appendIfError(errList []error, err error) { + if err != nil { + errList = append(errList, err) + } } diff --git a/pkg/backend/sink/validator.go b/pkg/backend/sink/validator.go deleted file mode 100644 index 1682abeb..00000000 --- a/pkg/backend/sink/validator.go +++ /dev/null @@ -1,68 +0,0 @@ -package sink - -import ( - "context" - "fmt" - - "golang.org/x/xerrors" - kcorev1 "k8s.io/api/core/v1" - ktypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" - "github.com/kyma-project/eventing-manager/internal/controller/events" - "github.com/kyma-project/eventing-manager/pkg/utils" -) - -var ErrSinkValidationFailed = xerrors.New("Sink validation failed") - -type Validator interface { - Validate(ctx context.Context, subscription *v1alpha2.Subscription) error -} - -// ValidatorFunc implements the Validator interface. -type ValidatorFunc func(context.Context, *v1alpha2.Subscription) error - -func (vf ValidatorFunc) Validate(ctx context.Context, sub *v1alpha2.Subscription) error { - return vf(ctx, sub) -} - -type defaultSinkValidator struct { - client client.Client - recorder record.EventRecorder -} - -// Perform a compile-time check. -var _ Validator = &defaultSinkValidator{} - -func NewValidator(client client.Client, recorder record.EventRecorder) Validator { - return &defaultSinkValidator{client: client, recorder: recorder} -} - -func (s defaultSinkValidator) Validate(ctx context.Context, subscription *v1alpha2.Subscription) error { - var svcNs, svcName string - - if _, subDomains, err := utils.GetSinkData(subscription.Spec.Sink); err != nil { - return err - } else { - svcNs = subDomains[1] - svcName = subDomains[0] - } - - if _, err := GetClusterLocalService(ctx, s.client, svcNs, svcName); err != nil { - events.Warn(s.recorder, subscription, events.ReasonValidationFailed, "Sink does not correspond to a valid cluster local svc") - return fmt.Errorf("%w: %w", ErrSinkValidationFailed, err) - } - - return nil -} - -func GetClusterLocalService(ctx context.Context, client client.Client, svcNs, svcName string) (*kcorev1.Service, error) { - svcLookupKey := ktypes.NamespacedName{Name: svcName, Namespace: svcNs} - svc := &kcorev1.Service{} - if err := client.Get(ctx, svcLookupKey, svc); err != nil { - return nil, err - } - return svc, nil -} diff --git a/pkg/backend/sink/validator_test.go b/pkg/backend/sink/validator_test.go deleted file mode 100644 index a6ae8559..00000000 --- a/pkg/backend/sink/validator_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package sink - -import ( - "context" - "strings" - "testing" - - "github.com/stretchr/testify/require" - kcorev1 "k8s.io/api/core/v1" - kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" - eventingtesting "github.com/kyma-project/eventing-manager/testing" -) - -func TestSinkValidator(t *testing.T) { - // given - namespaceName := "test" - fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() - ctx := context.Background() - recorder := &record.FakeRecorder{} - sinkValidator := NewValidator(fakeClient, recorder) - - testCases := []struct { - name string - givenSubscriptionSink string - givenSvcNameToCreate string - wantErrString string - }{ - { - name: "With invalid URL", - givenSubscriptionSink: "http://invalid Sink", - wantErrString: "failed to parse subscription sink URL", - }, - { - name: "With no existing svc in the cluster", - givenSubscriptionSink: "https://eventing-nats.test.svc.cluster.local:8080", - wantErrString: "Sink validation failed", - }, - { - name: "With no existing svc in the cluster, service has the wrong name", - givenSubscriptionSink: "https://eventing-nats.test.svc.cluster.local:8080", - givenSvcNameToCreate: "test", // wrong name - wantErrString: "Sink validation failed", - }, - { - name: "With a valid sink", - givenSubscriptionSink: "https://eventing-nats.test.svc.cluster.local:8080", - givenSvcNameToCreate: "eventing-nats", - wantErrString: "", - }, - } - - for _, tC := range testCases { - testCase := tC - t.Run(testCase.name, func(t *testing.T) { - // given - sub := eventingtesting.NewSubscription( - "foo", namespaceName, - eventingtesting.WithConditions([]eventingv1alpha2.Condition{}), - eventingtesting.WithStatus(true), - eventingtesting.WithSink(testCase.givenSubscriptionSink), - ) - - // create the service if required for test - if testCase.givenSvcNameToCreate != "" { - svc := &kcorev1.Service{ - ObjectMeta: kmetav1.ObjectMeta{ - Name: testCase.givenSvcNameToCreate, - Namespace: namespaceName, - }, - } - - err := fakeClient.Create(ctx, svc) - require.NoError(t, err) - } - - // when - // call the defaultSinkValidator function - err := sinkValidator.Validate(ctx, sub) - - // then - // given error should match expected error - if testCase.wantErrString == "" { - require.NoError(t, err) - } else { - substringResult := strings.Contains(err.Error(), testCase.wantErrString) - require.True(t, substringResult) - } - }) - } -} diff --git a/pkg/subscriptionmanager/eventmesh/eventmesh.go b/pkg/subscriptionmanager/eventmesh/eventmesh.go index 0065e579..c0398870 100644 --- a/pkg/subscriptionmanager/eventmesh/eventmesh.go +++ b/pkg/subscriptionmanager/eventmesh/eventmesh.go @@ -20,10 +20,10 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/eventmesh" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" backendeventmesh "github.com/kyma-project/eventing-manager/pkg/backend/eventmesh" "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/logger" @@ -116,6 +116,10 @@ func (c *SubscriptionManager) Start(_ env.DefaultSubscriptionConfig, params subm client := c.mgr.GetClient() recorder := c.mgr.GetEventRecorderFor("eventing-controller-beb") + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(client) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) + // Initialize v1alpha2 handler for EventMesh eventMeshHandler := backendeventmesh.NewEventMesh(oauth2credential, nameMapper, c.logger) eventMeshcleaner := cleaner.NewEventMeshCleaner(c.logger) @@ -128,7 +132,7 @@ func (c *SubscriptionManager) Start(_ env.DefaultSubscriptionConfig, params subm eventMeshHandler, oauth2credential, nameMapper, - sink.NewValidator(client, recorder), + subscriptionValidator, c.collector, c.domain, ) diff --git a/pkg/subscriptionmanager/jetstream/jetstream.go b/pkg/subscriptionmanager/jetstream/jetstream.go index 68795b97..50498d5b 100644 --- a/pkg/subscriptionmanager/jetstream/jetstream.go +++ b/pkg/subscriptionmanager/jetstream/jetstream.go @@ -18,10 +18,10 @@ import ( eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2" subscriptioncontrollerjetstream "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/jetstream" + "github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator" "github.com/kyma-project/eventing-manager/pkg/backend/cleaner" backendjetstream "github.com/kyma-project/eventing-manager/pkg/backend/jetstream" backendmetrics "github.com/kyma-project/eventing-manager/pkg/backend/metrics" - "github.com/kyma-project/eventing-manager/pkg/backend/sink" backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/logger" @@ -91,6 +91,10 @@ func (sm *SubscriptionManager) Start(defaultSubsConfig env.DefaultSubscriptionCo client := sm.mgr.GetClient() recorder := sm.mgr.GetEventRecorderFor("eventing-controller-jetstream") + // Init the Subscription validator. + sinkValidator := validator.NewSinkValidator(client) + subscriptionValidator := validator.NewSubscriptionValidator(sinkValidator) + // Initialize v1alpha2 event type cleaner jsCleaner := cleaner.NewJetStreamCleaner(sm.logger) jetStreamHandler := backendjetstream.NewJetStream(sm.envCfg, @@ -101,7 +105,7 @@ func (sm *SubscriptionManager) Start(defaultSubsConfig env.DefaultSubscriptionCo sm.logger, recorder, jsCleaner, - sink.NewValidator(client, recorder), + subscriptionValidator, sm.metricsCollector, ) sm.backendv2 = jetStreamReconciler.Backend