Skip to content

Commit

Permalink
Rely on the NATS cluster connection status when reporting the Eventin…
Browse files Browse the repository at this point in the history
…g CR status
  • Loading branch information
marcobebway committed Jan 15, 2024
1 parent 0df1b52 commit ad624bf
Show file tree
Hide file tree
Showing 21 changed files with 597 additions and 126 deletions.
8 changes: 8 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ linters-settings:
alias: istio$1$2
- pkg: github.com/nats-io/nats-server/v2/(\w+)$
alias: natsio$1
- pkg: github.com/nats-io/nats.go
alias: natsio
- pkg: github.com/kyma-project/eventing-manager/internal/controller/(\w+)$
alias: controller$1
- pkg: github.com/kyma-project/kyma/common/logging/logger
Expand All @@ -159,6 +161,12 @@ linters-settings:
alias: natsv1alpha1
- pkg: github.com/kyma-project/nats-manager/testutils
alias: natstestutils
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats
alias: natsconnection
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/errors
alias: natsconnectionerrors
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/mocks
alias: natsconnectionmocks
- pkg: github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/(\w+)$
alias: subscriptioncontroller$1
- pkg: github.com/kyma-project/eventing-manager/internal/controller/operator/eventing
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ RUN go mod download
# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY internal/controller/ internal/controller/
COPY internal/label/ internal/label/
COPY internal/ internal/
COPY pkg/ pkg/
COPY testing/ testing/
COPY options/ options/
Expand Down
4 changes: 4 additions & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (e *Eventing) SyncStatusActiveBackend() {
e.Status.ActiveBackend = e.Spec.Backend.Type
}

func (e *Eventing) IsPreviousBackendEmpty() bool {
return e.Status.ActiveBackend == ""
}

func (e *Eventing) IsSpecBackendTypeChanged() bool {
return e.Status.ActiveBackend != e.Spec.Backend.Type
}
31 changes: 31 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"flag"
"log"
"os"
"time"

"github.com/go-logr/zapr"
apigatewayv1beta1 "github.com/kyma-project/api-gateway/apis/gateway/v1beta1"
natsio "github.com/nats-io/nats.go"
kapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
kapixclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,6 +42,7 @@ import (
eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha1"
eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1"
natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats"
controllercache "github.com/kyma-project/eventing-manager/internal/controller/cache"
controllerclient "github.com/kyma-project/eventing-manager/internal/controller/client"
eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing"
Expand Down Expand Up @@ -182,6 +185,7 @@ func main() { //nolint:funlen // main function needs to initialize many object
Namespace: backendConfig.EventingCRNamespace,
},
},
getNATSConnectionBuilder(),
)

if err = (eventingReconciler).SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -230,3 +234,30 @@ func main() { //nolint:funlen // main function needs to initialize many object
}
syncLogger(ctrLogger)
}

func getNATSConnectionBuilder() natsconnection.Builder {
const (
// connectionURL is the NATS connection URL.
// It should be configured as part of https://github.com/kyma-project/eventing-manager/issues/272.
connectionURL = "nats://eventing-nats.kyma-system.svc.cluster.local:4222"

// connectionName is the name to identify the NATS connection.
connectionName = "Eventing Reconciler"
)

// The following constants are used to configure the NATS client re-connectivity.
// Please do not change these values to not change the intended behavior.
const (
maxReconnects = -1
retryOnFailedConnect = true
reconnectWait = time.Second
)

return natsconnection.NewBuilder(
connectionURL,
connectionName,
natsio.MaxReconnects(maxReconnects),
natsio.RetryOnFailedConnect(retryOnFailedConnect),
natsio.ReconnectWait(reconnectWait),
)
}
11 changes: 11 additions & 0 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_getNATSConnectionBuilder(t *testing.T) {
assert.NotNil(t, getNATSConnectionBuilder())
}
32 changes: 32 additions & 0 deletions internal/connection/nats/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package nats

import (
natsio "github.com/nats-io/nats.go"
)

type Builder interface {
Build() NATS
}

type ConnectionBuilder struct {
url string
opts []natsio.Option
}

func NewBuilder(url, name string, opts ...natsio.Option) *ConnectionBuilder {
opts = append(opts, natsio.Name(name)) // enforce configuring the connection name
return &ConnectionBuilder{
url: url,
opts: opts,
}
}

func (b *ConnectionBuilder) Build() NATS {
return &connection{
url: b.url,
conn: nil,
opts: b.opts,
reconnectHandlerRegistered: false,
disconnectErrHandlerRegistered: false,
}
}
66 changes: 66 additions & 0 deletions internal/connection/nats/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package nats

import (
natsio "github.com/nats-io/nats.go"

natsconnectionerrors "github.com/kyma-project/eventing-manager/internal/connection/nats/errors"
)

// compile-time check.
var _ NATS = &connection{}

// connection represents a NATS connection.
type connection struct {
url string
conn *natsio.Conn
opts []natsio.Option
reconnectHandlerRegistered bool
disconnectErrHandlerRegistered bool
}

func (c *connection) Connect() error {
if c.isConnected() {
return nil
}

var err error
if c.conn, err = natsio.Connect(c.url, c.opts...); err != nil {
return err
}

if c.isConnected() {
return nil
}

return natsconnectionerrors.ErrCannotConnect
}

func (c *connection) Disconnect() {
if c.conn == nil || c.conn.IsClosed() {
return
}
c.conn.Close()
}

func (c *connection) isConnected() bool {
if c.conn == nil {
return false
}
return c.conn.IsConnected()
}

func (c *connection) RegisterReconnectHandlerIfNotRegistered(handler natsio.ConnHandler) {
if c.conn == nil || c.reconnectHandlerRegistered {
return
}
c.conn.SetReconnectHandler(handler)
c.reconnectHandlerRegistered = true
}

func (c *connection) RegisterDisconnectErrHandlerIfNotRegistered(handler natsio.ConnErrHandler) {
if c.conn == nil || c.disconnectErrHandlerRegistered {
return
}
c.conn.SetDisconnectErrHandler(handler)
c.disconnectErrHandlerRegistered = true
}
12 changes: 12 additions & 0 deletions internal/connection/nats/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package errors

import (
"errors"
)

const (
errCannotConnect = "cannot connect to NATS"
)

// ErrCannotConnect represents an error when NATS connection failed.
var ErrCannotConnect = errors.New(errCannotConnect)
20 changes: 20 additions & 0 deletions internal/connection/nats/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package nats

import (
natsio "github.com/nats-io/nats.go"
)

//go:generate go run github.com/vektra/mockery/v2 --name=NATS --filename=nats.go
type NATS interface {
// Connect connects to NATS and returns an error if it cannot.
Connect() error

// Disconnect disconnects the active connection.
Disconnect()

// RegisterReconnectHandlerIfNotRegistered registers a ReconnectHandler only if not registered.
RegisterReconnectHandlerIfNotRegistered(natsio.ConnHandler)

// RegisterDisconnectErrHandlerIfNotRegistered registers a DisconnectErrHandler only if not registered.
RegisterDisconnectErrHandlerIfNotRegistered(natsio.ConnErrHandler)
}
17 changes: 17 additions & 0 deletions internal/connection/nats/mocks/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package mocks

import (
natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats"
)

type Builder struct {
mock *NATS
}

func NewBuilder(mock *NATS) *Builder {
return &Builder{mock: mock}
}

func (b *Builder) Build() natsconnection.NATS {
return b.mock
}
Loading

0 comments on commit ad624bf

Please sign in to comment.