Skip to content

Commit

Permalink
Refactor subscription validation
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobebway committed Apr 15, 2024
1 parent 7b03f30 commit 1562614
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 266 deletions.
41 changes: 0 additions & 41 deletions api/eventing/v1alpha2/errors.go

This file was deleted.

148 changes: 0 additions & 148 deletions api/eventing/v1alpha2/subscription_validation.go

This file was deleted.

41 changes: 41 additions & 0 deletions internal/controller/eventing/subscription/validator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package validator

import (
"fmt"
"strconv"

"k8s.io/apimachinery/pkg/util/validation/field"

eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
"github.com/kyma-project/eventing-manager/pkg/ems/api/events/types"
)

//nolint:gochecknoglobals // these are required for testing
var (
sourcePath = field.NewPath("spec").Child("source")
typesPath = field.NewPath("spec").Child("types")
configPath = field.NewPath("spec").Child("config")
sinkPath = field.NewPath("spec").Child("sink")
namespacePath = field.NewPath("metadata").Child("namespace")

emptyErrDetail = "must not be empty"
invalidURIErrDetail = "must be valid as per RFC 3986"
duplicateTypesErrDetail = "must not have duplicate types"
lengthErrDetail = "must not be of length zero"
minSegmentErrDetail = fmt.Sprintf("must have minimum %s segments", strconv.Itoa(minEventTypeSegments))
invalidPrefixErrDetail = fmt.Sprintf("must not have %s as type prefix", validPrefix)
stringIntErrDetail = fmt.Sprintf("%s must be a stringified int value", eventingv1alpha2.MaxInFlightMessages)

invalidQosErrDetail = fmt.Sprintf("must be a valid QoS value %s or %s", types.QosAtLeastOnce, types.QosAtMostOnce)
invalidAuthTypeErrDetail = fmt.Sprintf("must be a valid Auth Type value %s", types.AuthTypeClientCredentials)
invalidGrantTypeErrDetail = fmt.Sprintf("must be a valid Grant Type value %s", types.GrantTypeClientCredentials)

missingSchemeErrDetail = "must have URL scheme 'http' or 'https'"
suffixMissingErrDetail = fmt.Sprintf("must have valid sink URL suffix %s", clusterLocalURLSuffix)
subDomainsErrDetail = fmt.Sprintf("must have sink URL with %d sub-domains: ", subdomainSegments)
namespaceMismatchErrDetail = "must have the same namespace as the subscriber: "
)

func makeInvalidFieldError(path *field.Path, subName, detail string) *field.Error {
return field.Invalid(path, subName, detail)
}
148 changes: 148 additions & 0 deletions internal/controller/eventing/subscription/validator/spec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package validator

import (
"strconv"
"strings"

"k8s.io/apimachinery/pkg/util/validation/field"

eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
"github.com/kyma-project/eventing-manager/pkg/ems/api/events/types"
"github.com/kyma-project/eventing-manager/pkg/utils"
)

const (
minEventTypeSegments = 2
subdomainSegments = 5
validPrefix = "sap.kyma.custom"
clusterLocalURLSuffix = "svc.cluster.local"
)

func validateSpec(subscription eventingv1alpha2.Subscription) field.ErrorList {
var allErrs field.ErrorList
if err := validateSource(subscription); err != nil {
allErrs = append(allErrs, err)
}
if err := validateTypes(subscription); err != nil {
allErrs = append(allErrs, err)
}
if err := validateConfig(subscription); err != nil {
allErrs = append(allErrs, err...)
}
if err := validateSink(subscription); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) == 0 {
return nil
}
return allErrs
}

func validateSource(subscription eventingv1alpha2.Subscription) *field.Error {
if subscription.Spec.Source == "" && subscription.Spec.TypeMatching != eventingv1alpha2.TypeMatchingExact {
return makeInvalidFieldError(sourcePath, subscription.Spec.Source, emptyErrDetail)
}
// Check only if the source is valid for the cloud event, with a valid event type.
if IsInvalidCE(subscription.Spec.Source, "") {
return makeInvalidFieldError(sourcePath, subscription.Spec.Source, invalidURIErrDetail)
}
return nil
}

