Skip to content

Commit

Permalink
Add support for request mirror filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sgayangi committed Jun 20, 2024
1 parent c924cec commit 25f6683
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 41 deletions.
1 change: 1 addition & 0 deletions adapter/internal/oasparser/envoyconf/internal_dtos.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type routeCreateParams struct {
apiProperties []dpv1alpha2.Property
environment string
envType string
mirrorClusterNames map[string][]string
}

// RatelimitCriteria criterias of rate limiting
Expand Down
68 changes: 65 additions & 3 deletions adapter/internal/oasparser/envoyconf/routes_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ import (
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

func generateRouteConfig(routeName string, match *routev3.RouteMatch, action *routev3.Route_Route,
func generateRouteConfig(routeName string, match *routev3.RouteMatch, action *routev3.Route_Route, redirectAction *routev3.Route_Redirect,
metadata *corev3.Metadata, decorator *routev3.Decorator, typedPerFilterConfig map[string]*anypb.Any,
requestHeadersToAdd []*corev3.HeaderValueOption, requestHeadersToRemove []string,
responseHeadersToAdd []*corev3.HeaderValueOption, responseHeadersToRemove []string) *routev3.Route {

route := &routev3.Route{
Name: routeName,
Match: match,
Action: action,
Metadata: metadata,
Decorator: decorator,
TypedPerFilterConfig: typedPerFilterConfig,
Expand All @@ -62,6 +61,12 @@ func generateRouteConfig(routeName string, match *routev3.RouteMatch, action *ro
ResponseHeadersToRemove: responseHeadersToRemove,
}

if redirectAction != nil {
route.Action = redirectAction
} else if action != nil {
route.Action = action
}

return route
}

Expand All @@ -76,7 +81,7 @@ func generateRouteMatch(routeRegex string) *routev3.RouteMatch {
return match
}

func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria) (action *routev3.Route_Route) {
func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria, mirrorClusterNames []string) (action *routev3.Route_Route) {
action = &routev3.Route_Route{
Route: &routev3.RouteAction{
HostRewriteSpecifier: &routev3.RouteAction_AutoHostRewrite{
Expand Down Expand Up @@ -108,9 +113,66 @@ func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, rate
action.Route.RateLimits = generateRateLimitPolicy(ratelimitCriteria)
}

// Add request mirroring configurations
if mirrorClusterNames != nil && len(mirrorClusterNames) > 0 {
mirrorPolicies := []*routev3.RouteAction_RequestMirrorPolicy{}
for _, clusterName := range mirrorClusterNames {
mirrorPolicy := &routev3.RouteAction_RequestMirrorPolicy{
Cluster: clusterName,
}
mirrorPolicies = append(mirrorPolicies, mirrorPolicy)
}
action.Route.RequestMirrorPolicies = mirrorPolicies
}

return action
}

func generateRequestRedirectRoute(route string, policyParams interface{}) (action *routev3.Route_Redirect) {
policyParameters, _ := policyParams.(map[string]interface{})
scheme, _ := policyParameters[constants.RedirectScheme].(string)
hostname, _ := policyParameters[constants.RedirectHostname].(string)
port, _ := policyParameters[constants.RedirectPort].(int)
statusCode, _ := policyParameters[constants.RedirectStatusCode].(int)
replaceFullPath, _ := policyParameters[constants.RedirectPath].(string)
redirectActionStatusCode := mapStatusCodeToEnum(statusCode)
if redirectActionStatusCode == -1 {
_ = fmt.Errorf("Invalid status code provided")
}

action = &routev3.Route_Redirect{
Redirect: &routev3.RedirectAction{
SchemeRewriteSpecifier: &routev3.RedirectAction_HttpsRedirect{
HttpsRedirect: scheme == "https",
},
HostRedirect: hostname,
PortRedirect: uint32(port),
PathRewriteSpecifier: &routev3.RedirectAction_PathRedirect{
PathRedirect: replaceFullPath,
},
ResponseCode: routev3.RedirectAction_RedirectResponseCode(redirectActionStatusCode),
},
}
return action
}

func mapStatusCodeToEnum(statusCode int) int {
switch statusCode {
case 301:
return 0
case 302:
return 1
case 303:
return 2
case 307:
return 3
case 308:
return 4
default:
return -1
}
}

func generateRateLimitPolicy(ratelimitCriteria *ratelimitCriteria) []*routev3.RateLimit {

environmentValue := ratelimitCriteria.environment
Expand Down
74 changes: 57 additions & 17 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
gqlop := model.NewOperationWithPolicies("POST", policies)
resource := model.CreateMinimalResource(adapterInternalAPI.GetXWso2Basepath(), []*model.Operation{gqlop}, "", adapterInternalAPI.Endpoints, true, false, gwapiv1.PathMatchExact)
routesP, err := createRoutes(genRouteCreateParams(adapterInternalAPI, &resource, vHost, basePath, clusterName, nil,
nil, organizationID, false, false))
nil, organizationID, false, false, nil))
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR,
"Error while creating routes for GQL API %s %s Error: %s", adapterInternalAPI.GetTitle(),
Expand All @@ -188,7 +188,7 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
routes = append(routes, routesP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, &resource, vHost, basePath, clusterName, nil, nil, organizationID,
false, true))
false, true, nil))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
Expand All @@ -199,6 +199,7 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
}
for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
mirrorClusterNames := map[string][]string{}
resourcePath := resource.GetPath()
endpoint := resource.GetEndpoints()
basePath := ""
Expand All @@ -220,13 +221,43 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
} else {
clusterName = existingClusterName
}

