Skip to content

Commit

Permalink
React to NATS backend config changes (#87)
Browse files Browse the repository at this point in the history
* Mockery update

* React to changes in NATS config

* React to NATS specific config

* Update eventing status

* Working version with correct subsConfig

* Calculate the hash from NATS and Subs config

* Add status change to processing
Fix tests

* Use the right ConditionReason and remove unused variable

* Remove unused function

* Simplify if-statement
Simplify test-case

* Fix reconcile if-statement
  • Loading branch information
grischperl authored Sep 28, 2023
1 parent 4046ccf commit 7e5da78
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 33 deletions.
8 changes: 5 additions & 3 deletions api/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
ConditionReasonEventMeshSubManagerReady ConditionReason = "EventMeshSubscriptionManagerReady"
ConditionReasonEventMeshSubManagerFailed ConditionReason = "EventMeshSubscriptionManagerFailed"
ConditionReasonEventMeshSubManagerStopFailed ConditionReason = "EventMeshSubscriptionManagerStopFailed"
ConditionReasonSubscriptionManagerProcessing ConditionReason = "SubscriptionManagerProcessing"
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
Expand All @@ -86,9 +87,10 @@ type Eventing struct {

// EventingStatus defines the observed state of Eventing
type EventingStatus struct {
ActiveBackend BackendType `json:"activeBackend"`
State string `json:"state"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
ActiveBackend BackendType `json:"activeBackend"`
BackendConfigHash uint64 `json:"specHash"`
State string `json:"state"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// EventingSpec defines the desired state of Eventing
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/operator.kyma-project.io_eventings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,14 @@ spec:
- type
type: object
type: array
specHash:
format: int64
type: integer
state:
type: string
required:
- activeBackend
- specHash
- state
type: object
type: object
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/kyma-incubator/api-gateway v0.0.0-20220819093753-296e6704d413
github.com/kyma-project/kyma/components/eventing-controller v0.0.0-20230921090956-f27b81d3d8cb
github.com/kyma-project/nats-manager v0.0.0-20230718133808-9241d3b926bd
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/gomega v1.27.10
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, eventing *eventin
}

// start NATS subscription manager
if err := r.reconcileNATSSubManager(eventing, log); err != nil {
if err := r.reconcileNATSSubManager(ctx, eventing, log); err != nil {
return ctrl.Result{}, r.syncStatusWithNATSErr(ctx, eventing, err, log)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/eventing/mocks/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/controller/eventing/mocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/controller/eventing/mocks/nats_config_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 51 additions & 9 deletions internal/controller/eventing/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package eventing
import (
"context"
"fmt"
eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
ecenv "github.com/kyma-project/kyma/components/eventing-controller/pkg/env"

"github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/env"
Expand All @@ -12,12 +14,21 @@ import (
"go.uber.org/zap"
)

func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *zap.SugaredLogger) error {
func (r *Reconciler) reconcileNATSSubManager(ctx context.Context, eventing *v1alpha1.Eventing, log *zap.SugaredLogger) error {
// get the subscription config
defaultSubsConfig := r.getDefaultSubscriptionConfig()
// get the nats config
natsConfig, err := r.natsConfigHandler.GetNatsConfig(context.Background(), *eventing)
if err != nil {
return err
}
// get the hash of current config
specHash, err := r.getNATSBackendConfigHash(defaultSubsConfig, *natsConfig)
if err != nil {
return err
}

if r.natsSubManager == nil {
natsConfig, err := r.natsConfigHandler.GetNatsConfig(context.Background(), *eventing)
if err != nil {
return err
}
// create instance of NATS subscription manager
natsSubManager := r.subManagerFactory.NewJetStreamManager(*eventing, *natsConfig)

Expand All @@ -29,28 +40,59 @@ func (r *Reconciler) reconcileNATSSubManager(eventing *v1alpha1.Eventing, log *z
log.Info("NATS subscription-manager initialized")
// save instance only when init is successful.
r.natsSubManager = natsSubManager
} else {
// update the config if hashes differ
if eventing.Status.BackendConfigHash != specHash && r.isNATSSubManagerStarted {
// set the eventing CR status to processing
if err = r.syncStatusWithSubscriptionManagerProcessingWithReason(ctx,
eventingv1alpha1.ConditionReasonSubscriptionManagerProcessing,
eventing, "Updating NATS subscription-manager with new config.", log); err != nil {
return err
}

// stop the subsManager without cleanup

if err := r.stopNATSSubManager(false, log); err != nil {
return err
}
return nil

}
}

if r.isNATSSubManagerStarted {
log.Info("NATS subscription-manager is already started")
return nil
}

// start the subscription manager.
defaultSubsConfig := r.eventingManager.GetBackendConfig().
DefaultSubscriptionConfig.ToECENVDefaultSubscriptionConfig()
err = r.startNATSSubManager(defaultSubsConfig, log)
if err != nil {
return err
}

// update the hash of the current config only once subManager is started
eventing.Status.BackendConfigHash = specHash
log.Info(fmt.Sprintf("NATS subscription-manager has been updated, new hash: %d", specHash))

return nil
}

func (r *Reconciler) startNATSSubManager(defaultSubsConfig ecenv.DefaultSubscriptionConfig, log *zap.SugaredLogger) error {
if err := r.natsSubManager.Start(defaultSubsConfig, ecsubscriptionmanager.Params{}); err != nil {
return err
}

log.Info("NATS subscription-manager started")
// update flag so it do not try to start the manager again.
r.isNATSSubManagerStarted = true

return nil
}

func (r *Reconciler) getDefaultSubscriptionConfig() ecenv.DefaultSubscriptionConfig {
return r.eventingManager.GetBackendConfig().
DefaultSubscriptionConfig.ToECENVDefaultSubscriptionConfig()
}

func (r *Reconciler) stopNATSSubManager(runCleanup bool, log *zap.SugaredLogger) error {
log.Debug("stopping NATS subscription-manager")
if r.natsSubManager == nil || !r.isNATSSubManagerStarted {
Expand Down
82 changes: 71 additions & 11 deletions internal/controller/eventing/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func Test_reconcileNATSSubManager(t *testing.T) {

// given - common for all test cases.
givenEventing := utils.NewEventingCR(
utils.WithEventingCRName("eventing"),
utils.WithEventingCRNamespace("kyma-system"),
utils.WithEventingStreamData("Memory", "650M", 99, 98),
utils.WithEventingEventTypePrefix("one.two.three"),
)
Expand Down Expand Up @@ -60,33 +62,46 @@ func Test_reconcileNATSSubManager(t *testing.T) {
name string
givenIsNATSSubManagerStarted bool
givenShouldRetry bool
givenUpdateTest bool
givenHashBefore uint64
givenNATSSubManagerMock func() *ecsubmanagermocks.Manager
givenEventingManagerMock func() *managermocks.Manager
givenNatsConfigHandlerMock func() *mocks.NatsConfigHandler
givenManagerFactoryMock func(*ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory
wantAssertCheck bool
wantError error
wantHashAfter uint64
}{
{
name: "it should do nothing because subscription manager is already started",
givenIsNATSSubManagerStarted: true,
givenHashBefore: uint64(10896066536699660582),
givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager {
return new(ecsubmanagermocks.Manager)
jetStreamSubManagerMock := new(ecsubmanagermocks.Manager)
jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once()
jetStreamSubManagerMock.On("Stop", mock.Anything, mock.Anything).Return(nil).Once()
return jetStreamSubManagerMock
},
givenEventingManagerMock: func() *managermocks.Manager {
return nil
emMock := new(managermocks.Manager)
emMock.On("GetBackendConfig").Return(givenBackendConfig)
return emMock
},
givenNatsConfigHandlerMock: func() *mocks.NatsConfigHandler {
return nil
nchMock := new(mocks.NatsConfigHandler)
nchMock.On("GetNatsConfig", mock.Anything, mock.Anything).Return(givenNATSConfig, nil)
return nchMock
},
givenManagerFactoryMock: func(_ *ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory {
return nil
},
wantHashAfter: uint64(10896066536699660582),
},
{
name: "it should initialize and start subscription manager because " +
"subscription manager is not started",
givenIsNATSSubManagerStarted: false,
givenHashBefore: uint64(0),
givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager {
jetStreamSubManagerMock := new(ecsubmanagermocks.Manager)
jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once()
Expand All @@ -109,11 +124,13 @@ func Test_reconcileNATSSubManager(t *testing.T) {
return subManagerFactoryMock
},
wantAssertCheck: true,
wantHashAfter: uint64(10896066536699660582),
},
{
name: "it should retry to start subscription manager when subscription manager was " +
"successfully initialized but failed to start",
givenIsNATSSubManagerStarted: false,
givenHashBefore: uint64(0),
givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager {
jetStreamSubManagerMock := new(ecsubmanagermocks.Manager)
jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once()
Expand All @@ -138,17 +155,47 @@ func Test_reconcileNATSSubManager(t *testing.T) {
wantAssertCheck: true,
givenShouldRetry: true,
wantError: errors.New("failed to start"),
wantHashAfter: uint64(0),
},
{
name: "it should update the subscription manager when the backend config changes",
givenIsNATSSubManagerStarted: true,
givenHashBefore: uint64(17644964695675018020),
givenUpdateTest: true,
givenNATSSubManagerMock: func() *ecsubmanagermocks.Manager {
jetStreamSubManagerMock := new(ecsubmanagermocks.Manager)
jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once()
jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once()
jetStreamSubManagerMock.On("Stop", mock.Anything, mock.Anything).Return(nil).Once()
return jetStreamSubManagerMock
},
givenEventingManagerMock: func() *managermocks.Manager {
emMock := new(managermocks.Manager)
emMock.On("GetBackendConfig").Return(givenBackendConfig).Twice()
return emMock
},
givenNatsConfigHandlerMock: func() *mocks.NatsConfigHandler {
nchMock := new(mocks.NatsConfigHandler)
nchMock.On("GetNatsConfig", mock.Anything, mock.Anything).Return(givenNATSConfig, nil)
return nchMock
},
givenManagerFactoryMock: func(subManager *ecsubmanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory {
subManagerFactoryMock := new(subscriptionmanagermocks.ManagerFactory)
subManagerFactoryMock.On("NewJetStreamManager", mock.Anything, mock.Anything).Return(subManager).Once()
return subManagerFactoryMock
},
wantAssertCheck: true,
givenShouldRetry: true,
wantHashAfter: uint64(10896066536699660582),
},
}

// run test cases
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

// given
testEnv := NewMockedUnitTestEnvironment(t)
testEnv := NewMockedUnitTestEnvironment(t, givenEventing)
logger := testEnv.Reconciler.logger.WithContext().Named(ControllerName)

// get mocks from test-case.
Expand All @@ -163,18 +210,29 @@ func Test_reconcileNATSSubManager(t *testing.T) {
testEnv.Reconciler.natsConfigHandler = givenNatConfigHandlerMock
testEnv.Reconciler.subManagerFactory = givenManagerFactoryMock
testEnv.Reconciler.natsSubManager = nil
if givenManagerFactoryMock == nil {
if givenManagerFactoryMock == nil || tc.givenUpdateTest {
testEnv.Reconciler.natsSubManager = givenNATSSubManagerMock
}

// set the backend hash before depending on test
givenEventing.Status.BackendConfigHash = tc.givenHashBefore

// when
err := testEnv.Reconciler.reconcileNATSSubManager(givenEventing, logger)
err := testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger)
if err != nil && tc.givenShouldRetry {
// This is to test the scenario where initialization of natsSubManager was successful but
// starting the natsSubManager failed. So on next try it should again try to start the natsSubManager.
err = testEnv.Reconciler.reconcileNATSSubManager(givenEventing, logger)
err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger)
}
if err == nil && tc.givenShouldRetry {
// Run reconcile again with newBackendConfig:
err = testEnv.Reconciler.reconcileNATSSubManager(testEnv.Context, givenEventing, logger)
require.NoError(t, err)
}

// check for backend hash after
require.Equal(t, tc.wantHashAfter, givenEventing.Status.BackendConfigHash)

// then
if tc.wantError != nil {
require.Error(t, err)
Expand All @@ -187,9 +245,11 @@ func Test_reconcileNATSSubManager(t *testing.T) {

if tc.wantAssertCheck {
givenNATSSubManagerMock.AssertExpectations(t)
givenManagerFactoryMock.AssertExpectations(t)
givenEventingManagerMock.AssertExpectations(t)
givenNATSSubManagerMock.AssertExpectations(t)
givenNatConfigHandlerMock.AssertExpectations(t)
}
if !tc.givenIsNATSSubManagerStarted {
givenManagerFactoryMock.AssertExpectations(t)
}
})
}
Expand Down
10 changes: 10 additions & 0 deletions internal/controller/eventing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (r *Reconciler) syncStatusWithSubscriptionManagerErrWithReason(ctx context.
return errors.Join(err, r.syncEventingStatus(ctx, eventing, log))
}

func (r *Reconciler) syncStatusWithSubscriptionManagerProcessingWithReason(ctx context.Context,
reason eventingv1alpha1.ConditionReason,
eventing *eventingv1alpha1.Eventing,
message string, log *zap.SugaredLogger) error {
// Set processing state in status
eventing.Status.SetStateProcessing()
eventing.Status.UpdateConditionSubscriptionManagerReady(metav1.ConditionFalse, reason, message)
return r.syncEventingStatus(ctx, eventing, log)
}

// syncStatusWithSubscriptionManagerFailedCondition updates subscription manager condition and
// sets an error state. It doesn't return the incoming error.
func (r *Reconciler) syncStatusWithSubscriptionManagerFailedCondition(ctx context.Context,
Expand Down
16 changes: 15 additions & 1 deletion internal/controller/eventing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package eventing

import (
"context"

eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/env"
ecenv "github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
"github.com/mitchellh/hashstructure/v2"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -29,3 +31,15 @@ func (r *Reconciler) removeFinalizer(ctx context.Context, eventing *eventingv1al

return ctrl.Result{}, nil
}

func (r *Reconciler) getNATSBackendConfigHash(defaultSubscriptionConfig ecenv.DefaultSubscriptionConfig, natsConfig env.NATSConfig) (uint64, error) {
natsBackendConfig := struct {
ecenv.DefaultSubscriptionConfig
env.NATSConfig
}{defaultSubscriptionConfig, natsConfig}
hash, err := hashstructure.Hash(natsBackendConfig, hashstructure.FormatV2, nil)
if err != nil {
return 0, err
}
return hash, nil
}
Loading

0 comments on commit 7e5da78

Please sign in to comment.