Skip to content

Commit

Permalink
Delete Cluster Scoped Resources (#92)
Browse files Browse the repository at this point in the history
* Delete Cluster Scoped Resources
* Cluster role and binding were not deleted as k8s garbage collector cannot if namespace scoped owns cluster scoped
* Add tests
* Improve conditions

* Improve code  for Review Comments

* Rename Condition Reason
  • Loading branch information
muralov authored Sep 15, 2023
1 parent 1079924 commit f66f13c
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 39 deletions.
15 changes: 10 additions & 5 deletions api/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (

// common reasons
ConditionReasonProcessing ConditionReason = "Processing"
ConditionReasonDeleted ConditionReason = "Deleted"
ConditionReasonStopped ConditionReason = "Stopped"

// publisher proxy reasons
ConditionReasonDeployed ConditionReason = "Deployed"
Expand All @@ -49,13 +51,16 @@ const (
ConditionReasonForbidden ConditionReason = "Forbidden"
ConditionReasonWebhookFailed ConditionReason = "WebhookFailed"
ConditionReasonWebhookReady ConditionReason = "Ready"
ConditionReasonDeletedFailed ConditionReason = "DeletionFailed"

// message for conditions
ConditionPublisherProxyReadyMessage = "Publisher proxy is deployed"
ConditionNATSAvailableMessage = "NATS is available"
ConditionWebhookReadyMessage = "Webhook is available"
ConditionPublisherProxyProcessingMessage = "Eventing publisher proxy deployment is in progress"
ConditionSubscriptionManagerReadyMessage = "Subscription manager is ready"
ConditionPublisherProxyReadyMessage = "Publisher proxy is deployed"
ConditionPublisherProxyDeletedMessage = "Publisher proxy is deleted"
ConditionNATSAvailableMessage = "NATS is available"
ConditionWebhookReadyMessage = "Webhook is available"
ConditionPublisherProxyProcessingMessage = "Eventing publisher proxy deployment is in progress"
ConditionSubscriptionManagerReadyMessage = "Subscription manager is ready"
ConditionSubscriptionManagerStoppedMessage = "Subscription manager is stopped"

// subscription manager reasons
ConditionReasonEventMeshSubManagerReady ConditionReason = "EventMeshSubscriptionManagerReady"
Expand Down
10 changes: 10 additions & 0 deletions api/v1alpha1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (es *EventingStatus) SetNATSAvailableConditionToTrue() {
es.UpdateConditionNATSAvailable(metav1.ConditionTrue, ConditionReasonNATSAvailable, ConditionNATSAvailableMessage)
}

func (es *EventingStatus) SetSubscriptionManagerReadyConditionToFalse(reason ConditionReason, message string) {
es.UpdateConditionSubscriptionManagerReady(metav1.ConditionFalse, reason,
message)
}

func (es *EventingStatus) SetPublisherProxyConditionToFalse(reason ConditionReason, message string) {
es.UpdateConditionPublisherProxyReady(metav1.ConditionFalse, reason,
message)
}

func (es *EventingStatus) SetPublisherProxyReadyToTrue() {
es.State = StateReady
es.UpdateConditionPublisherProxyReady(metav1.ConditionTrue, ConditionReasonDeployed, ConditionPublisherProxyReadyMessage)
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,32 @@ func (r *Reconciler) handleEventingDeletion(ctx context.Context, eventing *event
eventing, err, log)
}
}
eventing.Status.SetSubscriptionManagerReadyConditionToFalse(
eventingv1alpha1.ConditionReasonStopped,
eventingv1alpha1.ConditionSubscriptionManagerStoppedMessage)

// delete cluster-scoped resources, such as clusterrole and clusterrolebinding.
if err := r.deleteClusterScopedResources(ctx, eventing); err != nil {
return ctrl.Result{}, r.syncStatusWithPublisherProxyErrWithReason(ctx,
eventingv1alpha1.ConditionReasonDeletedFailed, eventing, err, log)
}
eventing.Status.SetPublisherProxyConditionToFalse(
eventingv1alpha1.ConditionReasonDeleted,
eventingv1alpha1.ConditionPublisherProxyDeletedMessage)

return r.removeFinalizer(ctx, eventing)
}

// deleteClusterScopedResources deletes cluster-scoped resources, such as clusterrole and clusterrolebinding.
// K8s doesn't support cleaning cluster-scoped resources owned by namespace-scoped resources:
// https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/#owner-references-in-object-specifications
func (r *Reconciler) deleteClusterScopedResources(ctx context.Context, eventingCR *eventingv1alpha1.Eventing) error {
if err := r.kubeClient.DeleteClusterRole(ctx, eventing.GetPublisherClusterRoleName(*eventingCR), eventingCR.Namespace); err != nil {
return err
}
return r.kubeClient.DeleteClusterRoleBinding(ctx, eventing.GetPublisherClusterRoleBindingName(*eventingCR), eventingCR.Namespace)
}

func (r *Reconciler) handleEventingReconcile(ctx context.Context,
eventing *eventingv1alpha1.Eventing, log *zap.SugaredLogger) (ctrl.Result, error) {
log.Info("handling Eventing reconciliation...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func Test_CreateEventingCR_NATS(t *testing.T) {
testEnvironment.EnsureDeploymentDeletion(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace)
}
testEnvironment.EnsureK8sResourceDeleted(t, tc.givenNATS)
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// then
Expand Down Expand Up @@ -225,6 +226,7 @@ func Test_UpdateEventingCR(t *testing.T) {
testEnvironment.EnsureDeploymentDeletion(t, givenEPPDeploymentName, givenNamespace)
}
testEnvironment.EnsureK8sResourceDeleted(t, nats)
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// get Eventing CR.
Expand Down Expand Up @@ -288,6 +290,7 @@ func Test_WatcherEventingCRK8sObjects(t *testing.T) {
name string
givenEventing *eventingv1alpha1.Eventing
wantResourceDeletion []deletionFunc
runForRealCluster bool
}{
{
name: "should recreate Publish Service",
Expand Down Expand Up @@ -344,29 +347,30 @@ func Test_WatcherEventingCRK8sObjects(t *testing.T) {
deleteHPAFromK8s,
},
},
// @TODO: Fix the watching of ClusterRoles and ClusterRoleBindings
//{
// name: "should recreate ClusterRole",
// givenEventing: utils.NewEventingCR(
// utils.WithEventingCRMinimal(),
// utils.WithEventingStreamData("Memory", "1M", "1M", 1, 1),
// utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"),
// ),
// wantResourceDeletion: []deletionFunc{
// deleteClusterRoleFromK8s,
// },
//},
//{
// name: "should recreate ClusterRoleBinding",
// givenEventing: utils.NewEventingCR(
// utils.WithEventingCRMinimal(),
// utils.WithEventingStreamData("Memory", "1M", "1M", 1, 1),
// utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"),
// ),
// wantResourceDeletion: []deletionFunc{
// deleteClusterRoleBindingFromK8s,
// },
//},
{
name: "should recreate ClusterRole",
givenEventing: utils.NewEventingCR(
utils.WithEventingCRMinimal(),
utils.WithEventingStreamData("Memory", "1M", 1, 1),
utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"),
),
wantResourceDeletion: []deletionFunc{
deleteClusterRoleFromK8s,
},
runForRealCluster: true,
},
{
name: "should recreate ClusterRoleBinding",
givenEventing: utils.NewEventingCR(
utils.WithEventingCRMinimal(),
utils.WithEventingStreamData("Memory", "1M", 1, 1),
utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"),
),
wantResourceDeletion: []deletionFunc{
deleteClusterRoleBindingFromK8s,
},
runForRealCluster: true,
},
{
name: "should recreate all objects",
givenEventing: utils.NewEventingCR(
Expand All @@ -390,6 +394,10 @@ func Test_WatcherEventingCRK8sObjects(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

if !*testEnvironment.EnvTestInstance.UseExistingCluster && tc.runForRealCluster {
t.Skip("Skipping test case as it can only be run on real cluster")
}

// given
g := gomega.NewWithT(t)
eventingcontroller.IsDeploymentReady = func(deployment *v1.Deployment) bool {
Expand Down Expand Up @@ -417,6 +425,7 @@ func Test_WatcherEventingCRK8sObjects(t *testing.T) {
testEnvironment.EnsureDeploymentDeletion(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace)
}
testEnvironment.EnsureK8sResourceDeleted(t, nats)
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// check Eventing CR status.
Expand Down Expand Up @@ -526,6 +535,7 @@ func Test_CreateEventingCR_EventMesh(t *testing.T) {
if !*testEnvironment.EnvTestInstance.UseExistingCluster && !tc.shouldFailSubManager {
testEnvironment.EnsureDeploymentDeletion(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace)
}
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

// then
Expand Down Expand Up @@ -611,6 +621,7 @@ func Test_DeleteEventingCR(t *testing.T) {
if tc.givenEventing.Spec.Backend.Type == eventingv1alpha1.NatsBackendType {
testEnvironment.EnsureK8sResourceDeleted(t, nats)
}
testEnvironment.EnsureNamespaceDeleted(t, givenNamespace)
}()

testEnvironment.EnsureDeploymentExists(t, eventing.GetPublisherDeploymentName(*tc.givenEventing), givenNamespace)
Expand All @@ -630,15 +641,16 @@ func Test_DeleteEventingCR(t *testing.T) {
eventing.GetPublisherHealthServiceName(*tc.givenEventing), givenNamespace)
testEnvironment.EnsureK8sServiceAccountNotFound(t,
eventing.GetPublisherServiceAccountName(*tc.givenEventing), givenNamespace)
testEnvironment.EnsureK8sClusterRoleNotFound(t,
eventing.GetPublisherClusterRoleName(*tc.givenEventing), givenNamespace)
testEnvironment.EnsureK8sClusterRoleBindingNotFound(t,
eventing.GetPublisherClusterRoleBindingName(*tc.givenEventing), givenNamespace)
} else {
// check if the owner reference is set.
// if owner reference is set then these resources would be garbage collected in real k8s cluster.
testEnvironment.EnsureEPPK8sResourcesHaveOwnerReference(t, *tc.givenEventing)
// ensure clusterrole and clusterrolebindings are deleted.
}
testEnvironment.EnsureK8sClusterRoleNotFound(t,
eventing.GetPublisherClusterRoleName(*tc.givenEventing), givenNamespace)
testEnvironment.EnsureK8sClusterRoleBindingNotFound(t,
eventing.GetPublisherClusterRoleBindingName(*tc.givenEventing), givenNamespace)
})
}
}
Expand Down
9 changes: 8 additions & 1 deletion internal/controller/eventing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ func (r *Reconciler) syncStatusWithNATSErr(ctx context.Context,
// syncStatusWithPublisherProxyErr updates Publisher Proxy condition and sets an error state.
// Returns the relevant error.
func (r *Reconciler) syncStatusWithPublisherProxyErr(ctx context.Context,
eventing *eventingv1alpha1.Eventing, err error, log *zap.SugaredLogger) error {
return r.syncStatusWithPublisherProxyErrWithReason(ctx, eventingv1alpha1.ConditionReasonDeployedFailed,
eventing, err, log)
}

func (r *Reconciler) syncStatusWithPublisherProxyErrWithReason(ctx context.Context,
reason eventingv1alpha1.ConditionReason,
eventing *eventingv1alpha1.Eventing, err error, log *zap.SugaredLogger) error {
// Set error state in status
eventing.Status.SetStateError()
eventing.Status.UpdateConditionPublisherProxyReady(metav1.ConditionFalse, eventingv1alpha1.ConditionReasonDeployedFailed,
eventing.Status.UpdateConditionPublisherProxyReady(metav1.ConditionFalse, reason,
err.Error())

return errors.Join(err, r.syncEventingStatus(ctx, eventing, log))
Expand Down
29 changes: 29 additions & 0 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
admissionv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -18,6 +19,8 @@ import (
type Client interface {
GetDeployment(context.Context, string, string) (*v1.Deployment, error)
DeleteDeployment(context.Context, string, string) error
DeleteClusterRole(context.Context, string, string) error
DeleteClusterRoleBinding(context.Context, string, string) error
GetNATSResources(context.Context, string) (*natsv1alpha1.NATSList, error)
PatchApply(context.Context, client.Object) error
GetSecret(context.Context, string) (*corev1.Secret, error)
Expand Down Expand Up @@ -60,6 +63,32 @@ func (c *KubeClient) DeleteDeployment(ctx context.Context, name, namespace strin
return nil
}

func (c *KubeClient) DeleteClusterRole(ctx context.Context, name, namespace string) error {
role := &rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
if err := c.client.Delete(ctx, role); err != nil {
return client.IgnoreNotFound(err)
}
return nil
}

func (c *KubeClient) DeleteClusterRoleBinding(ctx context.Context, name, namespace string) error {
binding := &rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
if err := c.client.Delete(ctx, binding); err != nil {
return client.IgnoreNotFound(err)
}
return nil
}

func (c *KubeClient) GetNATSResources(ctx context.Context, namespace string) (*natsv1alpha1.NATSList, error) {
natsList := &natsv1alpha1.NATSList{}
err := c.client.List(ctx, natsList, &client.ListOptions{Namespace: namespace})
Expand Down
Loading

0 comments on commit f66f13c

Please sign in to comment.