Skip to content

Commit

Permalink
feat: discard the old configuration - 2
Browse files Browse the repository at this point in the history
  • Loading branch information
SongZhen0704 authored and rvql committed Jan 23, 2025
1 parent fb11d81 commit 9a4d108
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 1,051 deletions.
143 changes: 0 additions & 143 deletions message/trident.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,12 @@ service Synchronizer {
rpc Push(SyncRequest) returns (stream SyncResponse) {}
rpc AnalyzerSync(SyncRequest) returns (SyncResponse) {}
rpc Upgrade(UpgradeRequest) returns (stream UpgradeResponse) {}
rpc Query(NtpRequest) returns (NtpResponse) {}
rpc GenesisSync(GenesisSyncRequest) returns (GenesisSyncResponse) {}
rpc KubernetesAPISync(KubernetesAPISyncRequest) returns (KubernetesAPISyncResponse) {}
rpc GetKubernetesClusterID(KubernetesClusterIDRequest) returns (KubernetesClusterIDResponse) {}
rpc GPIDSync(GPIDSyncRequest) returns (GPIDSyncResponse) {}
rpc ShareGPIDLocalData(ShareGPIDSyncRequests) returns (ShareGPIDSyncRequests) {}
rpc Plugin(PluginRequest) returns (stream PluginResponse) {}
rpc GetPrometheusLabelIDs(PrometheusLabelRequest) returns (PrometheusLabelResponse) {}
rpc GetPrometheusTargets(PrometheusTargetRequest) returns (PrometheusTargetResponse) {}
rpc GetUniversalTagNameMaps(UniversalTagNameMapsRequest) returns (UniversalTagNameMapsResponse) {}
// because gRPC cannot be initiated by server, the req/resp of this rpc is reversed
rpc RemoteExecute(stream RemoteExecResponse) returns (stream RemoteExecRequest) {}
rpc GetOrgIDs(OrgIDsRequest) returns (OrgIDsResponse) {}
}

Expand Down Expand Up @@ -628,43 +622,12 @@ message UpgradeResponse {
optional string k8s_image = 6; // When k8s_image is not empty, ignore content
}

message NtpRequest {
optional string ctrl_ip = 1; // 请求端的控制口IP
optional bytes request = 10; // 数据
}

message NtpResponse {
optional bytes response = 1; // 数据
}

message PluginConfig {
optional uint32 update_time = 1 [default = 0]; // latest epoch of all configured plugins
repeated string wasm_plugins = 2;
repeated string so_plugins = 3;
}

enum PluginType {
WASM = 1;
SO = 2;
}

message PluginRequest {
optional string ctrl_ip = 1;
optional string ctrl_mac = 2;
optional PluginType plugin_type = 3;
optional string plugin_name = 4;
optional string team_id = 5; // agent team identity
}

message PluginResponse {
optional Status status = 1; // 调用是否成功
optional bytes content = 2; // 数据
optional string md5 = 3; // 文件MD5
optional uint64 total_len = 4; // 数据总长
optional uint32 pkt_count = 5; // 包总个数
optional uint32 update_time = 6 [default = 0]; // plugin update epoch
}

