Skip to content

Commit

Permalink
Refactor subscription validation
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobebway committed Apr 15, 2024
1 parent 6d2cead commit b89bdc6
Show file tree
Hide file tree
Showing 19 changed files with 432 additions and 348 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func main() { //nolint:funlen // main function needs to initialize many object
}

if errs := webhook.CleanupResources(ctx, k8sClient); len(errs) > 0 {
setupLog.Error(errors.Join(errs...), "unable to cleanup kubernetes webhook resources")
setupLog.Error(errors.Join(errs...), "failed to cleanup kubernetes webhook resources")
syncLogger(ctrLogger)
os.Exit(1)
}
Expand Down
33 changes: 13 additions & 20 deletions internal/controller/eventing/subscription/eventmesh/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
controllererrors "github.com/kyma-project/eventing-manager/internal/controller/errors"
"github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator"
"github.com/kyma-project/eventing-manager/internal/controller/events"
"github.com/kyma-project/eventing-manager/pkg/backend/cleaner"
"github.com/kyma-project/eventing-manager/pkg/backend/eventmesh"
"github.com/kyma-project/eventing-manager/pkg/backend/metrics"
"github.com/kyma-project/eventing-manager/pkg/backend/sink"
backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils"
"github.com/kyma-project/eventing-manager/pkg/constants"
"github.com/kyma-project/eventing-manager/pkg/ems/api/events/types"
Expand All @@ -55,7 +55,7 @@ type Reconciler struct {
oauth2credentials *eventmesh.OAuth2ClientCredentials
// nameMapper is used to map the Kyma subscription name to a subscription name on EventMesh.
nameMapper backendutils.NameMapper
sinkValidator sink.Validator
subscriptionValidator validator.SubscriptionValidator
collector *metrics.Collector
syncConditionWebhookCallStatus syncConditionWebhookCallStatusFunc
}
Expand All @@ -73,8 +73,8 @@ const (

func NewReconciler(client client.Client, logger *logger.Logger, recorder record.EventRecorder,
cfg env.Config, cleaner cleaner.Cleaner, eventMeshBackend eventmesh.Backend,
credential *eventmesh.OAuth2ClientCredentials, mapper backendutils.NameMapper, validator sink.Validator,
collector *metrics.Collector, domain string,
credential *eventmesh.OAuth2ClientCredentials, mapper backendutils.NameMapper,
subscriptionValidator validator.SubscriptionValidator, collector *metrics.Collector, domain string,
) *Reconciler {
if err := eventMeshBackend.Initialize(cfg); err != nil {
logger.WithContext().Errorw("Failed to start reconciler", "name",
Expand All @@ -90,7 +90,7 @@ func NewReconciler(client client.Client, logger *logger.Logger, recorder record.
cleaner: cleaner,
oauth2credentials: credential,
nameMapper: mapper,
sinkValidator: validator,
subscriptionValidator: subscriptionValidator,
collector: collector,
syncConditionWebhookCallStatus: syncConditionWebhookCallStatus,
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re
}

// Validate subscription.
if validationErr := r.handleSubscriptionValidation(ctx, sub); validationErr != nil {
if validationErr := r.validateSubscription(ctx, sub); validationErr != nil {
if updateErr := r.updateStatus(ctx, currentSubscription, sub, log); updateErr != nil {
return kctrl.Result{}, errors.Join(validationErr, updateErr)
}
Expand Down Expand Up @@ -184,25 +184,18 @@ func (r *Reconciler) Reconcile(ctx context.Context, req kctrl.Request) (kctrl.Re
return result, nil
}

func (r *Reconciler) handleSubscriptionValidation(ctx context.Context, desiredSubscription *eventingv1alpha2.Subscription) error {
func (r *Reconciler) validateSubscription(ctx context.Context, subscription *eventingv1alpha2.Subscription) error {
var err error
if err = r.validateSubscriptionSpec(ctx, desiredSubscription); err != nil {
desiredSubscription.Status.SetNotReady()
desiredSubscription.Status.ClearTypes()
desiredSubscription.Status.ClearBackend()
desiredSubscription.Status.ClearConditions()
if err = r.subscriptionValidator.Validate(ctx, *subscription); err != nil {
subscription.Status.SetNotReady()
subscription.Status.ClearTypes()
subscription.Status.ClearBackend()
subscription.Status.ClearConditions()
}
desiredSubscription.Status.SetSubscriptionSpecValidCondition(err)
subscription.Status.SetSubscriptionSpecValidCondition(err)
return err
}

func (r *Reconciler) validateSubscriptionSpec(ctx context.Context, subscription *eventingv1alpha2.Subscription) error {
if errList := subscription.ValidateSpec(); len(errList) > 0 {
return errors.Join(errList.ToAggregate())
}
return r.sinkValidator.Validate(ctx, subscription)
}

// updateSubscription updates the subscription changes to k8s.
func (r *Reconciler) updateSubscription(ctx context.Context, sub *eventingv1alpha2.Subscription, logger *zap.SugaredLogger) error {
namespacedName := &ktypes.NamespacedName{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
"github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/validator"
"github.com/kyma-project/eventing-manager/pkg/backend/cleaner"
"github.com/kyma-project/eventing-manager/pkg/backend/eventmesh"
"github.com/kyma-project/eventing-manager/pkg/backend/eventmesh/mocks"
"github.com/kyma-project/eventing-manager/pkg/backend/metrics"
"github.com/kyma-project/eventing-manager/pkg/backend/sink"
backendutils "github.com/kyma-project/eventing-manager/pkg/backend/utils"
"github.com/kyma-project/eventing-manager/pkg/ems/api/events/types"
"github.com/kyma-project/eventing-manager/pkg/env"
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestReconciler_Reconcile(t *testing.T) {
backendSyncErr := errors.New("backend sync error")
backendDeleteErr := errors.New("backend delete error")
validatorErr := errors.New("invalid sink")
happyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil })
unhappyValidator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return validatorErr })
happyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil })
unhappyValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return validatorErr })

testCases := []struct {
name string
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) {
eventingtesting.WithMaxInFlight(10),
)

validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil })
subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil })

col := metrics.NewCollector()

Expand Down Expand Up @@ -292,7 +292,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) {
testenv.backend,
testenv.credentials,
testenv.mapper,
validator,
subscriptionValidator,
col,
utils.Domain),
testenv.fakeClient
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestReconciler_APIRuleConfig(t *testing.T) {
testenv.backend,
testenv.credentials,
testenv.mapper,
validator,
subscriptionValidator,
col,
utils.Domain),
testenv.fakeClient
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) {
eventingtesting.WithMaxInFlight(10),
)

validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil })
subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil })
col := metrics.NewCollector()

testCases := []struct {
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) {
testenv.backend,
testenv.credentials,
testenv.mapper,
validator,
subscriptionValidator,
col,
utils.Domain),
testenv.fakeClient
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) {
testenv.backend,
testenv.credentials,
testenv.mapper,
validator,
subscriptionValidator,
col,
utils.Domain),
testenv.fakeClient
Expand Down Expand Up @@ -579,7 +579,7 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) {
func TestReconciler_PreserveBackendHashes(t *testing.T) {
ctx := context.Background()
collector := metrics.NewCollector()
validator := sink.ValidatorFunc(func(_ context.Context, s *eventingv1alpha2.Subscription) error { return nil })
subscriptionValidator := validator.SubscriptionValidatorFunc(func(_ context.Context, _ eventingv1alpha2.Subscription) error { return nil })

const (
ev2hash = int64(118518533334734)
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestReconciler_PreserveBackendHashes(t *testing.T) {
te.backend.On("Initialize", mock.Anything).Return(nil)
te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
return NewReconciler(te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner,
te.backend, te.credentials, te.mapper, validator, collector, utils.Domain), te.fakeClient
te.backend, te.credentials, te.mapper, subscriptionValidator, collector, utils.Domain), te.fakeClient
},
wantEv2Hash: ev2hash,
wantEventMeshHash: eventMeshHash,
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestReconciler_PreserveBackendHashes(t *testing.T) {
te.backend.On("Initialize", mock.Anything).Return(nil)
te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
return NewReconciler(te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner,
te.backend, te.credentials, te.mapper, validator, collector, utils.Domain), te.fakeClient
te.backend, te.credentials, te.mapper, subscriptionValidator, collector, utils.Domain), te.fakeClient
},
wantEv2Hash: ev2hash,
wantEventMeshHash: eventMeshHash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Test_CreateSubscription(t *testing.T) {
testCases := []struct {
name string
givenSubscriptionFunc func(namespace string) *eventingv1alpha2.Subscription
wantSubscriptionMatchers gomegatypes.GomegaMatcher
wantSubscriptionMatchers func(namespace string) gomegatypes.GomegaMatcher
wantEventMeshSubMatchers gomegatypes.GomegaMatcher
wantEventMeshSubCheck bool
wantAPIRuleCheck bool
Expand All @@ -74,13 +74,15 @@ func Test_CreateSubscription(t *testing.T) {
eventingtesting.WithMaxInFlight(10),
)
},
wantSubscriptionMatchers: gomega.And(
eventingtesting.HaveSubscriptionNotReady(),
eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition(
eventingv1alpha2.ConditionSubscriptionSpecValid,
eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors,
kcorev1.ConditionFalse, "Sink validation failed: Service \"invalid\" not found")),
),
wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveSubscriptionNotReady(),
eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition(
eventingv1alpha2.ConditionSubscriptionSpecValid,
eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors,
kcorev1.ConditionFalse, fmt.Sprintf("Subscription validation failed: Sink validation failed: service invalid.%s not found in the cluster", namespace))),
)
},
},
{
name: "should succeed to create subscription if types are non-empty",
Expand All @@ -96,19 +98,21 @@ func Test_CreateSubscription(t *testing.T) {
eventingtesting.WithMaxInFlight(10),
)
},
wantSubscriptionMatchers: gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1EventNotClean),
CleanType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1Event),
}, {
OriginalType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1EventNotClean),
CleanType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1Event),
},
}),
),
wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1EventNotClean),
CleanType: fmt.Sprintf("%s0", eventingtesting.OrderCreatedV1Event),
}, {
OriginalType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1EventNotClean),
CleanType: fmt.Sprintf("%s1", eventingtesting.OrderCreatedV1Event),
},
}),
)
},
wantEventMeshSubMatchers: gomega.And(
eventmeshsubmatchers.HaveEvents(emstypes.Events{
{
Expand All @@ -135,9 +139,11 @@ func Test_CreateSubscription(t *testing.T) {
eventingtesting.WithSinkURL(eventingtesting.ValidSinkURL(namespace, testName)),
)
},
wantSubscriptionMatchers: gomega.And(
eventingtesting.HaveSubscriptionActiveCondition(),
),
wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveSubscriptionActiveCondition(),
)
},
wantEventMeshSubMatchers: gomega.And(
// should have default values for protocol and webhook auth
eventmeshsubmatchers.HaveContentMode(emTestEnsemble.envConfig.ContentMode),
Expand All @@ -163,16 +169,18 @@ func Test_CreateSubscription(t *testing.T) {
eventingtesting.WithSinkURL(eventingtesting.ValidSinkURL(namespace, testName)),
)
},
wantSubscriptionMatchers: gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: eventingtesting.EventMeshExactType,
CleanType: eventingtesting.EventMeshExactType,
},
}),
),
wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: eventingtesting.EventMeshExactType,
CleanType: eventingtesting.EventMeshExactType,
},
}),
)
},
wantEventMeshSubMatchers: gomega.And(
eventmeshsubmatchers.HaveEvents(emstypes.Events{
{
Expand All @@ -197,16 +205,18 @@ func Test_CreateSubscription(t *testing.T) {
eventingtesting.WithMaxInFlight(10),
)
},
wantSubscriptionMatchers: gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: eventingtesting.OrderCreatedV1EventNotClean,
CleanType: eventingtesting.OrderCreatedV1Event,
},
}),
),
wantSubscriptionMatchers: func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveSubscriptionFinalizer(eventingv1alpha2.Finalizer),
eventingtesting.HaveSubscriptionActiveCondition(),
eventingtesting.HaveCleanEventTypes([]eventingv1alpha2.EventType{
{
OriginalType: eventingtesting.OrderCreatedV1EventNotClean,
CleanType: eventingtesting.OrderCreatedV1Event,
},
}),
)
},
wantEventMeshSubMatchers: gomega.And(
eventmeshsubmatchers.HaveEvents(emstypes.Events{
{
Expand Down Expand Up @@ -242,7 +252,7 @@ func Test_CreateSubscription(t *testing.T) {
ensureK8sResourceCreated(ctx, t, givenSubscription)

// check if the subscription is as required
getSubscriptionAssert(ctx, g, givenSubscription).Should(testcase.wantSubscriptionMatchers)
getSubscriptionAssert(ctx, g, givenSubscription).Should(testcase.wantSubscriptionMatchers(testNamespace))

if testcase.wantAPIRuleCheck {
// check if an APIRule was created for the subscription
Expand Down Expand Up @@ -545,14 +555,16 @@ func Test_FixingSinkAndApiRule(t *testing.T) {
)
}

wantSubscriptionWithoutSinkMatchers := gomega.And(
eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition(
eventingv1alpha2.ConditionSubscriptionSpecValid,
eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors,
kcorev1.ConditionFalse,
"Sink validation failed: Service \"invalid\" not found",
)),
)
wantSubscriptionWithoutSinkMatchers := func(namespace string) gomegatypes.GomegaMatcher {
return gomega.And(
eventingtesting.HaveCondition(eventingv1alpha2.MakeCondition(
eventingv1alpha2.ConditionSubscriptionSpecValid,
eventingv1alpha2.ConditionReasonSubscriptionSpecHasValidationErrors,
kcorev1.ConditionFalse,
fmt.Sprintf("Subscription validation failed: Sink validation failed: service invalid.%s not found in the cluster", namespace),
)),
)
}

givenUpdateSubscriptionWithSinkFunc := func(namespace, name, sinkFormat, path string) *eventingv1alpha2.Subscription {
return eventingtesting.NewSubscription(name, namespace,
Expand Down Expand Up @@ -611,7 +623,7 @@ func Test_FixingSinkAndApiRule(t *testing.T) {
ensureK8sResourceCreated(ctx, t, givenSubscription)
createdSubscription := givenSubscription.DeepCopy()
// check if the created subscription is correct
getSubscriptionAssert(ctx, g, createdSubscription).Should(wantSubscriptionWithoutSinkMatchers)
getSubscriptionAssert(ctx, g, createdSubscription).Should(wantSubscriptionWithoutSinkMatchers(testNamespace))

// update subscription with valid sink
givenUpdateSubscription.ResourceVersion = createdSubscription.ResourceVersion
Expand Down
Loading

0 comments on commit b89bdc6

Please sign in to comment.