Skip to content

Commit

Permalink
Ignore reconciling the EPP deployment if the only diff is in the spec…
Browse files Browse the repository at this point in the history
… replicas (#227)

* Ignore reconciling the EPP deployment if the only diff is in the spec replicas

* Fix tests

* Fix lint issues

* Set desired EPP deployment replicas to nil if there is not HPA

* Add integration test

* Ignore setting HPA

* fix equality tests for EPP

* moarcrap

* Update module docs (#230)

* Update module docs

* Further improvements

* Remove parallel test run (#224)

* Change spelling.

* Remove parallel execution from test `Update Eventing CR`.

* Remove all parallel running.

* Eventing Manager Module Documentation (#226)

* Add the docs for contributor

* User documentation

* Add automatically generated CRD documentation

* Small improvements

* Replace link

* Apply review comments

* Add review comments

* Fix typo

* Add review comments

* Bump golang.org/x/oauth2 from 0.13.0 to 0.14.0 (#232)

Bumps [golang.org/x/oauth2](https://github.com/golang/oauth2) from 0.13.0 to 0.14.0.
- [Commits](golang/oauth2@v0.13.0...v0.14.0)

---
updated-dependencies:
- dependency-name: golang.org/x/oauth2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump github.com/vektra/mockery/v2 from 2.36.0 to 2.36.1 (#219)

Bumps [github.com/vektra/mockery/v2](https://github.com/vektra/mockery) from 2.36.0 to 2.36.1.
- [Release notes](https://github.com/vektra/mockery/releases)
- [Changelog](https://github.com/vektra/mockery/blob/master/docs/changelog.md)
- [Commits](vektra/mockery@v2.36.0...v2.36.1)

---
updated-dependencies:
- dependency-name: github.com/vektra/mockery/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump github.com/onsi/gomega from 1.29.0 to 1.30.0 (#229)

Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.29.0 to 1.30.0.
- [Release notes](https://github.com/onsi/gomega/releases)
- [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md)
- [Commits](onsi/gomega@v1.29.0...v1.30.0)

---
updated-dependencies:
- dependency-name: github.com/onsi/gomega
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Closing NATS connection on subscription manager shutdown and disable parallel reconciliations (#233)

* Closing NATS connection on subscription manager shutdown

* added logs

* set concurrent reconciliations to 0

* updated

* revert doc change

* fix tests by reverting most of the previous changes and introducing
predicates for all watched resources

* change predicates

introduce some logging

* skip on create

* ignore updates on cr /crb

* cleanup predicate code

* add more info to predicates

* change loglevel for predicates

* fix broken build

* more cleanup

* debug

* debug

* debug

* debug

* Address review comments

- Refactoring
- Add unit tests

* fix imports

* fix tests

* fix tests

* fix equality for envs

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Korbinian Stoemmer <[email protected]>
Co-authored-by: Iwona Langer <[email protected]>
Co-authored-by: Friedrich <[email protected]>
Co-authored-by: Carina Kothe <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Muhammad Faizan <[email protected]>
  • Loading branch information
7 people authored Nov 14, 2023
1 parent f0d0636 commit f992d70
Show file tree
Hide file tree
Showing 14 changed files with 1,750 additions and 88 deletions.
99 changes: 79 additions & 20 deletions internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import (
"context"
"errors"
"fmt"

"github.com/kyma-project/eventing-manager/pkg/watcher"

"k8s.io/client-go/dynamic"
"strings"

"go.uber.org/zap"
v1 "k8s.io/api/apps/v1"
Expand All @@ -34,8 +31,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -50,8 +49,10 @@ import (
"github.com/kyma-project/eventing-manager/pkg/eventing"
"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/kyma-project/eventing-manager/pkg/logger"
"github.com/kyma-project/eventing-manager/pkg/object"
"github.com/kyma-project/eventing-manager/pkg/subscriptionmanager"
"github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager"
"github.com/kyma-project/eventing-manager/pkg/watcher"
)

const (
Expand Down Expand Up @@ -169,36 +170,36 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

// copy the object, so we don't modify the source object
eventing := currentEventing.DeepCopy()
eventingCR := currentEventing.DeepCopy()

// watch cluster-scoped resources, such as clusterrole and clusterrolebinding.
// watch cluster-scoped resources, such as ClusterRole and ClusterRoleBinding.
if !r.clusterScopedResourcesWatched {
if err := r.watchResource(&rbacv1.ClusterRole{}, eventing); err != nil {
if err := r.watchResource(&rbacv1.ClusterRole{}, eventingCR); err != nil {
return ctrl.Result{}, err
}
if err := r.watchResource(&rbacv1.ClusterRoleBinding{}, eventing); err != nil {
if err := r.watchResource(&rbacv1.ClusterRoleBinding{}, eventingCR); err != nil {
return ctrl.Result{}, err
}
r.clusterScopedResourcesWatched = true
}

// logger with eventing details
log := r.loggerWithEventing(eventing)
log := r.loggerWithEventing(eventingCR)

// check if eventing is in deletion state
if !eventing.DeletionTimestamp.IsZero() {
return r.handleEventingDeletion(ctx, eventing, log)
if !eventingCR.DeletionTimestamp.IsZero() {
return r.handleEventingDeletion(ctx, eventingCR, log)
}

// check if the Eventing CR is allowed to be created.
if r.allowedEventingCR != nil {
if result, err := r.handleEventingCRAllowedCheck(ctx, eventing, log); !result || err != nil {
if result, err := r.handleEventingCRAllowedCheck(ctx, eventingCR, log); !result || err != nil {
return ctrl.Result{}, err
}
}

// handle reconciliation
return r.handleEventingReconcile(ctx, eventing, log)
return r.handleEventingReconcile(ctx, eventingCR, log)
}

// handleEventingCRAllowedCheck checks if Eventing CR is allowed to be created or not.
Expand All @@ -221,17 +222,76 @@ func (r *Reconciler) handleEventingCRAllowedCheck(ctx context.Context, eventing
return false, r.syncEventingStatus(ctx, eventing, log)
}

func (r *Reconciler) logEventForResource(name, namespace, resourceType, eventType string, enqueue bool) {
r.namedLogger().
With("Name", name).
With("Namespace", namespace).
With("ResourceType", resourceType).
With("EventType", eventType).
With("Enqueue", enqueue).
Debug("Event received")
}

func (r *Reconciler) SkipEnqueueOnCreate() func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
// always skip enqueue
return false
}
}

func (r *Reconciler) SkipEnqueueOnUpdateAfterSemanticCompare(resourceType, nameFilter string) func(event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
// skip enqueue if the resource is not related to Eventing
if !strings.Contains(e.ObjectOld.GetName(), nameFilter) {
return false
}

// enqueue only if the resource changed
enqueue := !object.Semantic.DeepEqual(e.ObjectOld, e.ObjectNew)
r.logEventForResource(e.ObjectOld.GetName(), e.ObjectOld.GetNamespace(), resourceType, "Update", enqueue)
return enqueue
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
r.ctrlManager = mgr

var err error
r.controller, err = ctrl.NewControllerManagedBy(mgr).
For(&eventingv1alpha1.Eventing{}).
Owns(&v1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&corev1.ServiceAccount{}).
Owns(&autoscalingv2.HorizontalPodAutoscaler{}).
Owns(&v1.Deployment{},
builder.WithPredicates(
predicate.Funcs{
CreateFunc: r.SkipEnqueueOnCreate(),
UpdateFunc: r.SkipEnqueueOnUpdateAfterSemanticCompare("Deployment", "eventing"),
},
),
).
Owns(&corev1.Service{},
builder.WithPredicates(
predicate.Funcs{
CreateFunc: r.SkipEnqueueOnCreate(),
UpdateFunc: r.SkipEnqueueOnUpdateAfterSemanticCompare("Service", "eventing"),
},
),
).
Owns(&corev1.ServiceAccount{},
builder.WithPredicates(
predicate.Funcs{
CreateFunc: r.SkipEnqueueOnCreate(),
UpdateFunc: r.SkipEnqueueOnUpdateAfterSemanticCompare("SA", "eventing"),
},
),
).
Owns(&autoscalingv2.HorizontalPodAutoscaler{},
builder.WithPredicates(
predicate.Funcs{
CreateFunc: r.SkipEnqueueOnCreate(),
UpdateFunc: r.SkipEnqueueOnUpdateAfterSemanticCompare("HPA", "eventing"),
},
),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: 0,
}).
Expand All @@ -255,9 +315,8 @@ func (r *Reconciler) watchResource(kind client.Object, eventing *eventingv1alpha
predicate.ResourceVersionChangedPredicate{},
predicate.Funcs{
// don't reconcile for create events
CreateFunc: func(e event.CreateEvent) bool {
return false
},
CreateFunc: r.SkipEnqueueOnCreate(),
UpdateFunc: r.SkipEnqueueOnUpdateAfterSemanticCompare("CR/CRB", "eventing"),
},
)
return err
Expand Down
11 changes: 6 additions & 5 deletions internal/controller/eventing/eventmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"fmt"
"os"

"github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/env"
"github.com/kyma-project/eventing-manager/pkg/eventing"
subscriptionmanager "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager"
"github.com/kyma-project/eventing-manager/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kyma-project/eventing-manager/api/v1alpha1"
"github.com/kyma-project/eventing-manager/pkg/env"
"github.com/kyma-project/eventing-manager/pkg/eventing"
subscriptionmanager "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager"
"github.com/kyma-project/eventing-manager/pkg/utils"
)

const (
Expand Down
15 changes: 8 additions & 7 deletions internal/controller/eventing/eventmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ import (
"github.com/kyma-project/eventing-manager/pkg/k8s"
k8smocks "github.com/kyma-project/eventing-manager/pkg/k8s/mocks"

"github.com/kyma-project/eventing-manager/pkg/env"
managermocks "github.com/kyma-project/eventing-manager/pkg/eventing/mocks"
"github.com/kyma-project/eventing-manager/pkg/logger"
submanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager/mocks"
subscriptionmanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/mocks"
"github.com/kyma-project/eventing-manager/test/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/kyma-project/eventing-manager/pkg/env"
managermocks "github.com/kyma-project/eventing-manager/pkg/eventing/mocks"
"github.com/kyma-project/eventing-manager/pkg/logger"
submanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager/mocks"
subscriptionmanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/mocks"
"github.com/kyma-project/eventing-manager/test/utils"
)

const (
Expand Down Expand Up @@ -918,7 +919,7 @@ func Test_SyncPublisherProxySecret(t *testing.T) {
},
// patchApply error
{
name: "pathApply should fail",
name: "patchApply should fail",
givenSecret: utils.NewEventMeshSecret("valid", "test-namespace"),
mockKubeClient: func() *k8smocks.Client {
kubeClient := new(k8smocks.Client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import (
"os"
"testing"

"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/onsi/gomega"
gomegatypes "github.com/onsi/gomega/types"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/apps/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventinv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2"
natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
natstestutils "github.com/kyma-project/nats-manager/testutils"

eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/eventing"
"github.com/kyma-project/eventing-manager/pkg/eventing"
"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/kyma-project/eventing-manager/test/matchers"
"github.com/kyma-project/eventing-manager/test/utils"
testutils "github.com/kyma-project/eventing-manager/test/utils/integration"
natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
natstestutils "github.com/kyma-project/nats-manager/testutils"
"github.com/onsi/gomega"
gomegatypes "github.com/onsi/gomega/types"
"github.com/stretchr/testify/require"

eventinv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2"
v1 "k8s.io/api/apps/v1"
)

const (
Expand Down Expand Up @@ -243,7 +243,6 @@ func Test_UpdateEventingCR(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// given
eventingcontroller.IsDeploymentReady = func(deployment *v1.Deployment) bool {
return true
Expand Down Expand Up @@ -293,6 +292,88 @@ func Test_UpdateEventingCR(t *testing.T) {
}
}

func Test_ReconcileSameEventingCR(t *testing.T) {
t.Parallel()

////
// given
////
eventingcontroller.IsDeploymentReady = func(deployment *v1.Deployment) bool { return true }

eventingCR := utils.NewEventingCR(
utils.WithEventingCRMinimal(),
utils.WithEventingStreamData("Memory", "1M", 1, 1),
utils.WithEventingPublisherData(1, 1, "199m", "99Mi", "399m", "199Mi"),
)

namespace := eventingCR.GetNamespace()

natsCR := natstestutils.NewNATSCR(
natstestutils.WithNATSCRNamespace(namespace),
natstestutils.WithNATSCRDefaults(),
natstestutils.WithNATSStateReady(),
)

testEnvironment.EnsureNamespaceCreation(t, namespace)
testEnvironment.EnsureK8sResourceCreated(t, natsCR)
testEnvironment.EnsureNATSResourceStateReady(t, natsCR)
testEnvironment.EnsureK8sResourceCreated(t, eventingCR)

eppDeploymentName := eventing.GetPublisherDeploymentName(*eventingCR)
testEnvironment.EnsureDeploymentExists(t, eppDeploymentName, namespace)
testEnvironment.EnsureHPAExists(t, eppDeploymentName, namespace)

eppDeployment, err := testEnvironment.GetDeploymentFromK8s(eppDeploymentName, namespace)
require.NoError(t, err)
require.NotNil(t, eppDeployment)

defer func() {
testEnvironment.EnsureEventingResourceDeletion(t, eventingCR.Name, namespace)
if !*testEnvironment.EnvTestInstance.UseExistingCluster {
testEnvironment.EnsureDeploymentDeletion(t, eppDeploymentName, namespace)
}
testEnvironment.EnsureK8sResourceDeleted(t, natsCR)
testEnvironment.EnsureNamespaceDeleted(t, namespace)
}()

// Ensure reconciling the same Eventing CR multiple times does not update the EPP deployment.
const runs = 3
var resourceVersionBefore = eppDeployment.ObjectMeta.ResourceVersion
for r := 0; r < runs; r++ {
////
// when
////
runId := fmt.Sprintf("run-%d", r)

eventingCR, err = testEnvironment.GetEventingFromK8s(eventingCR.Name, namespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)

eventingCR = eventingCR.DeepCopy()
eventingCR.ObjectMeta.Labels = map[string]string{"reconcile": runId} // force new reconciliation
testEnvironment.EnsureK8sResourceUpdated(t, eventingCR)

eventingCR, err = testEnvironment.GetEventingFromK8s(eventingCR.Name, namespace)
require.NoError(t, err)
require.NotNil(t, eventingCR)
require.Equal(t, eventingCR.ObjectMeta.Labels["reconcile"], runId)

////
// then
////
testEnvironment.EnsureEventingSpecPublisherReflected(t, eventingCR)
testEnvironment.EnsureEventingReplicasReflected(t, eventingCR)
testEnvironment.EnsureDeploymentOwnerReferenceSet(t, eventingCR)

eppDeployment, err = testEnvironment.GetDeploymentFromK8s(eppDeploymentName, namespace)
require.NoError(t, err)
require.NotNil(t, eppDeployment)

resourceVersionAfter := eppDeployment.ObjectMeta.ResourceVersion
require.Equal(t, resourceVersionBefore, resourceVersionAfter)
}
}

// Test_WatcherEventingCRK8sObjects tests that deleting the k8s objects deployed by Eventing CR
// should trigger reconciliation.
func Test_WatcherEventingCRK8sObjects(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apigatewayv1beta1 "github.com/kyma-incubator/api-gateway/api/v1beta1"
eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2"

eventMeshtypes "github.com/kyma-project/eventing-manager/pkg/ems/api/events/types"
"github.com/kyma-project/eventing-manager/pkg/object"
reconcilertesting "github.com/kyma-project/eventing-manager/testing"
eventmeshsubmatchers "github.com/kyma-project/eventing-manager/testing/eventmeshsub"
eventingv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2"
)

const (
Expand Down Expand Up @@ -132,7 +133,6 @@ func Test_ValidationWebhook(t *testing.T) {
}

func Test_CreateSubscription(t *testing.T) {
t.Parallel()
var testCases = []struct {
name string
givenSubscriptionFunc func(namespace string) *eventingv1alpha2.Subscription
Expand Down Expand Up @@ -304,7 +304,6 @@ func Test_CreateSubscription(t *testing.T) {
for _, testCase := range testCases {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
g := gomega.NewGomegaWithT(t)
ctx := context.Background()

Expand Down
Loading

0 comments on commit f992d70

Please sign in to comment.