Skip to content

Commit

Permalink
Added tests for Cloud Events and verification of reception of events …
Browse files Browse the repository at this point in the history
…by the Sink (#93)

* Added e2e event tests for CE

* added code to fetch event from sink

* updated

* added verification for legacy events received by sink

* cleanup code

* fixed lint

* addressed review comments

* lint fix
  • Loading branch information
mfaizanse authored Sep 14, 2023
1 parent 8e81aae commit 343a348
Show file tree
Hide file tree
Showing 13 changed files with 560 additions and 133 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,15 @@ e2e-setup:

# e2e-cleanup will delete the Eventing CR and check if the required resources are de-provisioned or not.
.PHONY: e2e-cleanup
e2e-cleanup:
e2e-cleanup: e2e-eventing-cleanup
go test -v ./hack/e2e/cleanup/cleanup_test.go --tags=e2e

# e2e-eventing will setup subscriptions and tests end-to-end deliver of events.
# e2e-eventing-setup will setup subscriptions and sink required for tests to check end-to-end delivery of events.
.PHONY: e2e-eventing-setup
e2e-eventing-setup:
go test -v ./hack/e2e/eventing/setup/setup_test.go --tags=e2e

# e2e-eventing will tests end-to-end delivery of events.
.PHONY: e2e-eventing
e2e-eventing:
./hack/e2e/scripts/event_delivery_tests.sh
Expand All @@ -209,4 +214,4 @@ e2e-eventing-cleanup:

# e2e will run the whole suite of end-to-end tests for eventing-manager.
.PHONY: e2e
e2e: e2e-setup e2e-eventing e2e-cleanup
e2e: e2e-setup e2e-eventing-setup e2e-eventing e2e-cleanup
65 changes: 64 additions & 1 deletion hack/e2e/common/eventing/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/kyma-project/eventing-manager/hack/e2e/common"
"github.com/pkg/errors"
Expand All @@ -16,7 +19,7 @@ import (

const (
LegacyPublishEndpointFormat = "%s/%s/v1/events"
CloudEventPublishEndpointFormat = "%s/%s/v1/events"
CloudEventPublishEndpointFormat = "%s/publish"
)

type Publisher struct {
Expand All @@ -41,12 +44,22 @@ func (p *Publisher) LegacyPublishEndpoint(source string) string {
return fmt.Sprintf(LegacyPublishEndpointFormat, p.publisherURL, source)
}

func (p *Publisher) PublishEndpoint() string {
return fmt.Sprintf(CloudEventPublishEndpointFormat, p.publisherURL)
}

func (p *Publisher) SendLegacyEventWithRetries(source, eventType, payload string, attempts int, interval time.Duration) error {
return common.Retry(attempts, interval, func() error {
return p.SendLegacyEvent(source, eventType, payload)
})
}

func (p *Publisher) SendCloudEventWithRetries(event *cloudevents.Event, encoding binding.Encoding, attempts int, interval time.Duration) error {
return common.Retry(attempts, interval, func() error {
return p.SendCloudEvent(event, encoding)
})
}

func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error {
url := p.LegacyPublishEndpoint(source)

Expand Down Expand Up @@ -95,3 +108,53 @@ func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error {
return err2
}
}

func (p *Publisher) SendCloudEvent(event *cloudevents.Event, encoding binding.Encoding) error {
ce := *event
newCtx := context.Background()
ctx := cloudevents.ContextWithTarget(newCtx, p.PublishEndpoint())
switch encoding {
case binding.EncodingBinary:
{
ctx = binding.WithForceBinary(ctx)
}
case binding.EncodingStructured:
{
ctx = binding.WithForceStructured(ctx)
}
default:
{
return fmt.Errorf("failed to use unsupported cloudevent encoding:[%s]", encoding.String())
}
}

p.logger.Debug(fmt.Sprintf("Publishing cloud event:"+
" URL: %s,"+
" Encoding: %s,"+
" EventID: %s,"+
" EventSource: %s,"+
" EventType: %s,"+
" Payload: %s",
p.PublishEndpoint(), encoding.String(), ce.ID(), ce.Source(), ce.Type(), ce.Data()))

result := p.clientCE.Send(ctx, ce)
switch {
case cloudevents.IsUndelivered(result):
{
return fmt.Errorf("failed to send cloudevent-%s undelivered:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
case cloudevents.IsNACK(result):
{
return fmt.Errorf("failed to send cloudevent-%s nack:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
case cloudevents.IsACK(result):
{
p.logger.Debug(fmt.Sprintf("successfully sent cloudevent-%s [%s]", encoding.String(), ce.Type()))
return nil
}
default:
{
return fmt.Errorf("failed to send cloudevent-%s unknown:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
}
}
113 changes: 113 additions & 0 deletions hack/e2e/common/eventing/sinkclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package eventing

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cev2event "github.com/cloudevents/sdk-go/v2/event"
"github.com/kyma-project/eventing-manager/hack/e2e/common"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
EventsEndpointFormat = "%s/events/%s"
)

type SinkClient struct {
ctx context.Context
clientHTTP *http.Client
sinkURL string
logger *zap.Logger
}

type SinkEvent struct {
// Header stores the non CE events, e.g. X-B3-Sampled and Traceparent
http.Header
cev2event.Event
}

func NewSinkClient(ctx context.Context, clientHTTP *http.Client, sinkURL string, logger *zap.Logger) *SinkClient {
return &SinkClient{
ctx: ctx,
clientHTTP: clientHTTP,
sinkURL: sinkURL,
logger: logger,
}
}

func (sc *SinkClient) EventsEndpoint(eventId string) string {
return fmt.Sprintf(EventsEndpointFormat, sc.sinkURL, eventId)
}

func (sc *SinkClient) GetEventFromSinkWithRetries(eventId string, attempts int, interval time.Duration) (*SinkEvent, error) {
var gotEvent *SinkEvent
err := common.Retry(attempts, interval, func() error {
var err1 error
gotEvent, err1 = sc.GetEventFromSink(eventId)
return err1
})
return gotEvent, err
}

func (sc *SinkClient) GetEventFromSink(eventId string) (*SinkEvent, error) {
url := sc.EventsEndpoint(eventId)
sc.logger.Debug(fmt.Sprintf("Fetching event with ID: %s from the sink URL: %s", eventId, url))

req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer([]byte{}))
if err != nil {
err = errors.Wrap(err, "Failed to create HTTP request for fetching event from sink")
sc.logger.Debug(err.Error())
return nil, err
}

req.Header.Set("Content-Type", "application/json")
resp, err := sc.clientHTTP.Do(req)
if err != nil {
err = errors.Wrap(err, "Failed to fetch event")
sc.logger.Debug(err.Error())
return nil, err
}
defer func() {
err = resp.Body.Close()
if err != nil {
sc.logger.Error(err.Error())
}
}()

// read body.
respBody, err := io.ReadAll(resp.Body)
if err != nil {
err = errors.Wrap(err, "Failed to read response body")
sc.logger.Debug(err.Error())
return nil, err
}

// if not success, then return error.
if !Is2XX(resp.StatusCode) {
err = errors.New(fmt.Sprintf("Failed to fetch eventID:[%s] response:[%d] body:[%s]", eventId,
resp.StatusCode, string(respBody)))
sc.logger.Debug(err.Error())
return nil, err
}

// success
// convert to cloud event object.
ceEvent := cloudevents.NewEvent()
err = json.Unmarshal(respBody, &ceEvent)
if err != nil {
err = errors.Wrap(err, "failed to convert JSON to CloudEvent")
sc.logger.Debug(err.Error())
return nil, err
}

return &SinkEvent{
Event: ceEvent,
}, nil
}
24 changes: 24 additions & 0 deletions hack/e2e/common/eventing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"net/http"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"github.com/google/uuid"
)

Expand All @@ -25,6 +28,14 @@ func LegacyEventPayload(eventId, eventVersion, eventType, data string) string {
return `{"data":"` + data + `","event-id":"` + eventId + `","event-type":"` + eventType + `","event-time":"2020-04-02T21:37:00Z","event-type-version":"` + eventVersion + `"}`
}

func CloudEventMode(encoding binding.Encoding) string {
return fmt.Sprintf("ce-%s", encoding.String())
}

func CloudEventData(source, eventType string, encoding binding.Encoding) map[string]interface{} {
return map[string]interface{}{keyApp: source, keyMode: CloudEventMode(encoding), keyType: eventType}
}

func ExtractSourceFromSubscriptionV1Alpha1Type(eventType string) string {
segments := strings.Split(eventType, ".")
return segments[3]
Expand Down Expand Up @@ -73,3 +84,16 @@ func NewLegacyEvent(eventSource, eventType string) (string, string, string, stri

return eventID, eventSource, legacyEventType, payload
}

func NewCloudEvent(eventSource, eventType string, encoding binding.Encoding) (*cloudevents.Event, error) {
eventID := uuid.New().String()
ce := cloudevents.NewEvent()
data := CloudEventData(eventSource, eventType, encoding)
ce.SetID(eventID)
ce.SetType(eventType)
ce.SetSource(eventSource)
if err := ce.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("failed to set cloudevent-%s data with error:[%s]", encoding.String(), err)
}
return &ce, nil
}
35 changes: 26 additions & 9 deletions hack/e2e/common/fixtures/fixtures.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fixtures

import (
"errors"
"fmt"
"strings"

Expand All @@ -26,6 +27,14 @@ const (
WebhookServerCertJobName = "eventing-manager-cert-handler"
EventMeshSecretNamespace = "kyma-system"
EventMeshSecretName = "eventing-backend"
EventOriginalTypeHeader = "originaltype"
)

type SubscriptionCRVersion string

const (
V1Alpha1SubscriptionCRVersion SubscriptionCRVersion = "v1alpha1"
V1Alpha2SubscriptionCRVersion SubscriptionCRVersion = "v1alpha2"
)

func EventingCR(backendType eventingv1alpha1.BackendType) *eventingv1alpha1.Eventing {
Expand Down Expand Up @@ -104,15 +113,18 @@ func PublisherSpec() eventingv1alpha1.Publisher {
func V1Alpha1SubscriptionsToTest() []eventing.TestSubscriptionInfo {
return []eventing.TestSubscriptionInfo{
{
Name: "test-sub-1-v1alpha1",
Types: []string{"sap.kyma.custom.noapp.order.tested.v1"},
Name: "test-sub-1-v1alpha1",
Description: "event type and source without any alpha-numeric characters",
Types: []string{"sap.kyma.custom.noapp.order.tested.v1"},
},
{
Name: "test-sub-2-v1alpha1",
Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"},
Name: "test-sub-2-v1alpha1",
Description: "event type and source with alpha-numeric characters",
Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"},
},
{
Name: "test-sub-3-with-multiple-types-v1alpha1",
Name: "test-sub-3-with-multiple-types-v1alpha1",
Description: "multiple types in same subscription",
Types: []string{
"sap.kyma.custom.connected-app.order.tested.v1",
"sap.kyma.custom.connected-app2.or-der.crea-ted.one.two.three.v4",
Expand All @@ -125,19 +137,20 @@ func V1Alpha2SubscriptionsToTest() []eventing.TestSubscriptionInfo {
return []eventing.TestSubscriptionInfo{
{
Name: "test-sub-1-v1alpha2",
Description: "Test event type and source without any alpha-numeric characters",
Description: "event type and source without any alpha-numeric characters",
Source: "noapp",
Types: []string{"order.modified.v1"},
},
{
Name: "test-sub-2-v1alpha2",
Description: "Test event type and source with any alpha-numeric characters",
Description: "event type and source with alpha-numeric characters",
Source: "test-app",
Types: []string{"Order-$.third.R-e-c-e-i-v-e-d.v1"},
},
{
Name: "test-sub-3-with-multiple-types-v1alpha2",
Source: "test-evnt",
Name: "test-sub-3-with-multiple-types-v1alpha2",
Description: "multiple types in same subscription",
Source: "test-evnt",
Types: []string{
"or-der.crea-ted.one.two.three.four.v4",
"order.testing.v1",
Expand Down Expand Up @@ -256,3 +269,7 @@ func ConvertSelectorLabelsToString(labels map[string]string) string {
}
return strings.Join(result, ",")
}

func AppendMsgToError(err error, msg string) error {
return errors.Join(err, fmt.Errorf("\n==> %s", msg))
}
19 changes: 18 additions & 1 deletion hack/e2e/common/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,29 @@ package http
import (
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventsclient "github.com/cloudevents/sdk-go/v2/client"
)

func NewClient(transport *http.Transport) *http.Client {
func NewHttpClient(transport *http.Transport) *http.Client {
return &http.Client{Transport: transport}
}

func NewCloudEventsClient(transport *http.Transport) (*cloudeventsclient.Client, error) {
p, err := cloudevents.NewHTTP(cloudevents.WithRoundTripper(transport))
if err != nil {
return nil, err
}

client, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
return nil, err
}

return &client, nil
}

func NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int, idleConnTimeout time.Duration) *http.Transport {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = maxIdleConns
Expand Down
Loading

0 comments on commit 343a348

Please sign in to comment.