Skip to content

Commit

Permalink
Block deletion if customer stream and sap stream consumer exists
Browse files Browse the repository at this point in the history
* Don't delete if customer stream exists other than sap stream
* Don't delete if sap stream consumer exists
* Add WARNING state if deletion is blocked
* Add and adapt tests for deletion block
  • Loading branch information
muralov committed Sep 21, 2023
1 parent cb4be49 commit 9aaaa32
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 177 deletions.
4 changes: 4 additions & 0 deletions api/v1alpha1/nats_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (ns *NATSStatus) SetStateProcessing() {
ns.State = StateProcessing
}

func (ns *NATSStatus) SetStateWarning() {
ns.State = StateWarning
}

func (ns *NATSStatus) SetWaitingStateForStatefulSet() {
ns.SetStateProcessing()
ns.UpdateConditionStatefulSet(metav1.ConditionFalse,
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/nats_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
StateError string = "Error"
StateProcessing string = "Processing"
StateDeleting string = "Deleting"
StateWarning string = "Warning"

ConditionAvailable ConditionType = "Available"
ConditionStatefulSet ConditionType = "StatefulSet"
Expand Down
68 changes: 59 additions & 9 deletions internal/controller/nats/deprovisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
)

const (
StreamExistsErrorMsg = "Cannot delete NATS cluster as stream exists"
natsClientPort = 4222
InstanceLabelKey = "app.kubernetes.io/instance"
StreamExistsErrorMsg = "Cannot delete NATS cluster as customer stream exists"
ConsumerExistsErrorMsg = "Cannot delete NATS cluster as stream consumer exists"
natsClientPort = 4222
InstanceLabelKey = "app.kubernetes.io/instance"
SAP_STREAM_NAME = "sap"

Check warning on line 22 in internal/controller/nats/deprovisioner.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: don't use ALL_CAPS in Go names; use CamelCase (revive)
)

func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1.NATS,
Expand All @@ -34,26 +36,74 @@ func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1.

// create a new NATS client instance
if err := r.createAndConnectNatsClient(nats); err != nil {
// delete a PVC if NATS client cannot be created
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
// check if NATS JetStream stream exists
streamExists, err := r.getNatsClient(nats).StreamExists()

customerStreamExists, err := r.customerStreamExists(nats)
if err != nil {
// delete a PVC if NATS client cannot be created
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
if streamExists {
// if a stream exists, do not delete the NATS cluster
// if any streams exists except for 'sap' stream, block the deletion
if customerStreamExists {
nats.Status.SetStateWarning()
nats.Status.UpdateConditionDeletion(metav1.ConditionFalse,
natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg)
events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg)
return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log)
}

sapStreamConsumerExists, err := r.sapStreamConsumerExists(nats)
if err != nil {
return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}
// if any 'sap' stream consumer exists, block the deletion
if sapStreamConsumerExists {
nats.Status.SetStateWarning()
nats.Status.UpdateConditionDeletion(metav1.ConditionFalse,
natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg)
events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg)
return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log)
}

return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger)
}

// check if any other stream exists except for 'sap' stream

Check failure on line 71 in internal/controller/nats/deprovisioner.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func (r *Reconciler) customerStreamExists(nats *natsv1alpha1.NATS) (bool, error) {
// check if any other stream exists except for 'sap' stream
streams, err := r.getNatsClient(nats).GetStreams()
if err != nil {
return false, err
}
for _, stream := range streams {
if stream.Config.Name != SAP_STREAM_NAME {
return true, nil
}
}
return false, nil
}

func (r *Reconciler) sapStreamConsumerExists(nats *natsv1alpha1.NATS) (bool, error) {
// check if 'sap' stream exists
streams, err := r.getNatsClient(nats).GetStreams()
if err != nil {
return false, err
}
sapStreamExists := false
for _, stream := range streams {
if stream.Config.Name == SAP_STREAM_NAME {
sapStreamExists = true
break
}
}
// if 'sap' stream does not exist, return false
if !sapStreamExists {
return false, nil
}

return r.getNatsClient(nats).ConsumersExist(SAP_STREAM_NAME)
}

