diff --git a/api/v1alpha1/eventing_types.go b/api/v1alpha1/eventing_types.go index f2d61252..b0998694 100644 --- a/api/v1alpha1/eventing_types.go +++ b/api/v1alpha1/eventing_types.go @@ -66,6 +66,7 @@ const ( ConditionReasonEventMeshSubManagerReady ConditionReason = "EventMeshSubscriptionManagerReady" ConditionReasonEventMeshSubManagerFailed ConditionReason = "EventMeshSubscriptionManagerFailed" ConditionReasonEventMeshSubManagerStopFailed ConditionReason = "EventMeshSubscriptionManagerStopFailed" + ConditionReasonSubscriptionManagerProcessing ConditionReason = "SubscriptionManagerProcessing" ) // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. @@ -86,9 +87,10 @@ type Eventing struct { // EventingStatus defines the observed state of Eventing type EventingStatus struct { - ActiveBackend BackendType `json:"activeBackend"` - State string `json:"state"` - Conditions []metav1.Condition `json:"conditions,omitempty"` + ActiveBackend BackendType `json:"activeBackend"` + BackendConfigHash uint64 `json:"specHash"` + State string `json:"state"` + Conditions []metav1.Condition `json:"conditions,omitempty"` } // EventingSpec defines the desired state of Eventing diff --git a/config/crd/bases/operator.kyma-project.io_eventings.yaml b/config/crd/bases/operator.kyma-project.io_eventings.yaml index 3458e3df..a586a239 100644 --- a/config/crd/bases/operator.kyma-project.io_eventings.yaml +++ b/config/crd/bases/operator.kyma-project.io_eventings.yaml @@ -325,10 +325,14 @@ spec: - type type: object type: array + specHash: + format: int64 + type: integer state: type: string required: - activeBackend + - specHash - state type: object type: object diff --git a/go.mod b/go.mod index 6b1327af..e72931ca 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/kyma-incubator/api-gateway v0.0.0-20220819093753-296e6704d413 github.com/kyma-project/kyma/components/eventing-controller v0.0.0-20230921090956-f27b81d3d8cb github.com/kyma-project/nats-manager v0.0.0-20230718133808-9241d3b926bd + github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 @@ -50,7 +51,6 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/highwayhash v1.0.2 // indirect - github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/internal/controller/eventing/controller.go b/internal/controller/eventing/controller.go index c409d9b2..9c20a9d5 100644 --- a/internal/controller/eventing/controller.go +++ b/internal/controller/eventing/controller.go @@ -384,7 +384,7 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *eventin } // start NATS subscription manager - if err := r.reconcileNATSSubManager(eventing, log); err != nil { + if err := r.reconcileNATSSubManager(ctx, eventing, log); err != nil { return ctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log) } diff --git a/internal/controller/eventing/mocks/controller.go b/internal/controller/eventing/mocks/controller.go index 38727445..ad5f91df 100644 --- a/internal/controller/eventing/mocks/controller.go +++ b/internal/controller/eventing/mocks/controller.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/manager.go b/internal/controller/eventing/mocks/manager.go index 0080ab91..9286f29b 100644 --- a/internal/controller/eventing/mocks/manager.go +++ b/internal/controller/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/nats_config_handler.go b/internal/controller/eventing/mocks/nats_config_handler.go index 75479e12..88b6f69f 100644 --- a/internal/controller/eventing/mocks/nats_config_handler.go +++ b/internal/controller/eventing/mocks/nats_config_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/nats.go b/internal/controller/eventing/nats.go index 4521686e..eb935c9a 100644 --- a/internal/controller/eventing/nats.go +++ b/internal/controller/eventing/nats.go @@ -3,6 +3,8 @@ package eventing import ( "context" "fmt" + eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" + ecenv "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" "github.com/kyma-project/eventing-manager/api/v1alpha1" "github.com/kyma-project/eventing-manager/pkg/env" @@ -12,12 +14,21 @@ import ( "go.uber.org/zap" ) -func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *zap.SugaredLogger) error { +func (r *Reconciler) reconcileNATSSubManager(ctx context.Context, eventing *v1alpha1.Eventing, log *zap.SugaredLogger) error { + // get the subscription config + defaultSubsConfig := r.getDefaultSubscriptionConfig() + // get the nats config + natsConfig, err := r.natsConfigHandler.GetNatsConfig(context.Background(), *eventing) + if err != nil { + return err + } + // get the hash of current config + specHash, err := r.getNATSBackendConfigHash(defaultSubsConfig, *natsConfig) + if err != nil { + return err + } + if r.natsSubManager == nil { - natsConfig, err := r.natsConfigHandler.GetNatsConfig(context.Background(), *eventing) - if err != nil { - return err - } // create instance of NATS subscription manager natsSubManager := r.subManagerFactory.NewJetStreamManager(*eventing, *natsConfig) @@ -29,6 +40,24 @@ func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *z log.Info("NATS subscription-manager initialized") // save instance only when init is successful. r.natsSubManager = natsSubManager + } else { + // update the config if hashes differ + if eventing.Status.BackendConfigHash != specHash && r.isNATSSubManagerStarted { + // set the eventing CR status to processing + if err = r.syncStatusWithSubscriptionManagerProcessingWithReason(ctx, + eventingv1alpha1.ConditionReasonSubscriptionManagerProcessing, + eventing, "Updating NATS subscription-manager with new config.", log); err != nil { + return err + } + + // stop the subsManager without cleanup + + if err := r.stopNATSSubManager(false, log); err != nil { + return err + } + return nil + + } } if r.isNATSSubManagerStarted { @@ -36,10 +65,19 @@ func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *z return nil } - // start the subscription manager. - defaultSubsConfig := r.eventingManager.GetBackendConfig(). - DefaultSubscriptionConfig.ToECENVDefaultSubscriptionConfig() + err = r.startNATSSubManager(defaultSubsConfig, log) + if err != nil { + return err + } + // update the hash of the current config only once subManager is started + eventing.Status.BackendConfigHash = specHash + log.Info(fmt.Sprintf("NATS subscription-manager has been updated, new hash: %d", specHash)) + + return nil +} + +func (r *Reconciler) startNATSSubManager(defaultSubsConfig ecenv.DefaultSubscriptionConfig, log *zap.SugaredLogger) error { if err := r.natsSubManager.Start(defaultSubsConfig, ecsubscriptionmanager.Params{}); err != nil { return err } @@ -47,10 +85,14 @@ func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *z log.Info("NATS subscription-manager started") // update flag so it do not try to start the manager again. r.isNATSSubManagerStarted = true - return nil } +func (r *Reconciler) getDefaultSubscriptionConfig() ecenv.DefaultSubscriptionConfig { + return r.eventingManager.GetBackendConfig(). + DefaultSubscriptionConfig.ToECENVDefaultSubscriptionConfig() +} + func (r *Reconciler) stopNATSSubManager(runCleanup bool, log *zap.SugaredLogger) error { log.Debug("stopping NATS subscription-manager") if r.natsSubManager == nil || !r.isNATSSubManagerStarted { diff --git a/internal/controller/eventing/nats_test.go b/internal/controller/eventing/nats_test.go index 188e6b75..7824ce40 100644 --- a/internal/controller/eventing/nats_test.go +++ b/internal/controller/eventing/nats_test.go @@ -27,6 +27,8 @@ func Test_reconcileNATSSubManager(t *testing.T) { // given - common for all test cases. givenEventing := utils.NewEventingCR( + utils.WithEventingCRName("eventing"), + utils.WithEventingCRNamespace("kyma-system"), utils.WithEventingStreamData("Memory", "650M", 99, 98), utils.WithEventingEventTypePrefix("one.two.three"), ) @@ -60,33 +62,46 @@ func Test_reconcileNATSSubManager(t *testing.T) { name string givenIsNATSSubManagerStarted bool givenShouldRetry bool + givenUpdateTest bool + givenHashBefore uint64 givenNATSSubManagerMock func() *ecsubmanagermocks.Manager givenEventingManagerMock func() *managermocks.Manager givenNatsConfigHandlerMock func() *mocks.NatsConfigHandler givenManagerFactoryMock func(*ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory wantAssertCheck bool wantError error + wantHashAfter uint64 }{ { name: "it should do nothing because subscription manager is already started", givenIsNATSSubManagerStarted: true, + givenHashBefore: uint64(10896066536699660582), givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager { - return new(ecsubmanagermocks.Manager) + jetStreamSubManagerMock := new(ecsubmanagermocks.Manager) + jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once() + jetStreamSubManagerMock.On("Stop", mock.Anything, mock.Anything).Return(nil).Once() + return jetStreamSubManagerMock }, givenEventingManagerMock: func() *managermocks.Manager { - return nil + emMock := new(managermocks.Manager) + emMock.On("GetBackendConfig").Return(givenBackendConfig) + return emMock }, givenNatsConfigHandlerMock: func() *mocks.NatsConfigHandler { - return nil + nchMock := new(mocks.NatsConfigHandler) + nchMock.On("GetNatsConfig", mock.Anything, mock.Anything).Return(givenNATSConfig, nil) + return nchMock }, givenManagerFactoryMock: func(_ *ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, + wantHashAfter: uint64(10896066536699660582), }, { name: "it should initialize and start subscription manager because " + "subscription manager is not started", givenIsNATSSubManagerStarted: false, + givenHashBefore: uint64(0), givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager { jetStreamSubManagerMock := new(ecsubmanagermocks.Manager) jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -109,11 +124,13 @@ func Test_reconcileNATSSubManager(t *testing.T) { return subManagerFactoryMock }, wantAssertCheck: true, + wantHashAfter: uint64(10896066536699660582), }, { name: "it should retry to start subscription manager when subscription manager was " + "successfully initialized but failed to start", givenIsNATSSubManagerStarted: false, + givenHashBefore: uint64(0), givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager { jetStreamSubManagerMock := new(ecsubmanagermocks.Manager) jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -138,6 +155,38 @@ func Test_reconcileNATSSubManager(t *testing.T) { wantAssertCheck: true, givenShouldRetry: true, wantError: errors.New("failed to start"), + wantHashAfter: uint64(0), + }, + { + name: "it should update the subscription manager when the backend config changes", + givenIsNATSSubManagerStarted: true, + givenHashBefore: uint64(17644964695675018020), + givenUpdateTest: true, + givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager { + jetStreamSubManagerMock := new(ecsubmanagermocks.Manager) + jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once() + jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once() + jetStreamSubManagerMock.On("Stop", mock.Anything, mock.Anything).Return(nil).Once() + return jetStreamSubManagerMock + }, + givenEventingManagerMock: func() *managermocks.Manager { + emMock := new(managermocks.Manager) + emMock.On("GetBackendConfig").Return(givenBackendConfig).Twice() + return emMock + }, + givenNatsConfigHandlerMock: func() *mocks.NatsConfigHandler { + nchMock := new(mocks.NatsConfigHandler) + nchMock.On("GetNatsConfig", mock.Anything, mock.Anything).Return(givenNATSConfig, nil) + return nchMock + }, + givenManagerFactoryMock: func(subManager *ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { + subManagerFactoryMock := new(subscriptionmanagermocks.ManagerFactory) + subManagerFactoryMock.On("NewJetStreamManager", mock.Anything, mock.Anything).Return(subManager).Once() + return subManagerFactoryMock + }, + wantAssertCheck: true, + givenShouldRetry: true, + wantHashAfter: uint64(10896066536699660582), }, } @@ -145,10 +194,8 @@ func Test_reconcileNATSSubManager(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - t.Parallel() - // given - testEnv := NewMockedUnitTestEnvironment(t) + testEnv := NewMockedUnitTestEnvironment(t, givenEventing) logger := testEnv.Reconciler.logger.WithContext().Named(ControllerName) // get mocks from test-case. @@ -163,18 +210,29 @@ func Test_reconcileNATSSubManager(t *testing.T) { testEnv.Reconciler.natsConfigHandler = givenNatConfigHandlerMock testEnv.Reconciler.subManagerFactory = givenManagerFactoryMock testEnv.Reconciler.natsSubManager = nil - if givenManagerFactoryMock == nil { + if givenManagerFactoryMock == nil || tc.givenUpdateTest { testEnv.Reconciler.natsSubManager = givenNATSSubManagerMock } + // set the backend hash before depending on test + givenEventing.Status.BackendConfigHash = tc.givenHashBefore + // when - err := testEnv.Reconciler.reconcileNATSSubManager(givenEventing, logger) + err := testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger) if err != nil && tc.givenShouldRetry { // This is to test the scenario where initialization of natsSubManager was successful but // starting the natsSubManager failed. So on next try it should again try to start the natsSubManager. - err = testEnv.Reconciler.reconcileNATSSubManager(givenEventing, logger) + err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger) + } + if err == nil && tc.givenShouldRetry { + // Run reconcile again with newBackendConfig: + err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger) + require.NoError(t, err) } + // check for backend hash after + require.Equal(t, tc.wantHashAfter, givenEventing.Status.BackendConfigHash) + // then if tc.wantError != nil { require.Error(t, err) @@ -187,9 +245,11 @@ func Test_reconcileNATSSubManager(t *testing.T) { if tc.wantAssertCheck { givenNATSSubManagerMock.AssertExpectations(t) - givenManagerFactoryMock.AssertExpectations(t) givenEventingManagerMock.AssertExpectations(t) - givenNATSSubManagerMock.AssertExpectations(t) + givenNatConfigHandlerMock.AssertExpectations(t) + } + if !tc.givenIsNATSSubManagerStarted { + givenManagerFactoryMock.AssertExpectations(t) } }) } diff --git a/internal/controller/eventing/status.go b/internal/controller/eventing/status.go index 5fcdb0ff..79a22472 100644 --- a/internal/controller/eventing/status.go +++ b/internal/controller/eventing/status.go @@ -71,6 +71,16 @@ func (r *Reconciler) syncStatusWithSubscriptionManagerErrWithReason(ctx context. return errors.Join(err, r.syncEventingStatus(ctx, eventing, log)) } +func (r *Reconciler) syncStatusWithSubscriptionManagerProcessingWithReason(ctx context.Context, + reason eventingv1alpha1.ConditionReason, + eventing *eventingv1alpha1.Eventing, + message string, log *zap.SugaredLogger) error { + // Set processing state in status + eventing.Status.SetStateProcessing() + eventing.Status.UpdateConditionSubscriptionManagerReady(metav1.ConditionFalse, reason, message) + return r.syncEventingStatus(ctx, eventing, log) +} + // syncStatusWithSubscriptionManagerFailedCondition updates subscription manager condition and // sets an error state. It doesn't return the incoming error. func (r *Reconciler) syncStatusWithSubscriptionManagerFailedCondition(ctx context.Context, diff --git a/internal/controller/eventing/utils.go b/internal/controller/eventing/utils.go index c8e25553..2543beea 100644 --- a/internal/controller/eventing/utils.go +++ b/internal/controller/eventing/utils.go @@ -2,8 +2,10 @@ package eventing import ( "context" - eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" + "github.com/kyma-project/eventing-manager/pkg/env" + ecenv "github.com/kyma-project/kyma/components/eventing-controller/pkg/env" + "github.com/mitchellh/hashstructure/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -29,3 +31,15 @@ func (r *Reconciler) removeFinalizer(ctx context.Context, eventing *eventingv1al return ctrl.Result{}, nil } + +func (r *Reconciler) getNATSBackendConfigHash(defaultSubscriptionConfig ecenv.DefaultSubscriptionConfig, natsConfig env.NATSConfig) (uint64, error) { + natsBackendConfig := struct { + ecenv.DefaultSubscriptionConfig + env.NATSConfig + }{defaultSubscriptionConfig, natsConfig} + hash, err := hashstructure.Hash(natsBackendConfig, hashstructure.FormatV2, nil) + if err != nil { + return 0, err + } + return hash, nil +} diff --git a/pkg/eventing/mocks/manager.go b/pkg/eventing/mocks/manager.go index c7ba7c9a..4fc6d764 100644 --- a/pkg/eventing/mocks/manager.go +++ b/pkg/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/k8s/mocks/client.go b/pkg/k8s/mocks/client.go index 8900a22e..4f0f0808 100644 --- a/pkg/k8s/mocks/client.go +++ b/pkg/k8s/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/mocks/ec/manager.go b/pkg/subscriptionmanager/mocks/ec/manager.go index 44007a8f..913f6b72 100644 --- a/pkg/subscriptionmanager/mocks/ec/manager.go +++ b/pkg/subscriptionmanager/mocks/ec/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/mocks/manager_factory.go b/pkg/subscriptionmanager/mocks/manager_factory.go index 16d0caf1..850d0973 100644 --- a/pkg/subscriptionmanager/mocks/manager_factory.go +++ b/pkg/subscriptionmanager/mocks/manager_factory.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks