Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow empty backend #274

Merged
merged 11 commits into from
Dec 7, 2023
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ linters-settings:
alias: kctrl
- pkg: sigs.k8s.io/controller-runtime/pkg/log
alias: kctrllog
- pkg: sigs.k8s.io/controller-runtime/pkg/client
alias: kctrlclient
- pkg: k8s.io/api/autoscaling/v1
alias: kautoscalingv1
- pkg: k8s.io/api/autoscaling/v2
Expand Down
14 changes: 8 additions & 6 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
StateProcessing string = "Processing"
StateWarning string = "Warning"

ConditionNATSAvailable ConditionType = "NATSAvailable"
ConditionBackendAvailable ConditionType = "BackendAvailable"
ConditionPublisherProxyReady ConditionType = "PublisherProxyReady"
ConditionWebhookReady ConditionType = "WebhookReady"
ConditionSubscriptionManagerReady ConditionType = "SubscriptionManagerReady"
Expand All @@ -48,8 +48,9 @@ const (
ConditionReasonDeployed ConditionReason = "Deployed"
ConditionReasonDeployedFailed ConditionReason = "DeployFailed"
ConditionReasonDeploymentStatusSyncFailed ConditionReason = "DeploymentStatusSyncFailed"
ConditionReasonNATSAvailable ConditionReason = "Available"
ConditionReasonNATSNotAvailable ConditionReason = "NotAvailable"
ConditionReasonNATSAvailable ConditionReason = "NATSAvailable"
ConditionReasonNATSNotAvailable ConditionReason = "NATSUnavailable"
ConditionReasonBackendNotSpecified ConditionReason = "BackendNotSpecified"
ConditionReasonForbidden ConditionReason = "Forbidden"
ConditionReasonWebhookFailed ConditionReason = "WebhookFailed"
ConditionReasonWebhookReady ConditionReason = "Ready"
Expand All @@ -63,6 +64,7 @@ const (
ConditionPublisherProxyProcessingMessage = "Eventing publisher proxy deployment is in progress"
ConditionSubscriptionManagerReadyMessage = "Subscription manager is ready"
ConditionSubscriptionManagerStoppedMessage = "Subscription manager is stopped"
ConditionBackendNotSpecifiedMessage = "Backend config is not provided. Please specify a backend."

// subscription manager reasons.
ConditionReasonEventMeshSubManagerReady ConditionReason = "EventMeshSubscriptionManagerReady"
Expand All @@ -82,7 +84,8 @@ type Eventing struct {
kmetav1.TypeMeta `json:",inline"`
kmetav1.ObjectMeta `json:"metadata,omitempty"`

// +kubebuilder:default:={backend:{type:"NATS", config:{natsStreamStorageType:"File", natsStreamReplicas:3, natsStreamMaxSize:"700Mi", natsMaxMsgsPerTopic:1000000}}, logging:{logLevel:Info}, publisher:{replicas:{min:2,max:2}, resources:{limits:{cpu:"500m",memory:"512Mi"}, requests:{cpu:"40m",memory:"256Mi"}}}}
// +kubebuilder:default:={logging:{logLevel:Info}, publisher:{replicas:{min:2,max:2}, resources:{limits:{cpu:"500m",memory:"512Mi"}, requests:{cpu:"40m",memory:"256Mi"}}}}
// +kubebuilder:validation:XValidation:rule="!(oldSelf!=null && has(oldSelf.backend)) || has(self.backend)", message="backend config cannot be deleted"
Spec EventingSpec `json:"spec,omitempty"`
Status EventingStatus `json:"status,omitempty"`
}
Expand All @@ -98,9 +101,8 @@ type EventingStatus struct {
// EventingSpec defines the desired state of Eventing.
type EventingSpec struct {
// Backend defines the active backend used by Eventing.
// +kubebuilder:default:={type:"NATS", config:{natsStreamStorageType:"File", natsStreamReplicas:3, natsStreamMaxSize:"700Mi", natsMaxMsgsPerTopic:1000000}}
// +kubebuilder:validation:XValidation:rule=" (self.type != 'EventMesh') || ((self.type == 'EventMesh') && (self.config.eventMeshSecret != ''))", message="secret cannot be empty if EventMesh backend is used"
Backend Backend `json:"backend"`
Backend *Backend `json:"backend,omitempty"`

// Publisher defines the configurations for eventing-publisher-proxy.
// +kubebuilder:default:={replicas:{min:2,max:2}, resources:{limits:{cpu:"500m",memory:"512Mi"}, requests:{cpu:"40m",memory:"256Mi"}}}
Expand Down
8 changes: 4 additions & 4 deletions api/operator/v1alpha1/eventing_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSyncStatusActiveBackend(t *testing.T) {
name: "it should set ActiveBackend to NATS",
givenEventing: &Eventing{
Spec: EventingSpec{
Backend: Backend{Type: NatsBackendType},
Backend: &Backend{Type: NatsBackendType},
},
Status: EventingStatus{},
},
Expand All @@ -29,7 +29,7 @@ func TestSyncStatusActiveBackend(t *testing.T) {
name: "it should set ActiveBackend to EventMesh",
givenEventing: &Eventing{
Spec: EventingSpec{
Backend: Backend{Type: EventMeshBackendType},
Backend: &Backend{Type: EventMeshBackendType},
},
Status: EventingStatus{},
},
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestIsSpecBackendTypeChanged(t *testing.T) {
name: "it should return false if backend is not changed",
givenEventing: &Eventing{
Spec: EventingSpec{
Backend: Backend{Type: NatsBackendType},
Backend: &Backend{Type: NatsBackendType},
},
Status: EventingStatus{
ActiveBackend: NatsBackendType,
Expand All @@ -77,7 +77,7 @@ func TestIsSpecBackendTypeChanged(t *testing.T) {
name: "it should return true if backend is changed",
givenEventing: &Eventing{
Spec: EventingSpec{
Backend: Backend{Type: NatsBackendType},
Backend: &Backend{Type: NatsBackendType},
},
Status: EventingStatus{
ActiveBackend: EventMeshBackendType,
Expand Down
8 changes: 4 additions & 4 deletions api/operator/v1alpha1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (es *EventingStatus) UpdateConditionNATSAvailable(status kmetav1.ConditionStatus, reason ConditionReason,
func (es *EventingStatus) UpdateConditionBackendAvailable(status kmetav1.ConditionStatus, reason ConditionReason,
message string,
) {
condition := kmetav1.Condition{
Type: string(ConditionNATSAvailable),
Type: string(ConditionBackendAvailable),
Status: status,
LastTransitionTime: kmetav1.Now(),
Reason: string(reason),
Expand Down Expand Up @@ -80,7 +80,7 @@ func (es *EventingStatus) SetSubscriptionManagerReadyConditionToTrue() {

func (es *EventingStatus) SetStateReady() {
es.State = StateReady
es.UpdateConditionNATSAvailable(kmetav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage)
es.UpdateConditionBackendAvailable(kmetav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage)
es.UpdateConditionPublisherProxyReady(kmetav1.ConditionTrue, ConditionReasonDeployed, ConditionPublisherProxyReadyMessage)
}

Expand All @@ -89,7 +89,7 @@ func (ns *EventingStatus) SetStateWarning() {
}

func (es *EventingStatus) SetNATSAvailableConditionToTrue() {
es.UpdateConditionNATSAvailable(kmetav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage)
es.UpdateConditionBackendAvailable(kmetav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage)
}

func (es *EventingStatus) SetSubscriptionManagerReadyConditionToFalse(reason ConditionReason, message string) {
Expand Down
8 changes: 6 additions & 2 deletions api/operator/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 5 additions & 18 deletions config/crd/bases/operator.kyma-project.io_eventings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ spec:
type: object
spec:
default:
backend:
config:
natsMaxMsgsPerTopic: 1000000
natsStreamMaxSize: 700Mi
natsStreamReplicas: 3
natsStreamStorageType: File
type: NATS
logging:
logLevel: Info
publisher:
Expand All @@ -67,21 +60,14 @@ spec:
requests:
cpu: 40m
memory: 256Mi
description: EventingSpec defines the desired state of Eventing
description: EventingSpec defines the desired state of Eventing.
properties:
annotations:
additionalProperties:
type: string
description: Annotations allows to add annotations to resources.
type: object
backend:
default:
config:
natsMaxMsgsPerTopic: 1000000
natsStreamMaxSize: 700Mi
natsStreamReplicas: 3
natsStreamStorageType: File
type: NATS
description: Backend defines the active backend used by Eventing.
properties:
config:
Expand Down Expand Up @@ -261,11 +247,12 @@ spec:
type: object
type: object
type: object
required:
- backend
type: object
x-kubernetes-validations:
- message: backend config cannot be deleted
rule: '!(oldSelf!=null && has(oldSelf.backend)) || has(self.backend)'
status:
description: EventingStatus defines the observed state of Eventing
description: EventingStatus defines the observed state of Eventing.
properties:
activeBackend:
type: string
Expand Down
2 changes: 1 addition & 1 deletion docs/user/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Use the following sample CRs as guidance. Each can be applied immediately when y
| Parameter | Type | Description |
|----------------------------------------------------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **annotations** | map\[string\]string | Annotations allows to add annotations to resources. |
| **backend** (required) | object | Backend defines the active backend used by Eventing. |
| **backend** | object | Backend defines the active backend used by Eventing. |
| **backend.​config** | object | Config defines configuration for eventing backend. |
| **backend.​config.​domain** | string | Domain defines the cluster public domain used to configure the EventMesh Subscriptions and their corresponding ApiRules. |
| **backend.​config.​eventMeshSecret** | string | EventMeshSecret defines the namespaced name of K8s Secret containing EventMesh credentials. The format of name is "namespace/name". |
Expand Down
4 changes: 2 additions & 2 deletions hack/e2e/common/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func EventingNATSCR() *operatorv1alpha1.Eventing {
Namespace: NamespaceName,
},
Spec: operatorv1alpha1.EventingSpec{
Backend: operatorv1alpha1.Backend{
Backend: &operatorv1alpha1.Backend{
Type: "NATS",
Config: operatorv1alpha1.BackendConfig{
NATSStreamStorageType: "File",
Expand All @@ -88,7 +88,7 @@ func EventingEventMeshCR() *operatorv1alpha1.Eventing {
Namespace: NamespaceName,
},
Spec: operatorv1alpha1.EventingSpec{
Backend: operatorv1alpha1.Backend{
Backend: &operatorv1alpha1.Backend{
Type: "EventMesh",
Config: operatorv1alpha1.BackendConfig{
EventMeshSecret: fmt.Sprintf("%s/%s", EventMeshSecretNamespace, EventMeshSecretName),
Expand Down
31 changes: 29 additions & 2 deletions internal/controller/operator/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ func (r *Reconciler) handleEventingDeletion(ctx context.Context, eventing *opera
return kctrl.Result{}, nil
}

if eventing.Spec.Backend == nil {
// backend config can only be empty during creation and nothing is created if it is missing.
// Hence, eventing can be safely removed.
return r.removeFinalizer(ctx, eventing)
}

// check if subscription resources exist
exists, err := r.eventingManager.SubscriptionExists(ctx)
if err != nil {
Expand Down Expand Up @@ -441,6 +447,12 @@ func (r *Reconciler) handleEventingReconcile(ctx context.Context,

// set state processing if not set yet
r.InitStateProcessing(eventing)
if eventing.Spec.Backend == nil {
return kctrl.Result{Requeue: true}, r.syncStatusForEmptyBackend(ctx,
operatorv1alpha1.ConditionReasonBackendNotSpecified,
operatorv1alpha1.ConditionBackendNotSpecifiedMessage,
eventing, log)
}

// sync webhooks CABundle.
if err := r.reconcileWebhooksWithCABundle(ctx); err != nil {
Expand Down Expand Up @@ -514,9 +526,13 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *operato
// into CrashLoopBackOff.
log.Infof("NATS module not enabled, deleting publisher proxy resources")
delErr := r.eventingManager.DeletePublisherProxyResources(ctx, eventing)
if delErr != nil {
return kctrl.Result{}, delErr
}
// update the Eventing CR status.
notFoundErr := fmt.Errorf("NATS module has to be installed: %v", err)
return kctrl.Result{}, errors.Join(r.syncStatusWithNATSErr(ctx, eventing, notFoundErr, log), delErr)
return kctrl.Result{}, r.syncStatusWithNATSState(ctx, operatorv1alpha1.StateWarning, eventing,
notFoundErr, log)
}
return kctrl.Result{}, err
}
Expand Down Expand Up @@ -596,8 +612,19 @@ func (r *Reconciler) reconcileEventMeshBackend(ctx context.Context, eventing *op
return kctrl.Result{}, r.syncStatusWithSubscriptionManagerErr(ctx, eventing, apiRuleMissingErr, log)
}

// retrieve secret used to authenticate with EventMesh
eventMeshSecret, err := r.kubeClient.GetSecret(ctx, eventing.Spec.Backend.Config.EventMeshSecret)
if err != nil {
if kerrors.IsNotFound(err) {
return kctrl.Result{}, r.syncSubManagerStatusWithNATSState(ctx, operatorv1alpha1.StateWarning, eventing,
fmt.Errorf(EventMeshSecretMissingMessage), log)
}
return kctrl.Result{}, r.syncStatusWithSubscriptionManagerErr(ctx, eventing,
fmt.Errorf("failed to get EventMesh secret: %v", err), log)
}

// Start the EventMesh subscription controller
err = r.reconcileEventMeshSubManager(ctx, eventing)
err = r.reconcileEventMeshSubManager(ctx, eventing, eventMeshSecret, log)
if err != nil {
return kctrl.Result{}, r.syncStatusWithSubscriptionManagerErr(ctx, eventing, err, log)
}
Expand Down
11 changes: 5 additions & 6 deletions internal/controller/operator/eventing/eventmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ type oauth2Credentials struct {
certsURL []byte
}

func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing *v1alpha1.Eventing) error {
const EventMeshSecretMissingMessage = "The specified EventMesh secret is not found. Please provide an existing secret."

func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing *v1alpha1.Eventing,
eventMeshSecret *kcorev1.Secret, log *zap.SugaredLogger,
) error {
// gets oauth2ClientID and secret and stops the EventMesh subscription manager if changed
err := r.syncOauth2ClientIDAndSecret(ctx, eventing)
if err != nil {
return errors.Errorf("failed to sync OAuth secret: %v", err)
}
// retrieve secret to authenticate with EventMesh
eventMeshSecret, err := r.kubeClient.GetSecret(ctx, eventing.Spec.Backend.Config.EventMeshSecret)
if err != nil {
return errors.Errorf("failed to get EventMesh secret: %v", err)
}
// CreateOrUpdate deployment for publisher proxy secret
secretForPublisher, err := r.SyncPublisherProxySecret(ctx, eventMeshSecret)
if err != nil {
Expand Down
Loading