// Creating clusters for request mirroring endpoints
for _, op := range resource.GetOperations() {
if op.GetMirrorEndpoints() != nil && len(op.GetMirrorEndpoints().Endpoints) > 0 {
mirrorEndpointCluster := op.GetMirrorEndpoints()
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, mirrorCluster)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
}
} else {
mirrorClusterName = existingMirrorClusterName
}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
}
}

// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false)
false, false, mirrorClusterNames)

routeP, err := createRoutes(routeParams)
if err != nil {
Expand All @@ -238,7 +269,7 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true))
false, true, mirrorClusterNames))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
Expand Down Expand Up @@ -704,6 +735,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
vHost := params.vHost
xWso2Basepath := params.xWSO2BasePath
apiType := params.apiType
mirrorClusterNames := params.mirrorClusterNames

// cors policy
corsPolicy := getCorsPolicy(params.corsPolicy)
Expand Down Expand Up @@ -907,7 +939,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
var responseHeadersToAdd []*corev3.HeaderValueOption
var responseHeadersToRemove []string
var pathRewriteConfig *envoy_type_matcherv3.RegexMatchAndSubstitute

var requestRedirectAction *routev3.Route_Redirect
hasMethodRewritePolicy := false
var newMethod string

Expand Down Expand Up @@ -963,6 +995,10 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
if err != nil {
return nil, err
}
case constants.ActionRedirectRequest:
logger.LoggerOasparser.Debugf("Adding %s policy to request flow for %s %s",
constants.ActionRedirectRequest, resourcePath, operation.GetMethod())
requestRedirectAction = generateRequestRedirectRoute(resourcePath, requestPolicy.Parameters)
}
}

Expand All @@ -980,7 +1016,6 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
" %v", responsePolicy.Action, operation.GetMethod(), resourcePath, err)
}
responseHeadersToAdd = append(responseHeadersToAdd, responseHeaderToAdd)

case constants.ActionHeaderRemove:
logger.LoggerOasparser.Debugf("Adding %s policy to response flow for %s %s",
constants.ActionHeaderRemove, resourcePath, operation.GetMethod())
Expand Down Expand Up @@ -1010,12 +1045,12 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
metadataValue := operation.GetMethod() + "_to_" + newMethod
match2.DynamicMetadata = generateMetadataMatcherForInternalRoutes(metadataValue)

action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria)
action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria)
action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()])
action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()])

