generated from kyma-project/template-repository
-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added E2E tests for legacy events (#90)
* added e2e tests * added tests * added support for EventMesh * updated * updated * updated * Added e2e event delivery tests * updated * updated * updated * updated * updated * Apply suggestions from code review Co-authored-by: Friedrich <[email protected]> --------- Co-authored-by: Friedrich <[email protected]>
- Loading branch information
1 parent
2f0470f
commit 4afd885
Showing
14 changed files
with
976 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package eventing | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/cloudevents/sdk-go/v2/client" | ||
"github.com/kyma-project/eventing-manager/hack/e2e/common" | ||
"github.com/pkg/errors" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
LegacyPublishEndpointFormat = "%s/%s/v1/events" | ||
CloudEventPublishEndpointFormat = "%s/%s/v1/events" | ||
) | ||
|
||
type Publisher struct { | ||
ctx context.Context | ||
clientCE client.Client | ||
clientHTTP *http.Client | ||
publisherURL string | ||
logger *zap.Logger | ||
} | ||
|
||
func NewPublisher(ctx context.Context, clientCE client.Client, clientHTTP *http.Client, publisherURL string, logger *zap.Logger) *Publisher { | ||
return &Publisher{ | ||
ctx: ctx, | ||
clientCE: clientCE, | ||
clientHTTP: clientHTTP, | ||
publisherURL: publisherURL, | ||
logger: logger, | ||
} | ||
} | ||
|
||
func (p *Publisher) LegacyPublishEndpoint(source string) string { | ||
return fmt.Sprintf(LegacyPublishEndpointFormat, p.publisherURL, source) | ||
} | ||
|
||
func (p *Publisher) SendLegacyEventWithRetries(source, eventType, payload string, attempts int, interval time.Duration) error { | ||
return common.Retry(attempts, interval, func() error { | ||
return p.SendLegacyEvent(source, eventType, payload) | ||
}) | ||
} | ||
|
||
func (p *Publisher) SendLegacyEvent(source, eventType, payload string) error { | ||
url := p.LegacyPublishEndpoint(source) | ||
|
||
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer([]byte(payload))) | ||
if err != nil { | ||
err = errors.Wrap(err, "Failed to create HTTP request for sending legacy event") | ||
p.logger.Debug(err.Error()) | ||
return err | ||
} | ||
|
||
p.logger.Debug(fmt.Sprintf("Publishing legacy event:"+ | ||
" URL: %s,"+ | ||
" EventSource: %s,"+ | ||
" EventType: %s,"+ | ||
" Payload: %s", | ||
url, source, eventType, payload)) | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
resp, err := p.clientHTTP.Do(req) | ||
if err != nil { | ||
err = errors.Wrap(err, "Failed to send legacy-event") | ||
p.logger.Debug(err.Error()) | ||
return err | ||
} | ||
defer func() { | ||
err = resp.Body.Close() | ||
if err != nil { | ||
p.logger.Error(err.Error()) | ||
} | ||
}() | ||
|
||
// if success, then return. | ||
if Is2XX(resp.StatusCode) { | ||
return nil | ||
} | ||
|
||
// read body and return error. | ||
if body, err2 := io.ReadAll(resp.Body); err2 != nil { | ||
err2 = errors.Wrap(err2, "Failed to read response body") | ||
p.logger.Debug(err2.Error()) | ||
return err2 | ||
} else { | ||
err2 = errors.New(fmt.Sprintf("Failed to send legacy-event:[%s] response:[%d] body:[%s]", eventType, | ||
resp.StatusCode, string(body))) | ||
p.logger.Debug(err2.Error()) | ||
return err2 | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package eventing | ||
|
||
import ( | ||
ecv1alpha1 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha1" | ||
ecv1alpha2 "github.com/kyma-project/kyma/components/eventing-controller/api/v1alpha2" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
type TestSubscriptionInfo struct { | ||
Name string | ||
Description string | ||
Source string | ||
Types []string | ||
} | ||
|
||
func (s TestSubscriptionInfo) V1Alpha1SpecFilters() []*ecv1alpha1.EventMeshFilter { | ||
var filters []*ecv1alpha1.EventMeshFilter | ||
for _, etype := range s.Types { | ||
filter := &ecv1alpha1.EventMeshFilter{ | ||
EventSource: &ecv1alpha1.Filter{ | ||
Type: "exact", | ||
Property: "source", | ||
Value: "", | ||
}, | ||
EventType: &ecv1alpha1.Filter{ | ||
Type: "exact", | ||
Property: "type", | ||
Value: etype, | ||
}, | ||
} | ||
filters = append(filters, filter) | ||
} | ||
return filters | ||
} | ||
|
||
func (s TestSubscriptionInfo) ToSubscriptionV1Alpha1(sink, namespace string) *ecv1alpha1.Subscription { | ||
return &ecv1alpha1.Subscription{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: s.Name, | ||
Namespace: namespace, | ||
}, | ||
Spec: ecv1alpha1.SubscriptionSpec{ | ||
Sink: sink, | ||
Filter: &ecv1alpha1.BEBFilters{ | ||
Filters: s.V1Alpha1SpecFilters(), | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func (s TestSubscriptionInfo) ToSubscriptionV1Alpha2(sink, namespace string) *ecv1alpha2.Subscription { | ||
return &ecv1alpha2.Subscription{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: s.Name, | ||
Namespace: namespace, | ||
}, | ||
Spec: ecv1alpha2.SubscriptionSpec{ | ||
Sink: sink, | ||
TypeMatching: ecv1alpha2.TypeMatchingStandard, | ||
Source: s.Source, | ||
Types: s.Types, | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package eventing | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
|
||
"github.com/google/uuid" | ||
) | ||
|
||
const ( | ||
keyApp = "app" | ||
keyMode = "mode" | ||
keyType = "type" | ||
) | ||
|
||
func Is2XX(statusCode int) bool { | ||
return http.StatusOK <= statusCode && statusCode <= http.StatusIMUsed | ||
} | ||
|
||
func LegacyEventData(source, eventType string) string { | ||
return `{\"` + keyApp + `\":\"` + source + `\",\"` + keyMode + `\":\"legacy\",\"` + keyType + `\":\"` + eventType + `\"}` | ||
} | ||
func LegacyEventPayload(eventId, eventVersion, eventType, data string) string { | ||
return `{"data":"` + data + `","event-id":"` + eventId + `","event-type":"` + eventType + `","event-time":"2020-04-02T21:37:00Z","event-type-version":"` + eventVersion + `"}` | ||
} | ||
|
||
func ExtractSourceFromSubscriptionV1Alpha1Type(eventType string) string { | ||
segments := strings.Split(eventType, ".") | ||
return segments[3] | ||
} | ||
|
||
func ExtractLegacyTypeFromSubscriptionV1Alpha1Type(eventTypePrefix, eventSource, eventVersion, eventType string) string { | ||
tmp := strings.TrimPrefix(eventType, fmt.Sprintf("%s.%s.", eventTypePrefix, eventSource)) | ||
return strings.TrimSuffix(tmp, fmt.Sprintf(".%s", eventVersion)) | ||
} | ||
|
||
func ExtractLegacyTypeFromSubscriptionV1Alpha2Type(eventVersion, eventType string) string { | ||
return strings.TrimSuffix(eventType, fmt.Sprintf(".%s", eventVersion)) | ||
} | ||
|
||
func ExtractVersionFromEventType(eventType string) string { | ||
segments := strings.Split(eventType, ".") | ||
return segments[len(segments)-1] | ||
} | ||
|
||
func NewLegacyEventForV1Alpha1(eventType, eventTypePrefix string) (string, string, string, string) { | ||
// If the eventType is sap.kyma.custom.noapp.order.created.v1, then for legacy event: | ||
// eventSource should be: noapp | ||
// eventType should be: order.created | ||
// eventVersion should be: v1 | ||
eventID := uuid.New().String() | ||
eventSource := ExtractSourceFromSubscriptionV1Alpha1Type(eventType) | ||
eventVersion := ExtractVersionFromEventType(eventType) | ||
legacyEventType := ExtractLegacyTypeFromSubscriptionV1Alpha1Type(eventTypePrefix, | ||
eventSource, eventVersion, eventType) | ||
eventData := LegacyEventData(eventSource, legacyEventType) | ||
payload := LegacyEventPayload(eventID, eventVersion, legacyEventType, eventData) | ||
|
||
return eventID, eventSource, legacyEventType, payload | ||
} | ||
|
||
func NewLegacyEvent(eventSource, eventType string) (string, string, string, string) { | ||
// If the eventType is order.created.v1 and source is noapp, then for legacy event: | ||
// eventSource should be: noapp | ||
// eventType should be: order.created | ||
// eventVersion should be: v1 | ||
eventID := uuid.New().String() | ||
eventVersion := ExtractVersionFromEventType(eventType) | ||
legacyEventType := ExtractLegacyTypeFromSubscriptionV1Alpha2Type(eventVersion, eventType) | ||
eventData := LegacyEventData(eventSource, legacyEventType) | ||
payload := LegacyEventPayload(eventID, eventVersion, legacyEventType, eventData) | ||
|
||
return eventID, eventSource, legacyEventType, payload | ||
} |
Oops, something went wrong.