Skip to content

Commit

Permalink
Migrate codebase from EC to eventing-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
mfaizanse committed Oct 10, 2023
1 parent 099bff5 commit a2ea3a3
Show file tree
Hide file tree
Showing 59 changed files with 7,543 additions and 192 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
eventingcontroller "github.com/kyma-project/eventing-manager/internal/controller/eventing"
"github.com/kyma-project/eventing-manager/options"
backendmetrics "github.com/kyma-project/eventing-manager/pkg/backend/metrics"
"github.com/kyma-project/kyma/components/eventing-controller/logger"
"github.com/kyma-project/eventing-manager/pkg/logger"
apiclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down
50 changes: 50 additions & 0 deletions internal/controller/errors/skip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package errors

import "golang.org/x/xerrors"

// skippable is an interface of error types that can be skipped.
type skippable interface {
skip()
}

// IsSkippable returns true if the error is skippable, otherwise returns false.
func IsSkippable(err error) bool {
if err == nil {
return true
}
_, ok := err.(skippable)
return ok
}

// skippableError is an implementation of a skippable reconcile error.
type skippableError struct {
err error
}

var (
// Perform compile-time checks.
_ error = &skippableError{}
_ skippable = &skippableError{}
_ xerrors.Wrapper = &skippableError{}
)

// NewSkippable returns a new skippable error.
func NewSkippable(err error) error {
return &skippableError{err: err}
}

// Error implements the error interface.
func (e *skippableError) Error() string {
if e.err == nil {
return ""
}
return e.err.Error()
}

// skip implements the skippable interface.
func (*skippableError) skip() {}

// Unwrap implements the Wrapper interface.
func (e *skippableError) Unwrap() error {
return e.err
}
68 changes: 68 additions & 0 deletions internal/controller/errors/skip_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package errors_test

import (
"errors"
"fmt"
"testing"

ctrlserrors "github.com/kyma-project/eventing-manager/internal/controller/errors"
)

func Test_NewSkippable(t *testing.T) {
testCases := []struct {
error error
}{
{error: ctrlserrors.NewSkippable(nil)},
{error: ctrlserrors.NewSkippable(ctrlserrors.NewSkippable(nil))},
{error: ctrlserrors.NewSkippable(fmt.Errorf("some error"))},
{error: ctrlserrors.NewSkippable(ctrlserrors.NewSkippable(fmt.Errorf("some error")))},
}

for _, tc := range testCases {
skippableErr := ctrlserrors.NewSkippable(tc.error)
if skippableErr == nil {
t.Errorf("test NewSkippable retuned nil error")
continue
}
if err := errors.Unwrap(skippableErr); tc.error != err {
t.Errorf("test NewSkippable failed, want: %#v but got: %#v", tc.error, err)
}
}
}

