diff --git a/api/v1alpha1/eventing_types.go b/api/v1alpha1/eventing_types.go index b0998694..192610a1 100644 --- a/api/v1alpha1/eventing_types.go +++ b/api/v1alpha1/eventing_types.go @@ -88,7 +88,7 @@ type Eventing struct { // EventingStatus defines the observed state of Eventing type EventingStatus struct { ActiveBackend BackendType `json:"activeBackend"` - BackendConfigHash uint64 `json:"specHash"` + BackendConfigHash int64 `json:"specHash"` State string `json:"state"` Conditions []metav1.Condition `json:"conditions,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 99b1cdc5..28204a61 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -22,7 +22,7 @@ limitations under the License. package v1alpha1 import ( - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) diff --git a/config/crd/external/apirules.gateway.crd.yaml b/config/crd/external/apirules.gateway.crd.yaml index 3913d322..823d1293 100644 --- a/config/crd/external/apirules.gateway.crd.yaml +++ b/config/crd/external/apirules.gateway.crd.yaml @@ -14,205 +14,8 @@ spec: plural: apirules singular: apirule scope: Namespaced - conversion: - strategy: Webhook - webhook: - clientConfig: - service: - namespace: kyma-system - name: api-gateway-webhook-service - path: /convert - conversionReviewVersions: - - v1beta1 - - v1alpha1 preserveUnknownFields: false versions: - - deprecated: true - deprecationWarning: Since Kyma 2.5.X, APIRule in version v1alpha1 has been deprecated. - Consider using v1beta1. - name: v1alpha1 - schema: - openAPIV3Schema: - description: APIRule is the Schema for ApiRule APIs. - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: Defines the desired state of ApiRule. - properties: - gateway: - description: Specifies the Istio Gateway to be used. - pattern: ^[0-9a-z-_]+(\/[0-9a-z-_]+|(\.[0-9a-z-_]+)*)$ - type: string - rules: - description: Represents the array of Oathkeeper access rules to be - applied. - items: - properties: - accessStrategies: - description: Specifies the list of access strategies. All strategies - listed in [Oathkeeper documentation](https://www.ory.sh/docs/oathkeeper/pipeline/authn) - are supported. - items: - description: Represents a handler that authenticates provided - credentials. See the corresponding type in the oathkeeper-maester - project. - properties: - config: - description: Configures the handler. Configuration keys - vary per handler. - type: object - x-kubernetes-preserve-unknown-fields: true - properties: - jwks_urls: - description: Specifies the array of URLs from which Ory Oathkeeper can retrieve JSON Web Keys for validating JSON Web Token. - The value must begin with either `http://`, `https://`, or `file://`. - type: array - items: - type: string - pattern: ^(http://|https://|file://).*$ - trusted_issuers: - description: If the **trusted_issuers** field is set, the JWT must contain a value for the claim `iss` - that matches exactly (case-sensitive) one of the values of **trusted_issuers**. - The value must begin with either `http://`, `https://`, or `file://`. - type: array - items: - type: string - pattern: ^(http://|https://|file://).*$ - handler: - description: Specifies the name of the handler. - type: string - required: - - handler - type: object - minItems: 1 - type: array - methods: - description: Represents the list of allowed HTTP request methods - available for the **spec.rules.path**. - items: - type: string - minItems: 1 - type: array - mutators: - description: Specifies the list of [Ory Oathkeeper mutators](https://www.ory.sh/docs/oathkeeper/pipeline/mutator). - items: - description: Mutator represents a handler that transforms - the HTTP request before forwarding it. See the corresponding - in the oathkeeper-maester project. - properties: - config: - description: Configures the handler. Configuration keys - vary per handler. - type: object - x-kubernetes-preserve-unknown-fields: true - handler: - description: Specifies the name of the handler. - type: string - required: - - handler - type: object - type: array - path: - description: Specifies the path of the exposed service. - pattern: ^([0-9a-zA-Z./*()?!\\_-]+) - type: string - required: - - accessStrategies - - methods - - path - type: object - minItems: 1 - type: array - service: - description: Describes the service to expose. - properties: - external: - description: Specifies if the service is internal (in cluster) or - external. - type: boolean - host: - description: Specifies the URL of the exposed service. - maxLength: 256 - minLength: 3 - pattern: ^([a-zA-Z0-9][a-zA-Z0-9-_]*\.)*[a-zA-Z0-9]*[a-zA-Z0-9-_]*[[a-zA-Z0-9]+$ - type: string - name: - description: Specifies the name of the exposed service. - type: string - port: - description: Specifies the communication port of the exposed service. - format: int32 - maximum: 65535 - minimum: 1 - type: integer - required: - - host - - name - - port - type: object - required: - - gateway - - rules - - service - type: object - status: - description: Describes the observed state of ApiRule. - properties: - APIRuleStatus: - description: Describes the status of APIRule. - properties: - code: - description: Status code describing APIRule. - type: string - desc: - description: Explains the status of APIRule. - type: string - type: object - accessRuleStatus: - description: Describes the status of ORY Oathkeeper Rule. - properties: - code: - description: Status code describing ORY Oathkeeper Rule. - type: string - desc: - description: Explains the status of ORY Oathkeeper Rule. - type: string - type: object - lastProcessedTime: - description: Indicates the timestamp when the API Gateway controller last processed APIRule. - format: date-time - type: string - observedGeneration: - description: Specifies the generation of the resource that was observed by the API Gateway controller. - format: int64 - type: integer - virtualServiceStatus: - description: Describes the status of Istio VirtualService. - properties: - code: - description: Status code describing Istio VirtualService. - type: string - desc: - description: Explains the status of Istio VirtualService. - type: string - type: object - type: object - type: object - served: true - storage: false - subresources: - status: {} - additionalPrinterColumns: - jsonPath: .status.APIRuleStatus.code name: Status diff --git a/go.mod b/go.mod index 099acc61..f8c2635c 100644 --- a/go.mod +++ b/go.mod @@ -17,10 +17,14 @@ require ( github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/nats-io/nats-server/v2 v2.9.21 github.com/nats-io/nats.go v1.28.0 + github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.27.10 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.16.0 github.com/stretchr/testify v1.8.4 + go.uber.org/atomic v1.11.0 go.uber.org/zap v1.25.0 + golang.org/x/oauth2 v0.10.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 k8s.io/api v0.28.1 k8s.io/apiextensions-apiserver v0.28.0 @@ -62,19 +66,15 @@ require ( github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.8 // indirect - github.com/onsi/ginkgo v1.16.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect - go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/net v0.13.0 // indirect - golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/term v0.10.0 // indirect golang.org/x/text v0.11.0 // indirect diff --git a/internal/controller/eventing/eventmesh.go b/internal/controller/eventing/eventmesh.go index 46a46347..d083a384 100644 --- a/internal/controller/eventing/eventmesh.go +++ b/internal/controller/eventing/eventmesh.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/kyma-project/eventing-manager/pkg/env" "os" "github.com/kyma-project/eventing-manager/pkg/eventing" @@ -41,7 +42,6 @@ func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing if err != nil { return errors.Errorf("failed to sync OAuth secret: %v", err) } - // retrieve secret to authenticate with EventMesh eventMeshSecret, err := r.kubeClient.GetSecret(ctx, eventing.Spec.Backend.Config.EventMeshSecret) if err != nil { @@ -59,6 +59,25 @@ func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing return fmt.Errorf("failed to setup environment variables for EventMesh controller: %v", err) } + // get the subscription config + defaultSubsConfig := r.getDefaultSubscriptionConfig() + // get the subManager parameters + eventMeshSubMgrParams := r.getEventMeshSubManagerParams() + // get the hash of current config + specHash, err := r.getEventMeshBackendConfigHash(eventing.Spec.Backend.Config.EventMeshSecret, + eventing.Spec.Backend.Config.EventTypePrefix) + if err != nil { + return err + } + + // update the config if hashes differ + if eventing.Status.BackendConfigHash != specHash { + // stop the subsManager without cleanup + if err := r.stopEventMeshSubManager(false, r.namedLogger()); err != nil { + return err + } + } + if r.eventMeshSubManager == nil { // create instance of EventMesh subscription manager eventMeshSubManager, err := r.subManagerFactory.NewEventMeshManager() @@ -81,19 +100,36 @@ func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing return nil } - defaultSubsConfig := r.eventingManager.GetBackendConfig().DefaultSubscriptionConfig - eventMeshSubMgrParams := subscriptionmanager.Params{ + err = r.startEventMeshSubManager(defaultSubsConfig, eventMeshSubMgrParams) + if err != nil { + return err + } + + // update the hash of the current config only once subManager is started + eventing.Status.BackendConfigHash = specHash + r.namedLogger().Info(fmt.Sprintf("NATS subscription-manager has been updated, new hash: %d", specHash)) + + return nil +} + +func (r *Reconciler) getEventMeshSubManagerParams() subscriptionmanager.Params { + return subscriptionmanager.Params{ subscriptionmanager.ParamNameClientID: r.oauth2credentials.clientID, subscriptionmanager.ParamNameClientSecret: r.oauth2credentials.clientSecret, subscriptionmanager.ParamNameTokenURL: r.oauth2credentials.tokenURL, subscriptionmanager.ParamNameCertsURL: r.oauth2credentials.certsURL, } - if err = r.eventMeshSubManager.Start(defaultSubsConfig, eventMeshSubMgrParams); err != nil { +} + +func (r *Reconciler) startEventMeshSubManager(defaultSubsConfig env.DefaultSubscriptionConfig, + eventMeshSubMgrParams subscriptionmanager.Params) error { + if err := r.eventMeshSubManager.Start(defaultSubsConfig, eventMeshSubMgrParams); err != nil { return err } + r.namedLogger().Info("EventMesh subscription-manager started") + // update flag so it does not try to start the manager again r.isEventMeshSubManagerStarted = true - return nil } diff --git a/internal/controller/eventing/eventmesh_test.go b/internal/controller/eventing/eventmesh_test.go index d2f106f4..13720e54 100644 --- a/internal/controller/eventing/eventmesh_test.go +++ b/internal/controller/eventing/eventmesh_test.go @@ -36,12 +36,15 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { // given - common for all test cases. givenEventing := utils.NewEventingCR( + utils.WithEventingCRName("eventing"), utils.WithEventingCRNamespace("test-namespace"), utils.WithEventMeshBackend("test-namespace/test-secret-name"), utils.WithEventingPublisherData(2, 2, "199m", "99Mi", "399m", "199Mi"), utils.WithEventingEventTypePrefix("test-prefix"), ) + givenOauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) + givenBackendConfig := &env.BackendConfig{ EventingWebhookAuthSecretName: "eventing-webhook-auth", } @@ -52,6 +55,8 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { name string givenIsEventMeshSubManagerStarted bool givenShouldRetry bool + givenUpdateTest bool + givenHashBefore int64 givenEventMeshSubManagerMock func() *submanagermocks.Manager givenEventingManagerMock func() *managermocks.Manager givenManagerFactoryMock func(*submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory @@ -59,10 +64,12 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { givenKubeClientMock func() k8s.Client wantAssertCheck bool wantError error + wantHashAfter int64 }{ { name: "it should do nothing because syncing OAuth secret failed", givenIsEventMeshSubManagerStarted: true, + givenHashBefore: int64(0), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Stop", mock.Anything).Return(nil).Once() @@ -74,11 +81,13 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { givenManagerFactoryMock: func(_ *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, - wantError: errors.New("failed to sync OAuth secret"), + wantError: errors.New("failed to sync OAuth secret"), + wantHashAfter: int64(0), }, { name: "it should do nothing because failed syncing EventMesh secret", givenIsEventMeshSubManagerStarted: true, + givenHashBefore: int64(0), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Stop", mock.Anything).Return(nil).Once() @@ -90,23 +99,19 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { givenManagerFactoryMock: func(_ *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, - givenClientMock: func() client.Client { - mockClient := fake.NewClientBuilder().WithObjects().Build() - oauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) - require.NoError(t, mockClient.Create(ctx, oauthSecret)) - return mockClient - }, givenKubeClientMock: func() k8s.Client { mockKubeClient := new(k8smocks.Client) mockKubeClient.On("GetSecret", ctx, mock.Anything, mock.Anything).Return(nil, errors.New("failed getting secret")).Once() return mockKubeClient }, - wantError: errors.New("failed to get EventMesh secret"), + wantError: errors.New("failed to get EventMesh secret"), + wantHashAfter: int64(0), }, { name: "it should do nothing because failed sync Publisher Proxy secret", givenIsEventMeshSubManagerStarted: true, + givenHashBefore: int64(0), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Stop", mock.Anything).Return(nil).Once() @@ -118,12 +123,6 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { givenManagerFactoryMock: func(_ *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, - givenClientMock: func() client.Client { - mockClient := fake.NewClientBuilder().WithObjects().Build() - oauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) - require.NoError(t, mockClient.Create(ctx, oauthSecret)) - return mockClient - }, givenKubeClientMock: func() k8s.Client { mockKubeClient := new(k8smocks.Client) mockKubeClient.On("GetSecret", ctx, mock.Anything, mock.Anything).Return( @@ -136,23 +135,20 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { { name: "it should do nothing because subscription manager is already started", givenIsEventMeshSubManagerStarted: true, + givenHashBefore: int64(-8279197549452913403), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Stop", mock.Anything).Return(nil).Once() return eventMeshSubManagerMock }, givenEventingManagerMock: func() *managermocks.Manager { - return nil + emMock := new(managermocks.Manager) + emMock.On("GetBackendConfig").Return(givenBackendConfig) + return emMock }, givenManagerFactoryMock: func(_ *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, - givenClientMock: func() client.Client { - mockClient := fake.NewClientBuilder().WithObjects().Build() - oauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) - require.NoError(t, mockClient.Create(ctx, oauthSecret)) - return mockClient - }, givenKubeClientMock: func() k8s.Client { mockKubeClient := new(k8smocks.Client) mockKubeClient.On("PatchApply", ctx, mock.Anything).Return(nil).Once() @@ -160,11 +156,13 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { utils.NewEventMeshSecret("test-secret", givenEventing.Namespace), nil).Once() return mockKubeClient }, + wantHashAfter: int64(-8279197549452913403), }, { name: "it should initialize and start subscription manager because " + "subscription manager is not started", givenIsEventMeshSubManagerStarted: false, + givenHashBefore: int64(0), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -181,12 +179,6 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { subManagerFactoryMock.On("NewEventMeshManager", mock.Anything).Return(subManager, nil).Once() return subManagerFactoryMock }, - givenClientMock: func() client.Client { - mockClient := fake.NewClientBuilder().WithObjects().Build() - oauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) - require.NoError(t, mockClient.Create(ctx, oauthSecret)) - return mockClient - }, givenKubeClientMock: func() k8s.Client { mockKubeClient := new(k8smocks.Client) mockKubeClient.On("PatchApply", ctx, mock.Anything).Return(nil).Once() @@ -195,11 +187,13 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { return mockKubeClient }, wantAssertCheck: true, + wantHashAfter: int64(-8279197549452913403), }, { name: "it should retry to start subscription manager when subscription manager was " + "successfully initialized but failed to start", givenIsEventMeshSubManagerStarted: false, + givenHashBefore: int64(0), givenEventMeshSubManagerMock: func() *submanagermocks.Manager { eventMeshSubManagerMock := new(submanagermocks.Manager) eventMeshSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -216,12 +210,6 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { subManagerFactoryMock.On("NewEventMeshManager", mock.Anything).Return(subManager, nil).Once() return subManagerFactoryMock }, - givenClientMock: func() client.Client { - mockClient := fake.NewClientBuilder().WithObjects().Build() - oauthSecret := utils.NewOAuthSecret("eventing-webhook-auth", givenEventing.Namespace) - require.NoError(t, mockClient.Create(ctx, oauthSecret)) - return mockClient - }, givenKubeClientMock: func() k8s.Client { mockKubeClient := new(k8smocks.Client) mockKubeClient.On("PatchApply", ctx, mock.Anything).Return(nil).Twice() @@ -232,6 +220,39 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { wantAssertCheck: true, givenShouldRetry: true, wantError: errors.New("failed to start"), + wantHashAfter: int64(0), + }, + { + name: "it should update the subscription manager when the backend config changes", + givenIsEventMeshSubManagerStarted: true, + givenHashBefore: int64(-2279197549452913403), + givenUpdateTest: true, + givenEventMeshSubManagerMock: func() *submanagermocks.Manager { + eventMeshSubManagerMock := new(submanagermocks.Manager) + eventMeshSubManagerMock.On("Init", mock.Anything).Return(nil).Once() + eventMeshSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once() + eventMeshSubManagerMock.On("Stop", mock.Anything, mock.Anything).Return(nil).Once() + return eventMeshSubManagerMock + }, + givenEventingManagerMock: func() *managermocks.Manager { + emMock := new(managermocks.Manager) + emMock.On("GetBackendConfig").Return(givenBackendConfig).Twice() + return emMock + }, + givenManagerFactoryMock: func(subManager *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { + subManagerFactoryMock := new(subscriptionmanagermocks.ManagerFactory) + subManagerFactoryMock.On("NewEventMeshManager", mock.Anything).Return(subManager, nil).Once() + return subManagerFactoryMock + }, + givenKubeClientMock: func() k8s.Client { + mockKubeClient := new(k8smocks.Client) + mockKubeClient.On("PatchApply", ctx, mock.Anything).Return(nil).Twice() + mockKubeClient.On("GetSecret", ctx, mock.Anything, mock.Anything).Return( + utils.NewEventMeshSecret("test-secret", givenEventing.Namespace), nil).Twice() + return mockKubeClient + }, + wantAssertCheck: true, + wantHashAfter: int64(-8279197549452913403), }, } @@ -239,33 +260,31 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - t.Parallel() - // given - testEnv := NewMockedUnitTestEnvironment(t) + testEnv := NewMockedUnitTestEnvironment(t, givenEventing, givenOauthSecret) testEnv.Reconciler.backendConfig = *givenBackendConfig + // get mocks from test-case. givenEventMeshSubManagerMock := tc.givenEventMeshSubManagerMock() givenManagerFactoryMock := tc.givenManagerFactoryMock(givenEventMeshSubManagerMock) givenEventingManagerMock := tc.givenEventingManagerMock() // connect mocks with reconciler. - if tc.givenKubeClientMock != nil { testEnv.Reconciler.kubeClient = tc.givenKubeClientMock() } - if tc.givenClientMock != nil { - testEnv.Reconciler.Client = tc.givenClientMock() - } testEnv.Reconciler.isEventMeshSubManagerStarted = tc.givenIsEventMeshSubManagerStarted testEnv.Reconciler.eventingManager = givenEventingManagerMock testEnv.Reconciler.subManagerFactory = givenManagerFactoryMock testEnv.Reconciler.eventMeshSubManager = nil - if givenManagerFactoryMock == nil { + if givenManagerFactoryMock == nil || tc.givenUpdateTest { testEnv.Reconciler.eventMeshSubManager = givenEventMeshSubManagerMock } + // set the backend hash before depending on test + givenEventing.Status.BackendConfigHash = tc.givenHashBefore + // when err := testEnv.Reconciler.reconcileEventMeshSubManager(ctx, givenEventing) if err != nil && tc.givenShouldRetry { @@ -273,6 +292,14 @@ func Test_reconcileEventMeshSubManager(t *testing.T) { // starting the eventMeshSubManager failed. So on next try it should again try to start the eventMeshSubManager. err = testEnv.Reconciler.reconcileEventMeshSubManager(ctx, givenEventing) } + if tc.givenUpdateTest { + // Run reconcile again with newBackendConfig: + err = testEnv.Reconciler.reconcileEventMeshSubManager(ctx, givenEventing) + require.NoError(t, err) + } + + // check for backend hash after + require.Equal(t, tc.wantHashAfter, givenEventing.Status.BackendConfigHash) // then if tc.wantError != nil { diff --git a/internal/controller/eventing/integrationtests/controller_switching/integration_test.go b/internal/controller/eventing/integrationtests/controller_switching/integration_test.go index 8fe435f9..8df7306b 100644 --- a/internal/controller/eventing/integrationtests/controller_switching/integration_test.go +++ b/internal/controller/eventing/integrationtests/controller_switching/integration_test.go @@ -49,8 +49,6 @@ func TestMain(m *testing.M) { } func Test_Switching(t *testing.T) { - t.Parallel() - // given - common for all test cases. setEventMeshSecretConfig := func(eventingCR *eventingv1alpha1.Eventing, name, namespace string) { eventingCR.Spec.Backend.Config.EventMeshSecret = fmt.Sprintf("%s/%s", namespace, name) diff --git a/internal/controller/eventing/mocks/controller.go b/internal/controller/eventing/mocks/controller.go index 38727445..ad5f91df 100644 --- a/internal/controller/eventing/mocks/controller.go +++ b/internal/controller/eventing/mocks/controller.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/manager.go b/internal/controller/eventing/mocks/manager.go index 33186672..1fba02c9 100644 --- a/internal/controller/eventing/mocks/manager.go +++ b/internal/controller/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/mocks/nats_config_handler.go b/internal/controller/eventing/mocks/nats_config_handler.go index 75479e12..88b6f69f 100644 --- a/internal/controller/eventing/mocks/nats_config_handler.go +++ b/internal/controller/eventing/mocks/nats_config_handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/internal/controller/eventing/nats.go b/internal/controller/eventing/nats.go index 632aff65..1f4f06bb 100644 --- a/internal/controller/eventing/nats.go +++ b/internal/controller/eventing/nats.go @@ -7,7 +7,6 @@ import ( "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager" "github.com/kyma-project/eventing-manager/api/v1alpha1" - eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" "github.com/kyma-project/eventing-manager/options" "github.com/kyma-project/eventing-manager/pkg/env" "github.com/kyma-project/eventing-manager/pkg/k8s" @@ -28,6 +27,14 @@ func (r *Reconciler) reconcileNATSSubManager(ctx context.Context, eventing *v1al return err } + // update the config if hashes differ + if eventing.Status.BackendConfigHash != specHash && r.isNATSSubManagerStarted { + // stop the subsManager without cleanup + if err := r.stopNATSSubManager(false, log); err != nil { + return err + } + } + if r.natsSubManager == nil { // create instance of NATS subscription manager natsSubManager := r.subManagerFactory.NewJetStreamManager(*eventing, *natsConfig) @@ -40,24 +47,6 @@ func (r *Reconciler) reconcileNATSSubManager(ctx context.Context, eventing *v1al log.Info("NATS subscription-manager initialized") // save instance only when init is successful. r.natsSubManager = natsSubManager - } else { - // update the config if hashes differ - if eventing.Status.BackendConfigHash != specHash && r.isNATSSubManagerStarted { - // set the eventing CR status to processing - if err = r.syncStatusWithSubscriptionManagerProcessingWithReason(ctx, - eventingv1alpha1.ConditionReasonSubscriptionManagerProcessing, - eventing, "Updating NATS subscription-manager with new config.", log); err != nil { - return err - } - - // stop the subsManager without cleanup - - if err := r.stopNATSSubManager(false, log); err != nil { - return err - } - return nil - - } } if r.isNATSSubManagerStarted { diff --git a/internal/controller/eventing/nats_test.go b/internal/controller/eventing/nats_test.go index 0b9e16cc..ced1a7b6 100644 --- a/internal/controller/eventing/nats_test.go +++ b/internal/controller/eventing/nats_test.go @@ -64,19 +64,19 @@ func Test_reconcileNATSSubManager(t *testing.T) { givenIsNATSSubManagerStarted bool givenShouldRetry bool givenUpdateTest bool - givenHashBefore uint64 + givenHashBefore int64 givenNATSSubManagerMock func() *submanagermocks.Manager givenEventingManagerMock func() *managermocks.Manager givenNatsConfigHandlerMock func() *mocks.NatsConfigHandler givenManagerFactoryMock func(*submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory wantAssertCheck bool wantError error - wantHashAfter uint64 + wantHashAfter int64 }{ { name: "it should do nothing because subscription manager is already started", givenIsNATSSubManagerStarted: true, - givenHashBefore: uint64(10896066536699660582), + givenHashBefore: int64(-7550677537009891034), givenNATSSubManagerMock: func() *submanagermocks.Manager { jetStreamSubManagerMock := new(submanagermocks.Manager) jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once() @@ -96,13 +96,13 @@ func Test_reconcileNATSSubManager(t *testing.T) { givenManagerFactoryMock: func(_ *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory { return nil }, - wantHashAfter: uint64(10896066536699660582), + wantHashAfter: int64(-7550677537009891034), }, { name: "it should initialize and start subscription manager because " + "subscription manager is not started", givenIsNATSSubManagerStarted: false, - givenHashBefore: uint64(0), + givenHashBefore: int64(0), givenNATSSubManagerMock: func() *submanagermocks.Manager { jetStreamSubManagerMock := new(submanagermocks.Manager) jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -125,13 +125,13 @@ func Test_reconcileNATSSubManager(t *testing.T) { return subManagerFactoryMock }, wantAssertCheck: true, - wantHashAfter: uint64(10896066536699660582), + wantHashAfter: int64(-7550677537009891034), }, { name: "it should retry to start subscription manager when subscription manager was " + "successfully initialized but failed to start", givenIsNATSSubManagerStarted: false, - givenHashBefore: uint64(0), + givenHashBefore: int64(0), givenNATSSubManagerMock: func() *submanagermocks.Manager { jetStreamSubManagerMock := new(submanagermocks.Manager) jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once() @@ -156,12 +156,12 @@ func Test_reconcileNATSSubManager(t *testing.T) { wantAssertCheck: true, givenShouldRetry: true, wantError: errors.New("failed to start"), - wantHashAfter: uint64(0), + wantHashAfter: int64(0), }, { name: "it should update the subscription manager when the backend config changes", givenIsNATSSubManagerStarted: true, - givenHashBefore: uint64(17644964695675018020), + givenHashBefore: int64(-8550677537009891034), givenUpdateTest: true, givenNATSSubManagerMock: func() *submanagermocks.Manager { jetStreamSubManagerMock := new(submanagermocks.Manager) @@ -185,9 +185,8 @@ func Test_reconcileNATSSubManager(t *testing.T) { subManagerFactoryMock.On("NewJetStreamManager", mock.Anything, mock.Anything).Return(subManager).Once() return subManagerFactoryMock }, - wantAssertCheck: true, - givenShouldRetry: true, - wantHashAfter: uint64(10896066536699660582), + wantAssertCheck: true, + wantHashAfter: int64(-7550677537009891034), }, } @@ -225,7 +224,7 @@ func Test_reconcileNATSSubManager(t *testing.T) { // starting the natsSubManager failed. So on next try it should again try to start the natsSubManager. err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger) } - if err == nil && tc.givenShouldRetry { + if tc.givenUpdateTest { // Run reconcile again with newBackendConfig: err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger) require.NoError(t, err) diff --git a/internal/controller/eventing/status.go b/internal/controller/eventing/status.go index 79a22472..5fcdb0ff 100644 --- a/internal/controller/eventing/status.go +++ b/internal/controller/eventing/status.go @@ -71,16 +71,6 @@ func (r *Reconciler) syncStatusWithSubscriptionManagerErrWithReason(ctx context. return errors.Join(err, r.syncEventingStatus(ctx, eventing, log)) } -func (r *Reconciler) syncStatusWithSubscriptionManagerProcessingWithReason(ctx context.Context, - reason eventingv1alpha1.ConditionReason, - eventing *eventingv1alpha1.Eventing, - message string, log *zap.SugaredLogger) error { - // Set processing state in status - eventing.Status.SetStateProcessing() - eventing.Status.UpdateConditionSubscriptionManagerReady(metav1.ConditionFalse, reason, message) - return r.syncEventingStatus(ctx, eventing, log) -} - // syncStatusWithSubscriptionManagerFailedCondition updates subscription manager condition and // sets an error state. It doesn't return the incoming error. func (r *Reconciler) syncStatusWithSubscriptionManagerFailedCondition(ctx context.Context, diff --git a/internal/controller/eventing/utils.go b/internal/controller/eventing/utils.go index 90c16ba5..cfe1fe02 100644 --- a/internal/controller/eventing/utils.go +++ b/internal/controller/eventing/utils.go @@ -32,7 +32,7 @@ func (r *Reconciler) removeFinalizer(ctx context.Context, eventing *eventingv1al return ctrl.Result{}, nil } -func (r *Reconciler) getNATSBackendConfigHash(defaultSubscriptionConfig env.DefaultSubscriptionConfig, natsConfig env.NATSConfig) (uint64, error) { +func (r *Reconciler) getNATSBackendConfigHash(defaultSubscriptionConfig env.DefaultSubscriptionConfig, natsConfig env.NATSConfig) (int64, error) { natsBackendConfig := struct { env.DefaultSubscriptionConfig env.NATSConfig @@ -41,5 +41,15 @@ func (r *Reconciler) getNATSBackendConfigHash(defaultSubscriptionConfig env.Defa if err != nil { return 0, err } - return hash, nil + return int64(hash), nil +} + +func (r *Reconciler) getEventMeshBackendConfigHash(eventMeshSecret, eventTypePrefix string) (int64, error) { + eventMeshBackendConfig := eventMeshSecret + eventTypePrefix + + hash, err := hashstructure.Hash(eventMeshBackendConfig, hashstructure.FormatV2, nil) + if err != nil { + return 0, err + } + return int64(hash), nil } diff --git a/pkg/backend/jetstream/mocks/Backend.go b/pkg/backend/jetstream/mocks/Backend.go index ebfd5b94..91c7a1c8 100644 --- a/pkg/backend/jetstream/mocks/Backend.go +++ b/pkg/backend/jetstream/mocks/Backend.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/backend/jetstream/mocks/JetStreamContext.go b/pkg/backend/jetstream/mocks/JetStreamContext.go index e3cda65a..8c51b94f 100644 --- a/pkg/backend/jetstream/mocks/JetStreamContext.go +++ b/pkg/backend/jetstream/mocks/JetStreamContext.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/ems/api/events/client/mocks/PublisherManager.go b/pkg/ems/api/events/client/mocks/PublisherManager.go index e2bea779..5bf56d03 100644 --- a/pkg/ems/api/events/client/mocks/PublisherManager.go +++ b/pkg/ems/api/events/client/mocks/PublisherManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/eventing/mocks/manager.go b/pkg/eventing/mocks/manager.go index ec99f2a6..b9e701bf 100644 --- a/pkg/eventing/mocks/manager.go +++ b/pkg/eventing/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/k8s/mocks/client.go b/pkg/k8s/mocks/client.go index 6772678f..2798336e 100644 --- a/pkg/k8s/mocks/client.go +++ b/pkg/k8s/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/manager/mocks/manager.go b/pkg/subscriptionmanager/manager/mocks/manager.go index 0557a647..c41e5e6d 100644 --- a/pkg/subscriptionmanager/manager/mocks/manager.go +++ b/pkg/subscriptionmanager/manager/mocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/mocks/ec/manager.go b/pkg/subscriptionmanager/mocks/ec/manager.go index 44007a8f..913f6b72 100644 --- a/pkg/subscriptionmanager/mocks/ec/manager.go +++ b/pkg/subscriptionmanager/mocks/ec/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks diff --git a/pkg/subscriptionmanager/mocks/manager_factory.go b/pkg/subscriptionmanager/mocks/manager_factory.go index c56f6509..1909b489 100644 --- a/pkg/subscriptionmanager/mocks/manager_factory.go +++ b/pkg/subscriptionmanager/mocks/manager_factory.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.30.16. DO NOT EDIT. package mocks