Skip to content

Commit

Permalink
Rely on the NATS cluster connection status when reporting the Eventin…
Browse files Browse the repository at this point in the history
…g CR status (#331)

* Rely on the NATS cluster connection status when reporting the Eventing CR status

* Remove unsupported conditions

* Fix lint

* Address the review comments
  • Loading branch information
marcobebway authored Jan 29, 2024
1 parent bdbf3e4 commit f4a813e
Show file tree
Hide file tree
Showing 26 changed files with 915 additions and 123 deletions.
8 changes: 8 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ linters-settings:
alias: istio$1$2
- pkg: github.com/nats-io/nats-server/v2/(\w+)$
alias: natsio$1
- pkg: github.com/nats-io/nats.go
alias: natsio
- pkg: github.com/kyma-project/eventing-manager/internal/controller/(\w+)$
alias: controller$1
- pkg: github.com/kyma-project/kyma/common/logging/logger
Expand All @@ -159,6 +161,12 @@ linters-settings:
alias: natsv1alpha1
- pkg: github.com/kyma-project/nats-manager/testutils
alias: natstestutils
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats
alias: natsconnection
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/errors
alias: natsconnectionerrors
- pkg: github.com/kyma-project/eventing-manager/internal/connection/nats/mocks
alias: natsconnectionmocks
- pkg: github.com/kyma-project/eventing-manager/internal/controller/eventing/subscription/(\w+)$
alias: subscriptioncontroller$1
- pkg: github.com/kyma-project/eventing-manager/internal/controller/operator/eventing
Expand Down
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ RUN go mod download
# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY internal/controller/ internal/controller/
COPY internal/label/ internal/label/
COPY internal/ internal/
COPY pkg/ pkg/
COPY testing/ testing/
COPY options/ options/
Expand Down
15 changes: 15 additions & 0 deletions api/operator/v1alpha1/eventing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ const (
ConditionReasonEventMeshSubManagerStopFailed ConditionReason = "EventMeshSubscriptionManagerStopFailed"
)

// getSupportedConditionsTypes returns a map of supported condition types.
func getSupportedConditionsTypes() map[ConditionType]interface{} {
return map[ConditionType]interface{}{
ConditionBackendAvailable: nil,
ConditionPublisherProxyReady: nil,
ConditionWebhookReady: nil,
ConditionSubscriptionManagerReady: nil,
ConditionDeleted: nil,
}
}

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// Eventing is the Schema for the eventing API.
Expand Down Expand Up @@ -225,6 +236,10 @@ func (e *Eventing) SyncStatusActiveBackend() {
e.Status.ActiveBackend = e.Spec.Backend.Type
}

func (e *Eventing) IsPreviousBackendEmpty() bool {
return e.Status.ActiveBackend == ""
}

func (e *Eventing) IsSpecBackendTypeChanged() bool {
return e.Status.ActiveBackend != e.Spec.Backend.Type
}
12 changes: 12 additions & 0 deletions api/operator/v1alpha1/eventing_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,15 @@ func TestIsSpecBackendTypeChanged(t *testing.T) {
})
}
}

func Test_getSupportedConditionsTypes(t *testing.T) {
want := map[ConditionType]interface{}{
ConditionBackendAvailable: nil,
ConditionPublisherProxyReady: nil,
ConditionWebhookReady: nil,
ConditionSubscriptionManagerReady: nil,
ConditionDeleted: nil,
}
got := getSupportedConditionsTypes()
require.Equal(t, want, got)
}
16 changes: 16 additions & 0 deletions api/operator/v1alpha1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,19 @@ func (es *EventingStatus) ClearPublisherService() {
func (es *EventingStatus) SetPublisherService(name, namespace string) {
es.PublisherService = fmt.Sprintf("%s.%s", name, namespace)
}

// RemoveUnsupportedConditions removes unsupported conditions from the status and keeps only the supported ones.
func (es *EventingStatus) RemoveUnsupportedConditions() {
if len(es.Conditions) == 0 {
return
}

supportedConditionsTypes := getSupportedConditionsTypes()
supportedConditions := make([]kmetav1.Condition, 0, len(es.Conditions))
for _, condition := range es.Conditions {
if _, ok := supportedConditionsTypes[ConditionType(condition.Type)]; ok {
supportedConditions = append(supportedConditions, condition)
}
}
es.Conditions = supportedConditions
}
186 changes: 186 additions & 0 deletions api/operator/v1alpha1/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"testing"
"time"

"github.com/stretchr/testify/require"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -97,3 +98,188 @@ func TestSetPublisherService(t *testing.T) {
})
}
}

