Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block deletion if customer stream and sap stream consumer exists #145

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1alpha1/nats_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (ns *NATSStatus) SetStateProcessing() {
ns.State = StateProcessing
}

func (ns *NATSStatus) SetStateWarning() {
ns.State = StateWarning
}

func (ns *NATSStatus) SetWaitingStateForStatefulSet() {
ns.SetStateProcessing()
ns.UpdateConditionStatefulSet(metav1.ConditionFalse,
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/nats_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
StateError string = "Error"
StateProcessing string = "Processing"
StateDeleting string = "Deleting"
StateWarning string = "Warning"

ConditionAvailable ConditionType = "Available"
ConditionStatefulSet ConditionType = "StatefulSet"
Expand Down
78 changes: 64 additions & 14 deletions internal/controller/nats/deprovisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
)

const (
StreamExistsErrorMsg = "Cannot delete NATS cluster as stream exists"
natsClientPort = 4222
InstanceLabelKey = "app.kubernetes.io/instance"
StreamExistsErrorMsg = "Cannot delete NATS cluster as customer stream exists"
ConsumerExistsErrorMsg = "Cannot delete NATS cluster as stream consumer exists"
natsClientPort = 4222
InstanceLabelKey = "app.kubernetes.io/instance"
SapStreamName = "sap"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constant should be sourced from a single location otherwise it will at eventually go out of sync in the various components that use it

)

func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1.NATS,
Expand All @@ -32,31 +34,79 @@ func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1.
nats.Status.SetStateDeleting()
events.Normal(r.recorder, nats, natsv1alpha1.ConditionReasonDeleting, "Deleting the NATS cluster.")

// create a new NATS client instance
// create a new NATS client instance.
if err := r.createAndConnectNatsClient(nats); err != nil {
// delete a PVC if NATS client cannot be created
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
// check if NATS JetStream stream exists
streamExists, err := r.getNatsClient(nats).StreamExists()

customerStreamExists, err := r.customerStreamExists(nats)
if err != nil {
// delete a PVC if NATS client cannot be created
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
if streamExists {
// if a stream exists, do not delete the NATS cluster
// if any streams exists except for 'sap' stream, block the deletion.
if customerStreamExists {
nats.Status.SetStateWarning()
nats.Status.UpdateConditionDeletion(metav1.ConditionFalse,
natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg)
events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg)
return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log)
}

sapStreamConsumerExists, err := r.sapStreamConsumerExists(nats)
if err != nil {
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
// if any 'sap' stream consumer exists, block the deletion.
if sapStreamConsumerExists {
nats.Status.SetStateWarning()
nats.Status.UpdateConditionDeletion(metav1.ConditionFalse,
natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg)
events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg)
return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log)
}

return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}

// check if any other stream exists except for 'sap' stream.
func (r *Reconciler) customerStreamExists(nats *natsv1alpha1.NATS) (bool, error) {
// check if any other stream exists except for 'sap' stream.
streams, err := r.getNatsClient(nats).GetStreams()
if err != nil {
return false, err
}
for _, stream := range streams {
if stream.Config.Name != SapStreamName {
return true, nil
}
}
return false, nil
}

func (r *Reconciler) sapStreamConsumerExists(nats *natsv1alpha1.NATS) (bool, error) {
// check if 'sap' stream exists.
streams, err := r.getNatsClient(nats).GetStreams()
if err != nil {
return false, err
}
sapStreamExists := false
for _, stream := range streams {
if stream.Config.Name == SapStreamName {
sapStreamExists = true
break
}
}
// if 'sap' stream does not exist, return false.
if !sapStreamExists {
return false, nil
}

return r.getNatsClient(nats).ConsumersExist(SapStreamName)
}