// Create route1 for current method.
// Do not add policies to route config. Send via enforcer
route1 := generateRouteConfig(xWso2Basepath+operation.GetMethod(), match1, action1, nil, decorator, perRouteFilterConfigs,
route1 := generateRouteConfig(xWso2Basepath+operation.GetMethod(), match1, action1, requestRedirectAction, nil, decorator, perRouteFilterConfigs,
nil, nil, nil, nil)

// Create route2 for new method.
Expand All @@ -1026,24 +1061,27 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
action2.Route.RegexRewrite = generateRegexMatchAndSubstitute(routePath, resourcePath, pathMatchType)
}
configToSkipEnforcer := generateFilterConfigToSkipEnforcer()
route2 := generateRouteConfig(xWso2Basepath, match2, action2, nil, decorator, configToSkipEnforcer,
route2 := generateRouteConfig(xWso2Basepath, match2, action2, requestRedirectAction, nil, decorator, configToSkipEnforcer,
requestHeadersToAdd, requestHeadersToRemove, responseHeadersToAdd, responseHeadersToRemove)

routes = append(routes, route1)
routes = append(routes, route2)
} else {
var action *routev3.Route_Route
if requestRedirectAction == nil {
action = generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()])
}
logger.LoggerOasparser.Debug("Creating routes for resource with policies", resourcePath, operation.GetMethod())
// create route for current method. Add policies to route config. Send via enforcer
action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria)
match := generateRouteMatch(routePath)
match.Headers = generateHTTPMethodMatcher(operation.GetMethod(), clusterName)
match.DynamicMetadata = generateMetadataMatcherForExternalRoutes()
if pathRewriteConfig != nil {
if pathRewriteConfig != nil && requestRedirectAction == nil {
action.Route.RegexRewrite = pathRewriteConfig
} else {
} else if requestRedirectAction == nil {
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(routePath, resourcePath, pathMatchType)
}
route := generateRouteConfig(xWso2Basepath, match, action, nil, decorator, perRouteFilterConfigs,
route := generateRouteConfig(xWso2Basepath, match, action, requestRedirectAction, nil, decorator, perRouteFilterConfigs,
requestHeadersToAdd, requestHeadersToRemove, responseHeadersToAdd, responseHeadersToRemove)
routes = append(routes, route)
}
Expand All @@ -1057,11 +1095,11 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
}
match := generateRouteMatch(routePath)
match.Headers = generateHTTPMethodMatcher(methodRegex, clusterName)
action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria)
action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, nil)
rewritePath := generateRoutePathForReWrite(basePath, resourcePath, pathMatchType)
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, resourcePath, pathMatchType)

route := generateRouteConfig(xWso2Basepath, match, action, nil, decorator, perRouteFilterConfigs,
route := generateRouteConfig(xWso2Basepath, match, action, nil, nil, decorator, perRouteFilterConfigs,
nil, nil, nil, nil) // general headers to add and remove are included in this methods
routes = append(routes, route)
}
Expand Down Expand Up @@ -1528,7 +1566,8 @@ func getCorsPolicy(corsConfig *model.CorsConfig) *cors_filter_v3.CorsPolicy {

func genRouteCreateParams(swagger *model.AdapterInternalAPI, resource *model.Resource, vHost, endpointBasePath string,
clusterName string, requestInterceptor map[string]model.InterceptEndpoint,
responseInterceptor map[string]model.InterceptEndpoint, organizationID string, isSandbox bool, createDefaultPath bool) *routeCreateParams {
responseInterceptor map[string]model.InterceptEndpoint, organizationID string, isSandbox bool, createDefaultPath bool,
mirrorClusterNames map[string][]string) *routeCreateParams {

params := &routeCreateParams{
organizationID: organizationID,
Expand All @@ -1551,6 +1590,7 @@ func genRouteCreateParams(swagger *model.AdapterInternalAPI, resource *model.Res
createDefaultPath: createDefaultPath,
environment: swagger.GetEnvironment(),
envType: swagger.EnvType,
mirrorClusterNames: mirrorClusterNames,
}
return params
}
Expand Down
Loading

0 comments on commit 25f6683

Please sign in to comment.