func TestRemoveUnsupportedConditions(t *testing.T) {
t.Parallel()

// given
var (
// supported conditions
backendAvailableCondition = kmetav1.Condition{
Type: "BackendAvailable",
Status: kmetav1.ConditionStatus("BackendAvailableStatus"),
ObservedGeneration: int64(1),
LastTransitionTime: kmetav1.Time{Time: time.Date(2001, 0o1, 0o1, 0o1, 0o1, 0o1, 0o00000001, time.UTC)},
Reason: "BackendAvailableReason",
Message: "BackendAvailableMessage",
}
publisherProxyReadyCondition = kmetav1.Condition{
Type: "PublisherProxyReady",
Status: kmetav1.ConditionStatus("PublisherProxyReadyStatus"),
ObservedGeneration: int64(2),
LastTransitionTime: kmetav1.Time{Time: time.Date(2002, 0o2, 0o2, 0o2, 0o2, 0o2, 0o00000002, time.UTC)},
Reason: "PublisherProxyReadyReason",
Message: "PublisherProxyReadyMessage",
}
webhookReadyCondition = kmetav1.Condition{
Type: "WebhookReady",
Status: kmetav1.ConditionStatus("WebhookReadyStatus"),
ObservedGeneration: int64(3),
LastTransitionTime: kmetav1.Time{Time: time.Date(2003, 0o3, 0o3, 0o3, 0o3, 0o3, 0o00000003, time.UTC)},
Reason: "WebhookReadyReason",
Message: "WebhookReadyMessage",
}
subscriptionManagerReadyCondition = kmetav1.Condition{
Type: "SubscriptionManagerReady",
Status: kmetav1.ConditionStatus("SubscriptionManagerReadyStatus"),
ObservedGeneration: int64(4),
LastTransitionTime: kmetav1.Time{Time: time.Date(2004, 0o4, 0o4, 0o4, 0o4, 0o4, 0o00000004, time.UTC)},
Reason: "SubscriptionManagerReadyReason",
Message: "SubscriptionManagerReadyMessage",
}
deletedCondition = kmetav1.Condition{
Type: "Deleted",
Status: kmetav1.ConditionStatus("DeletedStatus"),
ObservedGeneration: int64(5),
LastTransitionTime: kmetav1.Time{Time: time.Date(2005, 0o5, 0o5, 0o5, 0o5, 0o5, 0o00000005, time.UTC)},
Reason: "DeletedReason",
Message: "DeletedMessage",
}

// unsupported conditions
unsupportedTypeCondition1 = kmetav1.Condition{
Type: "Unsupported1",
Status: kmetav1.ConditionStatus("UnsupportedStatus1"),
ObservedGeneration: int64(-1),
LastTransitionTime: kmetav1.Time{Time: time.Date(2011, 11, 11, 11, 11, 11, 0o00000011, time.UTC)},
Reason: "UnsupportedReason1",
Message: "UnsupportedMessage1",
}
unsupportedTypeCondition2 = kmetav1.Condition{
Type: "Unsupported2",
Status: kmetav1.ConditionStatus("UnsupportedStatus2"),
ObservedGeneration: int64(-2),
LastTransitionTime: kmetav1.Time{Time: time.Date(2012, 12, 12, 12, 12, 12, 0o00000012, time.UTC)},
Reason: "UnsupportedReason2",
Message: "UnsupportedMessage2",
}
unsupportedTypeCondition3 = kmetav1.Condition{
Type: "Unsupported3",
Status: kmetav1.ConditionStatus("UnsupportedStatus3"),
ObservedGeneration: int64(-3),
LastTransitionTime: kmetav1.Time{Time: time.Date(2013, 13, 13, 13, 13, 13, 0o00000013, time.UTC)},
Reason: "UnsupportedReason3",
Message: "UnsupportedMessage3",
}
)

tests := []struct {
name string
givenStatus *EventingStatus
wantStatus *EventingStatus
}{
{
name: "given nil conditions",
givenStatus: &EventingStatus{
Conditions: nil,
},
wantStatus: &EventingStatus{
Conditions: nil,
},
},
{
name: "given empty conditions",
givenStatus: &EventingStatus{
Conditions: []kmetav1.Condition{},
},
wantStatus: &EventingStatus{
Conditions: []kmetav1.Condition{},
},
},
{
name: "given few supported condition",
givenStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
backendAvailableCondition,
subscriptionManagerReadyCondition,
},
},
wantStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
backendAvailableCondition,
subscriptionManagerReadyCondition,
},
},
},
{
name: "given all supported conditions",
givenStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
backendAvailableCondition,
publisherProxyReadyCondition,
webhookReadyCondition,
subscriptionManagerReadyCondition,
deletedCondition,
},
},
wantStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
backendAvailableCondition,
publisherProxyReadyCondition,
webhookReadyCondition,
subscriptionManagerReadyCondition,
deletedCondition,
},
},
},
{
name: "given all unsupported conditions",
givenStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
unsupportedTypeCondition1,
unsupportedTypeCondition2,
unsupportedTypeCondition3,
},
},
wantStatus: &EventingStatus{
Conditions: []kmetav1.Condition{},
},
},
{
name: "given supported and unsupported conditions",
givenStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
unsupportedTypeCondition1,
unsupportedTypeCondition2,
unsupportedTypeCondition3,
backendAvailableCondition,
publisherProxyReadyCondition,
webhookReadyCondition,
subscriptionManagerReadyCondition,
deletedCondition,
},
},
wantStatus: &EventingStatus{
Conditions: []kmetav1.Condition{
backendAvailableCondition,
publisherProxyReadyCondition,
webhookReadyCondition,
subscriptionManagerReadyCondition,
deletedCondition,
},
},
},
}
for _, tt := range tests {
ttc := tt
t.Run(ttc.name, func(t *testing.T) {
t.Parallel()

// when
ttc.givenStatus.RemoveUnsupportedConditions()

// then
require.Equal(t, ttc.wantStatus, ttc.givenStatus)
})
}
}
39 changes: 39 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"flag"
"log"
"os"
"time"

