From 343a3487e9de868228acdcba4aa5c6810216f36d Mon Sep 17 00:00:00 2001 From: Muhammad Faizan Date: Thu, 14 Sep 2023 07:11:07 +0200 Subject: [PATCH] Added tests for Cloud Events and verification of reception of events by the Sink (#93) * Added e2e event tests for CE * added code to fetch event from sink * updated * added verification for legacy events received by sink * cleanup code * fixed lint * addressed review comments * lint fix --- Makefile | 11 +- hack/e2e/common/eventing/publisher.go | 65 +++++++- hack/e2e/common/eventing/sinkclient.go | 113 ++++++++++++++ hack/e2e/common/eventing/utils.go | 24 +++ hack/e2e/common/fixtures/fixtures.go | 35 +++-- hack/e2e/common/http/http.go | 19 ++- .../testenvironment/test_environment.go | 145 +++++++++++++++--- hack/e2e/env/env.go | 5 +- hack/e2e/eventing/cleanup/cleanup_test.go | 3 +- hack/e2e/eventing/delivery/delivery_test.go | 129 ++++++++++++++++ hack/e2e/eventing/eventing_test.go | 94 ------------ hack/e2e/eventing/setup/setup_test.go | 41 +++++ hack/e2e/scripts/event_delivery_tests.sh | 9 +- 13 files changed, 560 insertions(+), 133 deletions(-) create mode 100644 hack/e2e/common/eventing/sinkclient.go create mode 100644 hack/e2e/eventing/delivery/delivery_test.go delete mode 100644 hack/e2e/eventing/eventing_test.go create mode 100644 hack/e2e/eventing/setup/setup_test.go diff --git a/Makefile b/Makefile index 0707aea3..e1825593 100644 --- a/Makefile +++ b/Makefile @@ -194,10 +194,15 @@ e2e-setup: # e2e-cleanup will delete the Eventing CR and check if the required resources are de-provisioned or not. .PHONY: e2e-cleanup -e2e-cleanup: +e2e-cleanup: e2e-eventing-cleanup go test -v ./hack/e2e/cleanup/cleanup_test.go --tags=e2e -# e2e-eventing will setup subscriptions and tests end-to-end deliver of events. +# e2e-eventing-setup will setup subscriptions and sink required for tests to check end-to-end delivery of events. +.PHONY: e2e-eventing-setup +e2e-eventing-setup: + go test -v ./hack/e2e/eventing/setup/setup_test.go --tags=e2e + +# e2e-eventing will tests end-to-end delivery of events. .PHONY: e2e-eventing e2e-eventing: ./hack/e2e/scripts/event_delivery_tests.sh @@ -209,4 +214,4 @@ e2e-eventing-cleanup: # e2e will run the whole suite of end-to-end tests for eventing-manager. .PHONY: e2e -e2e: e2e-setup e2e-eventing e2e-cleanup +e2e: e2e-setup e2e-eventing-setup e2e-eventing e2e-cleanup diff --git a/hack/e2e/common/eventing/publisher.go b/hack/e2e/common/eventing/publisher.go index 4f506d2d..6c55dd5c 100644 --- a/hack/e2e/common/eventing/publisher.go +++ b/hack/e2e/common/eventing/publisher.go @@ -8,6 +8,9 @@ import ( "net/http" "time" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/client" "github.com/kyma-project/eventing-manager/hack/e2e/common" "github.com/pkg/errors" @@ -16,7 +19,7 @@ import ( const ( LegacyPublishEndpointFormat = "%s/%s/v1/events" - CloudEventPublishEndpointFormat = "%s/%s/v1/events" + CloudEventPublishEndpointFormat = "%s/publish" ) type Publisher struct { @@ -41,12 +44,22 @@ func (p *Publisher) LegacyPublishEndpoint(source string) string { return fmt.Sprintf(LegacyPublishEndpointFormat, p.publisherURL, source) } +func (p *Publisher) PublishEndpoint() string { + return fmt.Sprintf(CloudEventPublishEndpointFormat, p.publisherURL) +} + func (p *Publisher) SendLegacyEventWithRetries(source, eventType, payload string, attempts int, interval time.Duration) error { return common.Retry(attempts, interval, func() error { return p.SendLegacyEvent(source, eventType, payload) }) } +func (p *Publisher) SendCloudEventWithRetries(event *cloudevents.Event, encoding binding.Encoding, attempts int, interval time.Duration) error { + return common.Retry(attempts, interval, func() error { + return p.SendCloudEvent(event, encoding) + }) +} + func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error { url := p.LegacyPublishEndpoint(source) @@ -95,3 +108,53 @@ func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error { return err2 } } + +func (p *Publisher) SendCloudEvent(event *cloudevents.Event, encoding binding.Encoding) error { + ce := *event + newCtx := context.Background() + ctx := cloudevents.ContextWithTarget(newCtx, p.PublishEndpoint()) + switch encoding { + case binding.EncodingBinary: + { + ctx = binding.WithForceBinary(ctx) + } + case binding.EncodingStructured: + { + ctx = binding.WithForceStructured(ctx) + } + default: + { + return fmt.Errorf("failed to use unsupported cloudevent encoding:[%s]", encoding.String()) + } + } + + p.logger.Debug(fmt.Sprintf("Publishing cloud event:"+ + " URL: %s,"+ + " Encoding: %s,"+ + " EventID: %s,"+ + " EventSource: %s,"+ + " EventType: %s,"+ + " Payload: %s", + p.PublishEndpoint(), encoding.String(), ce.ID(), ce.Source(), ce.Type(), ce.Data())) + + result := p.clientCE.Send(ctx, ce) + switch { + case cloudevents.IsUndelivered(result): + { + return fmt.Errorf("failed to send cloudevent-%s undelivered:[%s] response:[%s]", encoding.String(), ce.Type(), result) + } + case cloudevents.IsNACK(result): + { + return fmt.Errorf("failed to send cloudevent-%s nack:[%s] response:[%s]", encoding.String(), ce.Type(), result) + } + case cloudevents.IsACK(result): + { + p.logger.Debug(fmt.Sprintf("successfully sent cloudevent-%s [%s]", encoding.String(), ce.Type())) + return nil + } + default: + { + return fmt.Errorf("failed to send cloudevent-%s unknown:[%s] response:[%s]", encoding.String(), ce.Type(), result) + } + } +} diff --git a/hack/e2e/common/eventing/sinkclient.go b/hack/e2e/common/eventing/sinkclient.go new file mode 100644 index 00000000..44122747 --- /dev/null +++ b/hack/e2e/common/eventing/sinkclient.go @@ -0,0 +1,113 @@ +package eventing + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cev2event "github.com/cloudevents/sdk-go/v2/event" + "github.com/kyma-project/eventing-manager/hack/e2e/common" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + EventsEndpointFormat = "%s/events/%s" +) + +type SinkClient struct { + ctx context.Context + clientHTTP *http.Client + sinkURL string + logger *zap.Logger +} + +type SinkEvent struct { + // Header stores the non CE events, e.g. X-B3-Sampled and Traceparent + http.Header + cev2event.Event +} + +func NewSinkClient(ctx context.Context, clientHTTP *http.Client, sinkURL string, logger *zap.Logger) *SinkClient { + return &SinkClient{ + ctx: ctx, + clientHTTP: clientHTTP, + sinkURL: sinkURL, + logger: logger, + } +} + +func (sc *SinkClient) EventsEndpoint(eventId string) string { + return fmt.Sprintf(EventsEndpointFormat, sc.sinkURL, eventId) +} + +func (sc *SinkClient) GetEventFromSinkWithRetries(eventId string, attempts int, interval time.Duration) (*SinkEvent, error) { + var gotEvent *SinkEvent + err := common.Retry(attempts, interval, func() error { + var err1 error + gotEvent, err1 = sc.GetEventFromSink(eventId) + return err1 + }) + return gotEvent, err +} + +func (sc *SinkClient) GetEventFromSink(eventId string) (*SinkEvent, error) { + url := sc.EventsEndpoint(eventId) + sc.logger.Debug(fmt.Sprintf("Fetching event with ID: %s from the sink URL: %s", eventId, url)) + + req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer([]byte{})) + if err != nil { + err = errors.Wrap(err, "Failed to create HTTP request for fetching event from sink") + sc.logger.Debug(err.Error()) + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + resp, err := sc.clientHTTP.Do(req) + if err != nil { + err = errors.Wrap(err, "Failed to fetch event") + sc.logger.Debug(err.Error()) + return nil, err + } + defer func() { + err = resp.Body.Close() + if err != nil { + sc.logger.Error(err.Error()) + } + }() + + // read body. + respBody, err := io.ReadAll(resp.Body) + if err != nil { + err = errors.Wrap(err, "Failed to read response body") + sc.logger.Debug(err.Error()) + return nil, err + } + + // if not success, then return error. + if !Is2XX(resp.StatusCode) { + err = errors.New(fmt.Sprintf("Failed to fetch eventID:[%s] response:[%d] body:[%s]", eventId, + resp.StatusCode, string(respBody))) + sc.logger.Debug(err.Error()) + return nil, err + } + + // success + // convert to cloud event object. + ceEvent := cloudevents.NewEvent() + err = json.Unmarshal(respBody, &ceEvent) + if err != nil { + err = errors.Wrap(err, "failed to convert JSON to CloudEvent") + sc.logger.Debug(err.Error()) + return nil, err + } + + return &SinkEvent{ + Event: ceEvent, + }, nil +} diff --git a/hack/e2e/common/eventing/utils.go b/hack/e2e/common/eventing/utils.go index b685aa09..4df6b528 100644 --- a/hack/e2e/common/eventing/utils.go +++ b/hack/e2e/common/eventing/utils.go @@ -5,6 +5,9 @@ import ( "net/http" "strings" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/google/uuid" ) @@ -25,6 +28,14 @@ func LegacyEventPayload(eventId, eventVersion, eventType, data string) string { return `{"data":"` + data + `","event-id":"` + eventId + `","event-type":"` + eventType + `","event-time":"2020-04-02T21:37:00Z","event-type-version":"` + eventVersion + `"}` } +func CloudEventMode(encoding binding.Encoding) string { + return fmt.Sprintf("ce-%s", encoding.String()) +} + +func CloudEventData(source, eventType string, encoding binding.Encoding) map[string]interface{} { + return map[string]interface{}{keyApp: source, keyMode: CloudEventMode(encoding), keyType: eventType} +} + func ExtractSourceFromSubscriptionV1Alpha1Type(eventType string) string { segments := strings.Split(eventType, ".") return segments[3] @@ -73,3 +84,16 @@ func NewLegacyEvent(eventSource, eventType string) (string, string, string, stri return eventID, eventSource, legacyEventType, payload } + +func NewCloudEvent(eventSource, eventType string, encoding binding.Encoding) (*cloudevents.Event, error) { + eventID := uuid.New().String() + ce := cloudevents.NewEvent() + data := CloudEventData(eventSource, eventType, encoding) + ce.SetID(eventID) + ce.SetType(eventType) + ce.SetSource(eventSource) + if err := ce.SetData(cloudevents.ApplicationJSON, data); err != nil { + return nil, fmt.Errorf("failed to set cloudevent-%s data with error:[%s]", encoding.String(), err) + } + return &ce, nil +} diff --git a/hack/e2e/common/fixtures/fixtures.go b/hack/e2e/common/fixtures/fixtures.go index b374aa97..22828476 100644 --- a/hack/e2e/common/fixtures/fixtures.go +++ b/hack/e2e/common/fixtures/fixtures.go @@ -1,6 +1,7 @@ package fixtures import ( + "errors" "fmt" "strings" @@ -26,6 +27,14 @@ const ( WebhookServerCertJobName = "eventing-manager-cert-handler" EventMeshSecretNamespace = "kyma-system" EventMeshSecretName = "eventing-backend" + EventOriginalTypeHeader = "originaltype" +) + +type SubscriptionCRVersion string + +const ( + V1Alpha1SubscriptionCRVersion SubscriptionCRVersion = "v1alpha1" + V1Alpha2SubscriptionCRVersion SubscriptionCRVersion = "v1alpha2" ) func EventingCR(backendType eventingv1alpha1.BackendType) *eventingv1alpha1.Eventing { @@ -104,15 +113,18 @@ func PublisherSpec() eventingv1alpha1.Publisher { func V1Alpha1SubscriptionsToTest() []eventing.TestSubscriptionInfo { return []eventing.TestSubscriptionInfo{ { - Name: "test-sub-1-v1alpha1", - Types: []string{"sap.kyma.custom.noapp.order.tested.v1"}, + Name: "test-sub-1-v1alpha1", + Description: "event type and source without any alpha-numeric characters", + Types: []string{"sap.kyma.custom.noapp.order.tested.v1"}, }, { - Name: "test-sub-2-v1alpha1", - Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"}, + Name: "test-sub-2-v1alpha1", + Description: "event type and source with alpha-numeric characters", + Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"}, }, { - Name: "test-sub-3-with-multiple-types-v1alpha1", + Name: "test-sub-3-with-multiple-types-v1alpha1", + Description: "multiple types in same subscription", Types: []string{ "sap.kyma.custom.connected-app.order.tested.v1", "sap.kyma.custom.connected-app2.or-der.crea-ted.one.two.three.v4", @@ -125,19 +137,20 @@ func V1Alpha2SubscriptionsToTest() []eventing.TestSubscriptionInfo { return []eventing.TestSubscriptionInfo{ { Name: "test-sub-1-v1alpha2", - Description: "Test event type and source without any alpha-numeric characters", + Description: "event type and source without any alpha-numeric characters", Source: "noapp", Types: []string{"order.modified.v1"}, }, { Name: "test-sub-2-v1alpha2", - Description: "Test event type and source with any alpha-numeric characters", + Description: "event type and source with alpha-numeric characters", Source: "test-app", Types: []string{"Order-$.third.R-e-c-e-i-v-e-d.v1"}, }, { - Name: "test-sub-3-with-multiple-types-v1alpha2", - Source: "test-evnt", + Name: "test-sub-3-with-multiple-types-v1alpha2", + Description: "multiple types in same subscription", + Source: "test-evnt", Types: []string{ "or-der.crea-ted.one.two.three.four.v4", "order.testing.v1", @@ -256,3 +269,7 @@ func ConvertSelectorLabelsToString(labels map[string]string) string { } return strings.Join(result, ",") } + +func AppendMsgToError(err error, msg string) error { + return errors.Join(err, fmt.Errorf("\n==> %s", msg)) +} diff --git a/hack/e2e/common/http/http.go b/hack/e2e/common/http/http.go index a032874d..7b572b02 100644 --- a/hack/e2e/common/http/http.go +++ b/hack/e2e/common/http/http.go @@ -3,12 +3,29 @@ package http import ( "net/http" "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventsclient "github.com/cloudevents/sdk-go/v2/client" ) -func NewClient(transport *http.Transport) *http.Client { +func NewHttpClient(transport *http.Transport) *http.Client { return &http.Client{Transport: transport} } +func NewCloudEventsClient(transport *http.Transport) (*cloudeventsclient.Client, error) { + p, err := cloudevents.NewHTTP(cloudevents.WithRoundTripper(transport)) + if err != nil { + return nil, err + } + + client, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + return nil, err + } + + return &client, nil +} + func NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int, idleConnTimeout time.Duration) *http.Transport { transport := http.DefaultTransport.(*http.Transport).Clone() transport.MaxIdleConns = maxIdleConns diff --git a/hack/e2e/common/testenvironment/test_environment.go b/hack/e2e/common/testenvironment/test_environment.go index 3a52551a..4a79126e 100644 --- a/hack/e2e/common/testenvironment/test_environment.go +++ b/hack/e2e/common/testenvironment/test_environment.go @@ -2,11 +2,15 @@ package testenvironment import ( "context" + "encoding/json" "errors" "fmt" "strings" "time" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/kyma-project/eventing-manager/hack/e2e/common" "github.com/kyma-project/eventing-manager/hack/e2e/common/eventing" "github.com/kyma-project/eventing-manager/hack/e2e/common/fixtures" @@ -39,6 +43,7 @@ type TestEnvironment struct { K8sClientset *kubernetes.Clientset K8sClient client.Client EventPublisher *eventing.Publisher + SinkClient *eventing.SinkClient TestConfigs *env.E2EConfig } @@ -83,18 +88,33 @@ func (te *TestEnvironment) CreateTestNamespace() error { func (te *TestEnvironment) DeleteTestNamespace() error { return common.Retry(FewAttempts, Interval, func() error { // It's fine if the Namespace already exists. - return client.IgnoreAlreadyExists(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace))) + return client.IgnoreNotFound(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace))) }) } -func (te *TestEnvironment) InitEventPublisherClient() { +func (te *TestEnvironment) InitEventPublisherClient() error { maxIdleConns := 10 maxConnsPerHost := 10 maxIdleConnsPerHost := 10 idleConnTimeout := 1 * time.Minute t := pkghttp.NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost, idleConnTimeout) - clientHTTP := pkghttp.NewClient(t.Clone()) - te.EventPublisher = eventing.NewPublisher(context.Background(), nil, clientHTTP, te.TestConfigs.PublisherURL, te.Logger) + clientHTTP := pkghttp.NewHttpClient(t.Clone()) + clientCE, err := pkghttp.NewCloudEventsClient(t.Clone()) + if err != nil { + return err + } + te.EventPublisher = eventing.NewPublisher(context.Background(), *clientCE, clientHTTP, te.TestConfigs.PublisherURL, te.Logger) + return nil +} + +func (te *TestEnvironment) InitSinkClient() { + maxIdleConns := 10 + maxConnsPerHost := 10 + maxIdleConnsPerHost := 10 + idleConnTimeout := 1 * time.Minute + t := pkghttp.NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost, idleConnTimeout) + clientHTTP := pkghttp.NewHttpClient(t.Clone()) + te.SinkClient = eventing.NewSinkClient(context.Background(), clientHTTP, te.TestConfigs.SinkPortForwardedURL, te.Logger) } func (te *TestEnvironment) CreateAllSubscriptions() error { @@ -231,9 +251,9 @@ func (te *TestEnvironment) SetupSink() error { func (te *TestEnvironment) DeleteSinkResources() error { if err := te.DeleteDeployment(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace); err != nil { - return err + return client.IgnoreNotFound(err) } - return te.DeleteService(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace) + return client.IgnoreNotFound(te.DeleteService(te.TestConfigs.SubscriptionSinkName, te.TestConfigs.TestNamespace)) } func (te *TestEnvironment) CreateSinkDeployment(name, namespace, image string) error { @@ -342,36 +362,123 @@ func (te *TestEnvironment) DeleteSubscriptionFromK8s(name, namespace string) err }) } -func (te *TestEnvironment) TestDeliveryOfLegacyEventForSubV1Alpha1(eventType string) error { +func (te *TestEnvironment) TestDeliveryOfLegacyEvent(eventSource, eventType string, subCRVersion fixtures.SubscriptionCRVersion) error { // define the event - eventID, eventSource, legacyEventType, payload := eventing.NewLegacyEventForV1Alpha1(eventType, te.TestConfigs.EventTypePrefix) + var eventId, legacyEventSource, legacyEventType, payload string + if subCRVersion == fixtures.V1Alpha1SubscriptionCRVersion { + eventId, legacyEventSource, legacyEventType, payload = eventing.NewLegacyEventForV1Alpha1(eventType, te.TestConfigs.EventTypePrefix) + } else { + eventId, legacyEventSource, legacyEventType, payload = eventing.NewLegacyEvent(eventSource, eventType) + } // publish the event - if err := te.EventPublisher.SendLegacyEvent(eventSource, legacyEventType, payload); err != nil { + if err := te.EventPublisher.SendLegacyEventWithRetries(legacyEventSource, legacyEventType, payload, FewAttempts, Interval); err != nil { te.Logger.Debug(err.Error()) return err } // verify if the event was received by the sink. - te.Logger.Debug(eventID) - // TODO: implement me! - - return nil + te.Logger.Debug(fmt.Sprintf("Verifying if LegacyEvent (ID: %s) was received by the sink", eventId)) + return te.VerifyLegacyEventReceivedBySink(eventId, eventType, eventSource, payload) } -func (te *TestEnvironment) TestDeliveryOfLegacyEvent(eventSource, eventType string) error { +func (te *TestEnvironment) TestDeliveryOfCloudEvent(eventSource, eventType string, encoding binding.Encoding) error { // define the event - eventID, eventSource, legacyEventType, payload := eventing.NewLegacyEvent(eventSource, eventType) + ceEvent, err := eventing.NewCloudEvent(eventSource, eventType, encoding) + if err != nil { + return err + } // publish the event - if err := te.EventPublisher.SendLegacyEvent(eventSource, legacyEventType, payload); err != nil { + if err := te.EventPublisher.SendCloudEventWithRetries(ceEvent, encoding, FewAttempts, Interval); err != nil { te.Logger.Debug(err.Error()) return err } // verify if the event was received by the sink. - te.Logger.Debug(eventID) - // TODO: implement me! + te.Logger.Debug(fmt.Sprintf("Verifying if CloudEvent (ID: %s) was received by the sink", ceEvent.ID())) + return te.VerifyCloudEventReceivedBySink(*ceEvent) +} - return nil +func (te *TestEnvironment) VerifyLegacyEventReceivedBySink(eventId, eventType, eventSource, payload string) error { + // publisher-proxy converts LegacyEvent to CloudEvent, so the sink should have received a CloudEvent. + // extract data from payload of legacy event. + result := make(map[string]interface{}) + if err := json.Unmarshal([]byte(payload), &result); err != nil { + return err + } + data := result["data"] + + // define the expected CloudEvent. + expectedCEEvent := cloudevents.NewEvent() + expectedCEEvent.SetID(eventId) + expectedCEEvent.SetType(eventType) + expectedCEEvent.SetSource(eventSource) + if err := expectedCEEvent.SetData(cloudevents.ApplicationJSON, data); err != nil { + return err + } + + // verify if the event was received. + return te.VerifyCloudEventReceivedBySink(expectedCEEvent) +} + +func (te *TestEnvironment) VerifyCloudEventReceivedBySink(expectedEvent cloudevents.Event) error { + // define the event + gotSinkEvent, err := te.SinkClient.GetEventFromSinkWithRetries(expectedEvent.ID(), Attempts, Interval) + if err != nil { + te.Logger.Debug(err.Error()) + return err + } + + // verify if the event was received by the sink. + te.Logger.Debug(fmt.Sprintf("Got event (ID: %s) from sink, checking if the payload is correct", gotSinkEvent.ID())) + return te.CompareCloudEvents(expectedEvent, gotSinkEvent.Event) +} + +func (te *TestEnvironment) CompareCloudEvents(expectedEvent cloudevents.Event, gotEvent cloudevents.Event) error { + var resultError error + // check if its a valid CloudEvent. + if err := gotEvent.Validate(); err != nil { + msg := fmt.Sprintf("expected valid cloud event, but got invalid cloud event. Error: %s", err.Error()) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + if expectedEvent.ID() != gotEvent.ID() { + msg := fmt.Sprintf("expected event ID: %s, got event ID: %s", expectedEvent.ID(), gotEvent.ID()) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + if string(expectedEvent.Data()) != string(gotEvent.Data()) { + msg := fmt.Sprintf("expected event data: %s, got event data: %s", + string(expectedEvent.Data()), string(gotEvent.Data())) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + // if it is a v1alpha1 Subscription event, then we do not check further. + if strings.HasPrefix(gotEvent.Type(), te.TestConfigs.EventTypePrefix) { + return resultError + } + + // check in detail further the source and type. + if expectedEvent.Source() != gotEvent.Source() { + msg := fmt.Sprintf("expected event Source: %s, got event Source: %s", expectedEvent.Source(), gotEvent.Source()) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + if expectedEvent.Type() != gotEvent.Type() { + msg := fmt.Sprintf("expected event Type: %s, got event Type: %s", expectedEvent.Type(), gotEvent.Type()) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + originalType, ok := gotEvent.Extensions()[fixtures.EventOriginalTypeHeader] + if !ok { + msg := fmt.Sprintf("expected event to have header: %s, but its missing", fixtures.EventOriginalTypeHeader) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + if expectedEvent.Type() != originalType { + msg := fmt.Sprintf("expected originaltype header to have value: %s, but got: %s", expectedEvent.Type(), originalType) + resultError = fixtures.AppendMsgToError(resultError, msg) + } + + return resultError } diff --git a/hack/e2e/env/env.go b/hack/e2e/env/env.go index a8bb2640..bc1d05d3 100644 --- a/hack/e2e/env/env.go +++ b/hack/e2e/env/env.go @@ -12,12 +12,13 @@ type E2EConfig struct { BackendType string `envconfig:"BACKEND_TYPE" default:"NATS"` // NATS or EventMesh ManagerImage string `envconfig:"MANAGER_IMAGE" default:""` EventTypePrefix string `envconfig:"EVENT_TYPE_PREFIX" default:"sap.kyma.custom"` - EventMeshNamespace string `envconfig:"EVENTMESH_NAMESPACE" default:"xxxxxx"` - SubscriptionSinkImage string `envconfig:"SUBSCRIPTION_SINK_IMAGE" default:"eu.gcr.io/kyma-project/eventing-tools:v20230329-fc309b92"` + EventMeshNamespace string `envconfig:"EVENTMESH_NAMESPACE" default:"/default/sap.kyma/tunas-develop"` + SubscriptionSinkImage string `envconfig:"SUBSCRIPTION_SINK_IMAGE" default:"ghcr.io/kyma-project/eventing-manager/e2e-tests-sink:sha-8e81aae"` SubscriptionSinkName string `envconfig:"SUBSCRIPTION_SINK_Name" default:"test-sink"` SubscriptionSinkURL string `envconfig:"SUBSCRIPTION_SINK_URL" default:""` TestNamespace string `envconfig:"TEST_NAMESPACE" default:"eventing-tests"` PublisherURL string `envconfig:"PUBLISHER_URL" default:"http://localhost:38081"` + SinkPortForwardedURL string `envconfig:"SINK_PORT_FORWARDED_URL" default:"http://localhost:38071"` } func (cfg E2EConfig) IsNATSBackend() bool { diff --git a/hack/e2e/eventing/cleanup/cleanup_test.go b/hack/e2e/eventing/cleanup/cleanup_test.go index 5c46245a..624c0327 100644 --- a/hack/e2e/eventing/cleanup/cleanup_test.go +++ b/hack/e2e/eventing/cleanup/cleanup_test.go @@ -13,8 +13,7 @@ import ( var testEnvironment *testenvironment.TestEnvironment -// TestMain runs before all the other test functions. It sets up all the resources that are shared between the different -// test functions. It will then run the tests and finally shuts everything down. +// TestMain runs before all the other test functions. func TestMain(m *testing.M) { testEnvironment = testenvironment.NewTestEnvironment() diff --git a/hack/e2e/eventing/delivery/delivery_test.go b/hack/e2e/eventing/delivery/delivery_test.go new file mode 100644 index 00000000..0f85a154 --- /dev/null +++ b/hack/e2e/eventing/delivery/delivery_test.go @@ -0,0 +1,129 @@ +package delivery + +import ( + "fmt" + "os" + "testing" + + "github.com/kyma-project/eventing-manager/hack/e2e/common/eventing" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/kyma-project/eventing-manager/hack/e2e/common" + "github.com/kyma-project/eventing-manager/hack/e2e/common/fixtures" + "github.com/stretchr/testify/require" + + "github.com/kyma-project/eventing-manager/hack/e2e/common/testenvironment" +) + +var testEnvironment *testenvironment.TestEnvironment + +type EventTestCase string + +const ( + LegacyEventCase EventTestCase = "legacy event" + StructuredCloudEventCase EventTestCase = "structured cloud event" + BinaryCloudEventCase EventTestCase = "binary cloud event" +) + +// TestMain runs before all the other test functions. +func TestMain(m *testing.M) { + testEnvironment = testenvironment.NewTestEnvironment() + + // wait for subscriptions. + if err := testEnvironment.WaitForAllSubscriptions(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // initialize event publisher client. + if err := testEnvironment.InitEventPublisherClient(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // initialize sink client for fetching events. + testEnvironment.InitSinkClient() + + // Run the tests and exit. + code := m.Run() + os.Exit(code) +} + +// ++ Tests + +func Test_LegacyEvents_SubscriptionV1Alpha1(t *testing.T) { + t.Parallel() + // binding.EncodingUnknown means legacy event. + testEventDelivery(t, LegacyEventCase, fixtures.V1Alpha1SubscriptionsToTest(), binding.EncodingUnknown, fixtures.V1Alpha1SubscriptionCRVersion) +} + +func Test_LegacyEvents(t *testing.T) { + t.Parallel() + // binding.EncodingUnknown means legacy event. + testEventDelivery(t, LegacyEventCase, fixtures.V1Alpha2SubscriptionsToTest(), binding.EncodingUnknown, fixtures.V1Alpha2SubscriptionCRVersion) +} + +func Test_StructuredCloudEvents_SubscriptionV1Alpha1(t *testing.T) { + t.Parallel() + testEventDelivery(t, StructuredCloudEventCase, fixtures.V1Alpha1SubscriptionsToTest(), binding.EncodingStructured, fixtures.V1Alpha1SubscriptionCRVersion) +} + +func Test_BinaryCloudEvents_SubscriptionV1Alpha1(t *testing.T) { + t.Parallel() + testEventDelivery(t, BinaryCloudEventCase, fixtures.V1Alpha1SubscriptionsToTest(), binding.EncodingBinary, fixtures.V1Alpha1SubscriptionCRVersion) +} + +func Test_StructuredCloudEvents(t *testing.T) { + t.Parallel() + testEventDelivery(t, StructuredCloudEventCase, fixtures.V1Alpha2SubscriptionsToTest(), binding.EncodingStructured, fixtures.V1Alpha2SubscriptionCRVersion) +} + +func Test_BinaryCloudEvents(t *testing.T) { + t.Parallel() + testEventDelivery(t, BinaryCloudEventCase, fixtures.V1Alpha2SubscriptionsToTest(), binding.EncodingBinary, fixtures.V1Alpha2SubscriptionCRVersion) +} + +// ++ Helper functions + +func testEventDelivery(t *testing.T, + testCase EventTestCase, + subsToTest []eventing.TestSubscriptionInfo, + encoding binding.Encoding, + subCRVersion fixtures.SubscriptionCRVersion) { + // In each subscription, we need to run the tests for each event type. + // loop over each subscription. + for _, subToTest := range subsToTest { + subToTest := subToTest + // loop over each event type in the subscription. + for id, eventTypeToTest := range subToTest.Types { + eventTypeToTest := eventTypeToTest + // define the test name. + testName := getTestName(testCase, subToTest, id) + // run test for the eventType. + t.Run(testName, func(t *testing.T) { + t.Parallel() + + // given + eventSourceToUse := subToTest.Source + if subCRVersion == fixtures.V1Alpha1SubscriptionCRVersion { + // For EventMesh with Subscription v1alpha1, the eventSource should be EventMesh NameSpace. + eventSourceToUse = testEnvironment.TestConfigs.EventMeshNamespace + } + + // when + err := common.Retry(testenvironment.ThreeAttempts, testenvironment.Interval, func() error { + if encoding == binding.EncodingUnknown { + // binding.EncodingUnknown means legacy event. + return testEnvironment.TestDeliveryOfLegacyEvent(eventSourceToUse, eventTypeToTest, subCRVersion) + } + return testEnvironment.TestDeliveryOfCloudEvent(eventSourceToUse, eventTypeToTest, encoding) + }) + + // then + require.NoError(t, err) + }) + } + } +} + +func getTestName(testCase EventTestCase, subToTest eventing.TestSubscriptionInfo, typeIndex int) string { + return fmt.Sprintf("%s should work for subscription(%s) (typeIndex[%v]) %s", testCase, subToTest.Name, typeIndex, subToTest.Description) +} diff --git a/hack/e2e/eventing/eventing_test.go b/hack/e2e/eventing/eventing_test.go deleted file mode 100644 index 62bc17ea..00000000 --- a/hack/e2e/eventing/eventing_test.go +++ /dev/null @@ -1,94 +0,0 @@ -//go:build e2e -// +build e2e - -package eventing - -import ( - "fmt" - "os" - "testing" - - "github.com/kyma-project/eventing-manager/hack/e2e/common" - "github.com/kyma-project/eventing-manager/hack/e2e/common/fixtures" - "github.com/kyma-project/eventing-manager/hack/e2e/common/testenvironment" - "github.com/stretchr/testify/require" -) - -var testEnvironment *testenvironment.TestEnvironment - -// TestMain runs before all the other test functions. It sets up all the resources that are shared between the different -// test functions. It will then run the tests and finally shuts everything down. -func TestMain(m *testing.M) { - testEnvironment = testenvironment.NewTestEnvironment() - - // create test namespace, - if err := testEnvironment.CreateTestNamespace(); err != nil { - testEnvironment.Logger.Fatal(err.Error()) - } - - // setup sink for subscriptions. - if err := testEnvironment.SetupSink(); err != nil { - testEnvironment.Logger.Fatal(err.Error()) - } - - // create subscriptions. - if err := testEnvironment.CreateAllSubscriptions(); err != nil { - testEnvironment.Logger.Fatal(err.Error()) - } - - // wait for subscriptions. - if err := testEnvironment.WaitForAllSubscriptions(); err != nil { - testEnvironment.Logger.Fatal(err.Error()) - } - - // initialize event publisher client. - testEnvironment.InitEventPublisherClient() - - // Run the tests and exit. - code := m.Run() - os.Exit(code) -} - -func Test_LegacyEvents_SubscriptionV1Alpha1(t *testing.T) { - t.Parallel() - for _, subToTest := range fixtures.V1Alpha1SubscriptionsToTest() { - subToTest := subToTest - for _, eventTypeToTest := range subToTest.Types { - eventTypeToTest := eventTypeToTest - testName := fmt.Sprintf("legacy event should work for subscription: %s with type: %s", subToTest.Name, eventTypeToTest) - // run test for the eventType. - t.Run(testName, func(t *testing.T) { - t.Parallel() - - // when - err := common.Retry(testenvironment.ThreeAttempts, testenvironment.Interval, func() error { - return testEnvironment.TestDeliveryOfLegacyEventForSubV1Alpha1(eventTypeToTest) - }) - - // then - require.NoError(t, err) - }) - } - } -} - -func Test_LegacyEvents(t *testing.T) { - t.Parallel() - for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() { - subToTest := subToTest - for _, eventTypeToTest := range subToTest.Types { - eventTypeToTest := eventTypeToTest - testName := fmt.Sprintf("legacy event should work for subscription: %s with type: %s", subToTest.Name, eventTypeToTest) - // run test for the eventType. - t.Run(testName, func(t *testing.T) { - t.Parallel() - - err := common.Retry(testenvironment.ThreeAttempts, testenvironment.Interval, func() error { - // It's fine if the Namespace already exists. - return testEnvironment.TestDeliveryOfLegacyEvent(subToTest.Source, eventTypeToTest) - }) - require.NoError(t, err) - }) - } - } -} diff --git a/hack/e2e/eventing/setup/setup_test.go b/hack/e2e/eventing/setup/setup_test.go new file mode 100644 index 00000000..1264d0b2 --- /dev/null +++ b/hack/e2e/eventing/setup/setup_test.go @@ -0,0 +1,41 @@ +package setup + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kyma-project/eventing-manager/hack/e2e/common/testenvironment" +) + +var testEnvironment *testenvironment.TestEnvironment + +// TestMain runs before all the other test functions. +func TestMain(m *testing.M) { + testEnvironment = testenvironment.NewTestEnvironment() + + // create test namespace, + if err := testEnvironment.CreateTestNamespace(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // setup sink for subscriptions. + if err := testEnvironment.SetupSink(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // create subscriptions. + if err := testEnvironment.CreateAllSubscriptions(); err != nil { + testEnvironment.Logger.Fatal(err.Error()) + } + + // Run the tests and exit. + code := m.Run() + os.Exit(code) +} + +func Test_SubscriptionsReady(t *testing.T) { + t.Parallel() + require.NoError(t, testEnvironment.WaitForAllSubscriptions()) +} diff --git a/hack/e2e/scripts/event_delivery_tests.sh b/hack/e2e/scripts/event_delivery_tests.sh index 5a6c1f99..ee0487b3 100755 --- a/hack/e2e/scripts/event_delivery_tests.sh +++ b/hack/e2e/scripts/event_delivery_tests.sh @@ -2,20 +2,25 @@ set -e # for our tests we need to port-forward the eventing-publisher-proxy. -echo "Port-forwarding to the eventing-publisher-proxy using port: 38081" +echo "Port-forwarding to the svc/eventing-publisher-proxy using port: 38081" kubectl -n kyma-system port-forward svc/eventing-publisher-proxy 38081:80 & PID1=$! +echo "Port-forwarding to the svc/test-sink in namespace: eventing-tests using port: 38071" +kubectl -n eventing-tests port-forward svc/test-sink 38071:80 & +PID2=$! # This will kill all the port-forwarding. We need this to be in a function so we can even call it, if our tests fails # since `set -e` would stop the script in case of an failing test. function kill_port_forward() { echo "Killing the port-forwarding for port: 38081" kill ${PID1} + echo "Killing the port-forwarding for port: 38071" + kill ${PID2} } # This kills the port-forwards even if the test fails. trap kill_port_forward ERR echo "Running tests..." -go test -v ./hack/e2e/eventing/eventing_test.go --tags=e2e +go test -v ./hack/e2e/eventing/delivery/delivery_test.go --tags=e2e kill_port_forward