func validateTypes(subscription eventingv1alpha2.Subscription) *field.Error {
if subscription.Spec.Types == nil || len(subscription.Spec.Types) == 0 {
return makeInvalidFieldError(typesPath, "", emptyErrDetail)
}
if duplicates := subscription.GetDuplicateTypes(); len(duplicates) > 0 {
return makeInvalidFieldError(typesPath, strings.Join(duplicates, ","), duplicateTypesErrDetail)
}
for _, eventType := range subscription.Spec.Types {
if len(eventType) == 0 {
return makeInvalidFieldError(typesPath, eventType, lengthErrDetail)
}
if segments := strings.Split(eventType, "."); len(segments) < minEventTypeSegments {
return makeInvalidFieldError(typesPath, eventType, minSegmentErrDetail)
}
if subscription.Spec.TypeMatching != eventingv1alpha2.TypeMatchingExact && strings.HasPrefix(eventType, validPrefix) {
return makeInvalidFieldError(typesPath, eventType, invalidPrefixErrDetail)
}
// Check only is the event type is valid for the cloud event, with a valid source.
const validSource = "source"
if IsInvalidCE(validSource, eventType) {
return makeInvalidFieldError(typesPath, eventType, invalidURIErrDetail)
}
}
return nil
}

func validateConfig(subscription eventingv1alpha2.Subscription) field.ErrorList {
var allErrs field.ErrorList
if isNotInt(subscription.Spec.Config[eventingv1alpha2.MaxInFlightMessages]) {
allErrs = append(allErrs, makeInvalidFieldError(configPath, subscription.Spec.Config[eventingv1alpha2.MaxInFlightMessages], stringIntErrDetail))
}
if ifKeyExistsInConfig(subscription, eventingv1alpha2.ProtocolSettingsQos) && types.IsInvalidQoS(subscription.Spec.Config[eventingv1alpha2.ProtocolSettingsQos]) {
allErrs = append(allErrs, makeInvalidFieldError(configPath, subscription.Spec.Config[eventingv1alpha2.ProtocolSettingsQos], invalidQosErrDetail))
}
if ifKeyExistsInConfig(subscription, eventingv1alpha2.WebhookAuthType) && types.IsInvalidAuthType(subscription.Spec.Config[eventingv1alpha2.WebhookAuthType]) {
allErrs = append(allErrs, makeInvalidFieldError(configPath, subscription.Spec.Config[eventingv1alpha2.WebhookAuthType], invalidAuthTypeErrDetail))
}
if ifKeyExistsInConfig(subscription, eventingv1alpha2.WebhookAuthGrantType) && types.IsInvalidGrantType(subscription.Spec.Config[eventingv1alpha2.WebhookAuthGrantType]) {
allErrs = append(allErrs, makeInvalidFieldError(configPath, subscription.Spec.Config[eventingv1alpha2.WebhookAuthGrantType], invalidGrantTypeErrDetail))
}
return allErrs
}

func validateSink(subscription eventingv1alpha2.Subscription) *field.Error {
if subscription.Spec.Sink == "" {
return makeInvalidFieldError(sinkPath, subscription.Spec.Sink, emptyErrDetail)
}

if !utils.IsValidScheme(subscription.Spec.Sink) {
return makeInvalidFieldError(sinkPath, subscription.Spec.Sink, missingSchemeErrDetail)
}

trimmedHost, subDomains, err := utils.GetSinkData(subscription.Spec.Sink)
if err != nil {
return makeInvalidFieldError(sinkPath, subscription.Spec.Sink, err.Error())
}

// Validate sink URL is a cluster local URL.
if !strings.HasSuffix(trimmedHost, clusterLocalURLSuffix) {
return makeInvalidFieldError(sinkPath, subscription.Spec.Sink, suffixMissingErrDetail)
}

// We expected a sink in the format "service.namespace.svc.cluster.local".
if len(subDomains) != subdomainSegments {
return makeInvalidFieldError(sinkPath, subscription.Spec.Sink, subDomainsErrDetail+trimmedHost)
}

// Assumption: Subscription CR and Subscriber should be deployed in the same namespace.
svcNs := subDomains[1]
if subscription.Namespace != svcNs {
return makeInvalidFieldError(namespacePath, subscription.Spec.Sink, namespaceMismatchErrDetail+svcNs)
}

return nil
}

func ifKeyExistsInConfig(subscription eventingv1alpha2.Subscription, key string) bool {
_, ok := subscription.Spec.Config[key]
return ok
}

func isNotInt(value string) bool {
if _, err := strconv.Atoi(value); err != nil {
return true
}
return false
}

func IsInvalidCE(source, eventType string) bool {
if source == "" {
return false
}
newEvent := utils.GetCloudEvent(eventType)
newEvent.SetSource(source)
err := newEvent.Validate()
return err != nil
}
Loading

0 comments on commit 1562614

Please sign in to comment.