From 9aaaa329290b24fc6b71c35d9866c3119f878224 Mon Sep 17 00:00:00 2001 From: Mansur Uralov Date: Wed, 20 Sep 2023 11:48:20 +0200 Subject: [PATCH] Block deletion if customer stream and sap stream consumer exists * Don't delete if customer stream exists other than sap stream * Don't delete if sap stream consumer exists * Add WARNING state if deletion is blocked * Add and adapt tests for deletion block --- api/v1alpha1/nats_status.go | 4 + api/v1alpha1/nats_types.go | 1 + internal/controller/nats/deprovisioner.go | 68 +++++++- .../controller/nats/deprovisioner_test.go | 86 +++++++++- internal/controller/nats/mocks/client.go | 156 ------------------ pkg/nats/client.go | 44 ++++- pkg/nats/mocks/client.go | 111 ++++++++++++- 7 files changed, 293 insertions(+), 177 deletions(-) delete mode 100644 internal/controller/nats/mocks/client.go diff --git a/api/v1alpha1/nats_status.go b/api/v1alpha1/nats_status.go index db4d0d1a..6e624396 100644 --- a/api/v1alpha1/nats_status.go +++ b/api/v1alpha1/nats_status.go @@ -63,6 +63,10 @@ func (ns *NATSStatus) SetStateProcessing() { ns.State = StateProcessing } +func (ns *NATSStatus) SetStateWarning() { + ns.State = StateWarning +} + func (ns *NATSStatus) SetWaitingStateForStatefulSet() { ns.SetStateProcessing() ns.UpdateConditionStatefulSet(metav1.ConditionFalse, diff --git a/api/v1alpha1/nats_types.go b/api/v1alpha1/nats_types.go index a7938e3f..09b62888 100644 --- a/api/v1alpha1/nats_types.go +++ b/api/v1alpha1/nats_types.go @@ -34,6 +34,7 @@ const ( StateError string = "Error" StateProcessing string = "Processing" StateDeleting string = "Deleting" + StateWarning string = "Warning" ConditionAvailable ConditionType = "Available" ConditionStatefulSet ConditionType = "StatefulSet" diff --git a/internal/controller/nats/deprovisioner.go b/internal/controller/nats/deprovisioner.go index ff4fabaa..d6f3a91a 100644 --- a/internal/controller/nats/deprovisioner.go +++ b/internal/controller/nats/deprovisioner.go @@ -15,9 +15,11 @@ import ( ) const ( - StreamExistsErrorMsg = "Cannot delete NATS cluster as stream exists" - natsClientPort = 4222 - InstanceLabelKey = "app.kubernetes.io/instance" + StreamExistsErrorMsg = "Cannot delete NATS cluster as customer stream exists" + ConsumerExistsErrorMsg = "Cannot delete NATS cluster as stream consumer exists" + natsClientPort = 4222 + InstanceLabelKey = "app.kubernetes.io/instance" + SAP_STREAM_NAME = "sap" ) func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1.NATS, @@ -34,26 +36,74 @@ func (r *Reconciler) handleNATSDeletion(ctx context.Context, nats *natsv1alpha1. // create a new NATS client instance if err := r.createAndConnectNatsClient(nats); err != nil { - // delete a PVC if NATS client cannot be created return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger) } - // check if NATS JetStream stream exists - streamExists, err := r.getNatsClient(nats).StreamExists() + + customerStreamExists, err := r.customerStreamExists(nats) if err != nil { - // delete a PVC if NATS client cannot be created return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger) } - if streamExists { - // if a stream exists, do not delete the NATS cluster + // if any streams exists except for 'sap' stream, block the deletion + if customerStreamExists { + nats.Status.SetStateWarning() nats.Status.UpdateConditionDeletion(metav1.ConditionFalse, natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg) events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, StreamExistsErrorMsg) return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log) } + sapStreamConsumerExists, err := r.sapStreamConsumerExists(nats) + if err != nil { + return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger) + } + // if any 'sap' stream consumer exists, block the deletion + if sapStreamConsumerExists { + nats.Status.SetStateWarning() + nats.Status.UpdateConditionDeletion(metav1.ConditionFalse, + natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg) + events.Warn(r.recorder, nats, natsv1alpha1.ConditionReasonDeletionError, ConsumerExistsErrorMsg) + return ctrl.Result{Requeue: true}, r.syncNATSStatus(ctx, nats, log) + } + return r.deletePVCsAndRemoveFinalizer(ctx, nats, r.logger) } +// check if any other stream exists except for 'sap' stream +func (r *Reconciler) customerStreamExists(nats *natsv1alpha1.NATS) (bool, error) { + // check if any other stream exists except for 'sap' stream + streams, err := r.getNatsClient(nats).GetStreams() + if err != nil { + return false, err + } + for _, stream := range streams { + if stream.Config.Name != SAP_STREAM_NAME { + return true, nil + } + } + return false, nil +} + +func (r *Reconciler) sapStreamConsumerExists(nats *natsv1alpha1.NATS) (bool, error) { + // check if 'sap' stream exists + streams, err := r.getNatsClient(nats).GetStreams() + if err != nil { + return false, err + } + sapStreamExists := false + for _, stream := range streams { + if stream.Config.Name == SAP_STREAM_NAME { + sapStreamExists = true + break + } + } + // if 'sap' stream does not exist, return false + if !sapStreamExists { + return false, nil + } + + return r.getNatsClient(nats).ConsumersExist(SAP_STREAM_NAME) +} + // create a new NATS client instance and connect to the NATS server. func (r *Reconciler) createAndConnectNatsClient(nats *natsv1alpha1.NATS) error { // create a new instance if it does not exist diff --git a/internal/controller/nats/deprovisioner_test.go b/internal/controller/nats/deprovisioner_test.go index 1169dd51..2d38f9ed 100644 --- a/internal/controller/nats/deprovisioner_test.go +++ b/internal/controller/nats/deprovisioner_test.go @@ -9,13 +9,14 @@ import ( "github.com/kyma-project/nats-manager/pkg/nats" "go.uber.org/zap" - "github.com/kyma-project/nats-manager/internal/controller/nats/mocks" natsmanager "github.com/kyma-project/nats-manager/pkg/manager" + "github.com/kyma-project/nats-manager/pkg/nats/mocks" natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1" "github.com/kyma-project/nats-manager/pkg/k8s/chart" k8smocks "github.com/kyma-project/nats-manager/pkg/k8s/mocks" "github.com/kyma-project/nats-manager/testutils" + natssdk "github.com/nats-io/nats.go" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,13 +63,13 @@ func Test_handleNATSDeletion(t *testing.T) { wantResult: ctrl.Result{}, }, { - name: "should delete resources if natsClients StreamExists returns error", + name: "should delete resources if natsClients GetStreams returns unexpected error", givenWithNATSCreated: true, wantNATSStatusState: natsv1alpha1.StateDeleting, mockNatsClientFunc: func() nats.Client { natsClient := new(mocks.Client) natsClient.On("Init").Return(nil) - natsClient.On("StreamExists").Return(false, errors.New("unexpected error")) + natsClient.On("GetStreams").Return(nil, errors.New("unexpected error")) natsClient.On("Close").Return() return natsClient }, @@ -76,9 +77,30 @@ func Test_handleNATSDeletion(t *testing.T) { wantResult: ctrl.Result{}, }, { - name: "should add deleted condition with error when stream exists", + name: "should delete resources if natsClients ConsumersExist returns unexpected error", givenWithNATSCreated: true, wantNATSStatusState: natsv1alpha1.StateDeleting, + mockNatsClientFunc: func() nats.Client { + natsClient := new(mocks.Client) + natsClient.On("Init").Return(nil) + natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{ + { + Config: natssdk.StreamConfig{ + Name: "sap", + }, + }, + }, nil) + natsClient.On("ConsumersExist", mock.Anything).Return(false, errors.New("unexpected error")) + natsClient.On("Close").Return() + return natsClient + }, + wantK8sEvents: []string{"Normal Deleting Deleting the NATS cluster."}, + wantResult: ctrl.Result{}, + }, + { + name: "should block deletion if non 'sap' stream exists", + givenWithNATSCreated: true, + wantNATSStatusState: natsv1alpha1.StateWarning, wantCondition: &metav1.Condition{ Type: string(natsv1alpha1.ConditionDeleted), Status: metav1.ConditionFalse, @@ -89,27 +111,73 @@ func Test_handleNATSDeletion(t *testing.T) { mockNatsClientFunc: func() nats.Client { natsClient := new(mocks.Client) natsClient.On("Init").Return(nil) - natsClient.On("StreamExists").Return(true, nil) + natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{ + { + Config: natssdk.StreamConfig{ + Name: "non-sap", + }, + }, + }, nil) + natsClient.On("Close").Return() + return natsClient + }, + wantFinalizerExists: true, + wantK8sEvents: []string{ + "Normal Deleting Deleting the NATS cluster.", + "Warning DeletionError " + StreamExistsErrorMsg, + }, + wantResult: ctrl.Result{Requeue: true}, + }, + { + name: "should block deletion if 'sap' stream consumer exists", + givenWithNATSCreated: true, + wantNATSStatusState: natsv1alpha1.StateWarning, + wantCondition: &metav1.Condition{ + Type: string(natsv1alpha1.ConditionDeleted), + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + Reason: string(natsv1alpha1.ConditionReasonDeletionError), + Message: ConsumerExistsErrorMsg, + }, + mockNatsClientFunc: func() nats.Client { + natsClient := new(mocks.Client) + natsClient.On("Init").Return(nil) + natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{ + { + Config: natssdk.StreamConfig{ + Name: "sap", + }, + }, + }, nil) + natsClient.On("ConsumersExist", mock.Anything).Return(true, nil) + natsClient.On("Close").Return() return natsClient }, wantFinalizerExists: true, wantK8sEvents: []string{ "Normal Deleting Deleting the NATS cluster.", - "Warning DeletionError Cannot delete NATS cluster as stream exists", + "Warning DeletionError " + ConsumerExistsErrorMsg, }, wantResult: ctrl.Result{Requeue: true}, }, { - name: "should delete resources if stream does not exist", + name: "should delete resources if neither consumer stream nor 'sap' stream exists", givenWithNATSCreated: true, + wantNATSStatusState: natsv1alpha1.StateDeleting, mockNatsClientFunc: func() nats.Client { natsClient := new(mocks.Client) natsClient.On("Init").Return(nil) - natsClient.On("StreamExists").Return(false, nil) + natsClient.On("GetStreams").Return([]*natssdk.StreamInfo{ + { + Config: natssdk.StreamConfig{ + Name: "sap", + }, + }, + }, nil) + natsClient.On("ConsumersExist", mock.Anything).Return(false, nil) natsClient.On("Close").Return() return natsClient }, - wantNATSStatusState: natsv1alpha1.StateDeleting, wantK8sEvents: []string{ "Normal Deleting Deleting the NATS cluster.", }, diff --git a/internal/controller/nats/mocks/client.go b/internal/controller/nats/mocks/client.go deleted file mode 100644 index e68f8da9..00000000 --- a/internal/controller/nats/mocks/client.go +++ /dev/null @@ -1,156 +0,0 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// Client is an autogenerated mock type for the Client type -type Client struct { - mock.Mock -} - -type Client_Expecter struct { - mock *mock.Mock -} - -func (_m *Client) EXPECT() *Client_Expecter { - return &Client_Expecter{mock: &_m.Mock} -} - -// Close provides a mock function with given fields: -func (_m *Client) Close() { - _m.Called() -} - -// Client_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type Client_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -func (_e *Client_Expecter) Close() *Client_Close_Call { - return &Client_Close_Call{Call: _e.mock.On("Close")} -} - -func (_c *Client_Close_Call) Run(run func()) *Client_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Client_Close_Call) Return() *Client_Close_Call { - _c.Call.Return() - return _c -} - -func (_c *Client_Close_Call) RunAndReturn(run func()) *Client_Close_Call { - _c.Call.Return(run) - return _c -} - -// Init provides a mock function with given fields: -func (_m *Client) Init() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Client_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init' -type Client_Init_Call struct { - *mock.Call -} - -// Init is a helper method to define mock.On call -func (_e *Client_Expecter) Init() *Client_Init_Call { - return &Client_Init_Call{Call: _e.mock.On("Init")} -} - -func (_c *Client_Init_Call) Run(run func()) *Client_Init_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Client_Init_Call) Return(_a0 error) *Client_Init_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Client_Init_Call) RunAndReturn(run func() error) *Client_Init_Call { - _c.Call.Return(run) - return _c -} - -// StreamExists provides a mock function with given fields: -func (_m *Client) StreamExists() (bool, error) { - ret := _m.Called() - - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func() (bool, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Client_StreamExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamExists' -type Client_StreamExists_Call struct { - *mock.Call -} - -// StreamExists is a helper method to define mock.On call -func (_e *Client_Expecter) StreamExists() *Client_StreamExists_Call { - return &Client_StreamExists_Call{Call: _e.mock.On("StreamExists")} -} - -func (_c *Client_StreamExists_Call) Run(run func()) *Client_StreamExists_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Client_StreamExists_Call) Return(_a0 bool, _a1 error) *Client_StreamExists_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Client_StreamExists_Call) RunAndReturn(run func() (bool, error)) *Client_StreamExists_Call { - _c.Call.Return(run) - return _c -} - -// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewClient(t interface { - mock.TestingT - Cleanup(func()) -}) *Client { - mock := &Client{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/pkg/nats/client.go b/pkg/nats/client.go index 959ee376..8b054ea1 100644 --- a/pkg/nats/client.go +++ b/pkg/nats/client.go @@ -7,17 +7,20 @@ import ( "github.com/nats-io/nats.go" ) +//go:generate go run github.com/vektra/mockery/v2 --name=Client --outpkg=mocks --case=underscore type Client interface { // initialize NATS connection Init() error // check if any stream exists in NATS JetStream StreamExists() (bool, error) + // GetStreams returns all the streams in NATS JetStream + GetStreams() ([]*nats.StreamInfo, error) + // ConsumersExist checks if any consumer exists for the given stream + ConsumersExist(streamName string) (bool, error) // close NATS connection Close() } -//go:generate go run github.com/vektra/mockery/v2 --name=Client --outpkg=mocks --case=underscore - type Config struct { URL string Timeout time.Duration `default:"5s"` @@ -64,6 +67,43 @@ func (c *natsClient) StreamExists() (bool, error) { return true, nil } +func (c *natsClient) GetStreams() ([]*nats.StreamInfo, error) { + // get JetStream context + jetStreamCtx, err := c.conn.JetStream() + if err != nil { + return nil, fmt.Errorf("failed to get JetStream: %w", err) + } + // read all the streams from the channel + var streams []*nats.StreamInfo + for stream := range jetStreamCtx.Streams() { + streams = append(streams, stream) + } + + // if it has no streams, return nil + if len(streams) == 0 { + return nil, nil + } + + return streams, nil +} + +func (c *natsClient) ConsumersExist(streamName string) (bool, error) { + // get JetStream context + jetStreamCtx, err := c.conn.JetStream() + if err != nil { + return false, fmt.Errorf("failed to get JetStream: %w", err) + } + // get all consumers and check if any exists + consumers := jetStreamCtx.Consumers(streamName) + // if it has no consumers, it will return false + _, ok := <-consumers + if !ok { + return false, nil + } + + return true, nil +} + func (c *natsClient) Close() { if c.conn != nil { c.conn.Close() diff --git a/pkg/nats/mocks/client.go b/pkg/nats/mocks/client.go index 99f33038..9262a275 100644 --- a/pkg/nats/mocks/client.go +++ b/pkg/nats/mocks/client.go @@ -2,7 +2,11 @@ package mocks -import mock "github.com/stretchr/testify/mock" +import ( + mock "github.com/stretchr/testify/mock" + + nats_go "github.com/nats-io/nats.go" +) // Client is an autogenerated mock type for the Client type type Client struct { @@ -49,6 +53,111 @@ func (_c *Client_Close_Call) RunAndReturn(run func()) *Client_Close_Call { return _c } +// ConsumersExist provides a mock function with given fields: streamName +func (_m *Client) ConsumersExist(streamName string) (bool, error) { + ret := _m.Called(streamName) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string) (bool, error)); ok { + return rf(streamName) + } + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(streamName) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(streamName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_ConsumersExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ConsumersExist' +type Client_ConsumersExist_Call struct { + *mock.Call +} + +// ConsumersExist is a helper method to define mock.On call +// - streamName string +func (_e *Client_Expecter) ConsumersExist(streamName interface{}) *Client_ConsumersExist_Call { + return &Client_ConsumersExist_Call{Call: _e.mock.On("ConsumersExist", streamName)} +} + +func (_c *Client_ConsumersExist_Call) Run(run func(streamName string)) *Client_ConsumersExist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *Client_ConsumersExist_Call) Return(_a0 bool, _a1 error) *Client_ConsumersExist_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_ConsumersExist_Call) RunAndReturn(run func(string) (bool, error)) *Client_ConsumersExist_Call { + _c.Call.Return(run) + return _c +} + +// GetStreams provides a mock function with given fields: +func (_m *Client) GetStreams() ([]*nats_go.StreamInfo, error) { + ret := _m.Called() + + var r0 []*nats_go.StreamInfo + var r1 error + if rf, ok := ret.Get(0).(func() ([]*nats_go.StreamInfo, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() []*nats_go.StreamInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*nats_go.StreamInfo) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_GetStreams_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStreams' +type Client_GetStreams_Call struct { + *mock.Call +} + +// GetStreams is a helper method to define mock.On call +func (_e *Client_Expecter) GetStreams() *Client_GetStreams_Call { + return &Client_GetStreams_Call{Call: _e.mock.On("GetStreams")} +} + +func (_c *Client_GetStreams_Call) Run(run func()) *Client_GetStreams_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Client_GetStreams_Call) Return(_a0 []*nats_go.StreamInfo, _a1 error) *Client_GetStreams_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_GetStreams_Call) RunAndReturn(run func() ([]*nats_go.StreamInfo, error)) *Client_GetStreams_Call { + _c.Call.Return(run) + return _c +} + // Init provides a mock function with given fields: func (_m *Client) Init() error { ret := _m.Called()