From dff16fae3aa3753b2f914d0e5ca7f793849b1f21 Mon Sep 17 00:00:00 2001 From: kangxiang Date: Tue, 21 May 2024 17:19:23 +0800 Subject: [PATCH] [Controller] genesis vinterface add team id --- cli/ctl/genesis.go | 3 +- cli/go.mod | 4 +- cli/go.sum | 8 +- message/controller.proto | 1 + .../db/mysql/migration/rawsql/init.sql | 1 + .../mysql/migration/rawsql/issu/6.5.1.35.sql | 35 ++++++ .../controller/db/mysql/migration/version.go | 2 +- server/controller/genesis/common/utils.go | 14 ++- server/controller/genesis/datatypes.go | 1 + server/controller/genesis/genesis.go | 1 + server/controller/genesis/grpc_server.go | 104 ++++++++++-------- server/controller/genesis/updater.go | 2 + server/controller/model/model.go | 1 + 13 files changed, 122 insertions(+), 55 deletions(-) create mode 100644 server/controller/db/mysql/migration/rawsql/issu/6.5.1.35.sql diff --git a/cli/ctl/genesis.go b/cli/ctl/genesis.go index e4530eb4f61..1ac2b7be945 100644 --- a/cli/ctl/genesis.go +++ b/cli/ctl/genesis.go @@ -325,7 +325,7 @@ func tableVip(response *simplejson.Json, table *tablewriter.Table) { } func tableVinterface(response *simplejson.Json, table *tablewriter.Table) { - table.SetHeader([]string{"MAC", "NAME", "TAP_MAC", "TAP_NAME", "IF_TYPE", "DEVICE_TYPE", "DEVICE_NAME", "HOST_IP", "AGENT_ID", "CLUSTER_ID", "NETNS_ID", "IP"}) + table.SetHeader([]string{"MAC", "NAME", "TAP_MAC", "TAP_NAME", "IF_TYPE", "DEVICE_TYPE", "DEVICE_NAME", "HOST_IP", "AGENT_ID", "CLUSTER_ID", "NETNS_ID", "TEAM_ID", "IP"}) tableItems := [][]string{} for i := range response.Get("DATA").MustArray() { @@ -351,6 +351,7 @@ func tableVinterface(response *simplejson.Json, table *tablewriter.Table) { tableItem = append(tableItem, strconv.Itoa(data.Get("VTAP_ID").MustInt())) tableItem = append(tableItem, data.Get("KUBERNETES_CLUSTER_ID").MustString()) tableItem = append(tableItem, strconv.Itoa(data.Get("NETNS_ID").MustInt())) + tableItem = append(tableItem, strconv.Itoa(data.Get("TEAM_ID").MustInt())) tableItem = append(tableItem, ip) tableItems = append(tableItems, tableItem) } diff --git a/cli/go.mod b/cli/go.mod index 68044c9d065..31e23d30037 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -4,8 +4,8 @@ go 1.18 require ( github.com/bitly/go-simplejson v0.5.0 - github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2 - github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac + github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b + github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803 github.com/golang/protobuf v1.5.4 github.com/mattn/go-runewidth v0.0.14 github.com/olekukonko/tablewriter v0.0.5 diff --git a/cli/go.sum b/cli/go.sum index 764112d7386..785aa001cce 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -75,10 +75,10 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2 h1:RaWabWp/uS3/uDCwsK7arzJ7/m5x4LeXC6EXpnl6ylI= -github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= -github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac h1:5gysrosvC5kwK0gwH9R/1dbgeyJRKcQj371eB1tXqCk= -github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac/go.mod h1:EMICsEChD3sF/62DhAsGJ/uDUEJDqEMcZjtanDH+C2o= +github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b h1:dDrPqgS+JP/2tc8zIIO58Xm6ulv8nOnezEpo2wgPSEc= +github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= +github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803 h1:Dh1tTAiMOWrhOVp6idJydvV+hHCZgcfBoGi9tU2mI60= +github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803/go.mod h1:C4Tk88hF3MBM+MexSqbbwI+6UxA9Yo0OJede6ab4pTE= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= diff --git a/message/controller.proto b/message/controller.proto index 74fe27dfcd2..87b6fa41f9c 100644 --- a/message/controller.proto +++ b/message/controller.proto @@ -134,6 +134,7 @@ message GenesisSyncVinterface { optional string last_seen = 14; optional uint32 netns_id = 15; optional string if_type = 16; + optional uint32 team_id = 17; } message GenesisSyncProcess { diff --git a/server/controller/db/mysql/migration/rawsql/init.sql b/server/controller/db/mysql/migration/rawsql/init.sql index 07ee58485a0..a4b9464f64a 100644 --- a/server/controller/db/mysql/migration/rawsql/init.sql +++ b/server/controller/db/mysql/migration/rawsql/init.sql @@ -1861,6 +1861,7 @@ CREATE TABLE IF NOT EXISTS genesis_vinterface ( last_seen DATETIME, vtap_id INTEGER, kubernetes_cluster_id CHAR(64), + team_id INTEGER DEFAULT 1, PRIMARY KEY (`lcuuid`,`vtap_id`, `node_ip`) ) ENGINE=innodb DEFAULT CHARSET=utf8mb4; TRUNCATE TABLE genesis_vinterface; diff --git a/server/controller/db/mysql/migration/rawsql/issu/6.5.1.35.sql b/server/controller/db/mysql/migration/rawsql/issu/6.5.1.35.sql new file mode 100644 index 00000000000..df8e8673333 --- /dev/null +++ b/server/controller/db/mysql/migration/rawsql/issu/6.5.1.35.sql @@ -0,0 +1,35 @@ +-- modify start, add upgrade sql +DROP PROCEDURE IF EXISTS AddColumnIfNotExists; + +CREATE PROCEDURE AddColumnIfNotExists( + IN tableName VARCHAR(255), + IN colName VARCHAR(255), + IN afterCol VARCHAR(255) +) +BEGIN + DECLARE column_count INT; + + -- 检查列是否存在 + SELECT COUNT(*) + INTO column_count + FROM information_schema.columns + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = tableName + AND column_name = colName; + + -- 如果列不存在,则添加列 + IF column_count = 0 THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', colName, ' INTEGER DEFAULT 1 AFTER ', afterCol); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL AddColumnIfNotExists('genesis_vinterface', 'team_id', 'kubernetes_cluster_id'); + +DROP PROCEDURE AddColumnIfNotExists; + +-- update db_version to latest, remeber update DB_VERSION_EXPECT in migrate/init.go +UPDATE db_version SET version='6.5.1.35'; +-- modify end diff --git a/server/controller/db/mysql/migration/version.go b/server/controller/db/mysql/migration/version.go index badd326e8f2..f2ba6ad2ce6 100644 --- a/server/controller/db/mysql/migration/version.go +++ b/server/controller/db/mysql/migration/version.go @@ -18,5 +18,5 @@ package migration const ( DB_VERSION_TABLE = "db_version" - DB_VERSION_EXPECTED = "6.5.1.34" + DB_VERSION_EXPECTED = "6.5.1.35" ) diff --git a/server/controller/genesis/common/utils.go b/server/controller/genesis/common/utils.go index 93e6d635edf..c8a1bbffd1d 100644 --- a/server/controller/genesis/common/utils.go +++ b/server/controller/genesis/common/utils.go @@ -45,6 +45,11 @@ import ( var log = logging.MustGetLogger("genesis.common") +type TeamInfo struct { + OrgID int + TeamId int +} + type VifInfo struct { MaskLen uint32 Address string @@ -499,8 +504,8 @@ func RequestGet(url string, timeout int, queryStrings map[string]string) error { return nil } -func GetTeamIDToOrgID() (map[string]int, error) { - teamIDToOrgID := map[string]int{} +func GetTeamShortLcuuidToInfo() (map[string]TeamInfo, error) { + teamIDToOrgID := map[string]TeamInfo{} orgIDs, err := mysql.GetORGIDs() if err != nil { return teamIDToOrgID, err @@ -518,7 +523,10 @@ func GetTeamIDToOrgID() (map[string]int, error) { continue } for _, team := range teams { - teamIDToOrgID[team.ShortLcuuid] = orgID + teamIDToOrgID[team.ShortLcuuid] = TeamInfo{ + OrgID: orgID, + TeamId: team.TeamID, + } } } return teamIDToOrgID, nil diff --git a/server/controller/genesis/datatypes.go b/server/controller/genesis/datatypes.go index bdbfbe78597..60aba70951f 100644 --- a/server/controller/genesis/datatypes.go +++ b/server/controller/genesis/datatypes.go @@ -33,6 +33,7 @@ import ( type VIFRPCMessage struct { orgID int msgType int + teamID uint32 vtapID uint32 peer string k8sClusterID string diff --git a/server/controller/genesis/genesis.go b/server/controller/genesis/genesis.go index 2fda2d2deef..6e9540af741 100644 --- a/server/controller/genesis/genesis.go +++ b/server/controller/genesis/genesis.go @@ -417,6 +417,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er HostIP: v.GetHostIp(), KubernetesClusterID: v.GetKubernetesClusterId(), NodeIP: v.GetNodeIp(), + TeamID: v.GetTeamId(), LastSeen: vpLastSeen, }) } diff --git a/server/controller/genesis/grpc_server.go b/server/controller/genesis/grpc_server.go index 5f7978d3b59..3020af7deda 100644 --- a/server/controller/genesis/grpc_server.go +++ b/server/controller/genesis/grpc_server.go @@ -48,8 +48,9 @@ func isInterestedHost(tType tridentcommon.TridentType) bool { type TridentStats struct { OrgID int + TeamID int VtapID uint32 - TeamID string + TeamShortLcuuid string IP string Proxy string K8sVersion uint64 @@ -70,7 +71,7 @@ type SynchronizerServer struct { k8sQueue queue.QueueWriter prometheusQueue queue.QueueWriter genesisSyncQueue queue.QueueWriter - teamIDToOrgID sync.Map + teamShortLcuuidToInfo sync.Map clusterIDToVersion sync.Map prometheusClusterIDToVersion sync.Map vtapToVersion sync.Map @@ -128,29 +129,32 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G return &trident.GenesisSyncResponse{Version: &version}, nil } - var orgID int - teamID := request.GetTeamId() - if teamID == "" { + var orgID, teamID int + teamShortLcuuid := request.GetTeamId() + if teamShortLcuuid == "" { orgID = mysqlcommon.DEFAULT_ORG_ID + teamID = mysqlcommon.DEFAULT_TEAM_ID } else { - oID, ok := g.teamIDToOrgID.Load(teamID) - if !ok { - teamIDToOrgID, err := common.GetTeamIDToOrgID() + t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid) + if ok { + orgID = t.(common.TeamInfo).OrgID + teamID = t.(common.TeamInfo).TeamId + } else { + teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo() if err != nil { - log.Errorf("genesis sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error()) + log.Errorf("genesis sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error()) return &trident.GenesisSyncResponse{Version: &version}, nil } - oID, ok := teamIDToOrgID[teamID] + teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid] if !ok { - log.Errorf("genesis sync from %s team_id %s not found organization", remote, teamID) + log.Errorf("genesis sync from %s team_id %s not found team info", remote, teamShortLcuuid) return &trident.GenesisSyncResponse{Version: &version}, nil } - orgID = oID - for k, v := range teamIDToOrgID { - g.teamIDToOrgID.Store(k, v) + orgID = teamInfo.OrgID + teamID = teamInfo.TeamId + for k, v := range teamShortLcuuidToInfo { + g.teamShortLcuuidToInfo.Store(k, v) } - } else { - orgID = oID.(int) } } vtap := fmt.Sprintf("%d-%d", orgID, vtapID) @@ -187,6 +191,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G peer: remote, vtapID: vtapID, orgID: orgID, + teamID: uint32(teamID), msgType: common.TYPE_RENEW, message: request, }, @@ -200,6 +205,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G peer: remote, vtapID: vtapID, orgID: orgID, + teamID: uint32(teamID), k8sClusterID: k8sClusterID, msgType: common.TYPE_UPDATE, message: request, @@ -222,6 +228,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G stats.SyncTridentType = tType stats.SyncLastSeen = time.Now() stats.K8sClusterID = k8sClusterID + stats.TeamShortLcuuid = teamShortLcuuid stats.GenesisSyncProcessDataOperation = request.GetProcessData() stats.GenesisSyncDataOperation = platformData g.tridentStatsMap.Store(vtap, stats) @@ -255,29 +262,32 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri } entries := request.GetEntries() - var orgID int - teamID := request.GetTeamId() - if teamID == "" { + var orgID, teamID int + teamShortLcuuid := request.GetTeamId() + if teamShortLcuuid == "" { orgID = mysqlcommon.DEFAULT_ORG_ID + teamID = mysqlcommon.DEFAULT_TEAM_ID } else { - oID, ok := g.teamIDToOrgID.Load(teamID) - if !ok { - teamIDToOrgID, err := common.GetTeamIDToOrgID() + t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid) + if ok { + orgID = t.(common.TeamInfo).OrgID + teamID = t.(common.TeamInfo).TeamId + } else { + teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo() if err != nil { - log.Errorf("kubernetes api sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error()) + log.Errorf("kubernetes api sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error()) return &trident.KubernetesAPISyncResponse{}, nil } - oID, ok := teamIDToOrgID[teamID] + teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid] if !ok { - log.Errorf("kubernetes api sync from %s team_id %s not found organization", remote, teamID) + log.Errorf("kubernetes api sync %s team_id %s not found team info", remote, teamShortLcuuid) return &trident.KubernetesAPISyncResponse{}, nil } - orgID = oID - for k, v := range teamIDToOrgID { - g.teamIDToOrgID.Store(k, v) + orgID = teamInfo.OrgID + teamID = teamInfo.TeamId + for k, v := range teamShortLcuuidToInfo { + g.teamShortLcuuidToInfo.Store(k, v) } - } else { - orgID = oID.(int) } } vtap := fmt.Sprintf("%d-%d", orgID, vtapID) @@ -296,6 +306,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri stats.K8sClusterID = clusterID stats.K8sLastSeen = time.Now() stats.K8sVersion = version + stats.TeamShortLcuuid = teamShortLcuuid g.tridentStatsMap.Store(vtap, stats) now := time.Now() if vtapID != 0 { @@ -377,29 +388,32 @@ func (g *SynchronizerServer) PrometheusAPISync(ctx context.Context, request *tri } entries := request.GetEntries() - var orgID int - teamID := request.GetTeamId() - if teamID == "" { + var orgID, teamID int + teamShortLcuuid := request.GetTeamId() + if teamShortLcuuid == "" { orgID = mysqlcommon.DEFAULT_ORG_ID + teamID = mysqlcommon.DEFAULT_TEAM_ID } else { - oID, ok := g.teamIDToOrgID.Load(teamID) - if !ok { - teamIDToOrgID, err := common.GetTeamIDToOrgID() + t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid) + if ok { + orgID = t.(common.TeamInfo).OrgID + teamID = t.(common.TeamInfo).TeamId + } else { + teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo() if err != nil { - log.Errorf("prometheus api sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error()) + log.Errorf("prometheus api sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error()) return &trident.PrometheusAPISyncResponse{}, nil } - oID, ok := teamIDToOrgID[teamID] + teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid] if !ok { - log.Errorf("prometheus api sync from %s team_id %s not found organization", remote, teamID) + log.Errorf("prometheus api sync %s team_id %s not found team info", remote, teamShortLcuuid) return &trident.PrometheusAPISyncResponse{}, nil } - orgID = oID - for k, v := range teamIDToOrgID { - g.teamIDToOrgID.Store(k, v) + orgID = teamInfo.OrgID + teamID = teamInfo.TeamId + for k, v := range teamShortLcuuidToInfo { + g.teamShortLcuuidToInfo.Store(k, v) } - } else { - orgID = oID.(int) } } vtap := fmt.Sprintf("%d-%d", orgID, vtapID) @@ -416,6 +430,7 @@ func (g *SynchronizerServer) PrometheusAPISync(ctx context.Context, request *tri stats.TeamID = teamID stats.VtapID = vtapID stats.PrometheusClusterID = clusterID + stats.TeamShortLcuuid = teamShortLcuuid stats.PrometheusLastSeen = time.Now() stats.PrometheusVersion = version g.tridentStatsMap.Store(vtap, stats) @@ -647,6 +662,7 @@ func (g *SynchronizerServer) GenesisSharingSync(ctx context.Context, request *co HostIp: &vData.HostIP, KubernetesClusterId: &vData.KubernetesClusterID, NodeIp: &vData.NodeIP, + TeamId: &vData.TeamID, LastSeen: &vLastSeen, } gSyncVinterfaces = append(gSyncVinterfaces, gVinterface) diff --git a/server/controller/genesis/updater.go b/server/controller/genesis/updater.go index 2e42132690e..8a419800f1f 100644 --- a/server/controller/genesis/updater.go +++ b/server/controller/genesis/updater.go @@ -220,6 +220,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str vIF.LastSeen = epoch vIF.VtapID = vtapID vIF.KubernetesClusterID = k8sClusterID + vIF.TeamID = info.teamID VIFs = append(VIFs, vIF) } } @@ -315,6 +316,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str vIF.LastSeen = epoch vIF.VtapID = vtapID vIF.KubernetesClusterID = k8sClusterID + vIF.TeamID = info.teamID VIFs = append(VIFs, vIF) } return VIFs diff --git a/server/controller/model/model.go b/server/controller/model/model.go index 0c7224cc31a..464ab9f37f1 100644 --- a/server/controller/model/model.go +++ b/server/controller/model/model.go @@ -593,6 +593,7 @@ func (GenesisPort) TableName() string { } type GenesisVinterface struct { + TeamID uint32 `gorm:"column:team_id;type:int;default:1" json:"TEAM_ID"` NetnsID uint32 `gorm:"column:netns_id;type:int unsigned;default:0" json:"NETNS_ID"` VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"` Lcuuid string `gorm:"primaryKey;column:lcuuid;type:char(64)" json:"LCUUID"`