Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for Cloud Events and verification of reception of events by the Sink #93

Merged
merged 8 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,15 @@ e2e-setup:

# e2e-cleanup will delete the Eventing CR and check if the required resources are de-provisioned or not.
.PHONY: e2e-cleanup
e2e-cleanup:
e2e-cleanup: e2e-eventing-cleanup
go test -v ./hack/e2e/cleanup/cleanup_test.go --tags=e2e

# e2e-eventing will setup subscriptions and tests end-to-end deliver of events.
# e2e-eventing-setup will setup subscriptions and sink required for tests to check end-to-end delivery of events.
.PHONY: e2e-eventing-setup
e2e-eventing-setup:
go test -v ./hack/e2e/eventing/setup/setup_test.go --tags=e2e

# e2e-eventing will tests end-to-end delivery of events.
.PHONY: e2e-eventing
e2e-eventing:
./hack/e2e/scripts/event_delivery_tests.sh
Expand All @@ -209,4 +214,4 @@ e2e-eventing-cleanup:

# e2e will run the whole suite of end-to-end tests for eventing-manager.
.PHONY: e2e
e2e: e2e-setup e2e-eventing e2e-cleanup
e2e: e2e-setup e2e-eventing-setup e2e-eventing e2e-cleanup
65 changes: 64 additions & 1 deletion hack/e2e/common/eventing/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/kyma-project/eventing-manager/hack/e2e/common"
"github.com/pkg/errors"
Expand All @@ -16,7 +19,7 @@ import (

const (
LegacyPublishEndpointFormat = "%s/%s/v1/events"
CloudEventPublishEndpointFormat = "%s/%s/v1/events"
CloudEventPublishEndpointFormat = "%s/publish"
)

type Publisher struct {
Expand All @@ -41,12 +44,22 @@ func (p *Publisher) LegacyPublishEndpoint(source string) string {
return fmt.Sprintf(LegacyPublishEndpointFormat, p.publisherURL, source)
}

func (p *Publisher) PublishEndpoint() string {
return fmt.Sprintf(CloudEventPublishEndpointFormat, p.publisherURL)
}

func (p *Publisher) SendLegacyEventWithRetries(source, eventType, payload string, attempts int, interval time.Duration) error {
return common.Retry(attempts, interval, func() error {
return p.SendLegacyEvent(source, eventType, payload)
})
}

func (p *Publisher) SendCloudEventWithRetries(event *cloudevents.Event, encoding binding.Encoding, attempts int, interval time.Duration) error {
return common.Retry(attempts, interval, func() error {
return p.SendCloudEvent(event, encoding)
})
}

func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error {
url := p.LegacyPublishEndpoint(source)

Expand Down Expand Up @@ -95,3 +108,53 @@ func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error {
return err2
}
}

func (p *Publisher) SendCloudEvent(event *cloudevents.Event, encoding binding.Encoding) error {
ce := *event
newCtx := context.Background()
ctx := cloudevents.ContextWithTarget(newCtx, p.PublishEndpoint())
switch encoding {
case binding.EncodingBinary:
{
ctx = binding.WithForceBinary(ctx)
}
case binding.EncodingStructured:
{
ctx = binding.WithForceStructured(ctx)
}
default:
{
return fmt.Errorf("failed to use unsupported cloudevent encoding:[%s]", encoding.String())
}
}

p.logger.Debug(fmt.Sprintf("Publishing cloud event:"+
" URL: %s,"+
" Encoding: %s,"+
" EventID: %s,"+
" EventSource: %s,"+
" EventType: %s,"+
" Payload: %s",
p.PublishEndpoint(), encoding.String(), ce.ID(), ce.Source(), ce.Type(), ce.Data()))

result := p.clientCE.Send(ctx, ce)
switch {
case cloudevents.IsUndelivered(result):
{
return fmt.Errorf("failed to send cloudevent-%s undelivered:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
case cloudevents.IsNACK(result):
{
return fmt.Errorf("failed to send cloudevent-%s nack:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
case cloudevents.IsACK(result):
{
p.logger.Debug(fmt.Sprintf("successfully sent cloudevent-%s [%s]", encoding.String(), ce.Type()))
return nil
}
default:
{
return fmt.Errorf("failed to send cloudevent-%s unknown:[%s] response:[%s]", encoding.String(), ce.Type(), result)
}
}
}
113 changes: 113 additions & 0 deletions hack/e2e/common/eventing/sinkclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package eventing

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cev2event "github.com/cloudevents/sdk-go/v2/event"
"github.com/kyma-project/eventing-manager/hack/e2e/common"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
EventsEndpointFormat = "%s/events/%s"
)

type SinkClient struct {
ctx context.Context
clientHTTP *http.Client
sinkURL string
logger *zap.Logger
}

type SinkEvent struct {
// Header stores the non CE events, e.g. X-B3-Sampled and Traceparent
http.Header
cev2event.Event
}

func NewSinkClient(ctx context.Context, clientHTTP *http.Client, sinkURL string, logger *zap.Logger) *SinkClient {
return &SinkClient{
ctx: ctx,
clientHTTP: clientHTTP,
sinkURL: sinkURL,
logger: logger,
}
}

func (sc *SinkClient) EventsEndpoint(eventId string) string {
return fmt.Sprintf(EventsEndpointFormat, sc.sinkURL, eventId)
}

func (sc *SinkClient) GetEventFromSinkWithRetries(eventId string, attempts int, interval time.Duration) (*SinkEvent, error) {
var gotEvent *SinkEvent
err := common.Retry(attempts, interval, func() error {
var err1 error
gotEvent, err1 = sc.GetEventFromSink(eventId)
return err1
})
return gotEvent, err
}

func (sc *SinkClient) GetEventFromSink(eventId string) (*SinkEvent, error) {
url := sc.EventsEndpoint(eventId)
sc.logger.Debug(fmt.Sprintf("Fetching event with ID: %s from the sink URL: %s", eventId, url))

req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer([]byte{}))
if err != nil {
err = errors.Wrap(err, "Failed to create HTTP request for fetching event from sink")
sc.logger.Debug(err.Error())
return nil, err
}

req.Header.Set("Content-Type", "application/json")
resp, err := sc.clientHTTP.Do(req)
if err != nil {
err = errors.Wrap(err, "Failed to fetch event")
sc.logger.Debug(err.Error())
return nil, err
}
defer func() {
err = resp.Body.Close()
if err != nil {
sc.logger.Error(err.Error())
}
}()

// read body.
respBody, err := io.ReadAll(resp.Body)
if err != nil {
err = errors.Wrap(err, "Failed to read response body")
sc.logger.Debug(err.Error())
return nil, err
}

// if not success, then return error.
if !Is2XX(resp.StatusCode) {
err = errors.New(fmt.Sprintf("Failed to fetch eventID:[%s] response:[%d] body:[%s]", eventId,
resp.StatusCode, string(respBody)))
sc.logger.Debug(err.Error())
return nil, err
}

// success
// convert to cloud event object.
ceEvent := cloudevents.NewEvent()
err = json.Unmarshal(respBody, &ceEvent)
if err != nil {
err = errors.Wrap(err, "failed to convert JSON to CloudEvent")
sc.logger.Debug(err.Error())
return nil, err
}

return &SinkEvent{
Event: ceEvent,
}, nil
}
24 changes: 24 additions & 0 deletions hack/e2e/common/eventing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"net/http"
"strings"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"github.com/google/uuid"
)

Expand All @@ -25,6 +28,14 @@ func LegacyEventPayload(eventId, eventVersion, eventType, data string) string {
return `{"data":"` + data + `","event-id":"` + eventId + `","event-type":"` + eventType + `","event-time":"2020-04-02T21:37:00Z","event-type-version":"` + eventVersion + `"}`
}

func CloudEventMode(encoding binding.Encoding) string {
return fmt.Sprintf("ce-%s", encoding.String())
}

func CloudEventData(source, eventType string, encoding binding.Encoding) map[string]interface{} {
return map[string]interface{}{keyApp: source, keyMode: CloudEventMode(encoding), keyType: eventType}
}

func ExtractSourceFromSubscriptionV1Alpha1Type(eventType string) string {
segments := strings.Split(eventType, ".")
return segments[3]
Expand Down Expand Up @@ -73,3 +84,16 @@ func NewLegacyEvent(eventSource, eventType string) (string, string, string, stri

return eventID, eventSource, legacyEventType, payload
}

func NewCloudEvent(eventSource, eventType string, encoding binding.Encoding) (*cloudevents.Event, error) {
eventID := uuid.New().String()
ce := cloudevents.NewEvent()
data := CloudEventData(eventSource, eventType, encoding)
ce.SetID(eventID)
ce.SetType(eventType)
ce.SetSource(eventSource)
if err := ce.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("failed to set cloudevent-%s data with error:[%s]", encoding.String(), err)
}
return &ce, nil
}
35 changes: 26 additions & 9 deletions hack/e2e/common/fixtures/fixtures.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fixtures

import (
"errors"
"fmt"
"strings"

Expand All @@ -26,6 +27,14 @@ const (
WebhookServerCertJobName = "eventing-manager-cert-handler"
EventMeshSecretNamespace = "kyma-system"
EventMeshSecretName = "eventing-backend"
EventOriginalTypeHeader = "originaltype"
)

type SubscriptionCRVersion string

const (
V1Alpha1SubscriptionCRVersion SubscriptionCRVersion = "v1alpha1"
V1Alpha2SubscriptionCRVersion SubscriptionCRVersion = "v1alpha2"
)

func EventingCR(backendType eventingv1alpha1.BackendType) *eventingv1alpha1.Eventing {
Expand Down Expand Up @@ -104,15 +113,18 @@ func PublisherSpec() eventingv1alpha1.Publisher {
func V1Alpha1SubscriptionsToTest() []eventing.TestSubscriptionInfo {
return []eventing.TestSubscriptionInfo{
{
Name: "test-sub-1-v1alpha1",
Types: []string{"sap.kyma.custom.noapp.order.tested.v1"},
Name: "test-sub-1-v1alpha1",
Description: "event type and source without any alpha-numeric characters",
Types: []string{"sap.kyma.custom.noapp.order.tested.v1"},
},
{
Name: "test-sub-2-v1alpha1",
Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"},
Name: "test-sub-2-v1alpha1",
Description: "event type and source with alpha-numeric characters",
Types: []string{"sap.kyma.custom.test-app.order-$.second.R-e-c-e-i-v-e-d.v1"},
},
{
Name: "test-sub-3-with-multiple-types-v1alpha1",
Name: "test-sub-3-with-multiple-types-v1alpha1",
Description: "multiple types in same subscription",
Types: []string{
"sap.kyma.custom.connected-app.order.tested.v1",
"sap.kyma.custom.connected-app2.or-der.crea-ted.one.two.three.v4",
Expand All @@ -125,19 +137,20 @@ func V1Alpha2SubscriptionsToTest() []eventing.TestSubscriptionInfo {
return []eventing.TestSubscriptionInfo{
{
Name: "test-sub-1-v1alpha2",
Description: "Test event type and source without any alpha-numeric characters",
Description: "event type and source without any alpha-numeric characters",
Source: "noapp",
Types: []string{"order.modified.v1"},
},
{
Name: "test-sub-2-v1alpha2",
Description: "Test event type and source with any alpha-numeric characters",
Description: "event type and source with alpha-numeric characters",
Source: "test-app",
Types: []string{"Order-$.third.R-e-c-e-i-v-e-d.v1"},
},
{
Name: "test-sub-3-with-multiple-types-v1alpha2",
Source: "test-evnt",
Name: "test-sub-3-with-multiple-types-v1alpha2",
Description: "multiple types in same subscription",
Source: "test-evnt",
Types: []string{
"or-der.crea-ted.one.two.three.four.v4",
"order.testing.v1",
Expand Down Expand Up @@ -256,3 +269,7 @@ func ConvertSelectorLabelsToString(labels map[string]string) string {
}
return strings.Join(result, ",")
}

func AppendMsgToError(err error, msg string) error {
return errors.Join(err, fmt.Errorf("\n==> %s", msg))
}
19 changes: 18 additions & 1 deletion hack/e2e/common/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,29 @@ package http
import (
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventsclient "github.com/cloudevents/sdk-go/v2/client"
)

func NewClient(transport *http.Transport) *http.Client {
func NewHttpClient(transport *http.Transport) *http.Client {
return &http.Client{Transport: transport}
}

func NewCloudEventsClient(transport *http.Transport) (*cloudeventsclient.Client, error) {
p, err := cloudevents.NewHTTP(cloudevents.WithRoundTripper(transport))
if err != nil {
return nil, err
}

client, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
return nil, err
}

return &client, nil
}

func NewTransport(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int, idleConnTimeout time.Duration) *http.Transport {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = maxIdleConns
Expand Down
Loading