Skip to content

Commit

Permalink
Fix nats stream replica update (#248)
Browse files Browse the repository at this point in the history
* Fix nats stream replica update

* Add unit test for the fix

Test the case when eventing CR changes but NATS sub manager failed to
start
  • Loading branch information
muralov authored Nov 16, 2023
1 parent 99c6afc commit 16da476
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
4 changes: 4 additions & 0 deletions internal/controller/eventing/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (r *Reconciler) reconcileNATSSubManager(ctx context.Context, eventing *v1al
if err := r.stopNATSSubManager(false, log); err != nil {
return err
}
} else if eventing.Status.BackendConfigHash != specHash {
// in case spec is change and subManager is not started yet (e.g. due to error)
// make natsSubManager nil to create a subManager with new values
r.natsSubManager = nil
}

if r.natsSubManager == nil {
Expand Down
34 changes: 32 additions & 2 deletions internal/controller/eventing/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func Test_reconcileNATSSubManager(t *testing.T) {
name: "it should retry to start subscription manager when subscription manager was " +
"successfully initialized but failed to start",
givenIsNATSSubManagerStarted: false,
givenHashBefore: int64(0),
givenHashBefore: int64(-7550677537009891034),
givenNATSSubManagerMock: func() *submanagermocks.Manager {
jetStreamSubManagerMock := new(submanagermocks.Manager)
jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once()
Expand All @@ -156,7 +156,7 @@ func Test_reconcileNATSSubManager(t *testing.T) {
wantAssertCheck: true,
givenShouldRetry: true,
wantError: errors.New("failed to start"),
wantHashAfter: int64(0),
wantHashAfter: int64(-7550677537009891034),
},
{
name: "it should update the subscription manager when the backend config changes",
Expand Down Expand Up @@ -188,6 +188,36 @@ func Test_reconcileNATSSubManager(t *testing.T) {
wantAssertCheck: true,
wantHashAfter: int64(-7550677537009891034),
},
{
name: "it should update the subscription manager when the backend config changes" +
"but subscription manager failed to start",
givenIsNATSSubManagerStarted: false,
givenHashBefore: int64(-8550677537009891034),
givenUpdateTest: true,
givenNATSSubManagerMock: func() *submanagermocks.Manager {
jetStreamSubManagerMock := new(submanagermocks.Manager)
jetStreamSubManagerMock.On("Init", mock.Anything).Return(nil).Once()
jetStreamSubManagerMock.On("Start", mock.Anything, mock.Anything).Return(nil).Once()
return jetStreamSubManagerMock
},
givenEventingManagerMock: func() *managermocks.Manager {
emMock := new(managermocks.Manager)
emMock.On("GetBackendConfig").Return(givenBackendConfig).Twice()
return emMock
},
givenNatsConfigHandlerMock: func() *mocks.NatsConfigHandler {
nchMock := new(mocks.NatsConfigHandler)
nchMock.On("GetNatsConfig", mock.Anything, mock.Anything).Return(givenNATSConfig, nil)
return nchMock
},
givenManagerFactoryMock: func(subManager *submanagermocks.Manager) *subscriptionmanagermocks.ManagerFactory {
subManagerFactoryMock := new(subscriptionmanagermocks.ManagerFactory)
subManagerFactoryMock.On("NewJetStreamManager", mock.Anything, mock.Anything).Return(subManager).Once()
return subManagerFactoryMock
},
wantAssertCheck: true,
wantHashAfter: int64(-7550677537009891034),
},
}

// run test cases
Expand Down

0 comments on commit 16da476

Please sign in to comment.