diff --git a/cli/ctl/agent.go b/cli/ctl/agent.go index 7bdca274ba0..d196a7163c5 100644 --- a/cli/ctl/agent.go +++ b/cli/ctl/agent.go @@ -43,6 +43,8 @@ type RebalanceType string const RebalanceTypeNull RebalanceType = "null" +var isDebug, isAggIP bool + func RegisterAgentCommand() *cobra.Command { agent := &cobra.Command{ Use: "agent", @@ -94,6 +96,7 @@ func RegisterAgentCommand() *cobra.Command { } var typeStr string + rebalanceCmd := &cobra.Command{ Use: "rebalance", Short: "rebalance controller or analyzer", @@ -117,6 +120,8 @@ deepflow-ctl agent rebalance --type=analyzer`, }, } rebalanceCmd.Flags().StringVarP(&typeStr, "type", "t", "", "request type controller/analyzer") + rebalanceCmd.Flags().BoolVarP(&isDebug, "debug", "d", false, "enable debug output") + rebalanceCmd.Flags().BoolVarP(&isAggIP, "agg-ip", "", false, "aggregation based on analyzer ip, type only support analzyer") agent.AddCommand(list) agent.AddCommand(delete) @@ -478,6 +483,9 @@ func upgadeAgent(cmd *cobra.Command, args []string) { } func rebalance(cmd *cobra.Command, rebalanceType RebalanceType, typeVal string) error { + if isDebug { + return rebalanceDebug(cmd, typeVal) + } isBalance, err := ifNeedRebalance(cmd, typeVal) if err != nil { return err @@ -501,6 +509,64 @@ func rebalance(cmd *cobra.Command, rebalanceType RebalanceType, typeVal string) return nil } +func rebalanceDebug(cmd *cobra.Command, typeVal string) error { + server := common.GetServerInfo(cmd) + url := fmt.Sprintf("http://%s:%d/v1/rebalance-vtap/?type=%s&is_debug=true", server.IP, server.Port, typeVal) + resp, err := common.CURLPerform("POST", url, nil, "", + []common.HTTPOption{common.WithTimeout(common.GetTimeout(cmd)), common.WithORGID(common.GetORGID(cmd))}...) + if err != nil { + return err + } + data := resp.Get("DATA") + + if data.Get("TRAFFIC_AZ") == nil || + (data.Get("TRAFFIC_AZ") != nil && len(data.Get("TRAFFIC_AZ").MustArray()) == 0) { + return nil + } + fmt.Println() + + t := table.New() + t.SetHeader([]string{"REGION", "AZ", "ANALYZER_IP", "ANALYZER_STATE", "AZ_TRAFFIC", "AGENT_COUNT", "ANALYZER_TRAFFIC"}) + tableItems := [][]string{} + for i := range data.Get("TRAFFIC_AZ").MustArray() { + d := data.Get("TRAFFIC_AZ").GetIndex(i) + tableItems = append(tableItems, []string{ + d.Get("REGION").MustString(), + d.Get("AZ").MustString(), + d.Get("ANALYZER_IP").MustString(), + strconv.Itoa(int(d.Get("ANALYZER_STATE").MustInt64())), + strconv.Itoa(int(d.Get("AZ_TRAFFIC").MustInt64())), + strconv.Itoa(int(d.Get("AGENT_COUNT").MustInt())), + strconv.Itoa(int(d.Get("ANALYZER_TRAFFIC").MustInt64())), + }) + } + t.AppendBulk(tableItems) + t.Render() + + if isAggIP { + fmt.Println() + + t1 := table.New() + t1.SetHeader([]string{"REGION", "AZ", "ANALYZER_IP", "ANALYZER_STATE", "AGENT_COUNT", "ANALYZER_TRAFFIC"}) + tableItems = [][]string{} + for i := range data.Get("TRAFFIC_ANALYZER").MustArray() { + d := data.Get("TRAFFIC_ANALYZER").GetIndex(i) + tableItems = append(tableItems, []string{ + d.Get("REGION").MustString(), + d.Get("AZ").MustString(), + d.Get("ANALYZER_IP").MustString(), + strconv.Itoa(int(d.Get("ANALYZER_STATE").MustInt64())), + strconv.Itoa(int(d.Get("AGENT_COUNT").MustInt())), + strconv.Itoa(int(d.Get("ANALYZER_TRAFFIC").MustInt64())), + }) + } + t1.AppendBulk(tableItems) + t1.Render() + } + + return nil +} + func ifNeedRebalance(cmd *cobra.Command, typeStr string) (bool, error) { server := common.GetServerInfo(cmd) url := fmt.Sprintf("http://%s:%d/v1/rebalance-vtap/?check=false&type=%s", server.IP, server.Port, typeStr) diff --git a/server/controller/http/router/common/controller.go b/server/controller/http/router/common/controller.go index 1ccfdee667c..6234a386243 100644 --- a/server/controller/http/router/common/controller.go +++ b/server/controller/http/router/common/controller.go @@ -19,17 +19,23 @@ package common import ( "fmt" "net/http" + "strconv" "strings" "github.com/gin-gonic/gin" + "github.com/op/go-logging" ) +var log = logging.MustGetLogger("common/controller") + func ForwardMasterController(c *gin.Context, masterControllerName string, port int) { requestHosts := strings.Split(c.Request.Host, ":") if len(requestHosts) > 1 { - c.Request.Host = strings.Replace( - c.Request.Host, requestHosts[0], masterControllerName, 1, - ) + if requestHosts[1] != strconv.Itoa(port) { + c.Request.Host = fmt.Sprintf("%s:%d", masterControllerName, port) + } else { + c.Request.Host = strings.Replace(c.Request.Host, requestHosts[0], masterControllerName, 1) + } } else { c.Request.Host = fmt.Sprintf("%s:%d", masterControllerName, port) } diff --git a/server/controller/http/router/vtap.go b/server/controller/http/router/vtap.go index 20c94f5158b..bb408d6fa2d 100644 --- a/server/controller/http/router/vtap.go +++ b/server/controller/http/router/vtap.go @@ -267,6 +267,9 @@ func rebalanceVtap(cfg *config.ControllerConfig) gin.HandlerFunc { if value, ok := c.GetQuery("check"); ok { args["check"] = (strings.ToLower(value) == "true") } + if isDebug, ok := c.GetQuery("is_debug"); ok { + args["is_debug"] = (strings.ToLower(isDebug) == "true") + } if value, ok := c.GetQuery("type"); ok { args["type"] = value if args["type"] != "controller" && args["type"] != "analyzer" { diff --git a/server/controller/http/service/rebalance/analyzer.go b/server/controller/http/service/rebalance/analyzer.go index 95361012efd..c182503b661 100644 --- a/server/controller/http/service/rebalance/analyzer.go +++ b/server/controller/http/service/rebalance/analyzer.go @@ -27,6 +27,7 @@ type DB interface { } type DBInfo struct { + Regions []mysql.Region AZs []mysql.AZ Analyzers []mysql.Analyzer AZAnalyzerConns []mysql.AZAnalyzerConnection @@ -38,12 +39,21 @@ type DBInfo struct { } type AnalyzerInfo struct { - onlyWeight bool - dbInfo *DBInfo - regionToVTapNameToTraffic map[string]map[string]int64 + onlyWeight bool - db DB - query Querier + dbInfo *DBInfo + db DB + query Querier + + RebalanceData +} + +type RebalanceData struct { + RegionToVTapNameToTraffic map[string]map[string]int64 `json:"RegionToVTapNameToTraffic"` + RegionToAZLcuuids map[string][]string `json:"RegionToAZLcuuids"` + AZToRegion map[string]string `json:"AZToRegion"` + AZToVTaps map[string][]*mysql.VTap `json:"AZToVTaps"` + AZToAnalyzers map[string][]*mysql.Analyzer `json:"AZToAnalyzers"` } func NewAnalyzerInfo(onlyWeight bool) *AnalyzerInfo { @@ -57,6 +67,9 @@ func NewAnalyzerInfo(onlyWeight bool) *AnalyzerInfo { } func (r *DBInfo) Get(db *mysql.DB) error { + if err := db.Find(&r.Regions).Error; err != nil { + return err + } if err := db.Find(&r.AZs).Error; err != nil { return err } diff --git a/server/controller/http/service/rebalance/traffic.go b/server/controller/http/service/rebalance/traffic.go index d9394895621..d8f0ea7494d 100644 --- a/server/controller/http/service/rebalance/traffic.go +++ b/server/controller/http/service/rebalance/traffic.go @@ -42,56 +42,17 @@ import ( var log = logging.MustGetLogger("service.rebalance") func (r *AnalyzerInfo) RebalanceAnalyzerByTraffic(db *mysql.DB, ifCheckout bool, dataDuration int) (*model.VTapRebalanceResult, error) { - // In automatic balancing, data is not obtained when ifCheckout = false. - if len(r.dbInfo.Analyzers) == 0 { - err := r.dbInfo.Get(db) - if err != nil { - return nil, err - } - } - info := r.dbInfo - if len(info.VTaps) == 0 || len(info.Analyzers) == 0 { - return nil, nil - } - - regionToAZLcuuids := make(map[string][]string) - azToRegion := make(map[string]string, len(info.AZs)) - for _, az := range info.AZs { - regionToAZLcuuids[az.Region] = append(regionToAZLcuuids[az.Region], az.Lcuuid) - azToRegion[az.Lcuuid] = az.Region - } - azToVTaps := make(map[string][]*mysql.VTap) - allVTapNameToID := make(map[string]int, len(info.VTaps)) - allVTapIDToVTap := make(map[int]*mysql.VTap, len(info.VTaps)) - for i, vtap := range info.VTaps { - azToVTaps[vtap.AZ] = append(azToVTaps[vtap.AZ], &info.VTaps[i]) - allVTapNameToID[vtap.Name] = vtap.ID - allVTapIDToVTap[vtap.ID] = &vtap - } - ipToAnalyzer := make(map[string]*mysql.Analyzer) - for i, analyzer := range info.Analyzers { - ipToAnalyzer[analyzer.IP] = &info.Analyzers[i] - } - azToAnalyzers := GetAZToAnalyzers(info.AZAnalyzerConns, regionToAZLcuuids, ipToAnalyzer) - - if r.regionToVTapNameToTraffic == nil { - regionToVTapNameToTraffic, err := r.getVTapTraffic(db, dataDuration, regionToAZLcuuids) - if err != nil { - return nil, fmt.Errorf("get traffic data failed: %v", err) - } - for region, vtapNameToTraffic := range regionToVTapNameToTraffic { - log.Infof("ORGID-%d DATABASE-%s region(%s) agent traffic: %#v", db.ORGID, db.Name, region, vtapNameToTraffic) - } - r.regionToVTapNameToTraffic = regionToVTapNameToTraffic + if err := r.generateRebalanceData(db, dataDuration); err != nil { + return nil, err } response := &model.VTapRebalanceResult{} - for _, az := range info.AZs { - azVTaps, ok := azToVTaps[az.Lcuuid] + for _, az := range r.dbInfo.AZs { + azVTaps, ok := r.AZToVTaps[az.Lcuuid] if !ok { continue } - azAnalyzers, ok := azToAnalyzers[az.Lcuuid] + azAnalyzers, ok := r.AZToAnalyzers[az.Lcuuid] if !ok { continue } @@ -103,7 +64,7 @@ func (r *AnalyzerInfo) RebalanceAnalyzerByTraffic(db *mysql.DB, ifCheckout bool, vTapIDToTraffic[vtap.ID] = 0 vTapIDToVTap[vtap.ID] = vtap } - for vtapName, traffic := range r.regionToVTapNameToTraffic[az.Region] { + for vtapName, traffic := range r.RegionToVTapNameToTraffic[az.Region] { vtapID, ok := vtapNameToID[vtapName] if !ok { continue @@ -160,6 +121,13 @@ func (r *AnalyzerInfo) RebalanceAnalyzerByTraffic(db *mysql.DB, ifCheckout bool, r.updateCounter(db, vTapIDToVTap, vtapNameToID, vTapIDToChangeInfo) } + + allVTapNameToID := make(map[string]int, len(r.dbInfo.VTaps)) + allVTapIDToVTap := make(map[int]*mysql.VTap, len(r.dbInfo.VTaps)) + for _, vtap := range r.dbInfo.VTaps { + allVTapNameToID[vtap.Name] = vtap.ID + allVTapIDToVTap[vtap.ID] = &vtap + } vtapCounter := statsd.GetVTapCounter() for name, counter := range vtapCounter.GetVtapNameCounter(db.ORGID) { vtapID, ok := allVTapNameToID[name] @@ -184,6 +152,177 @@ func (r *AnalyzerInfo) RebalanceAnalyzerByTraffic(db *mysql.DB, ifCheckout bool, return response, nil } +func (r *AnalyzerInfo) RebalanceAnalyzerByTrafficDebug(db *mysql.DB, dataDuration int) (map[string]interface{}, error) { + data := make(map[string]interface{}) + if err := r.generateRebalanceData(db, dataDuration); err != nil { + return nil, err + } + + data = r.trafficAZDebug(data) + data = r.trafficAnalyzerDebug(db.ORGID, data) + return data, nil +} + +func (r *AnalyzerInfo) trafficAZDebug(data map[string]interface{}) map[string]interface{} { + var trafficResult []map[string]interface{} + regionToName := make(map[string]string, len(r.dbInfo.Regions)) + for _, region := range r.dbInfo.Regions { + regionToName[region.Lcuuid] = region.Name + } + for _, az := range r.dbInfo.AZs { + azVTaps, ok := r.AZToVTaps[az.Lcuuid] + if !ok { + continue + } + azAnalyzers, ok := r.AZToAnalyzers[az.Lcuuid] + if !ok { + continue + } + analyzerIPToVTaps := make(map[string][]mysql.VTap, len(azAnalyzers)) + for _, vtap := range azVTaps { + analyzerIPToVTaps[vtap.AnalyzerIP] = append(analyzerIPToVTaps[vtap.AnalyzerIP], *vtap) + } + vTapNameToTraffic := make(map[string]int64) + for vtapName, traffic := range r.RegionToVTapNameToTraffic[az.Region] { + vTapNameToTraffic[vtapName] = traffic + } + + results := make([]map[string]interface{}, len(azAnalyzers)) + azTraffic := int64(0) + for i, analyzer := range azAnalyzers { + results[i] = map[string]interface{}{ + "REGION": fmt.Sprintf("%s(%s)", az.Region, regionToName[az.Region]), + "AZ": fmt.Sprintf("%s(%s)", az.Lcuuid, az.Name), + "ANALYZER_IP": analyzer.IP, + "ANALYZER_STATE": analyzer.State, + "AGENT_COUNT": len(analyzerIPToVTaps[analyzer.IP]), + } + trafficData := int64(0) + for _, vtap := range analyzerIPToVTaps[analyzer.IP] { + trafficData += vTapNameToTraffic[vtap.Name] + azTraffic += vTapNameToTraffic[vtap.Name] + } + results[i]["ANALYZER_TRAFFIC"] = trafficData + } + for _, result := range results { + result["AZ_TRAFFIC"] = azTraffic + } + trafficResult = append(trafficResult, results...) + } + data["TRAFFIC_AZ"] = trafficResult + + return data +} + +func (r *AnalyzerInfo) trafficAnalyzerDebug(orgID int, data map[string]interface{}) map[string]interface{} { + regionToName := make(map[string]string, len(r.dbInfo.Regions)) + for _, region := range r.dbInfo.Regions { + regionToName[region.Lcuuid] = region.Name + } + lcuuidToAz := make(map[string]*mysql.AZ) + for i, az := range r.dbInfo.AZs { + lcuuidToAz[az.Lcuuid] = &r.dbInfo.AZs[i] + } + ipToAzAnalyzerCon := make(map[string][]*mysql.AZAnalyzerConnection) + for i, conn := range r.dbInfo.AZAnalyzerConns { + ipToAzAnalyzerCon[conn.AnalyzerIP] = append( + ipToAzAnalyzerCon[conn.AnalyzerIP], + &r.dbInfo.AZAnalyzerConns[i], + ) + } + analyzerIPToVTaps := make(map[string][]mysql.VTap, len(r.dbInfo.VTaps)) + for _, vtap := range r.dbInfo.VTaps { + analyzerIPToVTaps[vtap.AnalyzerIP] = append(analyzerIPToVTaps[vtap.AnalyzerIP], vtap) + } + vtapNameToTraffic := make(map[string]int64) + for _, item := range r.RegionToVTapNameToTraffic { + for vtapName, traffic := range item { + vtapNameToTraffic[vtapName] = traffic + } + } + + results := make([]map[string]interface{}, len(r.dbInfo.Analyzers)) + for i, analyzer := range r.dbInfo.Analyzers { + results[i] = map[string]interface{}{ + "ANALYZER_IP": analyzer.IP, + "ANALYZER_STATE": analyzer.State, + "AGENT_COUNT": len(analyzerIPToVTaps[analyzer.IP]), + } + + trafficData := int64(0) + for _, vtap := range analyzerIPToVTaps[analyzer.IP] { + trafficData += vtapNameToTraffic[vtap.Name] + } + results[i]["ANALYZER_TRAFFIC"] = trafficData + + azConns, ok := ipToAzAnalyzerCon[analyzer.IP] + if ok && len(azConns) > 0 { + if regionName, ok := regionToName[azConns[0].Region]; ok { + results[i]["REGION"] = fmt.Sprintf("%s(%s)", azConns[0].Region, regionName) + } + } + var azStr string + for _, azConn := range azConns { + if azConn.AZ == "ALL" { + azStr = "ALL" + break + } else { + if azStr == "" { + azStr = fmt.Sprintf("%s(%s)", azConn.AZ, lcuuidToAz[azConn.AZ].Name) + } else { + azStr += fmt.Sprintf(", %s(%s)", azConn.AZ, lcuuidToAz[azConn.AZ].Name) + } + } + } + results[i]["AZ"] = azStr + } + data["TRAFFIC_ANALYZER"] = results + return data +} + +func (r *AnalyzerInfo) generateRebalanceData(db *mysql.DB, dataDuration int) error { + // In automatic balancing, data is not obtained when ifCheckout = false. + if len(r.dbInfo.Analyzers) == 0 { + err := r.dbInfo.Get(db) + if err != nil { + return err + } + } + info := r.dbInfo + if len(info.VTaps) == 0 || len(info.Analyzers) == 0 { + return nil + } + + r.RegionToAZLcuuids = make(map[string][]string) + r.AZToRegion = make(map[string]string, len(info.AZs)) + for _, az := range info.AZs { + r.RegionToAZLcuuids[az.Region] = append(r.RegionToAZLcuuids[az.Region], az.Lcuuid) + r.AZToRegion[az.Lcuuid] = az.Region + } + r.AZToVTaps = make(map[string][]*mysql.VTap) + for i, vtap := range info.VTaps { + r.AZToVTaps[vtap.AZ] = append(r.AZToVTaps[vtap.AZ], &info.VTaps[i]) + } + ipToAnalyzer := make(map[string]*mysql.Analyzer) + for i, analyzer := range info.Analyzers { + ipToAnalyzer[analyzer.IP] = &info.Analyzers[i] + } + r.AZToAnalyzers = GetAZToAnalyzers(info.AZAnalyzerConns, r.RegionToAZLcuuids, ipToAnalyzer) + + if r.RegionToVTapNameToTraffic == nil { + regionToVTapNameToTraffic, err := r.getVTapTraffic(db, dataDuration, r.RegionToAZLcuuids) + if err != nil { + return fmt.Errorf("get traffic data failed: %v", err) + } + for region, vtapNameToTraffic := range r.RegionToVTapNameToTraffic { + log.Infof("ORGID-%d DATABASE-%s region(%s) agent traffic: %#v", db.ORGID, db.Name, region, vtapNameToTraffic) + } + r.RegionToVTapNameToTraffic = regionToVTapNameToTraffic + } + r.dbInfo.AZs = r.dbInfo.AZs + return nil +} + type AZInfo struct { lcuuid string vTapIDToTraffic map[int]int64 diff --git a/server/controller/http/service/vtap.go b/server/controller/http/service/vtap.go index 3ef3f5b3086..1c6b5cbeed4 100644 --- a/server/controller/http/service/vtap.go +++ b/server/controller/http/service/vtap.go @@ -840,7 +840,7 @@ func vtapAnalyzerRebalance(db *mysql.DB, azs []mysql.AZ, ifCheck bool) (*model.V return response, nil } -func VTapRebalance(db *mysql.DB, args map[string]interface{}, cfg monitorconf.IngesterLoadBalancingStrategy) (*model.VTapRebalanceResult, error) { +func VTapRebalance(db *mysql.DB, args map[string]interface{}, cfg monitorconf.IngesterLoadBalancingStrategy) (interface{}, error) { var azs []mysql.AZ hostType := "controller" @@ -848,6 +848,13 @@ func VTapRebalance(db *mysql.DB, args map[string]interface{}, cfg monitorconf.In hostType = argsType.(string) } + if _, ok := args["is_debug"]; ok && hostType == "controller" { + return nil, errors.New("rebalance agent debug only support analyzer type") + } + if _, ok := args["is_debug"]; ok && cfg.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT { + return nil, errors.New("rebalance agent debug algorithm only support by-ingested-data") + } + ifCheck := false if argsCheck, ok := args["check"]; ok { ifCheck = argsCheck.(bool) @@ -858,6 +865,9 @@ func VTapRebalance(db *mysql.DB, args map[string]interface{}, cfg monitorconf.In return vtapControllerRebalance(db, azs, ifCheck) } else { if cfg.Algorithm == common.ANALYZER_ALLOC_BY_INGESTED_DATA { + if _, ok := args["is_debug"]; ok { + return rebalance.NewAnalyzerInfo(false).RebalanceAnalyzerByTrafficDebug(db, cfg.DataDuration) + } return rebalance.NewAnalyzerInfo(false).RebalanceAnalyzerByTraffic(db, ifCheck, cfg.DataDuration) } else if cfg.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT { result, err := vtapAnalyzerRebalance(db, azs, ifCheck)