diff --git a/internal/connection/nats/connection.go b/internal/connection/nats/connection.go index 5c734ea0..cedec92b 100644 --- a/internal/connection/nats/connection.go +++ b/internal/connection/nats/connection.go @@ -49,7 +49,7 @@ func (c *connection) isConnected() bool { return c.conn.IsConnected() } -func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.ConnHandler) { +func (c *connection) RegisterReconnectHandler(handler natsio.ConnHandler) { if c.conn == nil || c.reconnectHandlerRegistered { return } @@ -57,7 +57,7 @@ func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.Conn c.reconnectHandlerRegistered = true } -func (c *connection) RegisterDisconnectErrHandlerIfNotRegistered(handler natsio.ConnErrHandler) { +func (c *connection) RegisterDisconnectErrHandler(handler natsio.ConnErrHandler) { if c.conn == nil || c.disconnectErrHandlerRegistered { return } diff --git a/internal/connection/nats/interface.go b/internal/connection/nats/interface.go index cc37a499..0f7706ff 100644 --- a/internal/connection/nats/interface.go +++ b/internal/connection/nats/interface.go @@ -12,9 +12,9 @@ type Interface interface { // Disconnect disconnects the NATS connection. Disconnect() - // RegisterReconnectHandlerIfNotRegistered registers a ReconnectHandler only if it was not registered before. - RegisterReconnectHandlerIfNotRegistered(natsio.ConnHandler) + // RegisterReconnectHandler registers a ReconnectHandler only if it was not registered before. + RegisterReconnectHandler(natsio.ConnHandler) - // RegisterDisconnectErrHandlerIfNotRegistered registers a DisconnectErrHandler only if it was not registered before. - RegisterDisconnectErrHandlerIfNotRegistered(natsio.ConnErrHandler) + // RegisterDisconnectErrHandler registers a DisconnectErrHandler only if it was not registered before. + RegisterDisconnectErrHandler(natsio.ConnErrHandler) } diff --git a/internal/connection/nats/mocks/connection.go b/internal/connection/nats/mocks/connection.go index 4e86d368..7f162660 100644 --- a/internal/connection/nats/mocks/connection.go +++ b/internal/connection/nats/mocks/connection.go @@ -98,68 +98,68 @@ func (_c *Connection_Disconnect_Call) RunAndReturn(run func()) *Connection_Disco return _c } -// RegisterDisconnectErrHandlerIfNotRegistered provides a mock function with given fields: _a0 -func (_m *Connection) RegisterDisconnectErrHandlerIfNotRegistered(_a0 nats_go.ConnErrHandler) { +// RegisterDisconnectErrHandler provides a mock function with given fields: _a0 +func (_m *Connection) RegisterDisconnectErrHandler(_a0 nats_go.ConnErrHandler) { _m.Called(_a0) } -// Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterDisconnectErrHandlerIfNotRegistered' -type Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call struct { +// Connection_RegisterDisconnectErrHandler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterDisconnectErrHandler' +type Connection_RegisterDisconnectErrHandler_Call struct { *mock.Call } -// RegisterDisconnectErrHandlerIfNotRegistered is a helper method to define mock.On call +// RegisterDisconnectErrHandler is a helper method to define mock.On call // - _a0 nats_go.ConnErrHandler -func (_e *Connection_Expecter) RegisterDisconnectErrHandlerIfNotRegistered(_a0 interface{}) *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call { - return &Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call{Call: _e.mock.On("RegisterDisconnectErrHandlerIfNotRegistered", _a0)} +func (_e *Connection_Expecter) RegisterDisconnectErrHandler(_a0 interface{}) *Connection_RegisterDisconnectErrHandler_Call { + return &Connection_RegisterDisconnectErrHandler_Call{Call: _e.mock.On("RegisterDisconnectErrHandler", _a0)} } -func (_c *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call) Run(run func(_a0 nats_go.ConnErrHandler)) *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterDisconnectErrHandler_Call) Run(run func(_a0 nats_go.ConnErrHandler)) *Connection_RegisterDisconnectErrHandler_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(nats_go.ConnErrHandler)) }) return _c } -func (_c *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call) Return() *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterDisconnectErrHandler_Call) Return() *Connection_RegisterDisconnectErrHandler_Call { _c.Call.Return() return _c } -func (_c *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call) RunAndReturn(run func(nats_go.ConnErrHandler)) *Connection_RegisterDisconnectErrHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterDisconnectErrHandler_Call) RunAndReturn(run func(nats_go.ConnErrHandler)) *Connection_RegisterDisconnectErrHandler_Call { _c.Call.Return(run) return _c } -// RegisterReconnectHandlerIfNotRegistered provides a mock function with given fields: _a0 -func (_m *Connection) RegisterReconnectHandlerIfNotRegistered(_a0 nats_go.ConnHandler) { +// RegisterReconnectHandler provides a mock function with given fields: _a0 +func (_m *Connection) RegisterReconnectHandler(_a0 nats_go.ConnHandler) { _m.Called(_a0) } -// Connection_RegisterReconnectHandlerIfNotRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterReconnectHandlerIfNotRegistered' -type Connection_RegisterReconnectHandlerIfNotRegistered_Call struct { +// Connection_RegisterReconnectHandler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterReconnectHandler' +type Connection_RegisterReconnectHandler_Call struct { *mock.Call } -// RegisterReconnectHandlerIfNotRegistered is a helper method to define mock.On call +// RegisterReconnectHandler is a helper method to define mock.On call // - _a0 nats_go.ConnHandler -func (_e *Connection_Expecter) RegisterReconnectHandlerIfNotRegistered(_a0 interface{}) *Connection_RegisterReconnectHandlerIfNotRegistered_Call { - return &Connection_RegisterReconnectHandlerIfNotRegistered_Call{Call: _e.mock.On("RegisterReconnectHandlerIfNotRegistered", _a0)} +func (_e *Connection_Expecter) RegisterReconnectHandler(_a0 interface{}) *Connection_RegisterReconnectHandler_Call { + return &Connection_RegisterReconnectHandler_Call{Call: _e.mock.On("RegisterReconnectHandler", _a0)} } -func (_c *Connection_RegisterReconnectHandlerIfNotRegistered_Call) Run(run func(_a0 nats_go.ConnHandler)) *Connection_RegisterReconnectHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterReconnectHandler_Call) Run(run func(_a0 nats_go.ConnHandler)) *Connection_RegisterReconnectHandler_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(nats_go.ConnHandler)) }) return _c } -func (_c *Connection_RegisterReconnectHandlerIfNotRegistered_Call) Return() *Connection_RegisterReconnectHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterReconnectHandler_Call) Return() *Connection_RegisterReconnectHandler_Call { _c.Call.Return() return _c } -func (_c *Connection_RegisterReconnectHandlerIfNotRegistered_Call) RunAndReturn(run func(nats_go.ConnHandler)) *Connection_RegisterReconnectHandlerIfNotRegistered_Call { +func (_c *Connection_RegisterReconnectHandler_Call) RunAndReturn(run func(nats_go.ConnHandler)) *Connection_RegisterReconnectHandler_Call { _c.Call.Return(run) return _c } diff --git a/internal/controller/operator/eventing/controller.go b/internal/controller/operator/eventing/controller.go index 131e1afb..87fa3bea 100644 --- a/internal/controller/operator/eventing/controller.go +++ b/internal/controller/operator/eventing/controller.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "os" "strings" natsio "github.com/nats-io/nats.go" @@ -76,11 +75,10 @@ const ( ) var ( - ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage) - ErrUnsupportedBackedType = errors.New("backend type not supported") - ErrNatsModuleMissing = errors.New("NATS module has to be installed") - ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed") - ErrNilNATSConnectionBuilder = errors.New("connection builder for NATS backend is nil") + ErrSubscriptionExists = errors.New(SubscriptionExistsErrMessage) + ErrUnsupportedBackedType = errors.New("backend type not supported") + ErrNatsModuleMissing = errors.New("NATS module has to be installed") + ErrAPIGatewayModuleMissing = errors.New("API-Gateway module is needed for EventMesh backend. APIRules CRD is not installed") ) // Reconciler reconciles an Eventing object @@ -575,11 +573,6 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, } if connErr := r.connectToNATS(eventingCR); connErr != nil { - if errors.Is(connErr, ErrNilNATSConnectionBuilder) { - r.namedLogger().Error(connErr) - os.Exit(1) - } - if errors.Is(connErr, natsconnectionerrors.ErrCannotConnect) { return kctrl.Result{}, reconcile.TerminalError( r.syncStatusWithNATSErr(ctx, eventingCR, connErr, log), @@ -608,10 +601,6 @@ func (r *Reconciler) reconcileNATSBackend(ctx context.Context, // connectToNATS connects to NATS and returns an error if it failed. // It also registers handlers for reconnection and disconnection. func (r *Reconciler) connectToNATS(eventingCR *operatorv1alpha1.Eventing) error { - if r.natsConnectionBuilder == nil { - return ErrNilNATSConnectionBuilder - } - if r.natsConnection == nil { r.natsConnection = r.natsConnectionBuilder.Build() } @@ -622,13 +611,13 @@ func (r *Reconciler) connectToNATS(eventingCR *operatorv1alpha1.Eventing) error // At this point, it is safe to register handlers // because the internal nats connection is initialized. - r.natsConnection.RegisterReconnectHandlerIfNotRegistered( + r.natsConnection.RegisterReconnectHandler( func(_ *natsio.Conn) { r.namedLogger().Debug("Handle NATS reconnection") r.genericEvents <- event.GenericEvent{Object: eventingCR} }, ) - r.natsConnection.RegisterDisconnectErrHandlerIfNotRegistered( + r.natsConnection.RegisterDisconnectErrHandler( func(_ *natsio.Conn, _ error) { r.namedLogger().Debug("Handle NATS disconnection") r.genericEvents <- event.GenericEvent{Object: eventingCR} diff --git a/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go b/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go index 18419f0d..44bb9baf 100644 --- a/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go +++ b/internal/controller/operator/eventing/integrationtests/natsconnection/integration_test.go @@ -33,8 +33,8 @@ func Test_NATSConnection(t *testing.T) { conn := &natsconnectionmocks.Connection{} conn.On("Connect").Return(nil) conn.On("IsConnected").Return(true) - conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() - conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + conn.On("RegisterReconnectHandler", mock.Anything).Return() + conn.On("RegisterDisconnectErrHandler", mock.Anything).Return() return conn }, wantMatches: gomega.And( @@ -50,8 +50,8 @@ func Test_NATSConnection(t *testing.T) { conn := &natsconnectionmocks.Connection{} conn.On("Connect").Return(natsconnectionerrors.ErrCannotConnect) conn.On("IsConnected").Return(false) - conn.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() - conn.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + conn.On("RegisterReconnectHandler", mock.Anything).Return() + conn.On("RegisterDisconnectErrHandler", mock.Anything).Return() return conn }, wantMatches: gomega.And( diff --git a/test/utils/integration/integration.go b/test/utils/integration/integration.go index b7464575..1d88d601 100644 --- a/test/utils/integration/integration.go +++ b/test/utils/integration/integration.go @@ -199,8 +199,8 @@ func NewTestEnvironment(config TestEnvironmentConfig, connMock *natsconnectionmo connMock.On("Connect").Return(nil) connMock.On("IsConnected").Return(true) connMock.On("Disconnect").Return() - connMock.On("RegisterReconnectHandlerIfNotRegistered", mock.Anything).Return() - connMock.On("RegisterDisconnectErrHandlerIfNotRegistered", mock.Anything).Return() + connMock.On("RegisterReconnectHandler", mock.Anything).Return() + connMock.On("RegisterDisconnectErrHandler", mock.Anything).Return() } // create a new watcher