func Test_IsSkippable(t *testing.T) {
testCases := []struct {
name string
givenError error
wantSkippable bool
}{
{
name: "nil error, should be skipped",
givenError: nil,
wantSkippable: true,
},
{
name: "skippable error, should be skipped",
givenError: ctrlserrors.NewSkippable(fmt.Errorf("some errore")),
wantSkippable: true,
},
{
name: "not-skippable error, should not be skipped",
givenError: fmt.Errorf("some error"),
wantSkippable: false,
},
{
name: "not-skippable error which wraps a skippable error, should not be skipped",
givenError: fmt.Errorf("some error %w", ctrlserrors.NewSkippable(fmt.Errorf("some error"))),
wantSkippable: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if gotSkippable := ctrlserrors.IsSkippable(tc.givenError); tc.wantSkippable != gotSkippable {
t.Errorf("test skippable failed, want: %v but got: %v", tc.wantSkippable, gotSkippable)
}
})
}
}
12 changes: 5 additions & 7 deletions internal/controller/eventing/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ import (
"github.com/kyma-project/eventing-manager/pkg/env"
"github.com/kyma-project/eventing-manager/pkg/eventing"
"github.com/kyma-project/eventing-manager/pkg/k8s"
"github.com/kyma-project/eventing-manager/pkg/logger"
"github.com/kyma-project/eventing-manager/pkg/subscriptionmanager"
"github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager"
"github.com/kyma-project/kyma/components/eventing-controller/logger"
"github.com/kyma-project/kyma/components/eventing-controller/pkg/deployment"
ecsubscriptionmanager "github.com/kyma-project/kyma/components/eventing-controller/pkg/subscriptionmanager"
"go.uber.org/zap"
v1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
Expand All @@ -55,7 +53,7 @@ const (
NatsServerNotAvailableMsg = "NATS server is not available"
natsClientPort = 4222

AppLabelValue = deployment.PublisherName
AppLabelValue = eventing.PublisherName
PublisherSecretEMSHostKey = "ems-publish-host"

TokenEndpointFormat = "%s?grant_type=%s&response_type=token"
Expand All @@ -79,7 +77,7 @@ type Reconciler struct {
recorder record.EventRecorder
subManagerFactory subscriptionmanager.ManagerFactory
natsSubManager manager.Manager
eventMeshSubManager ecsubscriptionmanager.Manager
eventMeshSubManager manager.Manager
isNATSSubManagerStarted bool
isEventMeshSubManagerStarted bool
natsConfigHandler NatsConfigHandler
Expand Down Expand Up @@ -453,11 +451,11 @@ func (r *Reconciler) reconcileEventMeshBackend(ctx context.Context, eventing *ev
return r.handleEventingState(ctx, deployment, eventing, log)
}

func (r *Reconciler) GetEventMeshSubManager() ecsubscriptionmanager.Manager {
func (r *Reconciler) GetEventMeshSubManager() manager.Manager {
return r.eventMeshSubManager
}

func (r *Reconciler) SetEventMeshSubManager(eventMeshSubManager ecsubscriptionmanager.Manager) {
func (r *Reconciler) SetEventMeshSubManager(eventMeshSubManager manager.Manager) {
r.eventMeshSubManager = eventMeshSubManager
}

Expand Down
23 changes: 11 additions & 12 deletions internal/controller/eventing/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1"
submanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager/mocks"
ecsubmanagermocks "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/mocks/ec"
testutils "github.com/kyma-project/eventing-manager/test/utils"
natsv1alpha1 "github.com/kyma-project/nats-manager/api/v1alpha1"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,7 +112,7 @@ func Test_handleBackendSwitching(t *testing.T) {
name string
givenEventing *eventingv1alpha1.Eventing
givenNATSSubManagerMock func() *submanagermocks.Manager
givenEventMeshSubManagerMock func() *ecsubmanagermocks.Manager
givenEventMeshSubManagerMock func() *submanagermocks.Manager
wantNATSStopped bool
wantEventMeshStopped bool
wantEventingState string
Expand All @@ -131,8 +130,8 @@ func Test_handleBackendSwitching(t *testing.T) {
givenNATSSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
givenEventMeshSubManagerMock: func() *ecsubmanagermocks.Manager {
return new(ecsubmanagermocks.Manager)
givenEventMeshSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
wantError: nil,
wantEventingState: eventingv1alpha1.StateReady,
Expand All @@ -153,8 +152,8 @@ func Test_handleBackendSwitching(t *testing.T) {
managerMock.On("Stop", true).Return(nil).Once()
return managerMock
},
givenEventMeshSubManagerMock: func() *ecsubmanagermocks.Manager {
return new(ecsubmanagermocks.Manager)
givenEventMeshSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
wantError: nil,
wantEventingState: eventingv1alpha1.StateProcessing,
Expand All @@ -175,8 +174,8 @@ func Test_handleBackendSwitching(t *testing.T) {
managerMock.On("Stop", true).Return(errors.New("failed to stop")).Once()
return managerMock
},
givenEventMeshSubManagerMock: func() *ecsubmanagermocks.Manager {
return new(ecsubmanagermocks.Manager)
givenEventMeshSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
wantError: errors.New("failed to stop"),
wantEventingState: eventingv1alpha1.StateReady,
Expand All @@ -195,8 +194,8 @@ func Test_handleBackendSwitching(t *testing.T) {
givenNATSSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
givenEventMeshSubManagerMock: func() *ecsubmanagermocks.Manager {
managerMock := new(ecsubmanagermocks.Manager)
givenEventMeshSubManagerMock: func() *submanagermocks.Manager {
managerMock := new(submanagermocks.Manager)
managerMock.On("Stop", true).Return(nil).Once()
return managerMock
},
Expand All @@ -217,8 +216,8 @@ func Test_handleBackendSwitching(t *testing.T) {
givenNATSSubManagerMock: func() *submanagermocks.Manager {
return new(submanagermocks.Manager)
},
givenEventMeshSubManagerMock: func() *ecsubmanagermocks.Manager {
managerMock := new(ecsubmanagermocks.Manager)
givenEventMeshSubManagerMock: func() *submanagermocks.Manager {
managerMock := new(submanagermocks.Manager)
managerMock.On("Stop", true).Return(errors.New("failed to stop")).Once()
return managerMock
},
Expand Down
47 changes: 24 additions & 23 deletions internal/controller/eventing/eventmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
"fmt"
"os"

"github.com/kyma-project/eventing-manager/pkg/eventing"

subscriptionmanager "github.com/kyma-project/eventing-manager/pkg/subscriptionmanager/manager"

"github.com/kyma-project/eventing-manager/api/v1alpha1"
ecsubscriptionmanager "github.com/kyma-project/kyma/components/eventing-controller/pkg/subscriptionmanager"
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kyma-project/kyma/components/eventing-controller/pkg/deployment"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -79,13 +81,12 @@ func (r *Reconciler) reconcileEventMeshSubManager(ctx context.Context, eventing
return nil
}

defaultSubsConfig := r.eventingManager.GetBackendConfig().
DefaultSubscriptionConfig.ToECENVDefaultSubscriptionConfig()
eventMeshSubMgrParams := ecsubscriptionmanager.Params{
ecsubscriptionmanager.ParamNameClientID: r.oauth2credentials.clientID,
ecsubscriptionmanager.ParamNameClientSecret: r.oauth2credentials.clientSecret,
ecsubscriptionmanager.ParamNameTokenURL: r.oauth2credentials.tokenURL,
ecsubscriptionmanager.ParamNameCertsURL: r.oauth2credentials.certsURL,
defaultSubsConfig := r.eventingManager.GetBackendConfig().DefaultSubscriptionConfig
eventMeshSubMgrParams := subscriptionmanager.Params{
subscriptionmanager.ParamNameClientID: r.oauth2credentials.clientID,
subscriptionmanager.ParamNameClientSecret: r.oauth2credentials.clientSecret,
subscriptionmanager.ParamNameTokenURL: r.oauth2credentials.tokenURL,
subscriptionmanager.ParamNameCertsURL: r.oauth2credentials.certsURL,
}
if err = r.eventMeshSubManager.Start(defaultSubsConfig, eventMeshSubMgrParams); err != nil {
return err
Expand Down Expand Up @@ -240,10 +241,10 @@ func newSecret(name, namespace string) *corev1.Secret {
}

func getSecretForPublisher(eventMeshSecret *corev1.Secret) (*corev1.Secret, error) {
secret := newSecret(deployment.PublisherName, eventMeshSecret.Namespace)
secret := newSecret(eventing.PublisherName, eventMeshSecret.Namespace)

secret.Labels = map[string]string{
deployment.AppLabelKey: AppLabelValue,
eventing.AppLabelKey: AppLabelValue,
}

if _, ok := eventMeshSecret.Data["messaging"]; !ok {
Expand Down Expand Up @@ -290,42 +291,42 @@ func getSecretForPublisher(eventMeshSecret *corev1.Secret) (*corev1.Secret, erro

func getSecretStringData(clientID, clientSecret, tokenEndpoint, grantType, publishURL, namespace string) map[string]string {
return map[string]string{
deployment.PublisherSecretClientIDKey: clientID,
deployment.PublisherSecretClientSecretKey: clientSecret,
deployment.PublisherSecretTokenEndpointKey: fmt.Sprintf(TokenEndpointFormat, tokenEndpoint, grantType),
deployment.PublisherSecretEMSURLKey: fmt.Sprintf("%s%s", publishURL, EventMeshPublishEndpointForPublisher),
PublisherSecretEMSHostKey: publishURL,
deployment.PublisherSecretBEBNamespaceKey: namespace,
eventing.PublisherSecretClientIDKey: clientID,
eventing.PublisherSecretClientSecretKey: clientSecret,
eventing.PublisherSecretTokenEndpointKey: fmt.Sprintf(TokenEndpointFormat, tokenEndpoint, grantType),
eventing.PublisherSecretEMSURLKey: fmt.Sprintf("%s%s", publishURL, EventMeshPublishEndpointForPublisher),
PublisherSecretEMSHostKey: publishURL,
eventing.PublisherSecretBEBNamespaceKey: namespace,
}
}

func setUpEnvironmentForEventMesh(secret *corev1.Secret, eventing *v1alpha1.Eventing) error {
func setUpEnvironmentForEventMesh(secret *corev1.Secret, eventingCR *v1alpha1.Eventing) error {
err := os.Setenv("BEB_API_URL", fmt.Sprintf("%s%s", string(secret.Data[PublisherSecretEMSHostKey]), EventMeshPublishEndpointForSubscriber))
if err != nil {
return fmt.Errorf("set BEB_API_URL env var failed: %v", err)
}

err = os.Setenv("CLIENT_ID", string(secret.Data[deployment.PublisherSecretClientIDKey]))
err = os.Setenv("CLIENT_ID", string(secret.Data[eventing.PublisherSecretClientIDKey]))
if err != nil {
return fmt.Errorf("set CLIENT_ID env var failed: %v", err)
}

err = os.Setenv("CLIENT_SECRET", string(secret.Data[deployment.PublisherSecretClientSecretKey]))
err = os.Setenv("CLIENT_SECRET", string(secret.Data[eventing.PublisherSecretClientSecretKey]))
if err != nil {
return fmt.Errorf("set CLIENT_SECRET env var failed: %v", err)
}

err = os.Setenv("TOKEN_ENDPOINT", string(secret.Data[deployment.PublisherSecretTokenEndpointKey]))
err = os.Setenv("TOKEN_ENDPOINT", string(secret.Data[eventing.PublisherSecretTokenEndpointKey]))
if err != nil {
return fmt.Errorf("set TOKEN_ENDPOINT env var failed: %v", err)
}

err = os.Setenv("BEB_NAMESPACE", fmt.Sprintf("%s%s", NamespacePrefix, string(secret.Data[deployment.PublisherSecretBEBNamespaceKey])))
err = os.Setenv("BEB_NAMESPACE", fmt.Sprintf("%s%s", NamespacePrefix, string(secret.Data[eventing.PublisherSecretBEBNamespaceKey])))
if err != nil {
return fmt.Errorf("set BEB_NAMESPACE env var failed: %v", err)
}

if err := os.Setenv("EVENT_TYPE_PREFIX", eventing.Spec.Backend.Config.EventTypePrefix); err != nil {
if err := os.Setenv("EVENT_TYPE_PREFIX", eventingCR.Spec.Backend.Config.EventTypePrefix); err != nil {
return fmt.Errorf("set EVENT_TYPE_PREFIX env var failed: %v", err)
}

Expand Down
Loading

0 comments on commit a2ea3a3

Please sign in to comment.