From 4afd8859d89dd7b3650ea86dd2637727e1995008 Mon Sep 17 00:00:00 2001 From: Muhammad Faizan Date: Tue, 12 Sep 2023 13:20:54 +0200 Subject: [PATCH] Added E2E tests for legacy events (#90) * added e2e tests * added tests * added support for EventMesh * updated * updated * updated * Added e2e event delivery tests * updated * updated * updated * updated * updated * Apply suggestions from code review Co-authored-by: Friedrich --------- Co-authored-by: Friedrich --- Makefile | 14 + config/manager/kustomization.yaml | 4 +- hack/e2e/common/eventing/publisher.go | 97 +++++ .../common/eventing/testsubscriptioninfo.go | 64 +++ hack/e2e/common/eventing/utils.go | 75 ++++ hack/e2e/common/fixtures/fixtures.go | 143 ++++++- hack/e2e/common/http/http.go | 19 + hack/e2e/common/k8s.go | 15 + .../testenvironment/test_environment.go | 377 ++++++++++++++++++ hack/e2e/env/env.go | 17 +- hack/e2e/eventing/cleanup/cleanup_test.go | 42 ++ hack/e2e/eventing/eventing_test.go | 94 +++++ hack/e2e/scripts/event_delivery_tests.sh | 21 + hack/e2e/setup/setup_test.go | 2 +- 14 files changed, 976 insertions(+), 8 deletions(-) create mode 100644 hack/e2e/common/eventing/publisher.go create mode 100644 hack/e2e/common/eventing/testsubscriptioninfo.go create mode 100644 hack/e2e/common/eventing/utils.go create mode 100644 hack/e2e/common/http/http.go create mode 100644 hack/e2e/common/testenvironment/test_environment.go create mode 100644 hack/e2e/eventing/cleanup/cleanup_test.go create mode 100644 hack/e2e/eventing/eventing_test.go create mode 100755 hack/e2e/scripts/event_delivery_tests.sh diff --git a/Makefile b/Makefile index cfb55598..0707aea3 100644 --- a/Makefile +++ b/Makefile @@ -196,3 +196,17 @@ e2e-setup: .PHONY: e2e-cleanup e2e-cleanup: go test -v ./hack/e2e/cleanup/cleanup_test.go --tags=e2e + +# e2e-eventing will setup subscriptions and tests end-to-end deliver of events. +.PHONY: e2e-eventing +e2e-eventing: + ./hack/e2e/scripts/event_delivery_tests.sh + +# e2e-eventing-cleanup will delete all subscriptions and other resources created for event delivery tests. +.PHONY: e2e-eventing-cleanup +e2e-eventing-cleanup: + go test -v ./hack/e2e/eventing/cleanup/cleanup_test.go --tags=e2e + +# e2e will run the whole suite of end-to-end tests for eventing-manager. +.PHONY: e2e +e2e: e2e-setup e2e-eventing e2e-cleanup diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 76fb3485..627635b4 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,5 +5,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: europe-docker.pkg.dev/kyma-project/dev/eventing-manager - newTag: PR-78 + newName: europe-docker.pkg.dev/kyma-project/prod/eventing-manager + newTag: v20230907-01f5bd76 diff --git a/hack/e2e/common/eventing/publisher.go b/hack/e2e/common/eventing/publisher.go new file mode 100644 index 00000000..4f506d2d --- /dev/null +++ b/hack/e2e/common/eventing/publisher.go @@ -0,0 +1,97 @@ +package eventing + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/cloudevents/sdk-go/v2/client" + "github.com/kyma-project/eventing-manager/hack/e2e/common" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + LegacyPublishEndpointFormat = "%s/%s/v1/events" + CloudEventPublishEndpointFormat = "%s/%s/v1/events" +) + +type Publisher struct { + ctx context.Context + clientCE client.Client + clientHTTP *http.Client + publisherURL string + logger *zap.Logger +} + +func NewPublisher(ctx context.Context, clientCE client.Client, clientHTTP *http.Client, publisherURL string, logger *zap.Logger) *Publisher { + return &Publisher{ + ctx: ctx, + clientCE: clientCE, + clientHTTP: clientHTTP, + publisherURL: publisherURL, + logger: logger, + } +} + +func (p *Publisher) LegacyPublishEndpoint(source string) string { + return fmt.Sprintf(LegacyPublishEndpointFormat, p.publisherURL, source) +} + +func (p *Publisher) SendLegacyEventWithRetries(source, eventType, payload string, attempts int, interval time.Duration) error { + return common.Retry(attempts, interval, func() error { + return p.SendLegacyEvent(source, eventType, payload) + }) +} + +func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error { + url := p.LegacyPublishEndpoint(source) + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer([]byte(payload))) + if err != nil { + err = errors.Wrap(err, "Failed to create HTTP request for sending legacy event") + p.logger.Debug(err.Error()) + return err + } + + p.logger.Debug(fmt.Sprintf("Publishing legacy event:"+ + " URL: %s,"+ + " EventSource: %s,"+ + " EventType: %s,"+ + " Payload: %s", + url, source, eventType, payload)) + + req.Header.Set("Content-Type", "application/json") + resp, err := p.clientHTTP.Do(req) + if err != nil { + err = errors.Wrap(err, "Failed to send legacy-event") + p.logger.Debug(err.Error()) + return err + } + defer func() { + err = resp.Body.Close() + if err != nil { + p.logger.Error(err.Error()) + } + }() + + // if success, then return. + if Is2XX(resp.StatusCode) { + return nil + } + + // read body and return error. + if body, err2 := io.ReadAll(resp.Body); err2 != nil { + err2 = errors.Wrap(err2, "Failed to read response body") + p.logger.Debug(err2.Error()) + return err2 + } else { + err2 = errors.New(fmt.Sprintf("Failed to send legacy-event:[%s] response:[%d] body:[%s]", eventType, + resp.StatusCode, string(body))) + p.logger.Debug(err2.Error()) + return err2 + } +} diff --git a/hack/e2e/common/eventing/testsubscriptioninfo.go b/hack/e2e/common/eventing/testsubscriptioninfo.go new file mode 100644 index 00000000..9d10b5b8 --- /dev/null +++ b/hack/e2e/common/eventing/testsubscriptioninfo.go @@ -0,0 +1,64 @@ +package eventing + +import ( + ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" + ecv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type TestSubscriptionInfo struct { + Name string + Description string + Source string + Types []string +} + +func (s TestSubscriptionInfo) V1Alpha1SpecFilters() []*ecv1alpha1.EventMeshFilter { + var filters []*ecv1alpha1.EventMeshFilter + for _, etype := range s.Types { + filter := &ecv1alpha1.EventMeshFilter{ + EventSource: &ecv1alpha1.Filter{ + Type: "exact", + Property: "source", + Value: "", + }, + EventType: &ecv1alpha1.Filter{ + Type: "exact", + Property: "type", + Value: etype, + }, + } + filters = append(filters, filter) + } + return filters +} + +func (s TestSubscriptionInfo) ToSubscriptionV1Alpha1(sink, namespace string) *ecv1alpha1.Subscription { + return &ecv1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name, + Namespace: namespace, + }, + Spec: ecv1alpha1.SubscriptionSpec{ + Sink: sink, + Filter: &ecv1alpha1.BEBFilters{ + Filters: s.V1Alpha1SpecFilters(), + }, + }, + } +} + +func (s TestSubscriptionInfo) ToSubscriptionV1Alpha2(sink, namespace string) *ecv1alpha2.Subscription { + return &ecv1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name, + Namespace: namespace, + }, + Spec: ecv1alpha2.SubscriptionSpec{ + Sink: sink, + TypeMatching: ecv1alpha2.TypeMatchingStandard, + Source: s.Source, + Types: s.Types, + }, + } +} diff --git a/hack/e2e/common/eventing/utils.go b/hack/e2e/common/eventing/utils.go new file mode 100644 index 00000000..b685aa09 --- /dev/null +++ b/hack/e2e/common/eventing/utils.go @@ -0,0 +1,75 @@ +package eventing + +import ( + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" +) + +const ( + keyApp = "app" + keyMode = "mode" + keyType = "type" +) + +func Is2XX(statusCode int) bool { + return http.StatusOK <= statusCode && statusCode <= http.StatusIMUsed +} + +func LegacyEventData(source, eventType string) string { + return `{\"` + keyApp + `\":\"` + source + `\",\"` + keyMode + `\":\"legacy\",\"` + keyType + `\":\"` + eventType + `\"}` +} +func LegacyEventPayload(eventId, eventVersion, eventType, data string) string { + return `{"data":"` + data + `","event-id":"` + eventId + `","event-type":"` + eventType + `","event-time":"2020-04-02T21:37:00Z","event-type-version":"` + eventVersion + `"}` +} + +func ExtractSourceFromSubscriptionV1Alpha1Type(eventType string) string { + segments := strings.Split(eventType, ".") + return segments[3] +} + +func ExtractLegacyTypeFromSubscriptionV1Alpha1Type(eventTypePrefix, eventSource, eventVersion, eventType string) string { + tmp := strings.TrimPrefix(eventType, fmt.Sprintf("%s.%s.", eventTypePrefix, eventSource)) + return strings.TrimSuffix(tmp, fmt.Sprintf(".%s", eventVersion)) +} + +func ExtractLegacyTypeFromSubscriptionV1Alpha2Type(eventVersion, eventType string) string { + return strings.TrimSuffix(eventType, fmt.Sprintf(".%s", eventVersion)) +} + +func ExtractVersionFromEventType(eventType string) string { + segments := strings.Split(eventType, ".") + return segments[len(segments)-1] +} + +func NewLegacyEventForV1Alpha1(eventType, eventTypePrefix string) (string, string, string, string) { + // If the eventType is sap.kyma.custom.noapp.order.created.v1, then for legacy event: + // eventSource should be: noapp + // eventType should be: order.created + // eventVersion should be: v1 + eventID := uuid.New().String() + eventSource := ExtractSourceFromSubscriptionV1Alpha1Type(eventType) + eventVersion := ExtractVersionFromEventType(eventType) + legacyEventType := ExtractLegacyTypeFromSubscriptionV1Alpha1Type(eventTypePrefix, + eventSource, eventVersion, eventType) + eventData := LegacyEventData(eventSource, legacyEventType) + payload := LegacyEventPayload(eventID, eventVersion, legacyEventType, eventData) + + return eventID, eventSource, legacyEventType, payload +} + +func NewLegacyEvent(eventSource, eventType string) (string, string, string, string) { + // If the eventType is order.created.v1 and source is noapp, then for legacy event: + // eventSource should be: noapp + // eventType should be: order.created + // eventVersion should be: v1 + eventID := uuid.New().String() + eventVersion := ExtractVersionFromEventType(eventType) + legacyEventType := ExtractLegacyTypeFromSubscriptionV1Alpha2Type(eventVersion, eventType) + eventData := LegacyEventData(eventSource, legacyEventType) + payload := LegacyEventPayload(eventID, eventVersion, legacyEventType, eventData) + + return eventID, eventSource, legacyEventType, payload +} diff --git a/hack/e2e/common/fixtures/fixtures.go b/hack/e2e/common/fixtures/fixtures.go index 4ef33b34..b374aa97 100644 --- a/hack/e2e/common/fixtures/fixtures.go +++ b/hack/e2e/common/fixtures/fixtures.go @@ -4,6 +4,11 @@ import ( "fmt" "strings" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/kyma-project/eventing-manager/hack/e2e/common/eventing" + eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -11,6 +16,7 @@ import ( ) const ( + FieldManager = "eventing-tests" NamespaceName = "kyma-system" ManagerDeploymentName = "eventing-manager" CRName = "eventing" @@ -45,7 +51,7 @@ func EventingNATSCR() *eventingv1alpha1.Eventing { Config: eventingv1alpha1.BackendConfig{ NATSStreamStorageType: "File", NATSStreamReplicas: 3, - NATSStreamMaxSize: resource.MustParse("700m"), + NATSStreamMaxSize: resource.MustParse("700Mi"), NATSMaxMsgsPerTopic: 1000000, }, }, @@ -95,10 +101,141 @@ func PublisherSpec() eventingv1alpha1.Publisher { } } -func Namespace() *corev1.Namespace { +func V1Alpha1SubscriptionsToTest() []eventing.TestSubscriptionInfo { + return []eventing.TestSubscriptionInfo{ + { + Name: "test-sub-1-v1alpha1", + Types: []string{"sap.kyma.custom.noapp.order.tested.v1"}, + }, + { + Name: "test-sub-2-v1alpha1", + Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"}, + }, + { + Name: "test-sub-3-with-multiple-types-v1alpha1", + Types: []string{ + "sap.kyma.custom.connected-app.order.tested.v1", + "sap.kyma.custom.connected-app2.or-der.crea-ted.one.two.three.v4", + }, + }, + } +} + +func V1Alpha2SubscriptionsToTest() []eventing.TestSubscriptionInfo { + return []eventing.TestSubscriptionInfo{ + { + Name: "test-sub-1-v1alpha2", + Description: "Test event type and source without any alpha-numeric characters", + Source: "noapp", + Types: []string{"order.modified.v1"}, + }, + { + Name: "test-sub-2-v1alpha2", + Description: "Test event type and source with any alpha-numeric characters", + Source: "test-app", + Types: []string{"Order-$.third.R-e-c-e-i-v-e-d.v1"}, + }, + { + Name: "test-sub-3-with-multiple-types-v1alpha2", + Source: "test-evnt", + Types: []string{ + "or-der.crea-ted.one.two.three.four.v4", + "order.testing.v1", + }, + }, + } +} + +func Namespace(name string) *corev1.Namespace { return &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ - Name: NamespaceName, + Name: name, + }, + } +} + +func NewSinkDeployment(name, namespace, image string) *appsv1.Deployment { + labels := map[string]string{ + "source": "eventing-tests", + "name": name, + } + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: appsv1.DeploymentSpec{ + Selector: metav1.SetAsLabelSelector(labels), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyAlways, + Containers: []corev1.Container{ + { + Name: name, + Image: image, + Args: []string{ + "subscriber", + "--listen-port=8080", + }, + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 8080, + }, + }, + ImagePullPolicy: corev1.PullAlways, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + "cpu": resource.MustParse("300m"), + "memory": resource.MustParse("312Mi"), + }, + Requests: corev1.ResourceList{ + "cpu": resource.MustParse("100m"), + "memory": resource.MustParse("156Mi"), + }, + }, + }, + }, + }, + }, + }, + } +} + +func NewSinkService(name, namespace string) *corev1.Service { + labels := map[string]string{ + "source": "eventing-tests", + "name": name, + } + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 80, + TargetPort: intstr.FromString("http"), + }, + }, }, } } diff --git a/hack/e2e/common/http/http.go b/hack/e2e/common/http/http.go new file mode 100644 index 00000000..a032874d --- /dev/null +++ b/hack/e2e/common/http/http.go @@ -0,0 +1,19 @@ +package http + +import ( + "net/http" + "time" +) + +func NewClient(transport *http.Transport) *http.Client { + return &http.Client{Transport: transport} +} + +func NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int, idleConnTimeout time.Duration) *http.Transport { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = maxIdleConns + transport.MaxConnsPerHost = maxConnsPerHost + transport.MaxIdleConnsPerHost = maxIdleConnsPerHost + transport.IdleConnTimeout = idleConnTimeout + return transport +} diff --git a/hack/e2e/common/k8s.go b/hack/e2e/common/k8s.go index 61252f6b..88b13c1e 100644 --- a/hack/e2e/common/k8s.go +++ b/hack/e2e/common/k8s.go @@ -4,6 +4,9 @@ import ( "os" "path/filepath" + ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" + ecv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" + eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" "k8s.io/client-go/kubernetes" @@ -46,6 +49,18 @@ func GetK8sClients() (*kubernetes.Clientset, client.Client, error) { return nil, nil, err } + // add Subscription v1alpha1 CRD to the scheme. + err = ecv1alpha1.AddToScheme(scheme.Scheme) + if err != nil { + return nil, nil, err + } + + // add Subscription v1alpha2 CRD to the scheme. + err = ecv1alpha2.AddToScheme(scheme.Scheme) + if err != nil { + return nil, nil, err + } + // We need to add the Eventing CRD to the scheme, so we can create a client that can access Eventing objects. err = eventingv1alpha1.AddToScheme(scheme.Scheme) if err != nil { diff --git a/hack/e2e/common/testenvironment/test_environment.go b/hack/e2e/common/testenvironment/test_environment.go new file mode 100644 index 00000000..3a52551a --- /dev/null +++ b/hack/e2e/common/testenvironment/test_environment.go @@ -0,0 +1,377 @@ +package testenvironment + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/kyma-project/eventing-manager/hack/e2e/common" + "github.com/kyma-project/eventing-manager/hack/e2e/common/eventing" + "github.com/kyma-project/eventing-manager/hack/e2e/common/fixtures" + pkghttp "github.com/kyma-project/eventing-manager/hack/e2e/common/http" + "github.com/kyma-project/eventing-manager/hack/e2e/env" + ecv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + Interval = 2 * time.Second + SmallInterval = 200 * time.Millisecond + Attempts = 60 + FewAttempts = 5 + ThreeAttempts = 3 +) + +// TestEnvironment provides mocked resources for integration tests. +type TestEnvironment struct { + Context context.Context + Logger *zap.Logger + K8sClientset *kubernetes.Clientset + K8sClient client.Client + EventPublisher *eventing.Publisher + TestConfigs *env.E2EConfig +} + +func NewTestEnvironment() *TestEnvironment { + var err error + logger, err := common.SetupLogger() + if err != nil { + logger.Error(err.Error()) + panic(err) + } + + testConfigs, err := env.GetE2EConfig() + if err != nil { + logger.Error(err.Error()) + panic(err) + + } + logger.Info(fmt.Sprintf("##### NOTE: Tests will run w.r.t. backend: %s", testConfigs.BackendType)) + + clientSet, k8sClient, err := common.GetK8sClients() + if err != nil { + logger.Error(err.Error()) + panic(err) + } + + return &TestEnvironment{ + Context: context.TODO(), + Logger: logger, + K8sClientset: clientSet, + K8sClient: k8sClient, + TestConfigs: testConfigs, + } +} + +func (te *TestEnvironment) CreateTestNamespace() error { + return common.Retry(Attempts, Interval, func() error { + // It's fine if the Namespace already exists. + return client.IgnoreAlreadyExists(te.K8sClient.Create(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace))) + }) +} + +func (te *TestEnvironment) DeleteTestNamespace() error { + return common.Retry(FewAttempts, Interval, func() error { + // It's fine if the Namespace already exists. + return client.IgnoreAlreadyExists(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace))) + }) +} + +func (te *TestEnvironment) InitEventPublisherClient() { + maxIdleConns := 10 + maxConnsPerHost := 10 + maxIdleConnsPerHost := 10 + idleConnTimeout := 1 * time.Minute + t := pkghttp.NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost, idleConnTimeout) + clientHTTP := pkghttp.NewClient(t.Clone()) + te.EventPublisher = eventing.NewPublisher(context.Background(), nil, clientHTTP, te.TestConfigs.PublisherURL, te.Logger) +} + +func (te *TestEnvironment) CreateAllSubscriptions() error { + ctx := context.TODO() + // create v1alpha1 subscriptions if not exists. + err := te.CreateV1Alpha1Subscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest()) + if err != nil { + return err + } + + // create v1alpha2 subscriptions if not exists. + return te.CreateV1Alpha2Subscriptions(ctx, fixtures.V1Alpha2SubscriptionsToTest()) +} + +func (te *TestEnvironment) DeleteAllSubscriptions() error { + // delete v1alpha1 subscriptions if not exists. + for _, subToTest := range fixtures.V1Alpha1SubscriptionsToTest() { + if err := te.DeleteSubscriptionFromK8s(subToTest.Name, te.TestConfigs.TestNamespace); err != nil { + return err + } + } + + // delete v1alpha2 subscriptions if not exists. + for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() { + if err := te.DeleteSubscriptionFromK8s(subToTest.Name, te.TestConfigs.TestNamespace); err != nil { + return err + } + } + return nil +} + +func (te *TestEnvironment) WaitForAllSubscriptions() error { + ctx := context.TODO() + // wait for v1alpha1 subscriptions to get ready. + err := te.WaitForSubscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest()) + if err != nil { + return err + } + + // wait for v1alpha2 subscriptions to get ready + return te.WaitForSubscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest()) +} + +func (te *TestEnvironment) CreateV1Alpha1Subscriptions(ctx context.Context, subList []eventing.TestSubscriptionInfo) error { + for _, subInfo := range subList { + err := common.Retry(FewAttempts, SmallInterval, func() error { + newSub := subInfo.ToSubscriptionV1Alpha1(te.TestConfigs.SubscriptionSinkURL, te.TestConfigs.TestNamespace) + return client.IgnoreAlreadyExists(te.K8sClient.Create(ctx, newSub)) + }) + // return error if all retries are exhausted. + if err != nil { + return err + } + } + return nil +} + +func (te *TestEnvironment) CreateV1Alpha2Subscriptions(ctx context.Context, subList []eventing.TestSubscriptionInfo) error { + for _, subInfo := range subList { + err := common.Retry(FewAttempts, SmallInterval, func() error { + newSub := subInfo.ToSubscriptionV1Alpha2(te.TestConfigs.SubscriptionSinkURL, te.TestConfigs.TestNamespace) + return client.IgnoreAlreadyExists(te.K8sClient.Create(ctx, newSub)) + }) + // return error if all retries are exhausted. + if err != nil { + return err + } + } + return nil +} + +func (te *TestEnvironment) WaitForSubscriptions(ctx context.Context, subsToTest []eventing.TestSubscriptionInfo) error { + for _, subToTest := range subsToTest { + return te.WaitForSubscription(ctx, subToTest) + } + return nil +} + +func (te *TestEnvironment) WaitForSubscription(ctx context.Context, subsToTest eventing.TestSubscriptionInfo) error { + return common.Retry(Attempts, Interval, func() error { + // get subscription from cluster. + gotSub := ecv1alpha2.Subscription{} + err := te.K8sClient.Get(ctx, k8stypes.NamespacedName{ + Name: subsToTest.Name, + Namespace: te.TestConfigs.TestNamespace, + }, &gotSub) + if err != nil { + te.Logger.Debug(fmt.Sprintf("failed to check readiness; failed to fetch subscription: %s "+ + "in namespace: %s", subsToTest.Name, te.TestConfigs.TestNamespace)) + return err + } + + // check if subscription is reconciled by correct backend. + if !te.IsSubscriptionReconcileByBackend(gotSub, te.TestConfigs.BackendType) { + errMsg := fmt.Sprintf("waiting subscription: %s "+ + "in namespace: %s to get recocniled by backend: %s", subsToTest.Name, te.TestConfigs.TestNamespace, + te.TestConfigs.BackendType) + te.Logger.Debug(errMsg) + return errors.New(errMsg) + } + + // check if subscription is ready. + if !gotSub.Status.Ready { + errMsg := fmt.Sprintf("waiting subscription: %s "+ + "in namespace: %s to get ready", subsToTest.Name, te.TestConfigs.TestNamespace) + te.Logger.Debug(errMsg) + return errors.New(errMsg) + } + return nil + }) +} + +func (te *TestEnvironment) IsSubscriptionReconcileByBackend(sub ecv1alpha2.Subscription, activeBackend string) bool { + condition := sub.Status.FindCondition(ecv1alpha2.ConditionSubscriptionActive) + if condition == nil { + return false + } + return strings.Contains(strings.ToLower(string(condition.Reason)), strings.ToLower(activeBackend)) +} + +func (te *TestEnvironment) SetupSink() error { + if err := te.CreateSinkDeployment(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace, + te.TestConfigs.SubscriptionSinkImage); err != nil { + return err + } + + if err := te.CreateSinkService(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace); err != nil { + return err + } + + return te.WaitForDeploymentReady(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace, + te.TestConfigs.SubscriptionSinkImage) +} + +func (te *TestEnvironment) DeleteSinkResources() error { + if err := te.DeleteDeployment(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace); err != nil { + return err + } + return te.DeleteService(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace) +} + +func (te *TestEnvironment) CreateSinkDeployment(name, namespace, image string) error { + return common.Retry(FewAttempts, Interval, func() error { + return te.K8sClient.Patch(te.Context, fixtures.NewSinkDeployment(name, namespace, image), + client.Apply, + &client.PatchOptions{ + Force: pointer.Bool(true), + FieldManager: fixtures.FieldManager, + }) + }) +} + +func (te *TestEnvironment) CreateSinkService(name, namespace string) error { + return common.Retry(FewAttempts, Interval, func() error { + return te.K8sClient.Patch(te.Context, fixtures.NewSinkService(name, namespace), + client.Apply, + &client.PatchOptions{ + Force: pointer.Bool(true), + FieldManager: fixtures.FieldManager, + }) + }) +} + +func (te *TestEnvironment) DeleteDeployment(name, namespace string) error { + return common.Retry(FewAttempts, Interval, func() error { + return te.K8sClient.Delete(te.Context, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }) + }) +} + +func (te *TestEnvironment) DeleteService(name, namespace string) error { + return common.Retry(FewAttempts, Interval, func() error { + return te.K8sClient.Delete(te.Context, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }) + }) +} + +func (te *TestEnvironment) GetDeploymentFromK8s(name, namespace string) (*appsv1.Deployment, error) { + return te.K8sClientset.AppsV1().Deployments(namespace).Get(te.Context, name, metav1.GetOptions{}) +} + +func (te *TestEnvironment) WaitForDeploymentReady(name, namespace, image string) error { + // RetryGet the Eventing Manager and test status. + return common.Retry(Attempts, Interval, func() error { + te.Logger.Debug(fmt.Sprintf("waiting for deployment: %s to get ready with image: %s", name, image)) + // Get the deployment from the cluster. + gotDeployment, err := common.RetryGet(FewAttempts, SmallInterval, func() (*appsv1.Deployment, error) { + return te.GetDeploymentFromK8s(name, namespace) + }) + if err != nil { + return err + } + + // if image is provided, then check if the deployment has correct image. + if image != "" && gotDeployment.Spec.Template.Spec.Containers[0].Image != image { + err = fmt.Errorf("expected deployment (%s) image to be: %s, but found: %s", name, image, + gotDeployment.Spec.Template.Spec.Containers[0].Image, + ) + te.Logger.Debug(err.Error()) + return err + } + + // check if the deployment is ready. + if *gotDeployment.Spec.Replicas != gotDeployment.Status.UpdatedReplicas || + *gotDeployment.Spec.Replicas != gotDeployment.Status.ReadyReplicas || + *gotDeployment.Spec.Replicas != gotDeployment.Status.AvailableReplicas { + err = fmt.Errorf("waiting for deployment: %s to get ready", name) + te.Logger.Debug(err.Error()) + return err + } + + // Everything is fine. + te.Logger.Debug(fmt.Sprintf("deployment: %s is ready with image: %s", name, image)) + return nil + }) +} + +func (te *TestEnvironment) DeleteSubscriptionFromK8s(name, namespace string) error { + // define subscription to delete. + sub := &ecv1alpha2.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + + // delete with retries. + return common.Retry(FewAttempts, Interval, func() error { + // delete subscription from cluster. + err := te.K8sClient.Delete(te.Context, sub) + if err != nil && !k8serrors.IsNotFound(err) { + te.Logger.Debug(fmt.Sprintf("failed to delete subscription: %s "+ + "in namespace: %s", name, te.TestConfigs.TestNamespace)) + return err + } + return nil + }) +} + +func (te *TestEnvironment) TestDeliveryOfLegacyEventForSubV1Alpha1(eventType string) error { + // define the event + eventID, eventSource, legacyEventType, payload := eventing.NewLegacyEventForV1Alpha1(eventType, te.TestConfigs.EventTypePrefix) + + // publish the event + if err := te.EventPublisher.SendLegacyEvent(eventSource, legacyEventType, payload); err != nil { + te.Logger.Debug(err.Error()) + return err + } + + // verify if the event was received by the sink. + te.Logger.Debug(eventID) + // TODO: implement me! + + return nil +} + +func (te *TestEnvironment) TestDeliveryOfLegacyEvent(eventSource, eventType string) error { + // define the event + eventID, eventSource, legacyEventType, payload := eventing.NewLegacyEvent(eventSource, eventType) + + // publish the event + if err := te.EventPublisher.SendLegacyEvent(eventSource, legacyEventType, payload); err != nil { + te.Logger.Debug(err.Error()) + return err + } + + // verify if the event was received by the sink. + te.Logger.Debug(eventID) + // TODO: implement me! + + return nil +} diff --git a/hack/e2e/env/env.go b/hack/e2e/env/env.go index a3ae8301..a8bb2640 100644 --- a/hack/e2e/env/env.go +++ b/hack/e2e/env/env.go @@ -1,14 +1,23 @@ package env import ( + "fmt" + "github.com/kelseyhightower/envconfig" eventingv1alpha1 "github.com/kyma-project/eventing-manager/api/v1alpha1" ) // E2EConfig represents the environment config for the end-to-end tests for eventing-manager. type E2EConfig struct { - BackendType string `envconfig:"BACKEND_TYPE" default:"NATS"` // NATS or EventMesh - ManagerImage string `envconfig:"MANAGER_IMAGE" default:""` + BackendType string `envconfig:"BACKEND_TYPE" default:"NATS"` // NATS or EventMesh + ManagerImage string `envconfig:"MANAGER_IMAGE" default:""` + EventTypePrefix string `envconfig:"EVENT_TYPE_PREFIX" default:"sap.kyma.custom"` + EventMeshNamespace string `envconfig:"EVENTMESH_NAMESPACE" default:"xxxxxx"` + SubscriptionSinkImage string `envconfig:"SUBSCRIPTION_SINK_IMAGE" default:"eu.gcr.io/kyma-project/eventing-tools:v20230329-fc309b92"` + SubscriptionSinkName string `envconfig:"SUBSCRIPTION_SINK_Name" default:"test-sink"` + SubscriptionSinkURL string `envconfig:"SUBSCRIPTION_SINK_URL" default:""` + TestNamespace string `envconfig:"TEST_NAMESPACE" default:"eventing-tests"` + PublisherURL string `envconfig:"PUBLISHER_URL" default:"http://localhost:38081"` } func (cfg E2EConfig) IsNATSBackend() bool { @@ -24,5 +33,9 @@ func GetE2EConfig() (*E2EConfig, error) { if err := envconfig.Process("", &cfg); err != nil { return nil, err } + // set subscription sink URL if its empty. + if cfg.SubscriptionSinkURL == "" { + cfg.SubscriptionSinkURL = fmt.Sprintf("http://%s.%s.svc.cluster.local", cfg.SubscriptionSinkName, cfg.TestNamespace) + } return &cfg, nil } diff --git a/hack/e2e/eventing/cleanup/cleanup_test.go b/hack/e2e/eventing/cleanup/cleanup_test.go new file mode 100644 index 00000000..5c46245a --- /dev/null +++ b/hack/e2e/eventing/cleanup/cleanup_test.go @@ -0,0 +1,42 @@ +//go:build e2e +// +build e2e + +package cleanup + +import ( + "os" + "testing" + + "github.com/kyma-project/eventing-manager/hack/e2e/common/testenvironment" + "github.com/stretchr/testify/require" +) + +var testEnvironment *testenvironment.TestEnvironment + +// TestMain runs before all the other test functions. It sets up all the resources that are shared between the different +// test functions. It will then run the tests and finally shuts everything down. +func TestMain(m *testing.M) { + testEnvironment = testenvironment.NewTestEnvironment() + + // Run the tests and exit. + code := m.Run() + + // delete test namespace, + if err := testEnvironment.DeleteTestNamespace(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + os.Exit(code) +} + +// Test_CleanupAllSubscriptions deletes all the subscriptions created for testing. +func Test_CleanupAllSubscriptions(t *testing.T) { + t.Parallel() + require.NoError(t, testEnvironment.DeleteAllSubscriptions()) +} + +// Test_CleanupSubscriptionSink deletes the subscription sink created for testing. +func Test_CleanupSubscriptionSink(t *testing.T) { + t.Parallel() + require.NoError(t, testEnvironment.DeleteSinkResources()) +} diff --git a/hack/e2e/eventing/eventing_test.go b/hack/e2e/eventing/eventing_test.go new file mode 100644 index 00000000..62bc17ea --- /dev/null +++ b/hack/e2e/eventing/eventing_test.go @@ -0,0 +1,94 @@ +//go:build e2e +// +build e2e + +package eventing + +import ( + "fmt" + "os" + "testing" + + "github.com/kyma-project/eventing-manager/hack/e2e/common" + "github.com/kyma-project/eventing-manager/hack/e2e/common/fixtures" + "github.com/kyma-project/eventing-manager/hack/e2e/common/testenvironment" + "github.com/stretchr/testify/require" +) + +var testEnvironment *testenvironment.TestEnvironment + +// TestMain runs before all the other test functions. It sets up all the resources that are shared between the different +// test functions. It will then run the tests and finally shuts everything down. +func TestMain(m *testing.M) { + testEnvironment = testenvironment.NewTestEnvironment() + + // create test namespace, + if err := testEnvironment.CreateTestNamespace(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // setup sink for subscriptions. + if err := testEnvironment.SetupSink(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // create subscriptions. + if err := testEnvironment.CreateAllSubscriptions(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // wait for subscriptions. + if err := testEnvironment.WaitForAllSubscriptions(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // initialize event publisher client. + testEnvironment.InitEventPublisherClient() + + // Run the tests and exit. + code := m.Run() + os.Exit(code) +} + +func Test_LegacyEvents_SubscriptionV1Alpha1(t *testing.T) { + t.Parallel() + for _, subToTest := range fixtures.V1Alpha1SubscriptionsToTest() { + subToTest := subToTest + for _, eventTypeToTest := range subToTest.Types { + eventTypeToTest := eventTypeToTest + testName := fmt.Sprintf("legacy event should work for subscription: %s with type: %s", subToTest.Name, eventTypeToTest) + // run test for the eventType. + t.Run(testName, func(t *testing.T) { + t.Parallel() + + // when + err := common.Retry(testenvironment.ThreeAttempts, testenvironment.Interval, func() error { + return testEnvironment.TestDeliveryOfLegacyEventForSubV1Alpha1(eventTypeToTest) + }) + + // then + require.NoError(t, err) + }) + } + } +} + +func Test_LegacyEvents(t *testing.T) { + t.Parallel() + for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() { + subToTest := subToTest + for _, eventTypeToTest := range subToTest.Types { + eventTypeToTest := eventTypeToTest + testName := fmt.Sprintf("legacy event should work for subscription: %s with type: %s", subToTest.Name, eventTypeToTest) + // run test for the eventType. + t.Run(testName, func(t *testing.T) { + t.Parallel() + + err := common.Retry(testenvironment.ThreeAttempts, testenvironment.Interval, func() error { + // It's fine if the Namespace already exists. + return testEnvironment.TestDeliveryOfLegacyEvent(subToTest.Source, eventTypeToTest) + }) + require.NoError(t, err) + }) + } + } +} diff --git a/hack/e2e/scripts/event_delivery_tests.sh b/hack/e2e/scripts/event_delivery_tests.sh new file mode 100755 index 00000000..5a6c1f99 --- /dev/null +++ b/hack/e2e/scripts/event_delivery_tests.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +set -e + +# for our tests we need to port-forward the eventing-publisher-proxy. +echo "Port-forwarding to the eventing-publisher-proxy using port: 38081" +kubectl -n kyma-system port-forward svc/eventing-publisher-proxy 38081:80 & +PID1=$! + +# This will kill all the port-forwarding. We need this to be in a function so we can even call it, if our tests fails +# since `set -e` would stop the script in case of an failing test. +function kill_port_forward() { + echo "Killing the port-forwarding for port: 38081" + kill ${PID1} +} +# This kills the port-forwards even if the test fails. +trap kill_port_forward ERR + +echo "Running tests..." +go test -v ./hack/e2e/eventing/eventing_test.go --tags=e2e + +kill_port_forward diff --git a/hack/e2e/setup/setup_test.go b/hack/e2e/setup/setup_test.go index 616807b0..643dc2ca 100644 --- a/hack/e2e/setup/setup_test.go +++ b/hack/e2e/setup/setup_test.go @@ -73,7 +73,7 @@ func TestMain(m *testing.M) { // Create the Namespace used for testing. err = Retry(attempts, interval, func() error { // It's fine if the Namespace already exists. - return client.IgnoreAlreadyExists(k8sClient.Create(ctx, Namespace())) + return client.IgnoreAlreadyExists(k8sClient.Create(ctx, Namespace(testConfigs.TestNamespace))) }) if err != nil { logger.Fatal(err.Error())