From d7b32c4f5798d94490d08867f0d352ed20073857 Mon Sep 17 00:00:00 2001 From: "saica.go" Date: Tue, 15 Aug 2023 03:12:39 +0800 Subject: [PATCH 1/4] feat: preliminary implementation of traffic routing --- core/route/base/headers.go | 12 ++ core/route/base/http_match_request.go | 61 ++++++++ core/route/base/http_route.go | 16 +++ core/route/base/http_route_destination.go | 20 +++ core/route/base/instance.go | 9 ++ core/route/base/string_match.go | 25 ++++ core/route/base/traffic_context.go | 14 ++ core/route/base/traffic_policy.go | 5 + core/route/base/traffic_router.go | 11 ++ core/route/base/virtual_workload.go | 12 ++ core/route/cluster_manager.go | 51 +++++++ core/route/instance_manager.go | 24 ++++ core/route/load_balancer.go | 47 ++++++ core/route/route_test.go | 167 ++++++++++++++++++++++ core/route/router_filter.go | 131 +++++++++++++++++ core/route/traffic_router_rule_manager.go | 35 +++++ 16 files changed, 640 insertions(+) create mode 100644 core/route/base/headers.go create mode 100644 core/route/base/http_match_request.go create mode 100644 core/route/base/http_route.go create mode 100644 core/route/base/http_route_destination.go create mode 100644 core/route/base/instance.go create mode 100644 core/route/base/string_match.go create mode 100644 core/route/base/traffic_context.go create mode 100644 core/route/base/traffic_policy.go create mode 100644 core/route/base/traffic_router.go create mode 100644 core/route/base/virtual_workload.go create mode 100644 core/route/cluster_manager.go create mode 100644 core/route/instance_manager.go create mode 100644 core/route/load_balancer.go create mode 100644 core/route/route_test.go create mode 100644 core/route/router_filter.go create mode 100644 core/route/traffic_router_rule_manager.go diff --git a/core/route/base/headers.go b/core/route/base/headers.go new file mode 100644 index 00000000..fbfeb575 --- /dev/null +++ b/core/route/base/headers.go @@ -0,0 +1,12 @@ +package base + +type Headers struct { + Request *HeaderOperations + Response *HeaderOperations +} + +type HeaderOperations struct { + Set map[string]string + Add map[string]string + Remove []string +} diff --git a/core/route/base/http_match_request.go b/core/route/base/http_match_request.go new file mode 100644 index 00000000..18e01af8 --- /dev/null +++ b/core/route/base/http_match_request.go @@ -0,0 +1,61 @@ +package base + +import ( + "net/url" + "strconv" +) + +type HTTPMatchRequest struct { + Name string + Headers map[string]*StringMatch + Uri *StringMatch + Scheme *StringMatch + Authority *StringMatch + Method *StringMatch + Port *int + QueryParams map[string]*StringMatch +} + +// TODO +func (h *HTTPMatchRequest) IsMatch(context *TrafficContext) bool { + + for key, match := range h.Headers { + if v, ok := context.Headers[key]; ok && !match.IsMatch(v) { + return false + } + } + + if h.Uri != nil && !h.Uri.IsMatch(context.Uri) { + return false + } + + var parsedURL *url.URL + var err error + + if h.Uri != nil || h.Scheme != nil || h.Authority != nil || h.Port != nil { + parsedURL, err = url.Parse(context.Uri) + if err != nil { + return false + } + } + if h.Uri != nil && !h.Uri.IsMatch(parsedURL.Path) { + return false + } + if h.Scheme != nil && !h.Scheme.IsMatch(parsedURL.Scheme) { + return false + } + if h.Authority != nil && !h.Authority.IsMatch(parsedURL.Host) { + return false + } + if h.Port != nil { + p, err := strconv.Atoi(parsedURL.Port()) + if err != nil || *h.Port != p { + return false + } + } + if !h.Method.IsMatch(context.MethodName) { + return false + } + + return true +} diff --git a/core/route/base/http_route.go b/core/route/base/http_route.go new file mode 100644 index 00000000..a16828b2 --- /dev/null +++ b/core/route/base/http_route.go @@ -0,0 +1,16 @@ +package base + +type HTTPRoute struct { + Name string + Match []*HTTPMatchRequest + Route []*HTTPRouteDestination +} + +func (h *HTTPRoute) IsMatch(context *TrafficContext) bool { + for _, match := range h.Match { + if match.IsMatch(context) { + return true + } + } + return false +} diff --git a/core/route/base/http_route_destination.go b/core/route/base/http_route_destination.go new file mode 100644 index 00000000..2869a1eb --- /dev/null +++ b/core/route/base/http_route_destination.go @@ -0,0 +1,20 @@ +package base + +import "fmt" + +type HTTPRouteDestination struct { + Weight int + Destination *Destination + Headers Headers // TODO modifies headers +} + +func (H HTTPRouteDestination) String() string { + return fmt.Sprintf("{Weight: %v, Destination: %+v}\n", H.Weight, H.Destination) +} + +type Destination struct { + Host string + Subset string + Port uint32 + Fallback *HTTPRouteDestination +} diff --git a/core/route/base/instance.go b/core/route/base/instance.go new file mode 100644 index 00000000..4230497f --- /dev/null +++ b/core/route/base/instance.go @@ -0,0 +1,9 @@ +package base + +type Instance struct { + AppName string + Host string + Port int + Metadata map[string]string + TargetInstance interface{} +} diff --git a/core/route/base/string_match.go b/core/route/base/string_match.go new file mode 100644 index 00000000..0e56e4a8 --- /dev/null +++ b/core/route/base/string_match.go @@ -0,0 +1,25 @@ +package base + +import "regexp" + +type StringMatch struct { + Exact string + Prefix string + Regex string +} + +func (s *StringMatch) IsMatch(input string) bool { + if input == "" { + return false + } + + if s.Exact != "" { + return input == s.Exact + } else if s.Prefix != "" { + return len(input) >= len(s.Prefix) && input[:len(s.Prefix)] == s.Prefix + } else if s.Regex != "" { + matched, _ := regexp.MatchString(s.Regex, input) + return matched + } + return true +} diff --git a/core/route/base/traffic_context.go b/core/route/base/traffic_context.go new file mode 100644 index 00000000..286982b3 --- /dev/null +++ b/core/route/base/traffic_context.go @@ -0,0 +1,14 @@ +package base + +type TrafficContext struct { + Path string + Uri string + ServiceName string + Group string + Version string + MethodName string + ParamTypes []string + Args []interface{} + Headers map[string]string + Baggage map[string]string +} diff --git a/core/route/base/traffic_policy.go b/core/route/base/traffic_policy.go new file mode 100644 index 00000000..6e0bc991 --- /dev/null +++ b/core/route/base/traffic_policy.go @@ -0,0 +1,5 @@ +package base + +type TrafficPolicy struct { + LoadBalancer string +} diff --git a/core/route/base/traffic_router.go b/core/route/base/traffic_router.go new file mode 100644 index 00000000..68048070 --- /dev/null +++ b/core/route/base/traffic_router.go @@ -0,0 +1,11 @@ +package base + +type TrafficRouter struct { + Host []string + Http []*HTTPRoute +} + +type Fallback struct { + Host string + Subset string +} diff --git a/core/route/base/virtual_workload.go b/core/route/base/virtual_workload.go new file mode 100644 index 00000000..07736892 --- /dev/null +++ b/core/route/base/virtual_workload.go @@ -0,0 +1,12 @@ +package base + +type VirtualWorkload struct { + Host string + trafficPolicy *TrafficPolicy + Subsets []*Subset +} + +type Subset struct { + Name string + Labels map[string]string +} diff --git a/core/route/cluster_manager.go b/core/route/cluster_manager.go new file mode 100644 index 00000000..cff534bc --- /dev/null +++ b/core/route/cluster_manager.go @@ -0,0 +1,51 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "github.com/pkg/errors" +) + +type ClusterManager struct { + InstanceManager InstanceManager + RouterFilterList []RouterFilter + LoadBalancer LoadBalancer +} + +func NewClusterManager(instanceManager InstanceManager, routerFilters []RouterFilter, loadBalancer LoadBalancer) *ClusterManager { + return &ClusterManager{ + InstanceManager: instanceManager, + RouterFilterList: routerFilters, + LoadBalancer: loadBalancer, + } +} + +func (m *ClusterManager) Route(context *base.TrafficContext) ([]*base.Instance, error) { + instances := m.InstanceManager.GetInstances() + + var err error + for _, routerFilter := range m.RouterFilterList { + instances, err = routerFilter.Filter(instances, context) + if err != nil { + return nil, err + } + } + if len(instances) == 0 { + return nil, errors.New("no matching instances") + } + return instances, nil +} + +func (m *ClusterManager) GetOne(context *base.TrafficContext) (*base.Instance, error) { + instances, err := m.Route(context) + if err != nil { + return nil, err + } + if m.LoadBalancer == nil { + return instances[0], nil + } + instance, err := m.LoadBalancer.Select(instances, context) + if err != nil { + return nil, err + } + return instance, nil +} diff --git a/core/route/instance_manager.go b/core/route/instance_manager.go new file mode 100644 index 00000000..25a8a71e --- /dev/null +++ b/core/route/instance_manager.go @@ -0,0 +1,24 @@ +package route + +import "github.com/alibaba/sentinel-golang/core/route/base" + +type InstanceManager interface { + StoreInstances(instances []*base.Instance) + GetInstances() []*base.Instance +} + +type BasicInstanceManager struct { + Instances []*base.Instance +} + +func NewBasicInstanceManager() *BasicInstanceManager { + return &BasicInstanceManager{} +} + +func (b *BasicInstanceManager) StoreInstances(instances []*base.Instance) { + b.Instances = instances +} + +func (b *BasicInstanceManager) GetInstances() []*base.Instance { + return b.Instances +} diff --git a/core/route/load_balancer.go b/core/route/load_balancer.go new file mode 100644 index 00000000..9760e699 --- /dev/null +++ b/core/route/load_balancer.go @@ -0,0 +1,47 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "math/rand" + "sync" +) + +type LoadBalancer interface { + Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) +} + +type RandomLoadBalancer struct { +} + +func NewRandomLoadBalancer() *RandomLoadBalancer { + return &RandomLoadBalancer{} +} + +func (r *RandomLoadBalancer) Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) { + if len(instances) == 0 { + return nil, nil + } + + return instances[rand.Intn(len(instances))], nil +} + +type RoundRobinLoadBalancer struct { + idx int + mu sync.Mutex +} + +func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer { + return &RoundRobinLoadBalancer{idx: 0} +} + +func (r *RoundRobinLoadBalancer) Select(instances []*base.Instance, context *base.TrafficContext) (*base.Instance, error) { + if len(instances) == 0 { + return nil, nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.idx = (r.idx + 1) % len(instances) + return instances[r.idx], nil +} diff --git a/core/route/route_test.go b/core/route/route_test.go new file mode 100644 index 00000000..f2355227 --- /dev/null +++ b/core/route/route_test.go @@ -0,0 +1,167 @@ +package route + +import ( + "fmt" + "github.com/alibaba/sentinel-golang/core/route/base" + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +var clusterManager *ClusterManager + +func TestMain(m *testing.M) { + trafficRouter := base.TrafficRouter{ + Host: []string{"test-provider"}, + Http: []*base.HTTPRoute{ + { + Name: "test-traffic-provider-rule-basic", + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "test-tag": {Exact: "basic-test"}, + }, + Method: &base.StringMatch{ + Exact: "hello", + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v1", + }, + }, + }, + }, { + Name: "test-traffic-provider-rule-fallback", + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "test-tag": {Exact: "fallback-test"}, + }, + Method: &base.StringMatch{ + Exact: "hello", + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v4", + Fallback: &base.HTTPRouteDestination{ + Destination: &base.Destination{ + Host: "test-provider", + Subset: "v3", + }, + }, + }, + }, + }, + }, + }, + } + + virtualWorkload := base.VirtualWorkload{ + Host: "test-provider", + Subsets: []*base.Subset{ + { + Name: "v1", + Labels: map[string]string{ + "instance-tag": "v1", + }, + }, { + Name: "v2", + Labels: map[string]string{ + "instance-tag": "v2", + }, + }, { + Name: "v3", + Labels: map[string]string{ + "instance-tag": "v3", + }, + }, + }, + } + + SetAppName("test-consumer") + SetTrafficRouterList([]*base.TrafficRouter{&trafficRouter}) + SetVirtualWorkloadList([]*base.VirtualWorkload{&virtualWorkload}) + + clusterManager = &ClusterManager{ + InstanceManager: NewBasicInstanceManager(), + LoadBalancer: NewRandomLoadBalancer(), + RouterFilterList: []RouterFilter{ + NewBasicRouterFilter(), + }, + } + + instanceList := []*base.Instance{ + { + AppName: "test-provider", + Host: "127.0.0.1", + Port: 80081, + Metadata: map[string]string{ + "instance-tag": "v1", + }, + }, { + AppName: "test-provider", + Host: "127.0.0.2", + Port: 80082, + Metadata: map[string]string{ + "instance-tag": "v2", + }, + }, { + AppName: "test-provider", + Host: "127.0.0.3", + Port: 80083, + Metadata: map[string]string{ + "instance-tag": "v3", + }, + }, + } + + clusterManager.InstanceManager.StoreInstances(instanceList) + + exitCode := m.Run() + + os.Exit(exitCode) +} + +func TestRouteBasic(t *testing.T) { + context := &base.TrafficContext{ + MethodName: "hello", + Headers: map[string]string{ + "test-tag": "basic-test", + }, + } + res, err := clusterManager.GetOne(context) + if err != nil { + fmt.Println(err) + t.Failed() + } + assert.Equal(t, "127.0.0.1", res.Host) + assert.Equal(t, 80081, res.Port) + assert.Equal(t, "v1", res.Metadata["instance-tag"]) +} + +func TestRouteFallback(t *testing.T) { + context := &base.TrafficContext{ + MethodName: "hello", + Headers: map[string]string{ + "test-tag": "fallback-test", + }, + } + res, err := clusterManager.GetOne(context) + if err != nil { + fmt.Println(err) + t.Failed() + } + assert.Equal(t, "127.0.0.3", res.Host) + assert.Equal(t, 80083, res.Port) + assert.Equal(t, "v3", res.Metadata["instance-tag"]) +} diff --git a/core/route/router_filter.go b/core/route/router_filter.go new file mode 100644 index 00000000..c76054c1 --- /dev/null +++ b/core/route/router_filter.go @@ -0,0 +1,131 @@ +package route + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + "math/rand" +) + +type RouterFilter interface { + Filter(instanceList []*base.Instance, context *base.TrafficContext) ([]*base.Instance, error) +} + +type BasicRouterFilter struct { +} + +func NewBasicRouterFilter() *BasicRouterFilter { + return &BasicRouterFilter{} +} + +func (b *BasicRouterFilter) Filter(instanceList []*base.Instance, context *base.TrafficContext) ([]*base.Instance, error) { + if len(instanceList) == 0 { + return instanceList, nil + } + + routeDestinationList := getRouteDestination(context) + + targets := make([]*base.Instance, 0) + if len(routeDestinationList) == 0 { + return targets, nil + } + appName := instanceList[0].AppName + subset := randomSelectDestination(appName, routeDestinationList, instanceList) + if subset == "" { + return targets, nil + } + return getSubsetInstances(appName, subset, instanceList), nil +} + +func getRouteDestination(context *base.TrafficContext) []*base.HTTPRouteDestination { + routerList := GetTrafficRouterList() + for _, router := range routerList { + routes := router.Http + if len(routes) == 0 { + continue + } + for _, route := range routes { + if route.IsMatch(context) { + return route.Route + } + } + } + return nil +} + +func randomSelectDestination(appName string, + routeDestination []*base.HTTPRouteDestination, instanceList []*base.Instance) string { + + totalWeight := 0 + for _, dest := range routeDestination { + if dest.Weight < 1 { + totalWeight += 1 + } else { + totalWeight += dest.Weight + } + } + target := rand.Intn(totalWeight + 1) + for _, dest := range routeDestination { + if dest.Weight < 1 { + target -= 1 + } else { + target -= dest.Weight + } + if target <= 0 { + result := getDestination(appName, dest.Destination, instanceList) + if result != "" { + return result + } + } + } + return "" +} + +func getDestination(appName string, destination *base.Destination, instanceList []*base.Instance) string { + subset := destination.Subset + + for { + result := getSubsetInstances(appName, subset, instanceList) + newResult := make([]*base.Instance, 0) + + newResult = append(newResult, result...) + if len(newResult) > 0 { + return subset + } + + // fallback + routeDestination := destination.Fallback + if routeDestination == nil || routeDestination.Destination == nil { + break + } + subset = routeDestination.Destination.Subset + } + return "" +} + +func getSubsetInstances(appName string, subset string, instanceList []*base.Instance) []*base.Instance { + virtualLoads := GetVirtualWorkloadList() + result := make([]*base.Instance, 0) + for _, virtualWorkload := range virtualLoads { + if virtualWorkload.Host != appName { + continue + } + for _, subsetRule := range virtualWorkload.Subsets { + if subsetRule.Name != subset { + continue + } + labels := subsetRule.Labels + for _, instance := range instanceList { + match := true + for key, value := range labels { + if value != instance.Metadata[key] { // TODO + match = false + break + } + } + if match { + result = append(result, instance) + } + } + } + } + return result +} diff --git a/core/route/traffic_router_rule_manager.go b/core/route/traffic_router_rule_manager.go new file mode 100644 index 00000000..bb330265 --- /dev/null +++ b/core/route/traffic_router_rule_manager.go @@ -0,0 +1,35 @@ +package route + +import "github.com/alibaba/sentinel-golang/core/route/base" + +type TrafficRoutingRuleGroup struct { + AppName string + TrafficRouterList []*base.TrafficRouter + VirtualWorkloadList []*base.VirtualWorkload +} + +var group = &TrafficRoutingRuleGroup{} + +func SetAppName(appName string) { + group.AppName = appName +} + +func SetTrafficRouterList(list []*base.TrafficRouter) { + group.TrafficRouterList = list +} + +func SetVirtualWorkloadList(list []*base.VirtualWorkload) { + group.VirtualWorkloadList = list +} + +func GetAppName() string { + return group.AppName +} + +func GetTrafficRouterList() []*base.TrafficRouter { + return group.TrafficRouterList +} + +func GetVirtualWorkloadList() []*base.VirtualWorkload { + return group.VirtualWorkloadList +} From 1d2a8ff93cf2dade7122bde7e626102a23cd7604 Mon Sep 17 00:00:00 2001 From: "saica.go" Date: Tue, 15 Aug 2023 03:13:34 +0800 Subject: [PATCH 2/4] feat: adaptation for gRPC --- pkg/adapters/grpc/traffic.go | 75 ++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 pkg/adapters/grpc/traffic.go diff --git a/pkg/adapters/grpc/traffic.go b/pkg/adapters/grpc/traffic.go new file mode 100644 index 00000000..f8019118 --- /dev/null +++ b/pkg/adapters/grpc/traffic.go @@ -0,0 +1,75 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + "github.com/alibaba/sentinel-golang/core/route" + "github.com/alibaba/sentinel-golang/core/route/base" + "google.golang.org/grpc" + "net" + "strings" +) + +var ( + connToBaggage map[string]map[string]string = make(map[string]map[string]string) + cm *route.ClusterManager = nil +) + +type Baggage map[string]string + +func NewDialer(b Baggage) func(context.Context, string) (net.Conn, error) { + return func(ctx context.Context, addr string) (net.Conn, error) { + parts := strings.Split(addr, "/") + if len(parts) != 2 { + return nil, errors.New("invalid address format") + } + tc := &base.TrafficContext{ + ServiceName: parts[0], + MethodName: parts[1], + Headers: make(map[string]string), + } + + instance, err := cm.GetOne(tc) + + if err != nil { + return nil, err + } + if instance == nil { + return nil, errors.New("no matched provider") + } + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%v", instance.Host, instance.Port)) + if err != nil { + return nil, err + } + b = tc.Baggage + + return conn, nil + } +} + +func NewTrafficUnaryIntercepter(baggage Baggage) grpc.DialOption { + return grpc.WithUnaryInterceptor( + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + newCtx := ctx + // TODO modify the request with baggage + return invoker(newCtx, method, req, reply, cc, opts...) + }) +} + +func NewTrafficStreamIntercepter(baggage Baggage) grpc.DialOption { + return grpc.WithStreamInterceptor( + func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + newCtx := ctx + // TODO modify the request with baggage + return streamer(newCtx, desc, cc, method, opts...) + }) +} + +func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + var b Baggage = make(map[string]string) + opts = append(opts, grpc.WithContextDialer(NewDialer(b))) + opts = append(opts, NewTrafficUnaryIntercepter(b)) + opts = append(opts, NewTrafficStreamIntercepter(b)) + return grpc.Dial(addr, opts...) +} From 6577ac6f6d351cac28fcd8456a8586ad8132d43b Mon Sep 17 00:00:00 2001 From: saicaca Date: Wed, 27 Sep 2023 20:49:04 +0800 Subject: [PATCH 3/4] feat: enhance the gRPC adapter --- pkg/adapters/grpc/traffic.go | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/pkg/adapters/grpc/traffic.go b/pkg/adapters/grpc/traffic.go index f8019118..24052963 100644 --- a/pkg/adapters/grpc/traffic.go +++ b/pkg/adapters/grpc/traffic.go @@ -2,6 +2,7 @@ package grpc import ( "context" + "encoding/json" "errors" "fmt" "github.com/alibaba/sentinel-golang/core/route" @@ -16,6 +17,8 @@ var ( cm *route.ClusterManager = nil ) +const BAGGAGE_KEY = "_sentinel_baggage" + type Baggage map[string]string func NewDialer(b Baggage) func(context.Context, string) (net.Conn, error) { @@ -24,10 +27,19 @@ func NewDialer(b Baggage) func(context.Context, string) (net.Conn, error) { if len(parts) != 2 { return nil, errors.New("invalid address format") } + + baggageStr := ctx.Value(BAGGAGE_KEY).(string) + var baggage Baggage + err := json.Unmarshal([]byte(baggageStr), &baggage) + if err != nil { + return nil, err + } + tc := &base.TrafficContext{ ServiceName: parts[0], MethodName: parts[1], Headers: make(map[string]string), + Baggage: baggage, } instance, err := cm.GetOne(tc) @@ -51,8 +63,11 @@ func NewDialer(b Baggage) func(context.Context, string) (net.Conn, error) { func NewTrafficUnaryIntercepter(baggage Baggage) grpc.DialOption { return grpc.WithUnaryInterceptor( func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - newCtx := ctx - // TODO modify the request with baggage + baggageStr, err := json.Marshal(baggage) + if err != nil { + return err + } + newCtx := context.WithValue(ctx, BAGGAGE_KEY, baggageStr) return invoker(newCtx, method, req, reply, cc, opts...) }) } @@ -60,12 +75,16 @@ func NewTrafficUnaryIntercepter(baggage Baggage) grpc.DialOption { func NewTrafficStreamIntercepter(baggage Baggage) grpc.DialOption { return grpc.WithStreamInterceptor( func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - newCtx := ctx - // TODO modify the request with baggage + baggageStr, err := json.Marshal(baggage) + if err != nil { + return nil, err + } + newCtx := context.WithValue(ctx, BAGGAGE_KEY, baggageStr) return streamer(newCtx, desc, cc, method, opts...) }) } +// Dial function provides a easy way to enable sentinel traffic routing. Just need to replace grpc.Dial with this function. func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { var b Baggage = make(map[string]string) opts = append(opts, grpc.WithContextDialer(NewDialer(b))) @@ -73,3 +92,10 @@ func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { opts = append(opts, NewTrafficStreamIntercepter(b)) return grpc.Dial(addr, opts...) } + +// GetDialerAndIntercepter provides another way to enable sentinel traffic routing when user don't want to replace grpc.Dial. +// User can use this function to get dialer and intercepter, and then use in grpc.Dial. +func GetDialerAndIntercepter() (grpc.DialOption, grpc.DialOption, grpc.DialOption) { + var b Baggage = make(map[string]string) + return grpc.WithContextDialer(NewDialer(b)), NewTrafficUnaryIntercepter(b), NewTrafficStreamIntercepter(b) +} From 35dd3d64f3300732ab39aaa81ce8bf61bfbcd75e Mon Sep 17 00:00:00 2001 From: saicaca Date: Wed, 27 Sep 2023 20:54:15 +0800 Subject: [PATCH 4/4] feat: add OpenSergo subscriber --- core/route/datasource/parser.go | 146 ++++++++++++++++++++++ core/route/datasource/parser_test.go | 176 +++++++++++++++++++++++++++ core/route/datasource/subscriber.go | 60 +++++++++ 3 files changed, 382 insertions(+) create mode 100644 core/route/datasource/parser.go create mode 100644 core/route/datasource/parser_test.go create mode 100644 core/route/datasource/subscriber.go diff --git a/core/route/datasource/parser.go b/core/route/datasource/parser.go new file mode 100644 index 00000000..b281dad7 --- /dev/null +++ b/core/route/datasource/parser.go @@ -0,0 +1,146 @@ +package datasource + +import ( + "github.com/alibaba/sentinel-golang/core/route/base" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "sort" + "strings" +) + +// resolveRouting parses envoy RouteConfiguration to TrafficRouters and VirtualWorkloads +func resolveRouting(configurations []routev3.RouteConfiguration) ([]*base.TrafficRouter, []*base.VirtualWorkload) { + var routerList []*base.TrafficRouter + virtualWorkloadMap := map[string]*base.VirtualWorkload{} + subsetMap := map[string]map[string]*base.Subset{} + for i := range configurations { + conf := &configurations[i] + virtualHosts := conf.GetVirtualHosts() + for _, virtualHost := range virtualHosts { + appName := strings.Split(virtualHost.GetName(), ".")[0] + router := &base.TrafficRouter{} + router.Host = append(router.Host, appName) + + for _, route := range virtualHost.GetRoutes() { + dest, subsets := convertRouteAction(route.GetRoute(), appName) + httpRoute := &base.HTTPRoute{ + Name: route.GetName(), + Match: []*base.HTTPMatchRequest{convertRouteMatch(route.GetMatch())}, + Route: dest, + } + router.Http = append(router.Http, httpRoute) + + // add VirtualWorkload + if _, ok := virtualWorkloadMap[appName]; !ok { + virtualWorkloadMap[appName] = &base.VirtualWorkload{ + Host: appName, + } + } + for subset := range subsets { + if _, ok := subsetMap[appName]; !ok { + subsetMap[appName] = map[string]*base.Subset{} + } + if _, ok := subsetMap[appName][subset]; !ok { + subsetMap[appName][subset] = &base.Subset{ + Name: subset, + } + } + } + } + routerList = append(routerList, router) + } + } + + // build VirtualWorkload list + var vwList []*base.VirtualWorkload + for vw, m := range subsetMap { + virtualWorkloadMap[vw].Subsets = make([]*base.Subset, 0, len(m)) + for _, s := range m { + virtualWorkloadMap[vw].Subsets = append(virtualWorkloadMap[vw].Subsets, s) + } + } + for _, vw := range virtualWorkloadMap { + vwList = append(vwList, vw) + } + sort.Slice(vwList, func(i, j int) bool { + return vwList[i].Host < vwList[j].Host + }) + for _, vw := range vwList { + sort.Slice(vw.Subsets, func(i, j int) bool { + return vw.Subsets[i].Name < vw.Subsets[j].Name + }) + } + return routerList, vwList +} + +func convertRouteMatch(match *routev3.RouteMatch) *base.HTTPMatchRequest { + mr := &base.HTTPMatchRequest{ + Headers: make(map[string]*base.StringMatch), + } + for _, m := range match.Headers { + mr.Headers[m.GetName()] = convertHeaderMatcher(m) + } + return mr +} + +func convertHeaderMatcher(matcher *routev3.HeaderMatcher) *base.StringMatch { + // supports PresentMatch, ExactMatch, PrefixMatch, RegexMatch for now + if matcher.GetPresentMatch() { + return &base.StringMatch{Regex: ".*"} + } + sm := matcher.GetStringMatch() + if sm == nil { + return nil + } + if sm.GetExact() != "" { + return &base.StringMatch{Exact: sm.GetExact()} + } + if sm.GetPrefix() != "" { + return &base.StringMatch{Prefix: sm.GetPrefix()} + } + if sm.GetSafeRegex() != nil && sm.GetSafeRegex().Regex != "" { + return &base.StringMatch{Regex: sm.GetSafeRegex().Regex} + } + return nil +} + +func convertRouteAction(dest *routev3.RouteAction, host string) ([]*base.HTTPRouteDestination, map[string]bool) { + // supports Cluster, WeightedClusters for now + if dest.GetCluster() != "" { + subset := getSubset(dest.GetCluster()) + return []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: host, + Subset: subset, + }, + }, + }, map[string]bool{subset: true} + } + if dest.GetWeightedClusters() != nil { + var destList []*base.HTTPRouteDestination + subsets := make(map[string]bool) + for _, cluster := range dest.GetWeightedClusters().Clusters { + subset := getSubset(cluster.GetName()) + subsets[subset] = true + destList = append(destList, &base.HTTPRouteDestination{ + Weight: int(cluster.GetWeight().GetValue()), + Destination: &base.Destination{ + Host: host, + Subset: subset, + }, + }) + } + return destList, subsets + } + return nil, nil +} + +func getSubset(cluster string) string { + version := "" + info := strings.Split(cluster, "|") + if len(info) >= 3 { + version = info[2] + } + return version +} diff --git a/core/route/datasource/parser_test.go b/core/route/datasource/parser_test.go new file mode 100644 index 00000000..a9896ffe --- /dev/null +++ b/core/route/datasource/parser_test.go @@ -0,0 +1,176 @@ +package datasource + +import ( + "encoding/json" + "fmt" + "github.com/alibaba/sentinel-golang/core/route/base" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" + "google.golang.org/protobuf/types/known/wrapperspb" + "reflect" + "testing" +) + +func TestResolveRouting(t *testing.T) { + // Mock a sample routev3.RouteConfiguration slice + configurations := []routev3.RouteConfiguration{{ + VirtualHosts: []*routev3.VirtualHost{ + { + Name: "Foo", + Routes: []*routev3.Route{ + { + Match: &routev3.RouteMatch{ + Headers: []*routev3.HeaderMatcher{ + { + Name: "match-key-1", + HeaderMatchSpecifier: &routev3.HeaderMatcher_StringMatch{ + StringMatch: &matcherv3.StringMatcher{ + MatchPattern: &matcherv3.StringMatcher_Exact{ + Exact: "match-value-1", + }, + }, + }, + }, + }, + }, + Action: &routev3.Route_Route{ + Route: &routev3.RouteAction{ + ClusterSpecifier: &routev3.RouteAction_Cluster{ + Cluster: "Foo|Bar|Sub", + }, + }, + }, + }, { + Match: &routev3.RouteMatch{ + Headers: []*routev3.HeaderMatcher{ + { + Name: "match-key-2", + HeaderMatchSpecifier: &routev3.HeaderMatcher_StringMatch{ + StringMatch: &matcherv3.StringMatcher{ + MatchPattern: &matcherv3.StringMatcher_Exact{ + Exact: "match-value-2", + }, + }, + }, + }, + }, + }, + Action: &routev3.Route_Route{ + Route: &routev3.RouteAction{ + ClusterSpecifier: &routev3.RouteAction_WeightedClusters{ + WeightedClusters: &routev3.WeightedCluster{ + Clusters: []*routev3.WeightedCluster_ClusterWeight{ + { + Name: "Foo|Bar|Sub1", + Weight: wrapperspb.UInt32(1), + }, { + Name: "Foo|Bar|Sub2", + Weight: wrapperspb.UInt32(2), + }, { + Name: "Foo|Bar|Sub3", + Weight: wrapperspb.UInt32(3), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }}, + } + + routerList, virtualWorkloads := resolveRouting(configurations) + + expectedRouterList := []*base.TrafficRouter{ + { + Host: []string{"Foo"}, + Http: []*base.HTTPRoute{ + { + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "match-key-1": { + Exact: "match-value-1", + }, + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "Foo", + Subset: "Sub", + }, + }, + }, + }, { + Match: []*base.HTTPMatchRequest{ + { + Headers: map[string]*base.StringMatch{ + "match-key-2": { + Exact: "match-value-2", + }, + }, + }, + }, + Route: []*base.HTTPRouteDestination{ + { + Weight: 1, + Destination: &base.Destination{ + Host: "Foo", + Subset: "Sub1", + }, + }, { + Weight: 2, + Destination: &base.Destination{ + Host: "Foo", + Subset: "Sub2", + }, + }, { + Weight: 3, + Destination: &base.Destination{ + Host: "Foo", + Subset: "Sub3", + }, + }, + }, + }, + }, + }, + } + + expectedVirtualWorkloads := []*base.VirtualWorkload{ + { + Host: "Foo", + Subsets: []*base.Subset{ + { + Name: "Sub", + }, { + Name: "Sub1", + }, { + Name: "Sub2", + }, { + Name: "Sub3", + }, + }, + }, + } + + if !reflect.DeepEqual(routerList, expectedRouterList) { + t.Errorf("Expected routerList %v, but got %v", toJSON(expectedRouterList), toJSON(routerList)) + } + if !reflect.DeepEqual(virtualWorkloads, expectedVirtualWorkloads) { + t.Errorf("Expected vwList %s, but got %s", toJSON(expectedVirtualWorkloads), toJSON(virtualWorkloads)) + } +} + +func toJSON(v interface{}) string { + jsonBytes, err := json.Marshal(v) + if err != nil { + fmt.Println("failed to parse:", err) + } + return string(jsonBytes) +} diff --git a/core/route/datasource/subscriber.go b/core/route/datasource/subscriber.go new file mode 100644 index 00000000..ee2bedf7 --- /dev/null +++ b/core/route/datasource/subscriber.go @@ -0,0 +1,60 @@ +package datasource + +import ( + "github.com/alibaba/sentinel-golang/core/route" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "github.com/opensergo/opensergo-go/pkg/api" + "github.com/opensergo/opensergo-go/pkg/client" + "github.com/opensergo/opensergo-go/pkg/common/logging" + "github.com/opensergo/opensergo-go/pkg/configkind" + "github.com/opensergo/opensergo-go/pkg/model" + "github.com/pkg/errors" +) + +type TrafficRouterSubscriber struct { + OnUpdate func(subscribeKey model.SubscribeKey, data interface{}) (bool, error) +} + +func (t TrafficRouterSubscriber) OnSubscribeDataUpdate(subscribeKey model.SubscribeKey, data interface{}) (bool, error) { + ok, err := t.OnUpdate(subscribeKey, data) + return ok, err +} + +func SubscribeOpenSergoTrafficConfig(host, namespace, app string, port uint32) error { + + // Set OpenSergo console logger (optional) + logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat, true) + + // Create an OpenSergoClient. + openSergoClient, err := client.NewOpenSergoClient(host, port) + if err != nil { + return err + } + + // Start the OpenSergoClient. + err = openSergoClient.Start() + if err != nil { + return err + } + + subscribeKey := model.NewSubscribeKey(namespace, app, configkind.ConfigKindTrafficRouterStrategy{}) + subscriber := TrafficRouterSubscriber{ + OnUpdate: func(subscribeKey model.SubscribeKey, data interface{}) (bool, error) { + messages, ok := data.([]routev3.RouteConfiguration) + if !ok { + return false, errors.New("failed to convert data to RouteConfiguration") + } + TRList, VWList := resolveRouting(messages) + route.SetTrafficRouterList(TRList) + route.SetVirtualWorkloadList(VWList) + return true, nil + }, + } + + err = openSergoClient.SubscribeConfig(*subscribeKey, api.WithSubscriber(subscriber)) + if err != nil { + return err + } + + return err +}