message GenesisPlatformData {
repeated Ip ips = 7;

Expand Down Expand Up @@ -772,17 +735,6 @@ message KubernetesAPISyncResponse {
optional uint64 version = 1;
}

message KubernetesClusterIDRequest {
optional string ca_md5 = 1; // md5 of /run/secrets/[kubernetes.io/serviceaccount/ca.crt
optional string team_id = 2; // agent team identity
optional string kubernetes_cluster_name = 46; // get that from deepflow-agent.yaml
}

message KubernetesClusterIDResponse {
optional string error_msg = 1;
optional string cluster_id = 2;
}

message Pcap {
optional uint64 flow_id = 1;
optional uint64 start_time = 2; // ns
Expand Down Expand Up @@ -1059,101 +1011,6 @@ message UniversalTagNameMapsResponse {
repeated IdNameMap vtap_map = 14;
}

message AgentId {
optional string ip = 1;
optional string mac = 2;
optional string team_id = 3;
}

enum OutputFormat {
TEXT = 0;
BINARY = 1;
}

enum CommandType {
LINUX = 0;
KUBERNETES = 1;
}

enum ParamType {
PF_TEXT = 0;
PF_BOOLEAN = 1;
}

message CommandParam {
optional string name = 1;
optional string regex = 2;
optional bool required = 3;
optional ParamType param_type = 4;
optional string description = 5;
}

message RemoteCommand {
optional string cmd = 2;
optional OutputFormat output_format = 4;
optional string ident = 6;
repeated CommandParam params = 7;
optional string type_name = 8;

optional uint32 id = 1; // deprecated, use `ident` instead
repeated string param_names = 3; // deprecated, use `params` instead
optional CommandType cmd_type = 5; // deprecated, use `type_name` instead
}

message LinuxNamespace {
optional uint64 id = 1;
optional string ns_type = 2;
optional string user = 3;
optional uint32 pid = 4;
optional string cmd = 5;
}

message CommandResult {
optional int32 errno = 1;
optional bytes content = 2;
// will only be populated in the last segment
// also used as end of result
optional string md5 = 3;
optional uint64 total_len = 4;
optional uint32 pkt_count = 5;
}

enum ExecutionType {
LIST_COMMAND = 0;
LIST_NAMESPACE = 1;
RUN_COMMAND = 2;
}

message Parameter {
optional string key = 1;
optional string value = 2; // accepts [A-Za-z0-9-_]
}

// message from server to agent
message RemoteExecRequest {
optional uint64 request_id = 1;

optional ExecutionType exec_type = 2;
repeated Parameter params = 4; // parameters to use in commands
optional uint32 linux_ns_pid = 5; // execute command in agent namespace if null
optional uint32 batch_len = 6 [default = 1048576]; // batch len of command execution results, min 1024
optional string command_ident = 7;

optional uint32 command_id = 3; // deprecated, use `command_ident` instead
}

// message from agent to server
message RemoteExecResponse {
optional AgentId agent_id = 1;
optional uint64 request_id = 2;

optional string errmsg = 3;

repeated RemoteCommand commands = 4;
repeated LinuxNamespace linux_namespaces = 5;
optional CommandResult command_result = 6;
}

message OrgIDsRequest {}

message OrgIDsResponse {
Expand Down
1 change: 0 additions & 1 deletion server/controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/deepflowio/deepflow/server/controller/tagrecorder"
"github.com/deepflowio/deepflow/server/controller/trisolaris"
_ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/agentdebug"
_ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/debug"
_ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/grpc/healthcheck"
_ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/http/cache"
_ "github.com/deepflowio/deepflow/server/controller/trisolaris/services/http/upgrade"
Expand Down
59 changes: 8 additions & 51 deletions server/controller/grpc/synchronizer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ import (
)

type service struct {
vTapEvent *trisolaris.VTapEvent
tsdbEvent *trisolaris.TSDBEvent
ntpEvent *trisolaris.NTPEvent
upgradeEvent *trisolaris.UpgradeEvent
kubernetesClusterIDEvent *trisolaris.KubernetesClusterIDEvent
processInfoEvent *trisolaris.ProcessInfoEvent
pluginEvent *trisolaris.PluginEvent
prometheusEvent *prometheus.SynchronizerEvent
vTapEvent *trisolaris.VTapEvent
tsdbEvent *trisolaris.TSDBEvent
upgradeEvent *trisolaris.UpgradeEvent
prometheusEvent *prometheus.SynchronizerEvent
}

func init() {
Expand All @@ -48,13 +44,10 @@ func init() {

func newService() *service {
return &service{
vTapEvent: trisolaris.NewVTapEvent(),
tsdbEvent: trisolaris.NewTSDBEvent(),
ntpEvent: trisolaris.NewNTPEvent(),
upgradeEvent: trisolaris.NewUpgradeEvent(),
processInfoEvent: trisolaris.NewprocessInfoEvent(),
pluginEvent: trisolaris.NewPluginEvent(),
prometheusEvent: prometheus.NewSynchronizerEvent(),
vTapEvent: trisolaris.NewVTapEvent(),
tsdbEvent: trisolaris.NewTSDBEvent(),
upgradeEvent: trisolaris.NewUpgradeEvent(),
prometheusEvent: prometheus.NewSynchronizerEvent(),
}
}

Expand Down Expand Up @@ -97,22 +90,6 @@ func (s *service) Upgrade(r *api.UpgradeRequest, in api.Synchronizer_UpgradeServ
return s.upgradeEvent.Upgrade(r, in)
}

func (s *service) Query(ctx context.Context, in *api.NtpRequest) (*api.NtpResponse, error) {
startTime := time.Now()
defer func() {
statsd.AddGrpcCostStatsd(statsd.Query, int(time.Now().Sub(startTime).Milliseconds()))
}()
return s.ntpEvent.Query(ctx, in)
}

func (s *service) GetKubernetesClusterID(ctx context.Context, in *api.KubernetesClusterIDRequest) (*api.KubernetesClusterIDResponse, error) {
startTime := time.Now()
defer func() {
statsd.AddGrpcCostStatsd(statsd.GetKubernetesClusterID, int(time.Now().Sub(startTime).Milliseconds()))
}()
return s.kubernetesClusterIDEvent.GetKubernetesClusterID(ctx, in)
}

func (s *service) GenesisSync(ctx context.Context, in *api.GenesisSyncRequest) (*api.GenesisSyncResponse, error) {
startTime := time.Now()
defer func() {
Expand All @@ -129,18 +106,6 @@ func (s *service) KubernetesAPISync(ctx context.Context, in *api.KubernetesAPISy
return genesis.GenesisService.Synchronizer.KubernetesAPISync(ctx, in)
}

func (s *service) GPIDSync(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDSyncResponse, error) {
startTime := time.Now()
defer func() {
statsd.AddGrpcCostStatsd(statsd.GPIDSync, int(time.Now().Sub(startTime).Milliseconds()))
}()
return s.processInfoEvent.GPIDSync(ctx, in)
}

func (s *service) ShareGPIDLocalData(ctx context.Context, in *api.ShareGPIDSyncRequests) (*api.ShareGPIDSyncRequests, error) {
return s.processInfoEvent.ShareGPIDLocalData(ctx, in)
}

func (s *service) GetPrometheusLabelIDs(ctx context.Context, in *api.PrometheusLabelRequest) (*api.PrometheusLabelResponse, error) {
startTime := time.Now()
defer func() {
Expand All @@ -160,18 +125,10 @@ func (s *service) GetPrometheusTargets(ctx context.Context, in *api.PrometheusTa
// return resp, err
}

func (s *service) Plugin(r *api.PluginRequest, in api.Synchronizer_PluginServer) error {
return s.pluginEvent.Plugin(r, in)
}

func (s *service) GetUniversalTagNameMaps(ctx context.Context, in *api.UniversalTagNameMapsRequest) (*api.UniversalTagNameMapsResponse, error) {
return s.tsdbEvent.GetUniversalTagNameMaps(ctx, in)
}

func (s *service) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error {
return s.vTapEvent.RemoteExecute(in)
}

func (s *service) GetOrgIDs(ctx context.Context, in *api.OrgIDsRequest) (*api.OrgIDsResponse, error) {
return s.tsdbEvent.GetOrgIDs(ctx, in)
}
12 changes: 6 additions & 6 deletions server/controller/http/router/vtap/agent_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"

"github.com/deepflowio/deepflow/message/trident"
agentmessage "github.com/deepflowio/deepflow/message/agent"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/config"
"github.com/deepflowio/deepflow/server/controller/db/metadb"
Expand Down Expand Up @@ -208,7 +208,7 @@ func (a *AgentCMD) getCMDAndNamespaceHandler() gin.HandlerFunc {

userType, _ := c.Get(common.HEADER_KEY_X_USER_TYPE)
if !(userType == common.USER_TYPE_SUPER_ADMIN || userType == common.USER_TYPE_ADMIN) {
var cmds []*trident.RemoteCommand
var cmds []*agentmessage.RemoteCommand
for _, item := range data.RemoteCommand {
_, ok1 := profileCommandMap[*item.Cmd]
_, ok2 := probeCommandMap[*item.Cmd]
Expand All @@ -220,7 +220,7 @@ func (a *AgentCMD) getCMDAndNamespaceHandler() gin.HandlerFunc {
}

if filterCommandMap, ok := agentCommandMap[AgentCommandType(c.Query("type"))]; ok {
var cmds []*trident.RemoteCommand
var cmds []*agentmessage.RemoteCommand
for _, item := range data.RemoteCommand {
if item.Cmd == nil {
continue
Expand Down Expand Up @@ -271,8 +271,8 @@ func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc {
}
}

agentReq := trident.RemoteExecRequest{
ExecType: trident.ExecutionType_RUN_COMMAND.Enum(),
agentReq := agentmessage.RemoteExecRequest{
ExecType: agentmessage.ExecutionType_RUN_COMMAND.Enum(),
// CommandId: req.CommandId, // deprecated
CommandIdent: req.CommandIdent,
LinuxNsPid: req.LinuxNsPid,
Expand All @@ -296,7 +296,7 @@ func (a *AgentCMD) cmdRunHandler() gin.HandlerFunc {
return
}

if req.OutputFormat.String() == trident.OutputFormat_TEXT.String() {
if req.OutputFormat.String() == agentmessage.OutputFormat_TEXT.String() {
JsonResponse(c, content, nil)
return
}
Expand Down
Loading

0 comments on commit 9a4d108

Please sign in to comment.