From 24f0620c9479a87b0c25d33bcd4d22c14c066692 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Thu, 3 Oct 2024 17:09:48 +0800 Subject: [PATCH] fix --- pkg/flags/flags.go | 4 +- pkg/helper/k8smeta/k8s_meta_manager.go | 2 +- pkg/helper/self_metrics_agent_constants.go | 44 +++++++++++++++ .../self_metrics_component_constants.go | 33 +++++++++++ ...ts.go => self_metrics_plugin_constants.go} | 56 +------------------ .../input/kubernetesmetav2/meta_collector.go | 25 ++++----- .../kubernetesmetav2/meta_collector_test.go | 8 +-- .../input/kubernetesmetav2/service_meta.go | 32 ++++++----- 8 files changed, 114 insertions(+), 90 deletions(-) create mode 100644 pkg/helper/self_metrics_agent_constants.go create mode 100644 pkg/helper/self_metrics_component_constants.go rename pkg/helper/{self_metrics_constants.go => self_metrics_plugin_constants.go} (61%) diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 3f77128921..2b7d47d43f 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -117,7 +117,7 @@ var ( DeployMode = flag.String("DEPLOY_MODE", DeployDaemonset, "alibaba log deploy mode, daemonset or statefulset or singleton") EnableKubernetesMeta = flag.Bool("ENABLE_KUBERNETES_META", false, "enable kubernetes meta") ClusterID = flag.String("GLOBAL_CLUSTER_ID", "", "cluster id") - ClusterMode = flag.String("GLOBAL_CLUSTER_MODE", "", "cluster mode") + ClusterType = flag.String("GLOBAL_CLUSTER_TYPE", "", "cluster type, supporting ack, one, asi and k8s") ) func init() { @@ -145,7 +145,7 @@ func init() { _ = util.InitFromEnvString("DEPLOY_MODE", DeployMode, *DeployMode) _ = util.InitFromEnvBool("ENABLE_KUBERNETES_META", EnableKubernetesMeta, *EnableKubernetesMeta) _ = util.InitFromEnvString("GLOBAL_CLUSTER_ID", ClusterID, *ClusterID) - _ = util.InitFromEnvString("GLOBAL_CLUSTER_MODE", ClusterMode, *ClusterMode) + _ = util.InitFromEnvString("GLOBAL_CLUSTER_TYPE", ClusterType, *ClusterType) if len(*DefaultRegion) == 0 { *DefaultRegion = util.GuessRegionByEndpoint(*LogServiceEndpoint, "cn-hangzhou") diff --git a/pkg/helper/k8smeta/k8s_meta_manager.go b/pkg/helper/k8smeta/k8s_meta_manager.go index 80a3f800f5..668d405b65 100644 --- a/pkg/helper/k8smeta/k8s_meta_manager.go +++ b/pkg/helper/k8smeta/k8s_meta_manager.go @@ -100,7 +100,7 @@ func (m *MetaManager) Init(configPath string) (err error) { m.addEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaAddEventTotal) m.updateEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaUpdateEventTotal) m.deleteEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaDeleteEventTotal) - m.cacheResourceGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaCacheResourceSize) + m.cacheResourceGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaCacheSize) m.queueSizeGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaQueueSize) m.httpRequestCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPRequestTotal) m.httpAvgDelayMs = helper.NewAverageMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPAvgDelayMs) diff --git a/pkg/helper/self_metrics_agent_constants.go b/pkg/helper/self_metrics_agent_constants.go new file mode 100644 index 0000000000..fbff3634b9 --- /dev/null +++ b/pkg/helper/self_metrics_agent_constants.go @@ -0,0 +1,44 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "github.com/alibaba/ilogtail/pkg/pipeline" +) + +////////////////////////////////////////////////////////////////////////// +// agent +////////////////////////////////////////////////////////////////////////// + +// metric keys +const ( + MetricAgentMemoryGo = "agent_go_memory_used_mb" + MetricAgentGoRoutinesTotal = "agent_go_routines_total" +) + +func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair { + labels := make([]pipeline.LabelPair, 0) + labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()}) + labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()}) + labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()}) + + if len(pluginMeta.PluginID) > 0 { + labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID}) + } + if len(pluginMeta.PluginType) > 0 { + labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType}) + } + return labels +} diff --git a/pkg/helper/self_metrics_component_constants.go b/pkg/helper/self_metrics_component_constants.go new file mode 100644 index 0000000000..1b8f6cfacd --- /dev/null +++ b/pkg/helper/self_metrics_component_constants.go @@ -0,0 +1,33 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +////////////////////////////////////////////////////////////////////////// +// component +////////////////////////////////////////////////////////////////////////// + +/********************************************************** +* k8s meta +**********************************************************/ +const ( + MetricComponentK8sMetaAddEventTotal = "component_k8s_meta_add_event_total" + MetricComponentK8sMetaUpdateEventTotal = "component_k8s_meta_update_event_total" + MetricComponentK8sMetaDeleteEventTotal = "component_k8s_meta_delete_event_total" + MetricComponentK8sMetaCacheSize = "component_k8s_meta_cache_size" + MetricComponentK8sMetaQueueSize = "component_k8s_meta_queue_size" + MetricComponentK8sMetaHTTPRequestTotal = "component_k8s_meta_http_request_total" + MetricComponentK8sMetaHTTPAvgDelayMs = "component_k8s_meta_avg_delay_ms" + MetricComponentK8sMetaHTTPMaxDelayMs = "component_k8s_meta_max_delay_ms" +) diff --git a/pkg/helper/self_metrics_constants.go b/pkg/helper/self_metrics_plugin_constants.go similarity index 61% rename from pkg/helper/self_metrics_constants.go rename to pkg/helper/self_metrics_plugin_constants.go index 907152acff..91e869fb63 100644 --- a/pkg/helper/self_metrics_constants.go +++ b/pkg/helper/self_metrics_plugin_constants.go @@ -14,38 +14,6 @@ package helper -import ( - "github.com/alibaba/ilogtail/pkg/pipeline" -) - -////////////////////////////////////////////////////////////////////////// -// agent -////////////////////////////////////////////////////////////////////////// - -// metric keys -const ( - MetricAgentMemoryGo = "agent_go_memory_used_mb" - MetricAgentGoRoutinesTotal = "agent_go_routines_total" -) - -////////////////////////////////////////////////////////////////////////// -// component -////////////////////////////////////////////////////////////////////////// - -/********************************************************** -* k8s meta -**********************************************************/ -const ( - MetricComponentK8sMetaAddEventTotal = "component_k8s_meta_add_event_total" - MetricComponentK8sMetaUpdateEventTotal = "component_k8s_meta_update_event_total" - MetricComponentK8sMetaDeleteEventTotal = "component_k8s_meta_delete_event_total" - MetricComponentK8sMetaCacheResourceSize = "component_k8s_meta_cache_resource_size" - MetricComponentK8sMetaQueueSize = "component_k8s_meta_queue_size" - MetricComponentK8sMetaHTTPRequestTotal = "component_k8s_meta_http_request_total" - MetricComponentK8sMetaHTTPAvgDelayMs = "component_k8s_meta_avg_delay_ms" - MetricComponentK8sMetaHTTPMaxDelayMs = "component_k8s_meta_max_delay_ms" -) - ////////////////////////////////////////////////////////////////////////// // plugin ////////////////////////////////////////////////////////////////////////// @@ -99,20 +67,13 @@ const ( /********************************************************** * service_mysql * service_rdb +* service_kubernetes_meta **********************************************************/ const ( MetricPluginCollectAvgCostTimeMs = "plugin_collect_avg_cost_time_ms" MetricPluginCollectTotal = "plugin_collect_total" ) -/********************************************************** -* service_kubernetes_meta -**********************************************************/ -const ( - MetricPluginCollectEntityTotal = "plugin_collect_entity_total" - MetricPluginCollectLinkTotal = "plugin_collect_link_total" -) - /********************************************************** * all processor(所有解析类的处理插件通用指标。Todo:目前统计还不全、不准确) **********************************************************/ @@ -131,18 +92,3 @@ const ( const ( PluginPairsPerLogTotal = "plugin_pairs_per_log_total" ) - -func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair { - labels := make([]pipeline.LabelPair, 0) - labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()}) - labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()}) - labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()}) - - if len(pluginMeta.PluginID) > 0 { - labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID}) - } - if len(pluginMeta.PluginType) > 0 { - labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType}) - } - return labels -} diff --git a/plugins/input/kubernetesmetav2/meta_collector.go b/plugins/input/kubernetesmetav2/meta_collector.go index eb0146032a..6b579beaa3 100644 --- a/plugins/input/kubernetesmetav2/meta_collector.go +++ b/plugins/input/kubernetesmetav2/meta_collector.go @@ -13,6 +13,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/helper/k8smeta" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" @@ -329,23 +330,23 @@ func (m *metaCollector) sendInBackground() { case e := <-m.entityBuffer: entityGroup.Events = append(entityGroup.Events, e) if len(entityGroup.Events) >= 100 { - sendFunc(entityGroup) m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events))) + sendFunc(entityGroup) } case e := <-m.entityLinkBuffer: entityLinkGroup.Events = append(entityLinkGroup.Events, e) if len(entityLinkGroup.Events) >= 100 { - sendFunc(entityLinkGroup) m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events))) + sendFunc(entityLinkGroup) } case <-time.After(3 * time.Second): if len(entityGroup.Events) > 0 { - sendFunc(entityGroup) m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events))) + sendFunc(entityGroup) } if len(entityLinkGroup.Events) > 0 { - sendFunc(entityLinkGroup) m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events))) + sendFunc(entityLinkGroup) } case <-m.stopCh: return @@ -406,16 +407,14 @@ func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEve } func (m *metaCollector) genEntityTypeKey(kind string) string { - var prefix string - switch { - case kind == "": - prefix = m.serviceK8sMeta.domain + ".k8s" - case kind == clusterTypeName && m.serviceK8sMeta.domain == acsDomain: - prefix = m.serviceK8sMeta.domainWithClusterMode + "." - default: - prefix = m.serviceK8sMeta.domain + ".k8s." + // assert domain is initialized + if kind == "" { + return m.serviceK8sMeta.domain + ".k8s" + } + if kind == clusterTypeName && m.serviceK8sMeta.domain == acsDomain { + return m.serviceK8sMeta.domain + "." + *flags.ClusterType + "." + clusterTypeName } - return fmt.Sprintf("%s%s", prefix, strings.ToLower(kind)) + return m.serviceK8sMeta.domain + ".k8s." + strings.ToLower(kind) } func convertPipelineEvent2Log(event models.PipelineEvent) *protocol.Log { diff --git a/plugins/input/kubernetesmetav2/meta_collector_test.go b/plugins/input/kubernetesmetav2/meta_collector_test.go index 07ebc38ef8..c037581652 100644 --- a/plugins/input/kubernetesmetav2/meta_collector_test.go +++ b/plugins/input/kubernetesmetav2/meta_collector_test.go @@ -12,17 +12,17 @@ func TestGenEntityTypeKeyAcs(t *testing.T) { m := metaCollector{ serviceK8sMeta: &ServiceK8sMeta{}, } - *flags.ClusterMode = ackCluster + *flags.ClusterType = ackCluster m.serviceK8sMeta.initDomain() assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod")) assert.Equal(t, "acs.ack.cluster", m.genEntityTypeKey("cluster")) - *flags.ClusterMode = oneCluster + *flags.ClusterType = oneCluster m.serviceK8sMeta.initDomain() assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod")) assert.Equal(t, "acs.one.cluster", m.genEntityTypeKey("cluster")) - *flags.ClusterMode = asiCluster + *flags.ClusterType = asiCluster m.serviceK8sMeta.initDomain() assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod")) assert.Equal(t, "acs.asi.cluster", m.genEntityTypeKey("cluster")) @@ -32,7 +32,7 @@ func TestGenEntityTypeKeyInfra(t *testing.T) { m := metaCollector{ serviceK8sMeta: &ServiceK8sMeta{}, } - *flags.ClusterMode = "k8s" + *flags.ClusterType = "k8s" m.serviceK8sMeta.initDomain() assert.Equal(t, "infra.k8s.pod", m.genEntityTypeKey("pod")) assert.Equal(t, "infra.k8s.cluster", m.genEntityTypeKey("cluster")) diff --git a/plugins/input/kubernetesmetav2/service_meta.go b/plugins/input/kubernetesmetav2/service_meta.go index 4478f7cd45..d745a72d11 100644 --- a/plugins/input/kubernetesmetav2/service_meta.go +++ b/plugins/input/kubernetesmetav2/service_meta.go @@ -1,13 +1,12 @@ package kubernetesmetav2 import ( - "fmt" - "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/helper/k8smeta" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/protocol" ) type ProcessFunc func(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent @@ -35,14 +34,13 @@ type ServiceK8sMeta struct { Ingress bool Container bool // other - context pipeline.Context - metaManager *k8smeta.MetaManager - collector pipeline.Collector - metaCollector *metaCollector - configName string - clusterID string - domain string - domainWithClusterMode string + context pipeline.Context + metaManager *k8smeta.MetaManager + collector pipeline.Collector + metaCollector *metaCollector + configName string + clusterID string + domain string // self metric entityCount pipeline.CounterMetric linkCount pipeline.CounterMetric @@ -57,8 +55,14 @@ func (s *ServiceK8sMeta) Init(context pipeline.Context) (int, error) { s.initDomain() metricRecord := s.context.GetMetricRecord() - s.entityCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectEntityTotal) - s.linkCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectLinkTotal) + s.entityCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectTotal, &protocol.Log_Content{ + Key: "category", + Value: "entity", + }) + s.linkCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectTotal, &protocol.Log_Content{ + Key: "category", + Value: "link", + }) return 0, nil } @@ -86,13 +90,11 @@ func (s *ServiceK8sMeta) Start(collector pipeline.Collector) error { } func (s *ServiceK8sMeta) initDomain() { - switch *flags.ClusterMode { + switch *flags.ClusterType { case ackCluster, oneCluster, asiCluster: s.domain = acsDomain - s.domainWithClusterMode = fmt.Sprintf("%s.%s", acsDomain, *flags.ClusterMode) default: s.domain = infraDomain - s.domainWithClusterMode = infraDomain + ".k8s" } }