Skip to content

Commit

Permalink
Add /:app/v1/events/subscribed endpoint with the same functionality from
Browse files Browse the repository at this point in the history
event-service
  • Loading branch information
sayanh committed Nov 23, 2020
1 parent 77ca246 commit e3a3aa2
Show file tree
Hide file tree
Showing 17 changed files with 2,045 additions and 391 deletions.
37 changes: 26 additions & 11 deletions components/event-publisher-proxy/cmd/event-publisher-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package main

import (
"github.com/kelseyhightower/envconfig"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/legacy-events"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/options"
"github.com/sirupsen/logrus"

"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/env"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/handler"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/legacy-events"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/oauth"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/options"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/receiver"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/sender"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/signals"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/subscribed"
"sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/sirupsen/logrus"

_ "k8s.io/client-go/plugin/pkg/client/auth"
)

func main() {
Expand All @@ -22,15 +26,12 @@ func main() {
logger.Fatalf("Start handler failed with error: %s", err)
}

logger.Info("Start the Event Publisher Proxy")

// configure message receiver
messageReceiver := receiver.NewHttpMessageReceiver(cfg.Port)

// configure auth client
ctx := signals.NewContext()
client := oauth.NewClient(ctx, cfg)
defer client.CloseIdleConnections()
// configure message receiver
messageReceiver := receiver.NewHttpMessageReceiver(cfg.Port)

// configure message sender
messageSender := sender.NewHttpMessageSender(cfg.EmsPublishURL, client)
Expand All @@ -40,10 +41,24 @@ func main() {
cfg.BEBNamespace,
cfg.EventTypePrefix,
)

// Configure Subscription Lister
k8sConfig := config.GetConfigOrDie()
subDynamicSharedInfFactory := subscribed.GenerateSubscriptionInfFactory(k8sConfig)
subLister := subDynamicSharedInfFactory.ForResource(subscribed.GVR).Lister()
subscribedProcessor := &subscribed.Processor{
SubscriptionLister: &subLister,
Config: cfg,
Logger: logger,
}
// Sync informer cache or die
logger.Info("Waiting for informers caches to sync")
subscribed.WaitForCacheSyncOrDie(ctx, subDynamicSharedInfFactory)
logger.Info("Informers are synced successfully")

// start handler which blocks until it receives a shutdown signal
if err := handler.NewHandler(messageReceiver, messageSender, cfg.RequestTimeout, legacyTransformer, opts, logger).Start(ctx); err != nil {
if err := handler.NewHandler(messageReceiver, messageSender, cfg.RequestTimeout, legacyTransformer, opts, subscribedProcessor, logger).Start(ctx); err != nil {
logger.Fatalf("Start handler failed with error: %s", err)
}

logger.Info("Shutdown the Event Publisher Proxy")
}
16 changes: 15 additions & 1 deletion components/event-publisher-proxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@ module github.com/kyma-project/kyma/components/event-publisher-proxy
go 1.15

require (
github.com/cloudevents/sdk-go/v2 v2.2.0
github.com/cloudevents/sdk-go/v2 v2.3.1
github.com/google/uuid v1.1.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/kyma-project/kyma/components/console-backend-service v0.0.0-20201116133707-dd0a4cf8e9d8 // indirect
github.com/kyma-project/kyma/components/eventing-controller v0.0.0-20201116133707-dd0a4cf8e9d8
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
go.opencensus.io v0.22.4
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
k8s.io/api v0.19.4
k8s.io/apimachinery v0.19.4
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
sigs.k8s.io/controller-runtime v0.6.0
)

replace (
k8s.io/api => k8s.io/api v0.16.15
k8s.io/apimachinery => k8s.io/apimachinery v0.16.15
k8s.io/client-go => k8s.io/client-go v0.16.15
)
888 changes: 888 additions & 0 deletions components/event-publisher-proxy/go.sum

Large diffs are not rendered by default.

51 changes: 36 additions & 15 deletions components/event-publisher-proxy/pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/legacy-events"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/options"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/subscribed"

"github.com/sirupsen/logrus"

Expand All @@ -28,8 +29,9 @@ const (
// noDuration signals that the dispatch step has not started yet.
noDuration = -1

publishEndpoint = "/publish"
legacyEndpointSuffix = "/v1/events"
publishEndpoint = "/publish"
legacyEndpointSuffix = "/v1/events"
subscribedEndpointSuffix = "/v1/events/subscribed"
)

var (
Expand All @@ -54,35 +56,34 @@ type Handler struct {
LegacyTransformer *legacy.Transformer
// RequestTimeout timeout for outgoing requests
RequestTimeout time.Duration
//SubscribedProcessor processes requests for /:app/v1/events/subscribed endpoint
SubscribedProcessor *subscribed.Processor
// Logger default logger
Logger *logrus.Logger
// Options configures HTTP server
Options *options.Options
}

// NewHandler returns a new Handler instance for the Event Publisher Proxy.
func NewHandler(receiver *receiver.HttpMessageReceiver, sender *sender.HttpMessageSender, requestTimeout time.Duration, legacyTransformer *legacy.Transformer, options *options.Options, logger *logrus.Logger) *Handler {
func NewHandler(receiver *receiver.HttpMessageReceiver, sender *sender.HttpMessageSender, requestTimeout time.Duration, legacyTransformer *legacy.Transformer, opts *options.Options, subscribedProcessor *subscribed.Processor, logger *logrus.Logger) *Handler {

return &Handler{
Receiver: receiver,
Sender: sender,
RequestTimeout: requestTimeout,
LegacyTransformer: legacyTransformer,
Logger: logger,
Options: options,
Receiver: receiver,
Sender: sender,
RequestTimeout: requestTimeout,
LegacyTransformer: legacyTransformer,
SubscribedProcessor: subscribedProcessor,
Logger: logger,
Options: opts,
}
}

// Start starts the Handler with the given context.
func (h *Handler) Start(ctx context.Context) error {
return h.Receiver.StartListen(ctx, health.CheckHealth(h))
}

// ServeHTTP serves an HTTP request and returns back an HTTP response.
// It ensures that the incoming request is a valid Cloud Event, then dispatches it
// to the EMS gateway and writes back the HTTP response.
func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
// validate request method
if request.Method != http.MethodPost {
if request.Method != http.MethodPost && request.Method != http.MethodGet {
h.Logger.Warnf("Unexpected request method: %s", request.Method)
h.writeResponse(writer, http.StatusMethodNotAllowed, nil)
return
Expand All @@ -105,6 +106,13 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

// Process /:application/v1/events/subscribed
// Fetches the list of subscriptions available for the given application
if isARequestForSubscriptions(uri) {
h.SubscribedProcessor.ExtractEventsFromSubscriptions(writer, request)
return
}

h.writeResponse(writer, http.StatusNotFound, nil)
return
}
Expand All @@ -130,6 +138,14 @@ func isARequestWithLegacyEvent(uri string) bool {
return true
}

func isARequestForSubscriptions(uri string) bool {
// Assuming the path should be of the form /:application/v1/events/subscribed
if !strings.HasSuffix(uri, subscribedEndpointSuffix) {
return false
}
return true
}

func (h *Handler) publishLegacyEventsAsCE(writer http.ResponseWriter, request *http.Request) {
event := h.LegacyTransformer.TransformsLegacyRequestsToCE(writer, request)
if event == nil {
Expand Down Expand Up @@ -234,3 +250,8 @@ func (h *Handler) sendAndRecordDispatchTime(request *http.Request) (*http.Respon
dispatchTime := time.Since(start)
return resp, dispatchTime, err
}

// Start starts the Handler with the given context.
func (h *Handler) Start(ctx context.Context) error {
return h.Receiver.StartListen(ctx, health.CheckHealth(h))
}
Loading

0 comments on commit e3a3aa2

Please sign in to comment.