From d7b32c4f5798d94490d08867f0d352ed20073857 Mon Sep 17 00:00:00 2001 From: "saica.go" Date: Tue, 15 Aug 2023 03:12:39 +0800 Subject: [PATCH] feat: preliminary implementation of traffic routing --- core/route/base/headers.go | 12 ++ core/route/base/http_match_request.go | 61 ++++++++ core/route/base/http_route.go | 16 +++ core/route/base/http_route_destination.go | 20 +++ core/route/base/instance.go | 9 ++ core/route/base/string_match.go | 25 ++++ core/route/base/traffic_context.go | 14 ++ core/route/base/traffic_policy.go | 5 + core/route/base/traffic_router.go | 11 ++ core/route/base/virtual_workload.go | 12 ++ core/route/cluster_manager.go | 51 +++++++ core/route/instance_manager.go | 24 ++++ core/route/load_balancer.go | 47 ++++++ core/route/route_test.go | 167 ++++++++++++++++++++++ core/route/router_filter.go | 131 +++++++++++++++++ core/route/traffic_router_rule_manager.go | 35 +++++ 16 files changed, 640 insertions(+) create mode 100644 core/route/base/headers.go create mode 100644 core/route/base/http_match_request.go create mode 100644 core/route/base/http_route.go create mode 100644 core/route/base/http_route_destination.go create mode 100644 core/route/base/instance.go create mode 100644 core/route/base/string_match.go create mode 100644 core/route/base/traffic_context.go create mode 100644 core/route/base/traffic_policy.go create mode 100644 core/route/base/traffic_router.go create mode 100644 core/route/base/virtual_workload.go create mode 100644 core/route/cluster_manager.go create mode 100644 core/route/instance_manager.go create mode 100644 core/route/load_balancer.go create mode 100644 core/route/route_test.go create mode 100644 core/route/router_filter.go create mode 100644 core/route/traffic_router_rule_manager.go diff --git a/core/route/base/headers.go b/core/route/base/headers.go new file mode 100644 index 00000000..fbfeb575 --- /dev/null +++ b/core/route/base/headers.go @@ -0,0 +1,12 @@ +package base + +type Headers struct { + Request *HeaderOperations + Response *HeaderOperations +} + +type HeaderOperations struct { + Set map[string]string + Add map[string]string + Remove []string +} diff --git a/core/route/base/http_match_request.go b/core/route/base/http_match_request.go new file mode 100644 index 00000000..18e01af8 --- /dev/null +++ b/core/route/base/http_match_request.go @@ -0,0 +1,61 @@ +package base + +import ( + "net/url" + "strconv" +) + +type HTTPMatchRequest struct { + Name string + Headers map[string]*StringMatch + Uri *StringMatch + Scheme *StringMatch + Authority *StringMatch + Method *StringMatch + Port *int + QueryParams map[string]*StringMatch +} + +// TODO +func (h *HTTPMatchRequest) IsMatch(context *TrafficContext) bool { + + for key, match := range h.Headers { + if v, ok := context.Headers[key]; ok && !match.IsMatch(v) { + return false + } + } + + if h.Uri != nil && !h.Uri.IsMatch(context.Uri) { + return false + } + + var parsedURL *url.URL + var err error + + if h.Uri != nil || h.Scheme != nil || h.Authority != nil || h.Port != nil { + parsedURL, err = url.Parse(context.Uri) + if err != nil { + return false + } + } + if h.Uri != nil && !h.Uri.IsMatch(parsedURL.Path) { + return false + } + if h.Scheme != nil && !h.Scheme.IsMatch(parsedURL.Scheme) { + return false + } + if h.Authority != nil && !h.Authority.IsMatch(parsedURL.Host) { + return false + } + if h.Port != nil { + p, err := strconv.Atoi(parsedURL.Port()) + if err != nil || *h.Port != p { + return false + } + } + if !h.Method.IsMatch(context.MethodName) { + return false + } + + return true +} diff --git a/core/route/base/http_route.go b/core/route/base/http_route.go new file mode 100644 index 00000000..a16828b2 --- /dev/null +++ b/core/route/base/http_route.go @@ -0,0 +1,16 @@ +package base + +type HTTPRoute struct { + Name string + Match []*HTTPMatchRequest + Route []*HTTPRouteDestination +} + +func (h *HTTPRoute) IsMatch(context *TrafficContext) bool { + for _, match := range h.Match { + if match.IsMatch(context) { + return true + } + } + return false +} diff --git a/core/route/base/http_route_destination.go b/core/route/base/http_route_destination.go new file mode 100644 index 00000000..2869a1eb --- /dev/null +++ b/core/route/base/http_route_destination.go @@ -0,0 +1,20 @@ +package base + +import "fmt" + +type HTTPRouteDestination struct { + Weight int + Destination *Destination + Headers Headers // TODO modifies headers +} + +func (H HTTPRouteDestination) String() string { + return fmt.Sprintf("{Weight: %v, Destination: %+v}\n", H.Weight, H.Destination) +} + +type Destination struct { + Host string + Subset string + Port uint32 + Fallback *HTTPRouteDestination +} diff --git a/core/route/base/instance.go b/core/route/base/instance.go new file mode 100644 index 00000000..4230497f --- /dev/null +++ b/core/route/base/instance.go @@ -0,0 +1,9 @@ +package base + +type Instance struct { + AppName string + Host string + Port int + Metadata map[string]string + TargetInstance interface{} +} diff --git a/core/route/base/string_match.go b/core/route/base/string_match.go new file mode 100644 index 00000000..0e56e4a8 --- /dev/null +++ b/core/route/base/string_match.go @@ -0,0 +1,25 @@ +package base + +import "regexp" + +type StringMatch struct { + Exact string + Prefix string + Regex string +} + +func (s *StringMatch) IsMatch(input string) bool { + if input == "" { + return false + } + + if s.Exact != "" { + return input == s.Exact + } else if s.Prefix != "" { + return len(input) >= len(s.Prefix) && input[:len(s.Prefix)] == s.Prefix + } else if s.Regex != "" { + matched, _ := regexp.MatchString(s.Regex, input) + return matched + } + return true +} diff --git a/core/route/base/traffic_context.go b/core/route/base/traffic_context.go new file mode 100644 index 00000000..286982b3 --- /dev/null +++ b/core/route/base/traffic_context.go @@ -0,0 +1,14 @@ +package base + +type TrafficContext struct { + Path string + Uri string + ServiceName string + Group string + Version string + MethodName string + ParamTypes []string + Args []interface{} + Headers map[string]string + Baggage map[string]string +} diff --git a/core/route/base/traffic_policy.go b/core/route/base/traffic_policy.go new file mode 100644 index 00000000..6e0bc991 --- /dev/null +++ b/core/route/base/traffic_policy.go @@ -0,0 +1,5 @@ +package base + +type TrafficPolicy struct { + LoadBalancer string +} diff --git a/core/route/base/traffic_router.go b/core/route/base/traffic_router.go new file mode 100644 index 00000000..68048070 --- /dev/null +++ b/core/route/base/traffic_router.go @@ -0,0 +1,11 @@ +package base + +type TrafficRouter struct { + Host []string + Http []*HTTPRoute +} + +type Fallback struct { + Host string + Subset string +} diff --git a/core/route/base/virtual_workload.go b/core/route/base/virtual_workload.go new file mode 100644 index 00000000..07736892 --- /dev/null +++ b/core/route/base/virtual_workload.go @@ -0,0 +1,12 @@ +package base + +type VirtualWorkload struct { + Host string + trafficPolicy *TrafficPolicy + Subsets []*Subset +} + +type Subset struct { + Name string + Labels map[string]string +} diff --git a/core/route/cluster_manager.go b/core/route/cluster_manager.go new file mode 100644 index 00000000..cff534bc --- /dev/null +++ b/core/route/cluster_manager.go @@ -0,0 +1,51 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "github.com/pkg/errors" +) + +type ClusterManager struct { + InstanceManager InstanceManager + RouterFilterList []RouterFilter + LoadBalancer LoadBalancer +} + +func NewClusterManager(instanceManager InstanceManager, routerFilters []RouterFilter, loadBalancer LoadBalancer) *ClusterManager { + return &ClusterManager{ + InstanceManager: instanceManager, + RouterFilterList: routerFilters, + LoadBalancer: loadBalancer, + } +} + +func (m *ClusterManager) Route(context *base.TrafficContext) ([]*base.Instance, error) { + instances := m.InstanceManager.GetInstances() + + var err error + for _, routerFilter := range m.RouterFilterList { + instances, err = routerFilter.Filter(instances, context) + if err != nil { + return nil, err + } + } + if len(instances) == 0 { + return nil, errors.New("no matching instances") + } + return instances, nil +} + +func (m *ClusterManager) GetOne(context *base.TrafficContext) (*base.Instance, error) { + instances, err := m.Route(context) + if err != nil { + return nil, err + } + if m.LoadBalancer == nil { + return instances[0], nil + } + instance, err := m.LoadBalancer.Select(instances, context) + if err != nil { + return nil, err + } + return instance, nil +} diff --git a/core/route/instance_manager.go b/core/route/instance_manager.go new file mode 100644 index 00000000..25a8a71e --- /dev/null +++ b/core/route/instance_manager.go @@ -0,0 +1,24 @@ +package route + +import "github.com/alibaba/sentinel-golang/core/route/base" + +type InstanceManager interface { + StoreInstances(instances []*base.Instance) + GetInstances() []*base.Instance +} + +type BasicInstanceManager struct { + Instances []*base.Instance +} + +func NewBasicInstanceManager() *BasicInstanceManager { + return &BasicInstanceManager{} +} + +func (b *BasicInstanceManager) StoreInstances(instances []*base.Instance) { + b.Instances = instances +} + +func (b *BasicInstanceManager) GetInstances() []*base.Instance { + return b.Instances +} diff --git a/core/route/load_balancer.go b/core/route/load_balancer.go new file mode 100644 index 00000000..9760e699 --- /dev/null +++ b/core/route/load_balancer.go @@ -0,0 +1,47 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "math/rand" + "sync" +) + +type LoadBalancer interface { + Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) +} + +type RandomLoadBalancer struct { +} + +func NewRandomLoadBalancer() *RandomLoadBalancer { + return &RandomLoadBalancer{} +} + +func (r *RandomLoadBalancer) Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) { + if len(instances) == 0 { + return nil, nil + } + + return instances[rand.Intn(len(instances))], nil +} + +type RoundRobinLoadBalancer struct { + idx int + mu sync.Mutex +} + +func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer { + return &RoundRobinLoadBalancer{idx: 0} +} + +func (r *RoundRobinLoadBalancer) Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) { + if len(instances) == 0 { + return nil, nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.idx = (r.idx + 1) % len(instances) + return instances[r.idx], nil +} diff --git a/core/route/route_test.go b/core/route/route_test.go new file mode 100644 index 00000000..f2355227 --- /dev/null +++ b/core/route/route_test.go @@ -0,0 +1,167 @@ +package route + +import ( + "fmt" + "github.com/alibaba/sentinel-golang/core/route/base" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +var clusterManager *ClusterManager + +func TestMain(m *testing.M) { + trafficRouter := base.TrafficRouter{ + Host: []string{"test-provider"}, + Http: []*base.HTTPRoute{ + { + Name: "test-traffic-provider-rule-basic", + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "test-tag": {Exact: "basic-test"}, + }, + Method: &base.StringMatch{ + Exact: "hello", + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v1", + }, + }, + }, + }, { + Name: "test-traffic-provider-rule-fallback", + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "test-tag": {Exact: "fallback-test"}, + }, + Method: &base.StringMatch{ + Exact: "hello", + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v4", + Fallback: &base.HTTPRouteDestination{ + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v3", + }, + }, + }, + }, + }, + }, + }, + } + + virtualWorkload := base.VirtualWorkload{ + Host: "test-provider", + Subsets: []*base.Subset{ + { + Name: "v1", + Labels: map[string]string{ + "instance-tag": "v1", + }, + }, { + Name: "v2", + Labels: map[string]string{ + "instance-tag": "v2", + }, + }, { + Name: "v3", + Labels: map[string]string{ + "instance-tag": "v3", + }, + }, + }, + } + + SetAppName("test-consumer") + SetTrafficRouterList([]*base.TrafficRouter{&trafficRouter}) + SetVirtualWorkloadList([]*base.VirtualWorkload{&virtualWorkload}) + + clusterManager = &ClusterManager{ + InstanceManager: NewBasicInstanceManager(), + LoadBalancer: NewRandomLoadBalancer(), + RouterFilterList: []RouterFilter{ + NewBasicRouterFilter(), + }, + } + + instanceList := []*base.Instance{ + { + AppName: "test-provider", + Host: "127.0.0.1", + Port: 80081, + Metadata: map[string]string{ + "instance-tag": "v1", + }, + }, { + AppName: "test-provider", + Host: "127.0.0.2", + Port: 80082, + Metadata: map[string]string{ + "instance-tag": "v2", + }, + }, { + AppName: "test-provider", + Host: "127.0.0.3", + Port: 80083, + Metadata: map[string]string{ + "instance-tag": "v3", + }, + }, + } + + clusterManager.InstanceManager.StoreInstances(instanceList) + + exitCode := m.Run() + + os.Exit(exitCode) +} + +func TestRouteBasic(t *testing.T) { + context := &base.TrafficContext{ + MethodName: "hello", + Headers: map[string]string{ + "test-tag": "basic-test", + }, + } + res, err := clusterManager.GetOne(context) + if err != nil { + fmt.Println(err) + t.Failed() + } + assert.Equal(t, "127.0.0.1", res.Host) + assert.Equal(t, 80081, res.Port) + assert.Equal(t, "v1", res.Metadata["instance-tag"]) +} + +func TestRouteFallback(t *testing.T) { + context := &base.TrafficContext{ + MethodName: "hello", + Headers: map[string]string{ + "test-tag": "fallback-test", + }, + } + res, err := clusterManager.GetOne(context) + if err != nil { + fmt.Println(err) + t.Failed() + } + assert.Equal(t, "127.0.0.3", res.Host) + assert.Equal(t, 80083, res.Port) + assert.Equal(t, "v3", res.Metadata["instance-tag"]) +} diff --git a/core/route/router_filter.go b/core/route/router_filter.go new file mode 100644 index 00000000..c76054c1 --- /dev/null +++ b/core/route/router_filter.go @@ -0,0 +1,131 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "math/rand" +) + +type RouterFilter interface { + Filter(instanceList []*base.Instance, context *base.TrafficContext) ([]*base.Instance, error) +} + +type BasicRouterFilter struct { +} + +func NewBasicRouterFilter() *BasicRouterFilter { + return &BasicRouterFilter{} +} + +func (b *BasicRouterFilter) Filter(instanceList []*base.Instance, context *base.TrafficContext) ([]*base.Instance, error) { + if len(instanceList) == 0 { + return instanceList, nil + } + + routeDestinationList := getRouteDestination(context) + + targets := make([]*base.Instance, 0) + if len(routeDestinationList) == 0 { + return targets, nil + } + appName := instanceList[0].AppName + subset := randomSelectDestination(appName, routeDestinationList, instanceList) + if subset == "" { + return targets, nil + } + return getSubsetInstances(appName, subset, instanceList), nil +} + +func getRouteDestination(context *base.TrafficContext) []*base.HTTPRouteDestination { + routerList := GetTrafficRouterList() + for _, router := range routerList { + routes := router.Http + if len(routes) == 0 { + continue + } + for _, route := range routes { + if route.IsMatch(context) { + return route.Route + } + } + } + return nil +} + +func randomSelectDestination(appName string, + routeDestination []*base.HTTPRouteDestination, instanceList []*base.Instance) string { + + totalWeight := 0 + for _, dest := range routeDestination { + if dest.Weight < 1 { + totalWeight += 1 + } else { + totalWeight += dest.Weight + } + } + target := rand.Intn(totalWeight + 1) + for _, dest := range routeDestination { + if dest.Weight < 1 { + target -= 1 + } else { + target -= dest.Weight + } + if target <= 0 { + result := getDestination(appName, dest.Destination, instanceList) + if result != "" { + return result + } + } + } + return "" +} + +func getDestination(appName string, destination *base.Destination, instanceList []*base.Instance) string { + subset := destination.Subset + + for { + result := getSubsetInstances(appName, subset, instanceList) + newResult := make([]*base.Instance, 0) + + newResult = append(newResult, result...) + if len(newResult) > 0 { + return subset + } + + // fallback + routeDestination := destination.Fallback + if routeDestination == nil || routeDestination.Destination == nil { + break + } + subset = routeDestination.Destination.Subset + } + return "" +} + +func getSubsetInstances(appName string, subset string, instanceList []*base.Instance) []*base.Instance { + virtualLoads := GetVirtualWorkloadList() + result := make([]*base.Instance, 0) + for _, virtualWorkload := range virtualLoads { + if virtualWorkload.Host != appName { + continue + } + for _, subsetRule := range virtualWorkload.Subsets { + if subsetRule.Name != subset { + continue + } + labels := subsetRule.Labels + for _, instance := range instanceList { + match := true + for key, value := range labels { + if value != instance.Metadata[key] { // TODO + match = false + break + } + } + if match { + result = append(result, instance) + } + } + } + } + return result +} diff --git a/core/route/traffic_router_rule_manager.go b/core/route/traffic_router_rule_manager.go new file mode 100644 index 00000000..bb330265 --- /dev/null +++ b/core/route/traffic_router_rule_manager.go @@ -0,0 +1,35 @@ +package route + +import "github.com/alibaba/sentinel-golang/core/route/base" + +type TrafficRoutingRuleGroup struct { + AppName string + TrafficRouterList []*base.TrafficRouter + VirtualWorkloadList []*base.VirtualWorkload +} + +var group = &TrafficRoutingRuleGroup{} + +func SetAppName(appName string) { + group.AppName = appName +} + +func SetTrafficRouterList(list []*base.TrafficRouter) { + group.TrafficRouterList = list +} + +func SetVirtualWorkloadList(list []*base.VirtualWorkload) { + group.VirtualWorkloadList = list +} + +func GetAppName() string { + return group.AppName +} + +func GetTrafficRouterList() []*base.TrafficRouter { + return group.TrafficRouterList +} + +func GetVirtualWorkloadList() []*base.VirtualWorkload { + return group.VirtualWorkloadList +}