// create a new NATS client instance and connect to the NATS server.
func (r *Reconciler) createAndConnectNatsClient(nats *natsv1alpha1.NATS) error {
// create a new instance if it does not exist
Expand Down
86 changes: 77 additions & 9 deletions internal/controller/nats/deprovisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/kyma-project/nats-manager/pkg/nats"
"go.uber.org/zap"

"github.com/kyma-project/nats-manager/internal/controller/nats/mocks"
natsmanager "github.com/kyma-project/nats-manager/pkg/manager"
"github.com/kyma-project/nats-manager/pkg/nats/mocks"

natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
"github.com/kyma-project/nats-manager/pkg/k8s/chart"
k8smocks "github.com/kyma-project/nats-manager/pkg/k8s/mocks"
"github.com/kyma-project/nats-manager/testutils"
natssdk "github.com/nats-io/nats.go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,23 +63,44 @@ func Test_handleNATSDeletion(t *testing.T) {
wantResult: ctrl.Result{},
},
{
name: "should delete resources if natsClients StreamExists returns error",
name: "should delete resources if natsClients GetStreams returns unexpected error",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(false, errors.New("unexpected error"))
natsClient.On("GetStreams").Return(nil, errors.New("unexpected error"))
natsClient.On("Close").Return()
return natsClient
},
wantK8sEvents: []string{"Normal Deleting Deleting the NATS cluster."},
wantResult: ctrl.Result{},
},
{
name: "should add deleted condition with error when stream exists",
name: "should delete resources if natsClients ConsumersExist returns unexpected error",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: "sap",
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(false, errors.New("unexpected error"))
natsClient.On("Close").Return()
return natsClient
},
wantK8sEvents: []string{"Normal Deleting Deleting the NATS cluster."},
wantResult: ctrl.Result{},
},
{
name: "should block deletion if non 'sap' stream exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateWarning,
wantCondition: &metav1.Condition{
Type: string(natsv1alpha1.ConditionDeleted),
Status: metav1.ConditionFalse,
Expand All @@ -89,27 +111,73 @@ func Test_handleNATSDeletion(t *testing.T) {
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(true, nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: "non-sap",
},
},
}, nil)
natsClient.On("Close").Return()
return natsClient
},
wantFinalizerExists: true,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
"Warning DeletionError " + StreamExistsErrorMsg,
},
wantResult: ctrl.Result{Requeue: true},
},
{
name: "should block deletion if 'sap' stream consumer exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateWarning,
wantCondition: &metav1.Condition{
Type: string(natsv1alpha1.ConditionDeleted),
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: string(natsv1alpha1.ConditionReasonDeletionError),
Message: ConsumerExistsErrorMsg,
},
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: "sap",
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(true, nil)
natsClient.On("Close").Return()
return natsClient
},
wantFinalizerExists: true,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
"Warning DeletionError Cannot delete NATS cluster as stream exists",
"Warning DeletionError " + ConsumerExistsErrorMsg,
},
wantResult: ctrl.Result{Requeue: true},
},
{
name: "should delete resources if stream does not exist",
name: "should delete resources if neither consumer stream nor 'sap' stream exists",
givenWithNATSCreated: true,
wantNATSStatusState: natsv1alpha1.StateDeleting,
mockNatsClientFunc: func() nats.Client {
natsClient := new(mocks.Client)
natsClient.On("Init").Return(nil)
natsClient.On("StreamExists").Return(false, nil)
natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{
{
Config: natssdk.StreamConfig{
Name: "sap",
},
},
}, nil)
natsClient.On("ConsumersExist", mock.Anything).Return(false, nil)
natsClient.On("Close").Return()
return natsClient
},
wantNATSStatusState: natsv1alpha1.StateDeleting,
wantK8sEvents: []string{
"Normal Deleting Deleting the NATS cluster.",
},
Expand Down
Loading

0 comments on commit 9aaaa32

Please sign in to comment.