Skip to content

Commit

Permalink
support backend kind endpoints in new controller
Browse files Browse the repository at this point in the history
  • Loading branch information
AmaliMatharaarachchi committed Sep 19, 2024
1 parent e410748 commit 6ca847f
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 65 deletions.
2 changes: 1 addition & 1 deletion adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,7 @@ func (apiReconciler *APIReconciler) getAPIsForConfigMap(ctx context.Context, obj
return []reconcile.Request{}
}

backendList := &dpv1alpha1.BackendList{}
backendList := &dpv1alpha2.BackendList{}
err := apiReconciler.client.List(ctx, backendList, &k8client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(configMapBackend, utils.NamespacedName(configMap).String()),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewGatewayController(mgr manager.Manager, operatorDataStore *synchronizer.O
return err
}

if err := c.Watch(source.Kind(mgr.GetCache(), &dpv1alpha1.Backend{}), handler.EnqueueRequestsFromMapFunc(r.getGatewaysForBackend),
if err := c.Watch(source.Kind(mgr.GetCache(), &dpv1alpha2.Backend{}), handler.EnqueueRequestsFromMapFunc(r.getGatewaysForBackend),
predicates...); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3102, logging.BLOCKER, "Error watching Backend resources: %v", err))
return err
Expand Down Expand Up @@ -321,7 +321,7 @@ func (gatewayReconciler *GatewayReconciler) getResolvedBackendsMapping(ctx conte
// getGatewaysForBackend triggers the Gateway controller reconcile method based on the changes detected
// in backend resources.
func (gatewayReconciler *GatewayReconciler) getGatewaysForBackend(ctx context.Context, obj k8client.Object) []reconcile.Request {
backend, ok := obj.(*dpv1alpha1.Backend)
backend, ok := obj.(*dpv1alpha2.Backend)
if !ok {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3107, logging.TRIVIAL, "Unexpected object type, bypassing reconciliation: %v", backend))
return []reconcile.Request{}
Expand Down Expand Up @@ -408,7 +408,7 @@ func (gatewayReconciler *GatewayReconciler) getGatewaysForSecret(ctx context.Con
return []reconcile.Request{}
}

backendList := &dpv1alpha1.BackendList{}
backendList := &dpv1alpha2.BackendList{}
if err := gatewayReconciler.client.List(ctx, backendList, &k8client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(secretBackend, utils.NamespacedName(secret).String()),
}); err != nil {
Expand All @@ -433,7 +433,7 @@ func (gatewayReconciler *GatewayReconciler) getGatewaysForConfigMap(ctx context.
return []reconcile.Request{}
}

backendList := &dpv1alpha1.BackendList{}
backendList := &dpv1alpha2.BackendList{}
if err := gatewayReconciler.client.List(ctx, backendList, &k8client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(configMapBackend, utils.NamespacedName(configMap).String()),
}); err != nil {
Expand Down
5 changes: 1 addition & 4 deletions adapter/internal/operator/gateway-api/contexts.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ func (g *GatewayContext) ResetListeners() {
}
}