"github.com/go-logr/zapr"
apigatewayv1beta1 "github.com/kyma-project/api-gateway/apis/gateway/v1beta1"
natsio "github.com/nats-io/nats.go"
kapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
kapixclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,6 +42,7 @@ import (
eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha1"
eventingv1alpha2 "github.com/kyma-project/eventing-manager/api/eventing/v1alpha2"
operatorv1alpha1 "github.com/kyma-project/eventing-manager/api/operator/v1alpha1"
natsconnection "github.com/kyma-project/eventing-manager/internal/connection/nats"
controllercache "github.com/kyma-project/eventing-manager/internal/controller/cache"
controllerclient "github.com/kyma-project/eventing-manager/internal/controller/client"
eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/operator/eventing"
Expand Down Expand Up @@ -164,6 +167,14 @@ func main() { //nolint:funlen // main function needs to initialize many object
ctrLogger,
)

// init NATS connection builder
natsConnectionBuilder, err := initNATSConnectionBuilder()
if err != nil {
setupLog.Error(err, "failed to get a NATS connection builder")
syncLogger(ctrLogger)
os.Exit(1)
}

// create Eventing reconciler instance
eventingReconciler := eventingcontroller.NewReconciler(
k8sClient,
Expand All @@ -182,6 +193,7 @@ func main() { //nolint:funlen // main function needs to initialize many object
Namespace: backendConfig.EventingCRNamespace,
},
},
natsConnectionBuilder,
)

if err = (eventingReconciler).SetupWithManager(mgr); err != nil {
Expand Down Expand Up @@ -230,3 +242,30 @@ func main() { //nolint:funlen // main function needs to initialize many object
}
syncLogger(ctrLogger)
}

func initNATSConnectionBuilder() (natsconnection.Builder, error) {
const (
// connectionURL is the NATS connection URL.
// It should be configured as part of https://github.com/kyma-project/eventing-manager/issues/272.
connectionURL = "nats://eventing-nats.kyma-system.svc.cluster.local:4222"

// connectionName is the name to identify the NATS connection.
connectionName = "Eventing Reconciler"
)

// The following constants are used to configure the NATS client re-connectivity.
// Please do not change these values to not change the intended behavior.
const (
maxReconnects = -1
retryOnFailedConnect = true
reconnectWait = time.Second
)

return natsconnection.NewBuilder(
connectionURL,
connectionName,
natsio.MaxReconnects(maxReconnects),
natsio.RetryOnFailedConnect(retryOnFailedConnect),
natsio.ReconnectWait(reconnectWait),
)
}
13 changes: 13 additions & 0 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_initNATSConnectionBuilder(t *testing.T) {
natsConnectionBuilder, err := initNATSConnectionBuilder()
require.NoError(t, err)
require.NotNil(t, natsConnectionBuilder)
}
Loading

0 comments on commit f4a813e

Please sign in to comment.