diff --git a/message/trident.proto b/message/trident.proto index 8b0bc35178f..996c6034b7e 100644 --- a/message/trident.proto +++ b/message/trident.proto @@ -10,18 +10,12 @@ service Synchronizer { rpc Push(SyncRequest) returns (stream SyncResponse) {} rpc AnalyzerSync(SyncRequest) returns (SyncResponse) {} rpc Upgrade(UpgradeRequest) returns (stream UpgradeResponse) {} - rpc Query(NtpRequest) returns (NtpResponse) {} rpc GenesisSync(GenesisSyncRequest) returns (GenesisSyncResponse) {} rpc KubernetesAPISync(KubernetesAPISyncRequest) returns (KubernetesAPISyncResponse) {} - rpc GetKubernetesClusterID(KubernetesClusterIDRequest) returns (KubernetesClusterIDResponse) {} - rpc GPIDSync(GPIDSyncRequest) returns (GPIDSyncResponse) {} - rpc ShareGPIDLocalData(ShareGPIDSyncRequests) returns (ShareGPIDSyncRequests) {} - rpc Plugin(PluginRequest) returns (stream PluginResponse) {} rpc GetPrometheusLabelIDs(PrometheusLabelRequest) returns (PrometheusLabelResponse) {} rpc GetPrometheusTargets(PrometheusTargetRequest) returns (PrometheusTargetResponse) {} rpc GetUniversalTagNameMaps(UniversalTagNameMapsRequest) returns (UniversalTagNameMapsResponse) {} // because gRPC cannot be initiated by server, the req/resp of this rpc is reversed - rpc RemoteExecute(stream RemoteExecResponse) returns (stream RemoteExecRequest) {} rpc GetOrgIDs(OrgIDsRequest) returns (OrgIDsResponse) {} } @@ -628,43 +622,12 @@ message UpgradeResponse { optional string k8s_image = 6; // When k8s_image is not empty, ignore content } -message NtpRequest { - optional string ctrl_ip = 1; // 请求端的控制口IP - optional bytes request = 10; // 数据 -} - -message NtpResponse { - optional bytes response = 1; // 数据 -} - message PluginConfig { optional uint32 update_time = 1 [default = 0]; // latest epoch of all configured plugins repeated string wasm_plugins = 2; repeated string so_plugins = 3; } -enum PluginType { - WASM = 1; - SO = 2; -} - -message PluginRequest { - optional string ctrl_ip = 1; - optional string ctrl_mac = 2; - optional PluginType plugin_type = 3; - optional string plugin_name = 4; - optional string team_id = 5; // agent team identity -} - -message PluginResponse { - optional Status status = 1; // 调用是否成功 - optional bytes content = 2; // 数据 - optional string md5 = 3; // 文件MD5 - optional uint64 total_len = 4; // 数据总长 - optional uint32 pkt_count = 5; // 包总个数 - optional uint32 update_time = 6 [default = 0]; // plugin update epoch -} - message GenesisPlatformData { repeated Ip ips = 7; @@ -772,17 +735,6 @@ message KubernetesAPISyncResponse { optional uint64 version = 1; } -message KubernetesClusterIDRequest { - optional string ca_md5 = 1; // md5 of /run/secrets/[kubernetes.io/serviceaccount/ca.crt - optional string team_id = 2; // agent team identity - optional string kubernetes_cluster_name = 46; // get that from deepflow-agent.yaml -} - -message KubernetesClusterIDResponse { - optional string error_msg = 1; - optional string cluster_id = 2; -} - message Pcap { optional uint64 flow_id = 1; optional uint64 start_time = 2; // ns @@ -1059,101 +1011,6 @@ message UniversalTagNameMapsResponse { repeated IdNameMap vtap_map = 14; } -message AgentId { - optional string ip = 1; - optional string mac = 2; - optional string team_id = 3; -} - -enum OutputFormat { - TEXT = 0; - BINARY = 1; -} - -enum CommandType { - LINUX = 0; - KUBERNETES = 1; -} - -enum ParamType { - PF_TEXT = 0; - PF_BOOLEAN = 1; -} - -message CommandParam { - optional string name = 1; - optional string regex = 2; - optional bool required = 3; - optional ParamType param_type = 4; - optional string description = 5; -} - -message RemoteCommand { - optional string cmd = 2; - optional OutputFormat output_format = 4; - optional string ident = 6; - repeated CommandParam params = 7; - optional string type_name = 8; - - optional uint32 id = 1; // deprecated, use `ident` instead - repeated string param_names = 3; // deprecated, use `params` instead - optional CommandType cmd_type = 5; // deprecated, use `type_name` instead -} - -message LinuxNamespace { - optional uint64 id = 1; - optional string ns_type = 2; - optional string user = 3; - optional uint32 pid = 4; - optional string cmd = 5; -} - -message CommandResult { - optional int32 errno = 1; - optional bytes content = 2; - // will only be populated in the last segment - // also used as end of result - optional string md5 = 3; - optional uint64 total_len = 4; - optional uint32 pkt_count = 5; -} - -enum ExecutionType { - LIST_COMMAND = 0; - LIST_NAMESPACE = 1; - RUN_COMMAND = 2; -} - -message Parameter { - optional string key = 1; - optional string value = 2; // accepts [A-Za-z0-9-_] -} - -// message from server to agent -message RemoteExecRequest { - optional uint64 request_id = 1; - - optional ExecutionType exec_type = 2; - repeated Parameter params = 4; // parameters to use in commands - optional uint32 linux_ns_pid = 5; // execute command in agent namespace if null - optional uint32 batch_len = 6 [default = 1048576]; // batch len of command execution results, min 1024 - optional string command_ident = 7; - - optional uint32 command_id = 3; // deprecated, use `command_ident` instead -} - -// message from agent to server -message RemoteExecResponse { - optional AgentId agent_id = 1; - optional uint64 request_id = 2; - - optional string errmsg = 3; - - repeated RemoteCommand commands = 4; - repeated LinuxNamespace linux_namespaces = 5; - optional CommandResult command_result = 6; -} - message OrgIDsRequest {} message OrgIDsResponse { diff --git a/server/controller/controller/controller.go b/server/controller/controller/controller.go index f9fd466ce0d..9ad12a40cd3 100644 --- a/server/controller/controller/controller.go +++ b/server/controller/controller/controller.go @@ -47,7 +47,6 @@ import ( "github.com/deepflowio/deepflow/server/controller/tagrecorder" "github.com/deepflowio/deepflow/server/controller/trisolaris" _ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/agentdebug" - _ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/debug" _ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/healthcheck" _ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/http/cache" _ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/http/upgrade" diff --git a/server/controller/grpc/synchronizer/service.go b/server/controller/grpc/synchronizer/service.go index 2b7a316985b..3bf73f4dd4f 100644 --- a/server/controller/grpc/synchronizer/service.go +++ b/server/controller/grpc/synchronizer/service.go @@ -32,14 +32,10 @@ import ( ) type service struct { - vTapEvent *trisolaris.VTapEvent - tsdbEvent *trisolaris.TSDBEvent - ntpEvent *trisolaris.NTPEvent - upgradeEvent *trisolaris.UpgradeEvent - kubernetesClusterIDEvent *trisolaris.KubernetesClusterIDEvent - processInfoEvent *trisolaris.ProcessInfoEvent - pluginEvent *trisolaris.PluginEvent - prometheusEvent *prometheus.SynchronizerEvent + vTapEvent *trisolaris.VTapEvent + tsdbEvent *trisolaris.TSDBEvent + upgradeEvent *trisolaris.UpgradeEvent + prometheusEvent *prometheus.SynchronizerEvent } func init() { @@ -48,13 +44,10 @@ func init() { func newService() *service { return &service{ - vTapEvent: trisolaris.NewVTapEvent(), - tsdbEvent: trisolaris.NewTSDBEvent(), - ntpEvent: trisolaris.NewNTPEvent(), - upgradeEvent: trisolaris.NewUpgradeEvent(), - processInfoEvent: trisolaris.NewprocessInfoEvent(), - pluginEvent: trisolaris.NewPluginEvent(), - prometheusEvent: prometheus.NewSynchronizerEvent(), + vTapEvent: trisolaris.NewVTapEvent(), + tsdbEvent: trisolaris.NewTSDBEvent(), + upgradeEvent: trisolaris.NewUpgradeEvent(), + prometheusEvent: prometheus.NewSynchronizerEvent(), } } @@ -97,22 +90,6 @@ func (s *service) Upgrade(r *api.UpgradeRequest, in api.Synchronizer_UpgradeServ return s.upgradeEvent.Upgrade(r, in) } -func (s *service) Query(ctx context.Context, in *api.NtpRequest) (*api.NtpResponse, error) { - startTime := time.Now() - defer func() { - statsd.AddGrpcCostStatsd(statsd.Query, int(time.Now().Sub(startTime).Milliseconds())) - }() - return s.ntpEvent.Query(ctx, in) -} - -func (s *service) GetKubernetesClusterID(ctx context.Context, in *api.KubernetesClusterIDRequest) (*api.KubernetesClusterIDResponse, error) { - startTime := time.Now() - defer func() { - statsd.AddGrpcCostStatsd(statsd.GetKubernetesClusterID, int(time.Now().Sub(startTime).Milliseconds())) - }() - return s.kubernetesClusterIDEvent.GetKubernetesClusterID(ctx, in) -} - func (s *service) GenesisSync(ctx context.Context, in *api.GenesisSyncRequest) (*api.GenesisSyncResponse, error) { startTime := time.Now() defer func() { @@ -129,18 +106,6 @@ func (s *service) KubernetesAPISync(ctx context.Context, in *api.KubernetesAPISy return genesis.GenesisService.Synchronizer.KubernetesAPISync(ctx, in) } -func (s *service) GPIDSync(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDSyncResponse, error) { - startTime := time.Now() - defer func() { - statsd.AddGrpcCostStatsd(statsd.GPIDSync, int(time.Now().Sub(startTime).Milliseconds())) - }() - return s.processInfoEvent.GPIDSync(ctx, in) -} - -func (s *service) ShareGPIDLocalData(ctx context.Context, in *api.ShareGPIDSyncRequests) (*api.ShareGPIDSyncRequests, error) { - return s.processInfoEvent.ShareGPIDLocalData(ctx, in) -} - func (s *service) GetPrometheusLabelIDs(ctx context.Context, in *api.PrometheusLabelRequest) (*api.PrometheusLabelResponse, error) { startTime := time.Now() defer func() { @@ -160,18 +125,10 @@ func (s *service) GetPrometheusTargets(ctx context.Context, in *api.PrometheusTa // return resp, err } -func (s *service) Plugin(r *api.PluginRequest, in api.Synchronizer_PluginServer) error { - return s.pluginEvent.Plugin(r, in) -} - func (s *service) GetUniversalTagNameMaps(ctx context.Context, in *api.UniversalTagNameMapsRequest) (*api.UniversalTagNameMapsResponse, error) { return s.tsdbEvent.GetUniversalTagNameMaps(ctx, in) } -func (s *service) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error { - return s.vTapEvent.RemoteExecute(in) -} - func (s *service) GetOrgIDs(ctx context.Context, in *api.OrgIDsRequest) (*api.OrgIDsResponse, error) { return s.tsdbEvent.GetOrgIDs(ctx, in) } diff --git a/server/controller/http/router/vtap/agent_cmd.go b/server/controller/http/router/vtap/agent_cmd.go index 32d6f7615a7..67f794d26c9 100644 --- a/server/controller/http/router/vtap/agent_cmd.go +++ b/server/controller/http/router/vtap/agent_cmd.go @@ -29,7 +29,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" - "github.com/deepflowio/deepflow/message/trident" + agentmessage "github.com/deepflowio/deepflow/message/agent" "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/config" "github.com/deepflowio/deepflow/server/controller/db/metadb" @@ -208,7 +208,7 @@ func (a *AgentCMD) getCMDAndNamespaceHandler() gin.HandlerFunc { userType, _ := c.Get(common.HEADER_KEY_X_USER_TYPE) if !(userType == common.USER_TYPE_SUPER_ADMIN || userType == common.USER_TYPE_ADMIN) { - var cmds []*trident.RemoteCommand + var cmds []*agentmessage.RemoteCommand for _, item := range data.RemoteCommand { _, ok1 := profileCommandMap[*item.Cmd] _, ok2 := probeCommandMap[*item.Cmd] @@ -220,7 +220,7 @@ func (a *AgentCMD) getCMDAndNamespaceHandler() gin.HandlerFunc { } if filterCommandMap, ok := agentCommandMap[AgentCommandType(c.Query("type"))]; ok { - var cmds []*trident.RemoteCommand + var cmds []*agentmessage.RemoteCommand for _, item := range data.RemoteCommand { if item.Cmd == nil { continue @@ -271,8 +271,8 @@ func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc { } } - agentReq := trident.RemoteExecRequest{ - ExecType: trident.ExecutionType_RUN_COMMAND.Enum(), + agentReq := agentmessage.RemoteExecRequest{ + ExecType: agentmessage.ExecutionType_RUN_COMMAND.Enum(), // CommandId: req.CommandId, // deprecated CommandIdent: req.CommandIdent, LinuxNsPid: req.LinuxNsPid, @@ -296,7 +296,7 @@ func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc { return } - if req.OutputFormat.String() == trident.OutputFormat_TEXT.String() { + if req.OutputFormat.String() == agentmessage.OutputFormat_TEXT.String() { JsonResponse(c, content, nil) return } diff --git a/server/controller/http/service/vtap/agent_cmd.go b/server/controller/http/service/vtap/agent_cmd.go index 1efb9a2bfde..ccd3a66b0ee 100644 --- a/server/controller/http/service/vtap/agent_cmd.go +++ b/server/controller/http/service/vtap/agent_cmd.go @@ -23,25 +23,25 @@ import ( "sync" "time" - "github.com/deepflowio/deepflow/message/trident" + agentmessage "github.com/deepflowio/deepflow/message/agent" ctrlcommon "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/db/metadb" metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" ) type RemoteExecReq struct { - trident.RemoteExecRequest + agentmessage.RemoteExecRequest - OutputFormat *trident.OutputFormat `json:"output_format"` // 0: "TEXT", 1: "BINARY" - OutputFilename string `json:"output_filename"` - CMD string `json:"cmd" binding:"required"` + OutputFormat *agentmessage.OutputFormat `json:"output_format"` // 0: "TEXT", 1: "BINARY" + OutputFilename string `json:"output_filename"` + CMD string `json:"cmd" binding:"required"` } type RemoteExecResp struct { - Content string `json:"content,omitempty"` // RUN_COMMAND - ErrorMessage string `json:"-"` - RemoteCommand []*trident.RemoteCommand `json:"remote_commands,omitempty"` // LIST_COMMAND - LinuxNamespace []*trident.LinuxNamespace `json:"linux_namespaces,omitempty"` // LIST_NAMESPACE + Content string `json:"content,omitempty"` // RUN_COMMAND + ErrorMessage string `json:"-"` + RemoteCommand []*agentmessage.RemoteCommand `json:"remote_commands,omitempty"` // LIST_COMMAND + LinuxNamespace []*agentmessage.LinuxNamespace `json:"linux_namespaces,omitempty"` // LIST_NAMESPACE } var ( @@ -85,7 +85,7 @@ func AddToCMDManagerIfNotExist(key string, requestID uint64) *CMDManager { log.Infof("add agent(key:%s) to cmd manager", key) agentCMDManager[key] = &CMDManager{ requestID: requestID, - ExecCH: make(chan *trident.RemoteExecRequest, 1), + ExecCH: make(chan *agentmessage.RemoteExecRequest, 1), requestIDToResp: make(map[uint64]*CMDResp), } @@ -126,7 +126,7 @@ func RemoveAllFromCMDManager(key string) { type CMDManager struct { requestID uint64 - ExecCH chan *trident.RemoteExecRequest + ExecCH chan *agentmessage.RemoteExecRequest requestIDToResp map[uint64]*CMDResp } @@ -188,7 +188,7 @@ func GetRequestID(key string) uint64 { return 0 } -func AppendCommands(key string, requestID uint64, data []*trident.RemoteCommand) { +func AppendCommands(key string, requestID uint64, data []*agentmessage.RemoteCommand) { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { resp.data.RemoteCommand = append(resp.data.RemoteCommand, data...) @@ -196,7 +196,7 @@ func AppendCommands(key string, requestID uint64, data []*trident.RemoteCommand) } } -func InitCommands(key string, requestID uint64, data []*trident.RemoteCommand) { +func InitCommands(key string, requestID uint64, data []*agentmessage.RemoteCommand) { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { resp.data.RemoteCommand = data @@ -204,7 +204,7 @@ func InitCommands(key string, requestID uint64, data []*trident.RemoteCommand) { } } -func AppendNamespaces(key string, requestID uint64, data []*trident.LinuxNamespace) { +func AppendNamespaces(key string, requestID uint64, data []*agentmessage.LinuxNamespace) { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { resp.data.LinuxNamespace = append(resp.data.LinuxNamespace, data...) @@ -212,7 +212,7 @@ func AppendNamespaces(key string, requestID uint64, data []*trident.LinuxNamespa } } -func InitNamespaces(key string, requestID uint64, data []*trident.LinuxNamespace) { +func InitNamespaces(key string, requestID uint64, data []*agentmessage.LinuxNamespace) { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { resp.data.LinuxNamespace = data @@ -258,7 +258,7 @@ func GetContent(key string, requestID uint64) string { return "" } -func GetCommands(key string, requestID uint64) []*trident.RemoteCommand { +func GetCommands(key string, requestID uint64) []*agentmessage.RemoteCommand { agentCMDMutex.RLock() defer agentCMDMutex.RUnlock() if manager, ok := agentCMDManager[key]; ok { @@ -269,7 +269,7 @@ func GetCommands(key string, requestID uint64) []*trident.RemoteCommand { return nil } -func GetCommandsWithoutLock(key string, requestID uint64) []*trident.RemoteCommand { +func GetCommandsWithoutLock(key string, requestID uint64) []*agentmessage.RemoteCommand { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { return resp.data.RemoteCommand @@ -278,7 +278,7 @@ func GetCommandsWithoutLock(key string, requestID uint64) []*trident.RemoteComma return nil } -func GetNamespaces(key string, requestID uint64) []*trident.LinuxNamespace { +func GetNamespaces(key string, requestID uint64) []*agentmessage.LinuxNamespace { agentCMDMutex.RLock() defer agentCMDMutex.RUnlock() if manager, ok := agentCMDManager[key]; ok { @@ -289,7 +289,7 @@ func GetNamespaces(key string, requestID uint64) []*trident.LinuxNamespace { return nil } -func GetNamespacesWithoutLock(key string, requestID uint64) []*trident.LinuxNamespace { +func GetNamespacesWithoutLock(key string, requestID uint64) []*agentmessage.LinuxNamespace { if manager, ok := agentCMDManager[key]; ok { if resp, ok := manager.requestIDToResp[requestID]; ok { return resp.data.LinuxNamespace @@ -319,9 +319,9 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { } defer RemoveAgentCMDResp(key, requestID) - cmdReq := &trident.RemoteExecRequest{ + cmdReq := &agentmessage.RemoteExecRequest{ RequestId: &requestID, - ExecType: trident.ExecutionType_LIST_COMMAND.Enum(), + ExecType: agentmessage.ExecutionType_LIST_COMMAND.Enum(), } manager.ExecCH <- cmdReq @@ -337,7 +337,10 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { return nil, fmt.Errorf("%sagent(key: %s, name: %s) command manager is lost", key, agent.Name) } resp.RemoteCommand = GetCommands(key, requestID) - namespaceReq := &trident.RemoteExecRequest{RequestId: &requestID, ExecType: trident.ExecutionType_LIST_NAMESPACE.Enum()} + namespaceReq := &agentmessage.RemoteExecRequest{ + RequestId: &requestID, + ExecType: agentmessage.ExecutionType_LIST_NAMESPACE.Enum(), + } manager.ExecCH <- namespaceReq case _, ok := <-cmdResp.LinuxNamespaceDoneCH: if !ok { @@ -366,7 +369,7 @@ func GetCMDAndNamespace(timeout, orgID, agentID int) (*RemoteExecResp, error) { } } -func RunAgentCMD(timeout, orgID, agentID int, req *trident.RemoteExecRequest, CMD string) (string, error) { +func RunAgentCMD(timeout, orgID, agentID int, req *agentmessage.RemoteExecRequest, CMD string) (string, error) { serverLog := fmt.Sprintf("The deepflow-server is unable to execute the `%s` command."+ " Detailed error information is as follows:\n\n", CMD) dbInfo, err := metadb.GetDB(orgID) diff --git a/server/controller/trisolaris/services/grpc/debug/agent_cache.go b/server/controller/trisolaris/services/grpc/debug/agent_cache.go deleted file mode 100644 index f62938c8d54..00000000000 --- a/server/controller/trisolaris/services/grpc/debug/agent_cache.go +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - "encoding/json" - "fmt" - - "github.com/deepflowio/deepflow/server/controller/trisolaris/vtap" -) - -type AgentCacheDebug struct { - RawData string `json:"DATA"` -} - -func (a *AgentCacheDebug) Marshal() []byte { - if a == nil { - return nil - } - v, err := json.Marshal(*a) - if err != nil { - log.Error(err) - return nil - } - - return v -} - -func NewAgentCacheDebug(vtapCache *vtap.VTapCache) *AgentCacheDebug { - return &AgentCacheDebug{ - RawData: fmt.Sprintf("%s", vtapCache), - } -} diff --git a/server/controller/trisolaris/services/grpc/debug/service.go b/server/controller/trisolaris/services/grpc/debug/service.go deleted file mode 100644 index f41977748d8..00000000000 --- a/server/controller/trisolaris/services/grpc/debug/service.go +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - api "github.com/deepflowio/deepflow/message/trident" - context "golang.org/x/net/context" - "google.golang.org/grpc" - - grpcserver "github.com/deepflowio/deepflow/server/controller/grpc" - "github.com/deepflowio/deepflow/server/controller/trisolaris" - "github.com/deepflowio/deepflow/server/libs/logger" -) - -var log = logger.MustGetLogger("trisolaris.debug") - -type service struct{} - -func init() { - grpcserver.Add(newService()) -} - -func newService() *service { - return &service{} -} - -func (s *service) Register(gs *grpc.Server) error { - api.RegisterDebugServer(gs, s) - return nil -} - -func (s *service) DebugGPIDGlobalData(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDGlobalData, error) { - teamID := in.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - log.Infof("receive DebugGPIDGlobalLocalData about vtap(ctrl_ip: %s, ctrl_mac: %s, team_id: %s, org_id: %d)", - in.GetCtrlIp(), in.GetCtrlMac(), teamID, orgID) - - processInfo := trisolaris.GetORGVTapInfo(orgID).GetProcessInfo() - if processInfo == nil { - return &api.GPIDGlobalData{}, nil - } - entries := processInfo.GetGlobalEntries() - return &api.GPIDGlobalData{ - Entries: entries, - }, nil -} - -func (s *service) DebugGPIDVTapData(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDVTapData, error) { - teamID := in.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - vtapCacheKey := in.GetCtrlIp() + "-" + in.GetCtrlMac() - vtapCache := trisolaris.GetORGVTapInfo(orgID).GetVTapCache(vtapCacheKey) - if vtapCache == nil { - log.Info("not found vtap(ctrl_ip: %s, ctrl_mac: %s, team_id: %s, org_id: %d) cache", - in.GetCtrlIp(), in.GetCtrlMac(), teamID, orgID) - return &api.GPIDVTapData{}, nil - } - log.Infof("receive DebugGPIDVTapLocalData about vtap(ctrl_ip: %s, ctrl_mac: %s, id: %d, team_id: %s, org_id: %d)", - in.GetCtrlIp(), in.GetCtrlMac(), vtapCache.GetVTapID(), teamID, orgID) - processInfo := trisolaris.GetORGVTapInfo(orgID).GetProcessInfo() - if processInfo == nil { - return &api.GPIDVTapData{}, nil - } - req, updateTime := processInfo.GetAgentGPIDReq(uint32(vtapCache.GetVTapID())) - return &api.GPIDVTapData{ - UpdateTime: &updateTime, - SyncRequest: req, - }, nil -} - -func (s *service) DebugRealGlobalData(ctx context.Context, in *api.GPIDSyncRequest) (*api.RealGlobalData, error) { - teamID := in.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - processInfo := trisolaris.GetORGVTapInfo(orgID).GetProcessInfo() - if processInfo == nil { - return &api.RealGlobalData{}, nil - } - return &api.RealGlobalData{ - Entries: processInfo.GetRealGlobalData(), - }, nil -} - -func (s *service) DebugRIPToVIP(ctx context.Context, in *api.GPIDSyncRequest) (*api.RVData, error) { - teamID := in.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - processInfo := trisolaris.GetORGVTapInfo(orgID).GetProcessInfo() - if processInfo == nil { - return &api.RVData{}, nil - } - return &api.RVData{ - Entries: processInfo.GetRVData(), - }, nil -} - -func (s *service) DebugAgentCache(ctx context.Context, in *api.AgentCacheRequest) (*api.AgentCacheResponse, error) { - teamID := in.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - vtapCacheKey := in.GetCtrlIp() + "-" + in.GetCtrlMac() - vtapCache := trisolaris.GetORGVTapInfo(orgID).GetVTapCache(vtapCacheKey) - if vtapCache == nil { - log.Infof("not found vtap(ctrl_ip: %s, ctrl_mac: %s, team_id: %s, org_id: %d) cache", - in.GetCtrlIp(), in.GetCtrlMac(), teamID, orgID) - return &api.AgentCacheResponse{}, nil - } - agentCacheDebug := NewAgentCacheDebug(vtapCache) - return &api.AgentCacheResponse{ - Content: agentCacheDebug.Marshal(), - }, nil -} diff --git a/server/controller/trisolaris/services/grpc/synchronize/kubernetes.go b/server/controller/trisolaris/services/grpc/synchronize/kubernetes.go deleted file mode 100644 index 0ee708447ea..00000000000 --- a/server/controller/trisolaris/services/grpc/synchronize/kubernetes.go +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - "fmt" - - api "github.com/deepflowio/deepflow/message/trident" - context "golang.org/x/net/context" - - "github.com/deepflowio/deepflow/server/controller/common" - "github.com/deepflowio/deepflow/server/controller/trisolaris" -) - -type KubernetesClusterIDEvent struct { -} - -func NewKubernetesClusterIDEvent() *KubernetesClusterIDEvent { - return &KubernetesClusterIDEvent{} -} - -func (k *KubernetesClusterIDEvent) GetKubernetesClusterID(ctx context.Context, in *api.KubernetesClusterIDRequest) (*api.KubernetesClusterIDResponse, error) { - remote := getRemote(ctx) - log.Infof("call me from ip: %s to get kubernetes cluster_id", remote) - log.Debugf("ca_md5: %#v", in.GetCaMd5()) - - clusterID, err := common.GenerateKuberneteClusterIDByMD5(in.GetCaMd5()) - if err != nil { - errorMsg := err.Error() - log.Error(errorMsg) - return &api.KubernetesClusterIDResponse{ErrorMsg: &errorMsg}, nil - } - if !trisolaris.GetConfig().DomainAutoRegister { - return &api.KubernetesClusterIDResponse{ClusterId: &clusterID}, nil - } - - // cache clusterID & create kubernetes domain - kubernetesInfo := trisolaris.GetGKubernetesInfo(in.GetTeamId()) - if kubernetesInfo == nil { - errorMsg := fmt.Sprintf("failed to get kubernetes info for team_id: %d", in.GetTeamId()) - log.Error(errorMsg) - return &api.KubernetesClusterIDResponse{ErrorMsg: &errorMsg}, nil - } - kubernetesInfo.CacheClusterID(in.GetTeamId(), clusterID, in.GetKubernetesClusterName()) - - log.Infof("response kubernetes cluster_id: %s to ip: %s", clusterID, remote) - log.Debugf("ca_md5: %#v", in.GetCaMd5()) - return &api.KubernetesClusterIDResponse{ClusterId: &clusterID}, nil -} diff --git a/server/controller/trisolaris/services/grpc/synchronize/ntp.go b/server/controller/trisolaris/services/grpc/synchronize/ntp.go deleted file mode 100644 index 861a8a19d07..00000000000 --- a/server/controller/trisolaris/services/grpc/synchronize/ntp.go +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - "net" - "strconv" - "time" - - api "github.com/deepflowio/deepflow/message/trident" - context "golang.org/x/net/context" - - "github.com/deepflowio/deepflow/server/controller/trisolaris" -) - -type NTPEvent struct{} - -func NewNTPEvent() *NTPEvent { - return &NTPEvent{} -} - -var EmptyNtpResponse = &api.NtpResponse{} - -func (e *NTPEvent) Query(ctx context.Context, in *api.NtpRequest) (*api.NtpResponse, error) { - log.Infof("request ntp proxcy from ip: %s", in.GetCtrlIp()) - config := trisolaris.GetConfig() - addr := net.JoinHostPort(config.Chrony.Host, strconv.Itoa(int(config.Chrony.Port))) - udpAddr, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - log.Error(err) - return EmptyNtpResponse, nil - } - conn, err := net.DialUDP("udp", nil, udpAddr) - if err != nil { - log.Error(err) - return EmptyNtpResponse, nil - } - defer conn.Close() - if err = conn.SetDeadline(time.Now().Add(time.Duration(config.Chrony.Timeout) * time.Second)); err != nil { - log.Error(err) - return EmptyNtpResponse, nil - } - request := in.GetRequest() - if request == nil { - log.Errorf("ntp query no request data from ip: %s", in.GetCtrlIp()) - return EmptyNtpResponse, nil - } - _, err = conn.Write(request) - if err != nil { - log.Error("send ntp request failed", err) - return EmptyNtpResponse, nil - } - data := make([]byte, 4096) - n, remoterAddr, err := conn.ReadFromUDP(data) - if err != nil { - log.Error("receive ntp response failed", remoterAddr, err) - return EmptyNtpResponse, nil - } - log.Debug("receive ntp response", remoterAddr, n) - return &api.NtpResponse{ - Response: data[:n], - }, nil -} diff --git a/server/controller/trisolaris/services/grpc/synchronize/plugin.go b/server/controller/trisolaris/services/grpc/synchronize/plugin.go deleted file mode 100644 index 4ce368111f5..00000000000 --- a/server/controller/trisolaris/services/grpc/synchronize/plugin.go +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - "crypto/md5" - "fmt" - "math" - - "github.com/golang/protobuf/proto" - - api "github.com/deepflowio/deepflow/message/trident" - "github.com/deepflowio/deepflow/server/controller/db/metadb" - metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model" - "github.com/deepflowio/deepflow/server/controller/trisolaris" - "github.com/deepflowio/deepflow/server/controller/trisolaris/dbmgr" - "github.com/deepflowio/deepflow/server/libs/logger" -) - -type PluginEvent struct{} - -type PluginData struct { - content []byte - totalLen uint64 - pktCount uint32 - md5Sum string - step uint64 - updateTime uint32 -} - -func NewPluginEvent() *PluginEvent { - return &PluginEvent{} -} - -func (p *PluginEvent) GetPluginData(r *api.PluginRequest, orgID int) (*PluginData, error) { - if r.GetPluginType() == 0 || r.GetPluginName() == "" { - return nil, fmt.Errorf("the plugin request data type(%d) or name(%s) is empty", - r.GetPluginType(), r.GetPluginName()) - } - db, err := metadb.GetDB(orgID) - if err != nil { - return nil, fmt.Errorf("get db failed") - } - pluginDbMgr := dbmgr.DBMgr[metadbmodel.Plugin](db.DB) - plugin, err := pluginDbMgr.GetByOption( - pluginDbMgr.WithName(r.GetPluginName()), - pluginDbMgr.WithType(int(r.GetPluginType())), - ) - if err != nil { - return nil, fmt.Errorf("get plugin(type=%s, name=%s) from db failed, %s", - r.GetPluginType(), r.GetPluginName(), err) - } - content := plugin.Image - totalLen := uint64(len(content)) - step := uint64(1024 * 1024) - pktCount := uint32(math.Ceil(float64(totalLen) / float64(step))) - md5Sum := fmt.Sprintf("%x", md5.Sum(content)) - return &PluginData{ - content: content, - totalLen: totalLen, - pktCount: pktCount, - md5Sum: md5Sum, - step: step, - updateTime: uint32(plugin.UpdatedAt.Unix()), - }, err -} -func sendPluginFailed(in api.Synchronizer_PluginServer) error { - response := &api.PluginResponse{ - Status: &STATUS_FAILED, - } - err := in.Send(response) - if err != nil { - log.Error(err) - } - return err -} - -func (p *PluginEvent) Plugin(r *api.PluginRequest, in api.Synchronizer_PluginServer) error { - vtapCacheKey := r.GetCtrlIp() + "-" + r.GetCtrlMac() - teamID := r.GetTeamId() - orgID := trisolaris.GetOrgIDByTeamID(teamID) - vtapCache := trisolaris.GetORGVTapInfo(orgID).GetVTapCache(vtapCacheKey) - if vtapCache == nil { - log.Errorf("agent(%s team_id=%s ) cache not found", vtapCacheKey, teamID, logger.NewORGPrefix(orgID)) - return sendPluginFailed(in) - } - log.Infof("receive agent(%s team_id=%s) plugin request", vtapCacheKey, teamID, logger.NewORGPrefix(orgID)) - - pluginData, err := p.GetPluginData(r, orgID) - if err != nil { - log.Error(err, logger.NewORGPrefix(orgID)) - return sendPluginFailed(in) - } - for start := uint64(0); start < pluginData.totalLen; start += pluginData.step { - end := start + pluginData.step - if end > pluginData.totalLen { - end = pluginData.totalLen - } - response := &api.PluginResponse{ - Status: &STATUS_SUCCESS, - Content: pluginData.content[start:end], - Md5: proto.String(pluginData.md5Sum), - PktCount: proto.Uint32(pluginData.pktCount), - TotalLen: proto.Uint64(pluginData.totalLen), - UpdateTime: proto.Uint32(pluginData.updateTime), - } - err = in.Send(response) - if err != nil { - log.Errorf("send agent(%s team_id=%s) plugin data faild, err:%s", vtapCacheKey, teamID, err, logger.NewORGPrefix(orgID)) - break - } - } - log.Infof("sending plugin data to agent(%s team_id=%s) completed", vtapCacheKey, teamID, logger.NewORGPrefix(orgID)) - return err -} diff --git a/server/controller/trisolaris/services/grpc/synchronize/process_info.go b/server/controller/trisolaris/services/grpc/synchronize/process_info.go deleted file mode 100644 index 6bab1ac2b5f..00000000000 --- a/server/controller/trisolaris/services/grpc/synchronize/process_info.go +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - context "golang.org/x/net/context" - - api "github.com/deepflowio/deepflow/message/trident" - "github.com/deepflowio/deepflow/server/controller/grpc/statsd" - "github.com/deepflowio/deepflow/server/controller/trisolaris" - "github.com/deepflowio/deepflow/server/libs/logger" -) - -var EmptyGPIDResponse = &api.GPIDSyncResponse{} - -type ProcessInfoEvent struct{} - -func NewprocessInfoEvent() *ProcessInfoEvent { - return &ProcessInfoEvent{} -} - -func (e *ProcessInfoEvent) GPIDSync(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDSyncResponse, error) { - orgID := trisolaris.GetOrgIDByTeamID(in.GetTeamId()) - gVTapInfo := trisolaris.GetORGVTapInfo(orgID) - if gVTapInfo == nil { - return EmptyGPIDResponse, nil - } - processInfo := gVTapInfo.GetProcessInfo() - if in.GetVtapId() == 0 { - vtapCacheKey := in.GetCtrlIp() + "-" + in.GetCtrlMac() - vtapCache := gVTapInfo.GetVTapCache(vtapCacheKey) - if vtapCache != nil { - log.Infof("receive debug gpid sync data by vtap(ctrl_ip: %s, ctrl_mac: %s vtap_id: %d team_id: %s)", - in.GetCtrlIp(), in.GetCtrlMac(), vtapCache.GetVTapID(), in.GetTeamId(), logger.NewORGPrefix(orgID)) - return processInfo.GetGPIDResponseByVtapID(vtapCache.GetVTapID()), nil - } - log.Infof("receive invalid gpid sync data from vtap(ctrl_ip: %s, ctrl_mac: %s team_id: %s), because vtap_id=%d(vtap is not registered)", - in.GetCtrlIp(), in.GetCtrlMac(), in.GetTeamId(), in.GetVtapId(), logger.NewORGPrefix(orgID)) - - return EmptyGPIDResponse, nil - } - - statsd.AddGPIDReceiveCounter(uint64(len(in.GetEntries()))) - - log.Infof("receive gpid sync data from vtap(ctrl_ip: %s, ctrl_mac: %s, vtap_id: %d, team_id: %s) data_len: %d", - in.GetCtrlIp(), in.GetCtrlMac(), in.GetVtapId(), in.GetTeamId(), len(in.GetEntries()), logger.NewORGPrefix(orgID)) - processInfo.UpdateAgentGPIDReq(in) - resp := processInfo.GetGPIDResponseByReq(in) - log.Infof("send gpid response data(len=%d) to vtap(ctrl_ip: %s, ctrl_mac: %s, vtap_id: %d, team_id: %s)", - len(resp.GetEntries()), in.GetCtrlIp(), in.GetCtrlMac(), in.GetVtapId(), in.GetTeamId(), logger.NewORGPrefix(orgID)) - statsd.AddGPIDSendCounter(uint64(len(resp.GetEntries()))) - return resp, nil -} - -func (e *ProcessInfoEvent) ShareGPIDLocalData(ctx context.Context, in *api.ShareGPIDSyncRequests) (*api.ShareGPIDSyncRequests, error) { - log.Infof("receive gpid sync data from server(%s)", in.GetServerIp(), logger.NewORGPrefix(int(in.GetOrgId()))) - processInfo := trisolaris.GetORGVTapInfo(int(in.GetOrgId())).GetProcessInfo() - if processInfo == nil { - return &api.ShareGPIDSyncRequests{}, nil - } - processInfo.UpdateGPIDReqFromShare(in) - shareData := processInfo.GetGPIDShareReqs() - if shareData == nil { - shareData = &api.ShareGPIDSyncRequests{} - } - return shareData, nil -} diff --git a/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go b/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go deleted file mode 100644 index 5db764b34e3..00000000000 --- a/server/controller/trisolaris/services/grpc/synchronize/remote_execute.go +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package synchronize - -import ( - "context" - "encoding/json" - "fmt" - "io" - "runtime" - "sync" - "time" - - api "github.com/deepflowio/deepflow/message/trident" - service "github.com/deepflowio/deepflow/server/controller/http/service/vtap" - "google.golang.org/protobuf/proto" -) - -const ( - CMD_INACTIVITY_TIMEOUT = 1 * time.Minute -) - -func (e *VTapEvent) RemoteExecute(stream api.Synchronizer_RemoteExecuteServer) error { - key := "" - isFisrtRecv := false - var wg sync.WaitGroup - wg.Add(1) - defer func() { - wg.Wait() - service.RemoveAllFromCMDManager(key) - }() - - var manager *service.CMDManager - initDone := make(chan struct{}) - - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() - - errCH := make(chan error, 1) - - go func() { - defer func() { - log.Infof("agent(key: %s) remote exec stream receive goroutine done", key) - wg.Done() - if r := recover(); r != nil { - buf := make([]byte, 2048) - n := runtime.Stack(buf, false) - errMsg := fmt.Sprintf("recovered in RemoteExecute: %s", buf[:n]) - log.Errorf(errMsg) - errCH <- fmt.Errorf(errMsg) - } - }() - - inactivityTimer := time.NewTimer(CMD_INACTIVITY_TIMEOUT) - defer inactivityTimer.Stop() - - for { - select { - case <-ctx.Done(): - log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) - return - case <-inactivityTimer.C: - errMsg := fmt.Errorf("no message received for %vs, closing connection for agent(key: %s)", - CMD_INACTIVITY_TIMEOUT.Seconds(), key) - log.Error(errMsg) - errCH <- errMsg - return - default: - resp, err := stream.Recv() - // Handle any errors that occur during stream reception - // if server restart, an io.EOF error may be received - if err == io.EOF { - log.Errorf("agent(key: %s) command stream error: %v", key, err) - errCH <- err - return - } - // Attempt to stop the inactivity timer - if !inactivityTimer.Stop() { - // If the timer has already expired, drain the channel - <-inactivityTimer.C - } - // Reset the inactivity timer to its original duration - inactivityTimer.Reset(CMD_INACTIVITY_TIMEOUT) - - if resp == nil { - continue - } - log.Debugf("agent command response: %s", resp.String()) - if resp.AgentId == nil { - log.Warningf("recevie agent info from remote command is nil") - continue - } - key = resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() - if !isFisrtRecv { - isFisrtRecv = true - log.Infof("agent(key: %s) call RemoteExecute", key) - } - if manager == nil { - log.Infof("agent(key: %s) remote exec map not found, add to cmd manager", key) - manager = service.AddToCMDManagerIfNotExist(key, uint64(1)) - initDone <- struct{}{} - } - - service.AgentCommandLock() - manager = service.GetAgentCMDManagerWithoutLock(key) - if manager == nil { - log.Errorf("agent(key: %s) remote exec map not found", key) - service.AgentCommandUnlock() - continue - } - - // heartbeat - if resp.CommandResult == nil && resp.LinuxNamespaces == nil && - resp.Commands == nil && resp.Errmsg == nil { - log.Infof("agent heart beat command response: %s", resp.String()) - manager.ExecCH <- &api.RemoteExecRequest{RequestId: proto.Uint64(0)} - service.AgentCommandUnlock() - continue - } - - if err != nil { - err := fmt.Errorf("agent(key: %s) command stream error: %v", key, err) - log.Error(err) - service.AgentCommandUnlock() - continue - } - - handleResponse(resp) - service.AgentCommandUnlock() - } - } - }() - - <-initDone - log.Infof("agent(key: %s) init done", key) - if manager == nil { - err := fmt.Errorf("get agent(key: %s) remote exec manager nil", key) - log.Error(err) - return err - } - for { - select { - case <-ctx.Done(): - log.Infof("context done, agent(key: %s), context err: %v", key, ctx.Err()) - return ctx.Err() - case err := <-errCH: - log.Error(err) - return err - case req, ok := <-manager.ExecCH: - if !ok { - err := fmt.Errorf("agent(key: %s) exec channel is closed", key) - log.Error(err) - return err - } - b, _ := json.Marshal(req) - log.Infof("agent(key: %s) request: %s", key, string(b)) - if err := stream.Send(req); err != nil { - log.Errorf("send cmd to agent error: %s, req: %#v", err.Error(), req) - return err - } - } - } -} - -func handleResponse(resp *api.RemoteExecResponse) { - key := resp.AgentId.GetIp() + "-" + resp.AgentId.GetMac() - if resp.RequestId == nil { - log.Errorf("agent(key: %s) command resp request id not found", key, resp.RequestId) - return - } - cmdResp := service.GetAgentCMDRespWithoutLock(key, *resp.RequestId) - if cmdResp == nil { - log.Errorf("agent(key: %s, request id: %v) remote exec map not found", key, resp.RequestId) - return - } - - b, _ := json.Marshal(resp) - log.Infof("agent(key: %s) resp: %s", key, string(b)) - - switch { - case resp.Errmsg != nil: - log.Errorf("agent(key: %s) run command error: %s", - key, *resp.Errmsg) - service.AppendErrorMessage(key, *resp.RequestId, resp.Errmsg) - - result := resp.CommandResult - // get commands and linux namespace error - if result == nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } - if result.Content == nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } - - // run command error and handle content - if result.Content != nil { - service.AppendContent(key, *resp.RequestId, result.Content) - } - if result.Md5 != nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } - return - case len(resp.LinuxNamespaces) > 0: - if len(service.GetNamespacesWithoutLock(key, *resp.RequestId)) > 0 { - service.InitNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) - } else { - service.AppendNamespaces(key, *resp.RequestId, resp.LinuxNamespaces) - } - cmdResp.LinuxNamespaceDoneCH <- struct{}{} - case len(resp.Commands) > 0: - if len(service.GetCommandsWithoutLock(key, *resp.RequestId)) > 0 { - service.InitCommands(key, *resp.RequestId, resp.Commands) - } else { - service.AppendCommands(key, *resp.RequestId, resp.Commands) - } - cmdResp.RemoteCMDDoneCH <- struct{}{} - default: - result := resp.CommandResult - if resp.CommandResult == nil { - return - } - - if result.Content != nil { - service.AppendContent(key, *resp.RequestId, result.Content) - } - if result.Md5 != nil { - cmdResp.ExecDoneCH <- struct{}{} - return - } - } -} diff --git a/server/controller/trisolaris/vtap/process_info.go b/server/controller/trisolaris/vtap/process_info.go index 6921312907c..78f92ed0ae0 100644 --- a/server/controller/trisolaris/vtap/process_info.go +++ b/server/controller/trisolaris/vtap/process_info.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "gorm.io/gorm" - "github.com/deepflowio/deepflow/message/trident" + agentmessage "github.com/deepflowio/deepflow/message/agent" . "github.com/deepflowio/deepflow/server/controller/common" models "github.com/deepflowio/deepflow/server/controller/db/metadb/model" "github.com/deepflowio/deepflow/server/controller/trisolaris/config" @@ -146,12 +146,12 @@ func getAgentIdPortNetnsIndex(value uint64) (agentId, port, netnsIndex uint32) { return } -func convertProto(proto string) trident.ServiceProtocol { +func convertProto(proto string) agentmessage.ServiceProtocol { switch proto { case TCP_PROTO_STR: - return trident.ServiceProtocol_TCP_SERVICE + return agentmessage.ServiceProtocol_TCP_SERVICE case UDP_PROTO_STR: - return trident.ServiceProtocol_UDP_SERVICE + return agentmessage.ServiceProtocol_UDP_SERVICE } return 0 @@ -192,14 +192,14 @@ func isLoopbackIP(ip uint32) bool { return false } -func (d EntryData) getAggregateMap(entry *trident.GPIDSyncEntry) *U128IDMap { +func (d EntryData) getAggregateMap(entry *agentmessage.GPIDSyncEntry) *U128IDMap { protocol := entry.GetProtocol() ip := entry.GetIpv4_1() serviceIndex := MAX_SERVICE_TYPE switch { - case protocol == trident.ServiceProtocol_TCP_SERVICE: + case protocol == agentmessage.ServiceProtocol_TCP_SERVICE: serviceIndex = TCPService - case protocol == trident.ServiceProtocol_UDP_SERVICE: + case protocol == agentmessage.ServiceProtocol_UDP_SERVICE: serviceIndex = UDPService } if serviceIndex == MAX_SERVICE_TYPE { @@ -214,7 +214,7 @@ func (d EntryData) getAggregateMap(entry *trident.GPIDSyncEntry) *U128IDMap { return d[serviceIndex][ipIndex] } -func (d EntryData) addData(agentId uint32, entry *trident.GPIDSyncEntry, p *ProcessInfo) { +func (d EntryData) addData(agentId uint32, entry *agentmessage.GPIDSyncEntry, p *ProcessInfo) { aggregateMap := d.getAggregateMap(entry) if aggregateMap == nil { return @@ -245,7 +245,7 @@ func (d EntryData) addData(agentId uint32, entry *trident.GPIDSyncEntry, p *Proc } } -func (d EntryData) getData(agentId uint32, entry *trident.GPIDSyncEntry, p *ProcessInfo) *PidPair { +func (d EntryData) getData(agentId uint32, entry *agentmessage.GPIDSyncEntry, p *ProcessInfo) *PidPair { aggregateMap := d.getAggregateMap(entry) if aggregateMap == nil { return nil @@ -258,16 +258,16 @@ func (d EntryData) getData(agentId uint32, entry *trident.GPIDSyncEntry, p *Proc return value.(*PidPair) } -func (e EntryData) getGPIDGlobalData(p *ProcessInfo) []*trident.GlobalGPIDEntry { +func (e EntryData) getGPIDGlobalData(p *ProcessInfo) []*agentmessage.GlobalGPIDEntry { - allData := []*trident.GlobalGPIDEntry{} + allData := []*agentmessage.GlobalGPIDEntry{} for _, serviceIndex := range serviceTypes { - var protocol trident.ServiceProtocol + var protocol agentmessage.ServiceProtocol switch serviceIndex { case TCPService: - protocol = trident.ServiceProtocol_TCP_SERVICE + protocol = agentmessage.ServiceProtocol_TCP_SERVICE case UDPService: - protocol = trident.ServiceProtocol_UDP_SERVICE + protocol = agentmessage.ServiceProtocol_UDP_SERVICE } if serviceIndex >= MAX_SERVICE_TYPE { break @@ -295,7 +295,7 @@ func (e EntryData) getGPIDGlobalData(p *ProcessInfo) []*trident.GlobalGPIDEntry pid0, agentId0, pid1, agentId1 := realValue.getData() gpid0 := p.agentIdAndPIDToGPID.getData(agentId0, pid0) gpid1 := p.agentIdAndPIDToGPID.getData(agentId1, pid1) - entry := &trident.GlobalGPIDEntry{ + entry := &agentmessage.GlobalGPIDEntry{ Protocol: &protocol, AgentId_1: &agentId1, EpcId_1: &epcId1, @@ -321,17 +321,17 @@ func (e EntryData) getGPIDGlobalData(p *ProcessInfo) []*trident.GlobalGPIDEntry type CacheReq struct { updateTime time.Time - req *trident.GPIDSyncRequest + req *agentmessage.GPIDSyncRequest } -func NewCacheReq(req *trident.GPIDSyncRequest) *CacheReq { +func NewCacheReq(req *agentmessage.GPIDSyncRequest) *CacheReq { return &CacheReq{ updateTime: time.Now(), req: req, } } -func (c *CacheReq) getReq() *trident.GPIDSyncRequest { +func (c *CacheReq) getReq() *agentmessage.GPIDSyncRequest { if c == nil { return nil } @@ -377,12 +377,12 @@ func (r *AgentIDToReq) getSetIntKeys() mapset.Set { return keys } -func (r *AgentIDToReq) updateReq(req *trident.GPIDSyncRequest) { +func (r *AgentIDToReq) updateReq(req *agentmessage.GPIDSyncRequest) { if req == nil { return } r.Lock() - r.idToReq[req.GetVtapId()] = NewCacheReq(req) + r.idToReq[req.GetAgentId()] = NewCacheReq(req) r.Unlock() } @@ -392,7 +392,7 @@ func (r *AgentIDToReq) updateCacheReq(cacheReq *CacheReq) { } r.Lock() - r.idToReq[cacheReq.req.GetVtapId()] = cacheReq + r.idToReq[cacheReq.req.GetAgentId()] = cacheReq r.Unlock() } @@ -403,7 +403,7 @@ func (r *AgentIDToReq) getCacheReq(agentId uint32) *CacheReq { return cacheReq } -func (r *AgentIDToReq) getReq(agentId uint32) *trident.GPIDSyncRequest { +func (r *AgentIDToReq) getReq(agentId uint32) *agentmessage.GPIDSyncRequest { r.RLock() cacheReq := r.idToReq[agentId] r.RUnlock() @@ -474,12 +474,12 @@ func NewRVData() RVData { return rvData } -func (r RVData) getRVmap(protocol trident.ServiceProtocol) RipToVipMap { +func (r RVData) getRVmap(protocol agentmessage.ServiceProtocol) RipToVipMap { serviceIndex := MAX_SERVICE_TYPE switch { - case protocol == trident.ServiceProtocol_TCP_SERVICE: + case protocol == agentmessage.ServiceProtocol_TCP_SERVICE: serviceIndex = TCPService - case protocol == trident.ServiceProtocol_UDP_SERVICE: + case protocol == agentmessage.ServiceProtocol_UDP_SERVICE: serviceIndex = UDPService } if serviceIndex == MAX_SERVICE_TYPE { @@ -489,7 +489,7 @@ func (r RVData) getRVmap(protocol trident.ServiceProtocol) RipToVipMap { return r[serviceIndex] } -func (r RVData) addData(epcId, rIp, rPort, vIp, vPort uint32, protocol trident.ServiceProtocol) { +func (r RVData) addData(epcId, rIp, rPort, vIp, vPort uint32, protocol agentmessage.ServiceProtocol) { rvMap := r.getRVmap(protocol) if rvMap == nil { return @@ -497,7 +497,7 @@ func (r RVData) addData(epcId, rIp, rPort, vIp, vPort uint32, protocol trident.S rvMap.addData(epcId, rIp, rPort, vIp, vPort) } -func (r RVData) getvIp(rEpcId, rIp, rPort uint32, protocol trident.ServiceProtocol) (vIp, vport uint32) { +func (r RVData) getvIp(rEpcId, rIp, rPort uint32, protocol agentmessage.ServiceProtocol) (vIp, vport uint32) { rvMap := r.getRVmap(protocol) if rvMap == nil { return @@ -506,15 +506,15 @@ func (r RVData) getvIp(rEpcId, rIp, rPort uint32, protocol trident.ServiceProtoc return } -func (r RVData) getDebugData() []*trident.RipToVip { - allData := []*trident.RipToVip{} +func (r RVData) getDebugData() []*agentmessage.RipToVip { + allData := []*agentmessage.RipToVip{} for _, serviceIndex := range serviceTypes { - var protocol trident.ServiceProtocol + var protocol agentmessage.ServiceProtocol switch serviceIndex { case TCPService: - protocol = trident.ServiceProtocol_TCP_SERVICE + protocol = agentmessage.ServiceProtocol_TCP_SERVICE case UDPService: - protocol = trident.ServiceProtocol_UDP_SERVICE + protocol = agentmessage.ServiceProtocol_UDP_SERVICE } if serviceIndex >= MAX_SERVICE_TYPE { break @@ -522,7 +522,7 @@ func (r RVData) getDebugData() []*trident.RipToVip { for key, value := range r[serviceIndex] { epcId, rport, rip := getEpcIdPortIP(key) _, vport, vIp := getEpcIdPortIP(value) - entry := &trident.RipToVip{ + entry := &agentmessage.RipToVip{ Protocol: &protocol, EpcId: &epcId, RIpv4: &rip, @@ -585,8 +585,8 @@ func NewProcessInfo(db *gorm.DB, cfg *config.Config, orgID int) *ProcessInfo { } } -func (p *ProcessInfo) GetRealGlobalData() []*trident.RealClientToRealServer { - data := make([]*trident.RealClientToRealServer, 0, p.realClientToRealServer.Size()) +func (p *ProcessInfo) GetRealGlobalData() []*agentmessage.RealClientToRealServer { + data := make([]*agentmessage.RealClientToRealServer, 0, p.realClientToRealServer.Size()) for keyValue := range p.realClientToRealServer.Iter() { key0, key1, value := keyValue.GetData() @@ -597,7 +597,7 @@ func (p *ProcessInfo) GetRealGlobalData() []*trident.RealClientToRealServer { continue } agentIdReal, epcIdReal, portReal, ipReal, pidReal := realValue.getData() - etnry := &trident.RealClientToRealServer{ + etnry := &agentmessage.RealClientToRealServer{ EpcId_1: &epcId1, Ipv4_1: &ip1, Port_1: &port1, @@ -616,11 +616,11 @@ func (p *ProcessInfo) GetRealGlobalData() []*trident.RealClientToRealServer { return data } -func (p *ProcessInfo) GetRVData() []*trident.RipToVip { +func (p *ProcessInfo) GetRVData() []*agentmessage.RipToVip { return p.rvData.getDebugData() } -func (p *ProcessInfo) getKey(agentId uint32, entry *trident.GPIDSyncEntry) (key0, key1 uint64) { +func (p *ProcessInfo) getKey(agentId uint32, entry *agentmessage.GPIDSyncEntry) (key0, key1 uint64) { if isLoopbackIP(entry.GetIpv4_1()) { netnsIndex := entry.GetNetnsIdx() key0 = generateLoopbackKey(agentId, entry.GetPort_0(), netnsIndex) @@ -630,7 +630,7 @@ func (p *ProcessInfo) getKey(agentId uint32, entry *trident.GPIDSyncEntry) (key0 // server // If there is a real client, use the real client ip/port instead of the client ip/port // Use the server ip/port to query the load balancing RIP>vIp mapping table on the controller and convert it to vIp/vport - if entry.GetPid_1() > 0 && entry.GetIpv4Real() > 0 && entry.GetRoleReal() == trident.RoleType_ROLE_CLIENT { + if entry.GetPid_1() > 0 && entry.GetIpv4Real() > 0 && entry.GetRoleReal() == agentmessage.RoleType_ROLE_CLIENT { key0 = generateEPKey(entry.GetEpcIdReal(), entry.GetPortReal(), entry.GetIpv4Real()) rEpcId, rPort, rIpv4 := entry.GetEpcId_1(), entry.GetPort_1(), entry.GetIpv4_1() vIpv4, vPort := p.rvData.getvIp(rEpcId, rIpv4, rPort, entry.GetProtocol()) @@ -646,8 +646,8 @@ func (p *ProcessInfo) getKey(agentId uint32, entry *trident.GPIDSyncEntry) (key0 return } -func (p *ProcessInfo) addRealData(agentId uint32, entry *trident.GPIDSyncEntry, toRS *U128IDMap) { - if entry.GetPid_1() > 0 && entry.GetIpv4Real() > 0 && entry.GetRoleReal() == trident.RoleType_ROLE_CLIENT { +func (p *ProcessInfo) addRealData(agentId uint32, entry *agentmessage.GPIDSyncEntry, toRS *U128IDMap) { + if entry.GetPid_1() > 0 && entry.GetIpv4Real() > 0 && entry.GetRoleReal() == agentmessage.RoleType_ROLE_CLIENT { key0, key1 := p.getKey(agentId, entry) value := &RealServerData{ epcIdReal: entry.GetEpcId_1(), @@ -660,7 +660,7 @@ func (p *ProcessInfo) addRealData(agentId uint32, entry *trident.GPIDSyncEntry, } } -func (p *ProcessInfo) getRealData(agentId uint32, entry *trident.GPIDSyncEntry) *RealServerData { +func (p *ProcessInfo) getRealData(agentId uint32, entry *agentmessage.GPIDSyncEntry) *RealServerData { key0, key1 := p.getKey(agentId, entry) realData, ok := p.realClientToRealServer.Get(key0, key1) if ok { @@ -670,11 +670,11 @@ func (p *ProcessInfo) getRealData(agentId uint32, entry *trident.GPIDSyncEntry) return nil } -func (p *ProcessInfo) UpdateAgentGPIDReq(req *trident.GPIDSyncRequest) { +func (p *ProcessInfo) UpdateAgentGPIDReq(req *agentmessage.GPIDSyncRequest) { p.sendGPIDReq.updateReq(req) } -func (p *ProcessInfo) GetAgentGPIDReq(agentId uint32) (*trident.GPIDSyncRequest, uint32) { +func (p *ProcessInfo) GetAgentGPIDReq(agentId uint32) (*agentmessage.GPIDSyncRequest, uint32) { cacheReq := p.sendGPIDReq.getCacheReq(agentId) if cacheReq == nil { localReq := p.agentIdToLocalGPIDReq.getCacheReq(agentId) @@ -697,21 +697,21 @@ func (p *ProcessInfo) GetAgentGPIDReq(agentId uint32) (*trident.GPIDSyncRequest, return cacheReq.getReq(), uint32(cacheReq.getUpdateTime()) } -func (p *ProcessInfo) UpdateGPIDReqFromShare(shareReq *trident.ShareGPIDSyncRequests) { +func (p *ProcessInfo) UpdateGPIDReqFromShare(shareReq *agentmessage.ShareGPIDSyncRequests) { for _, req := range shareReq.GetSyncRequests() { p.agentIdToShareGPIDReq.updateReq(req) } } -func (p *ProcessInfo) GetGPIDShareReqs() *trident.ShareGPIDSyncRequests { +func (p *ProcessInfo) GetGPIDShareReqs() *agentmessage.ShareGPIDSyncRequests { reqs := p.sendGPIDReq.getAllReqAndClear() - shareSyncReqs := make([]*trident.GPIDSyncRequest, 0, len(reqs)) + shareSyncReqs := make([]*agentmessage.GPIDSyncRequest, 0, len(reqs)) for _, req := range reqs { p.agentIdToLocalGPIDReq.updateCacheReq(req) shareSyncReqs = append(shareSyncReqs, req.getReq()) } if len(shareSyncReqs) > 0 { - return &trident.ShareGPIDSyncRequests{ + return &agentmessage.ShareGPIDSyncRequests{ ServerIp: proto.String(p.config.NodeIP), SyncRequests: shareSyncReqs, OrgId: proto.Uint32(uint32(p.ORGID)), @@ -728,7 +728,7 @@ func (p *ProcessInfo) updateRealClientToRealServer(data *U128IDMap) { p.realClientToRealServer = data } -func (p *ProcessInfo) GetGlobalEntries() []*trident.GlobalGPIDEntry { +func (p *ProcessInfo) GetGlobalEntries() []*agentmessage.GlobalGPIDEntry { return p.globalLocalEntries.getGPIDGlobalData(p) } @@ -858,26 +858,26 @@ func (p *ProcessInfo) getRIPToVIPFromDB() { p.rvData = rvData } -func (p *ProcessInfo) GetGPIDResponseByVtapID(agentId uint32) *trident.GPIDSyncResponse { +func (p *ProcessInfo) GetGPIDResponseByVtapID(agentId uint32) *agentmessage.GPIDSyncResponse { req, _ := p.GetAgentGPIDReq(agentId) return p.GetGPIDResponseByReq(req) } -func (p *ProcessInfo) GetGPIDResponseByReq(req *trident.GPIDSyncRequest) *trident.GPIDSyncResponse { +func (p *ProcessInfo) GetGPIDResponseByReq(req *agentmessage.GPIDSyncRequest) *agentmessage.GPIDSyncResponse { if req == nil { - return &trident.GPIDSyncResponse{} + return &agentmessage.GPIDSyncResponse{} } entries := req.GetEntries() if len(entries) == 0 { - return &trident.GPIDSyncResponse{} + return &agentmessage.GPIDSyncResponse{} } - agentId := req.GetVtapId() - responseEntries := make([]*trident.GPIDSyncEntry, 0, len(entries)) + agentId := req.GetAgentId() + responseEntries := make([]*agentmessage.GPIDSyncEntry, 0, len(entries)) for _, entry := range entries { netnsIndex := entry.GetNetnsIdx() roleReal := entry.GetRoleReal() protocol := entry.GetProtocol() - responseEntry := &trident.GPIDSyncEntry{ + responseEntry := &agentmessage.GPIDSyncEntry{ Protocol: &protocol, RoleReal: &roleReal, EpcId_1: proto.Uint32(entry.GetEpcId_1()), @@ -925,7 +925,7 @@ func (p *ProcessInfo) GetGPIDResponseByReq(req *trident.GPIDSyncRequest) *triden realServerData := p.getRealData(agentId, entry) if realServerData != nil { agentIdReal, epcIdReal, portReal, ipv4Real, pidReal := realServerData.getData() - role := trident.RoleType_ROLE_SERVER + role := agentmessage.RoleType_ROLE_SERVER responseEntry.EpcIdReal = &epcIdReal responseEntry.Ipv4Real = &ipv4Real responseEntry.PortReal = &portReal @@ -939,7 +939,7 @@ func (p *ProcessInfo) GetGPIDResponseByReq(req *trident.GPIDSyncRequest) *triden responseEntry.PidReal = &gpidReal responseEntries = append(responseEntries, responseEntry) } - return &trident.GPIDSyncResponse{Entries: responseEntries} + return &agentmessage.GPIDSyncResponse{Entries: responseEntries} } func (p *ProcessInfo) DeleteAgentExpiredData(dbAgentIDs mapset.Set) { @@ -1032,7 +1032,7 @@ func (p *ProcessInfo) sendLocalShareEntryData() { log.Infof(p.Logf("server(%s) send local share req data to server(%s)", p.config.NodeIP, conn.Target())) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - client := trident.NewSynchronizerClient(conn) + client := agentmessage.NewSynchronizerClient(conn) response, err := client.ShareGPIDLocalData(ctx, shareReqs) if err != nil { log.Error(err)