func (g *GatewayContext) attachEnvoyProxy(resources *Resources) {
// if g.Spec.Infrastructure != nil && g.Spec.Infrastructure.ParametersRef != nil && !IsMergeGatewaysEnabled(resources) {
// ref := g.Spec.Infrastructure.ParametersRef
Expand All @@ -75,7 +73,6 @@ func (g *GatewayContext) attachEnvoyProxy(resources *Resources) {
// g.envoyProxy = resources.EnvoyProxyForGatewayClass
}


// ListenerContext wraps a Listener and provides helper methods for
// setting conditions and other status information on the associated
// Gateway, etc.
Expand Down Expand Up @@ -480,4 +477,4 @@ func GetBackendRef(b BackendRefContext) *gwapiv1.BackendRef {

func GetFilters(b BackendRefContext) any {
return reflect.ValueOf(b).FieldByName("Filters").Interface()
}
}
14 changes: 7 additions & 7 deletions adapter/internal/operator/gateway-api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func irRoutePrefix(route RouteContext) string {
func GetNamespacedName(route RouteContext) string {
return types.NamespacedName{
Namespace: route.GetNamespace(),
Name: route.GetName(),
Name: route.GetName(),
}.String()
}

Expand Down Expand Up @@ -474,10 +474,10 @@ func listenersWithSameHTTPPort(xdsIR *ir.Xds, listener *ir.HTTPListener) []strin
// contains checks if a specific string exists in the provided slice of strings.
// It returns true if the string is found, and false otherwise.
func Contains(slice []string, item string) bool {
for _, v := range slice {
if v == item {
return true
}
}
return false
for _, v := range slice {
if v == item {
return true
}
}
return false
}
4 changes: 2 additions & 2 deletions adapter/internal/operator/gateway-api/ir/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ type ExtAuth struct {
// authorization server by default, no matter whether they are specified
// in HeadersToExtAuth or not.
// +optional
HeadersToExtAuth []string `json:"headersToExtAuth,omitempty"`
UseBootstrapCluster *bool `json:"useBootstrapCluster,omitempty"`
HeadersToExtAuth []string `json:"headersToExtAuth,omitempty"`
UseBootstrapCluster *bool `json:"useBootstrapCluster,omitempty"`
}

// HTTPExtAuthService defines the HTTP External Authorization service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (r *gatewayReconcilerNew) watchResources(ctx context.Context, mgr manager.M
// Watch Service CRUDs and process affected *Route objects and services belongs to gateways
backendPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateBackendForReconcile)}
if err := c.Watch(
source.Kind(mgr.GetCache(), &dpv1alpha1.Backend{}),
source.Kind(mgr.GetCache(), &dpv1alpha2.Backend{}),
handler.EnqueueRequestsFromMapFunc(r.enqueueClass),
backendPredicates...,
); err != nil {
Expand Down Expand Up @@ -669,7 +669,7 @@ func (r *gatewayReconcilerNew) processBackendRefs(ctx context.Context, gwcResour
endpointSliceLabelKey = discoveryv1.LabelServiceName

case gatewayapi.KindBackend:
backend := new(dpv1alpha1.Backend)
backend := new(dpv1alpha2.Backend)
err := r.client.Get(ctx, types.NamespacedName{Namespace: string(*backendRef.Namespace), Name: string(backendRef.Name)}, backend)
if err != nil {
loggers.LoggerAPKOperator.Errorf("Failed to get Backend namespace %s, name %s, Error: %v", string(*backendRef.Namespace),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/wso2/apk/adapter/internal/loggers"
gatewayapi "github.com/wso2/apk/adapter/internal/operator/gateway-api"
"github.com/wso2/apk/adapter/internal/operator/utils"
dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
corev1 "k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -108,7 +108,7 @@ func (r *gatewayReconcilerNew) validateServiceForReconcile(obj client.Object) bo
// if it exists, finds the Gateway's Deployment, and further updates the Gateway
// status Ready condition. All Services are pushed for reconciliation.
func (r *gatewayReconcilerNew) validateBackendForReconcile(obj client.Object) bool {
backend, ok := obj.(*dpv1alpha1.Backend)
backend, ok := obj.(*dpv1alpha2.Backend)
if !ok {
loggers.LoggerAPKOperator.Info("unexpected object type, bypassing reconciliation for object", obj)
return false
Expand Down Expand Up @@ -164,15 +164,12 @@ func (r *gatewayReconcilerNew) isRouteReferencingBackend(nsName *types.Namespace
httpRouteList := &gwapiv1.HTTPRouteList{}
if err := r.client.List(ctx, httpRouteList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(backendHTTPRouteIndex, nsName.String()),
}); err != nil {
}); err != nil || len(httpRouteList.Items) == 0 {
loggers.LoggerAPKOperator.Error("unable to find associated HTTPRoutes for the service ", err)
return false
}

// Check how many Route objects refer this Backend
allAssociatedRoutes := len(httpRouteList.Items)

return allAssociatedRoutes != 0
return true
}

// envoyServiceForGateway returns the Envoy service, returning nil if the service doesn't exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *gatewayReconcilerNew) processHTTPRoutes(ctx context.Context, gatewayNam
newMatches := make([]gwapiv1.HTTPRouteMatch, 0)
for _, match := range rule.Matches {
if match.Path.Value != nil {
newPath := api.Spec.BasePath + *match.Path.Value
newPath := api.Spec.BasePath + *match.Path.Value
match.Path.Value = &newPath
} else {
newPath := api.Spec.BasePath
Expand All @@ -241,7 +241,7 @@ func (r *gatewayReconcilerNew) processHTTPRoutes(ctx context.Context, gatewayNam
for _, match := range rule.Matches {
matchCopied := match.DeepCopy()
if match.Path.Value != nil {
newPath := removeFirstSubstring(*match.Path.Value, fmt.Sprintf("/%s",api.Spec.APIVersion))
newPath := removeFirstSubstring(*match.Path.Value, fmt.Sprintf("/%s", api.Spec.APIVersion))
matchCopied.Path.Value = &newPath
} else {
newPath := removeSuffix(api.Spec.BasePath, api.Spec.APIVersion)
Expand All @@ -265,7 +265,6 @@ func (r *gatewayReconcilerNew) processHTTPRoutes(ctx context.Context, gatewayNam
return nil
}


func removeSuffix(str, suffix string) string {
if strings.HasSuffix(str, suffix) {
return str[:len(str)-len(suffix)]
Expand Down
23 changes: 16 additions & 7 deletions adapter/internal/operator/gateway-api/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"reflect"

"github.com/wso2/apk/adapter/internal/operator/gateway-api/ir"
dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,7 +56,7 @@ type Resources struct {
ReferenceGrants []*gwapiv1b1.ReferenceGrant `json:"referenceGrants,omitempty" yaml:"referenceGrants,omitempty"`
Namespaces []*v1.Namespace `json:"namespaces,omitempty" yaml:"namespaces,omitempty"`
Services []*v1.Service `json:"services,omitempty" yaml:"services,omitempty"`
Backends []*dpv1alpha1.Backend `json:"backends,omitempty" yaml:"backends,omitempty"`
Backends []*dpv1alpha2.Backend `json:"backends,omitempty" yaml:"backends,omitempty"`
EndpointSlices []*discoveryv1.EndpointSlice `json:"endpointSlices,omitempty" yaml:"endpointSlices,omitempty"`
Secrets []*v1.Secret `json:"secrets,omitempty" yaml:"secrets,omitempty"`
ConfigMaps []*v1.ConfigMap `json:"configMaps,omitempty" yaml:"configMaps,omitempty"`
Expand Down Expand Up @@ -104,6 +103,16 @@ func (r *Resources) GetService(namespace, name string) *v1.Service {
return nil
}

func (r *Resources) GetBackend(namespace, name string) *dpv1alpha2.Backend {
for _, backend := range r.Backends {
if backend.Namespace == namespace && backend.Name == name {
return backend
}
}

return nil
}

func (r *Resources) GetSecret(namespace, name string) *v1.Secret {
for _, secret := range r.Secrets {
if secret.Namespace == namespace && secret.Name == name {
Expand Down Expand Up @@ -176,10 +185,10 @@ func RemoveDuplicates(apis []*dpv1alpha2.API) []*dpv1alpha2.API {
result := []*dpv1alpha2.API{}

for _, api := range apis {
if _, exists := uniqueAPIs[api]; !exists {
uniqueAPIs[api] = struct{}{}
result = append(result, api)
}
if _, exists := uniqueAPIs[api]; !exists {
uniqueAPIs[api] = struct{}{}
result = append(result, api)
}
}
return result
}
}
13 changes: 12 additions & 1 deletion adapter/internal/operator/gateway-api/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

"github.com/wso2/apk/adapter/internal/loggers"
"github.com/wso2/apk/adapter/internal/operator/gateway-api/ir"
"github.com/wso2/apk/adapter/internal/operator/status"
"github.com/wso2/apk/adapter/pkg/utils/regex"
Expand Down Expand Up @@ -199,6 +200,7 @@ func (t *Translator) processHTTPRouteRules(httpRoute *HTTPRouteContext, parentRe
// processing any destinations for this route.
if route.DirectResponse == nil && route.Redirect == nil {
if ds != nil && len(ds.Endpoints) > 0 {
loggers.LoggerAPI.Errorf("Destination is not given in %s:%s", httpRoute.Namespace, httpRoute.Name)
if route.Destination == nil {
route.Destination = &ir.RouteDestination{
Name: irRouteDestinationName(httpRoute, ruleIdx),
Expand Down Expand Up @@ -1095,14 +1097,16 @@ func (t *Translator) processDestination(backendRefContext BackendRefContext,
if backendRef.Weight != nil {
weight = uint32(*backendRef.Weight)
}

loggers.LoggerAPI.Error("amalii", backendRef)
backendNamespace := NamespaceDerefOr(backendRef.Namespace, route.GetNamespace())
if !t.validateBackendRef(backendRefContext, parentRef, route, resources, backendNamespace, routeType) {
loggers.LoggerAPI.Error("amalii")
return nil, weight
}

// Skip processing backends with 0 weight
if weight == 0 {
loggers.LoggerAPI.Error("amalii")
return nil, weight
}

Expand Down Expand Up @@ -1140,6 +1144,13 @@ func (t *Translator) processDestination(backendRefContext BackendRefContext,
uint32(*backendRef.Port))
endpoints = append(endpoints, ep)
}
case KindBackend:
loggers.LoggerAPI.Error("amalii")
backend := resources.GetBackend(backendNamespace, string(backendRef.Name))
ep := ir.NewDestEndpoint(
backend.Spec.Services[0].Host,
uint32(backend.Spec.Services[0].Port))
endpoints = append(endpoints, ep)
}

// TODO: support mixed endpointslice address type for the same backendRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func buildXdsClusterLoadAssignment(clusterName string, destSettings []*ir.Destin
},
},
},
}
}
}

for _, irEp := range ds.Endpoints {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions adapter/internal/operator/synchronizer/gateway_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ func AddOrUpdateGateway(gatewayState GatewayState, state string) (string, error)
if state == constants.Create {
xds.GenerateGlobalClusters(gateway.Name)
}

xds.GenerateInterceptorClusters(gateway.Name, gwReqICluster, gwReqIAddresses, gwResICluster, gwResIAddresses)
xds.UpdateGatewayCache(gateway, resolvedListenerCerts, gwLuaScript, customRateLimitPolicies)
listeners, clusters, routes, endpoints, apis := xds.GenerateEnvoyResoucesForGateway(gateway.Name)
loggers.LoggerAPKOperator.Debugf("listeners: %v", listeners)
loggers.LoggerAPKOperator.Debugf("clusters: %v", clusters)
loggers.LoggerAPKOperator.Debugf("routes: %v", routes)
loggers.LoggerAPKOperator.Debugf("endpoints: %v", endpoints)
loggers.LoggerAPKOperator.Debugf("apis: %v", apis)
xds.UpdateXdsCacheWithLock(gateway.Name, endpoints, clusters, routes, listeners)
if !config.ReadConfigs().Adapter.EnableGatewayClassController {
xds.GenerateInterceptorClusters(gateway.Name, gwReqICluster, gwReqIAddresses, gwResICluster, gwResIAddresses)
xds.UpdateGatewayCache(gateway, resolvedListenerCerts, gwLuaScript, customRateLimitPolicies)
loggers.LoggerAPKOperator.Debugf("listeners: %v", listeners)
loggers.LoggerAPKOperator.Debugf("clusters: %v", clusters)
loggers.LoggerAPKOperator.Debugf("routes: %v", routes)
loggers.LoggerAPKOperator.Debugf("endpoints: %v", endpoints)
loggers.LoggerAPKOperator.Debugf("apis: %v", apis)
xds.UpdateXdsCacheWithLock(gateway.Name, endpoints, clusters, routes, listeners)
}
xds.UpdateEnforcerApis(gateway.Name, apis, "")
return "", nil
}
Expand Down
Loading

0 comments on commit 6ca847f

Please sign in to comment.