// create a new NATS client instance and connect to the NATS server.
func (r *Reconciler) createAndConnectNatsClient(nats *natsv1alpha1.NATS) error {
// create a new instance if it does not exist
// create a new instance if it does not exist.
if r.getNatsClient(nats) == nil {
r.setNatsClient(nats, natspkg.NewNatsClient(&natspkg.Config{
URL: fmt.Sprintf("nats://%s.%s.svc.cluster.local:%d", nats.Name, nats.Namespace, natsClientPort),
Expand All @@ -72,12 +122,12 @@ func (r *Reconciler) deletePVCsAndRemoveFinalizer(ctx context.Context,
if nats.Name == "eventing-nats" {
labelValue = "eventing"
}
// delete PVCs with the label selector
// delete PVCs with the label selector.
labelSelector := fmt.Sprintf("%s=%s", InstanceLabelKey, labelValue)
if err := r.kubeClient.DeletePVCsWithLabel(ctx, labelSelector, nats.Name, nats.Namespace); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// close the nats connection and remove the client instance
// close the nats connection and remove the client instance.
r.closeNatsClient(nats)

log.Debugf("deleted PVCs with a namespace: %s and label selector: %s", nats.Namespace, labelSelector)
Expand All @@ -96,7 +146,7 @@ func (r *Reconciler) setNatsClient(nats *natsv1alpha1.NATS, newNatsClient natspk

// close the nats connection and remove the client instance.
func (r *Reconciler) closeNatsClient(nats *natsv1alpha1.NATS) {
// check if nats client exists
// check if nats client exists.
if r.getNatsClient(nats) != nil {
r.getNatsClient(nats).Close()
r.setNatsClient(nats, nil)
Expand Down
86 changes: 77 additions & 9 deletions internal/controller/nats/deprovisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/kyma-project/nats-manager/pkg/nats"
"go.uber.org/zap"

"github.com/kyma-project/nats-manager/internal/controller/nats/mocks"
natsmanager "github.com/kyma-project/nats-manager/pkg/manager"
"github.com/kyma-project/nats-manager/pkg/nats/mocks"

natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
"github.com/kyma-project/nats-manager/pkg/k8s/chart"
k8smocks "github.com/kyma-project/nats-manager/pkg/k8s/mocks"
"github.com/kyma-project/nats-manager/testutils"
natssdk "github.com/nats-io/nats.go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,23 +63,44 @@ func Test_handleNATSDeletion(t *testing.T) {
wantResult: ctrl.Result{},
},
{
name: "should delete resources if natsClients StreamExists returns error",
name: "should delete resources if natsClients GetStreams returns unexpected error",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(false, errors.New("unexpected error"))
natsClient.On("GetStreams").Return(nil, errors.New("unexpected error"))
natsClient.On("Close").Return()
return natsClient
},
wantK8sEvents: []string{"Normal Deleting Deleting the NATS cluster."},
wantResult: ctrl.Result{},
},
{
name: "should add deleted condition with error when stream exists",
name: "should delete resources if natsClients ConsumersExist returns unexpected error",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: SapStreamName,
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(false, errors.New("unexpected error"))
natsClient.On("Close").Return()
return natsClient
},
wantK8sEvents: []string{"Normal Deleting Deleting the NATS cluster."},
wantResult: ctrl.Result{},
},
{
name: "should block deletion if non 'sap' stream exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateWarning,
wantCondition: &metav1.Condition{
Type: string(natsv1alpha1.ConditionDeleted),
Status: metav1.ConditionFalse,
Expand All @@ -89,27 +111,73 @@ func Test_handleNATSDeletion(t *testing.T) {
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(true, nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: "non-sap",
},
},
}, nil)
natsClient.On("Close").Return()
return natsClient
},
wantFinalizerExists: true,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
"Warning DeletionError " + StreamExistsErrorMsg,
},
wantResult: ctrl.Result{Requeue: true},
},
{
name: "should block deletion if 'sap' stream consumer exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateWarning,
wantCondition: &metav1.Condition{
Type: string(natsv1alpha1.ConditionDeleted),
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: string(natsv1alpha1.ConditionReasonDeletionError),
Message: ConsumerExistsErrorMsg,
},
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: SapStreamName,
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(true, nil)
natsClient.On("Close").Return()
return natsClient
},
wantFinalizerExists: true,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
"Warning DeletionError Cannot delete NATS cluster as stream exists",
"Warning DeletionError " + ConsumerExistsErrorMsg,
},
wantResult: ctrl.Result{Requeue: true},
},
{
name: "should delete resources if stream does not exist",
name: "should delete resources if neither consumer stream nor 'sap' stream exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(false, nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: SapStreamName,
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(false, nil)
natsClient.On("Close").Return()
return natsClient
},
wantNATSStatusState: natsv1alpha1.StateDeleting,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
},
Expand Down
Loading