Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Oct 3, 2024
1 parent 88e39cd commit 24f0620
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 90 deletions.
4 changes: 2 additions & 2 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var (
DeployMode = flag.String("DEPLOY_MODE", DeployDaemonset, "alibaba log deploy mode, daemonset or statefulset or singleton")
EnableKubernetesMeta = flag.Bool("ENABLE_KUBERNETES_META", false, "enable kubernetes meta")
ClusterID = flag.String("GLOBAL_CLUSTER_ID", "", "cluster id")
ClusterMode = flag.String("GLOBAL_CLUSTER_MODE", "", "cluster mode")
ClusterType = flag.String("GLOBAL_CLUSTER_TYPE", "", "cluster type, supporting ack, one, asi and k8s")
)

func init() {
Expand Down Expand Up @@ -145,7 +145,7 @@ func init() {
_ = util.InitFromEnvString("DEPLOY_MODE", DeployMode, *DeployMode)
_ = util.InitFromEnvBool("ENABLE_KUBERNETES_META", EnableKubernetesMeta, *EnableKubernetesMeta)
_ = util.InitFromEnvString("GLOBAL_CLUSTER_ID", ClusterID, *ClusterID)
_ = util.InitFromEnvString("GLOBAL_CLUSTER_MODE", ClusterMode, *ClusterMode)
_ = util.InitFromEnvString("GLOBAL_CLUSTER_TYPE", ClusterType, *ClusterType)

if len(*DefaultRegion) == 0 {
*DefaultRegion = util.GuessRegionByEndpoint(*LogServiceEndpoint, "cn-hangzhou")
Expand Down
2 changes: 1 addition & 1 deletion pkg/helper/k8smeta/k8s_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (m *MetaManager) Init(configPath string) (err error) {
m.addEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaAddEventTotal)
m.updateEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaUpdateEventTotal)
m.deleteEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaDeleteEventTotal)
m.cacheResourceGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaCacheResourceSize)
m.cacheResourceGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaCacheSize)
m.queueSizeGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaQueueSize)
m.httpRequestCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPRequestTotal)
m.httpAvgDelayMs = helper.NewAverageMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPAvgDelayMs)
Expand Down
44 changes: 44 additions & 0 deletions pkg/helper/self_metrics_agent_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
)

//////////////////////////////////////////////////////////////////////////
// agent
//////////////////////////////////////////////////////////////////////////

// metric keys
const (
MetricAgentMemoryGo = "agent_go_memory_used_mb"
MetricAgentGoRoutinesTotal = "agent_go_routines_total"
)

func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair {
labels := make([]pipeline.LabelPair, 0)
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()})

if len(pluginMeta.PluginID) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID})
}
if len(pluginMeta.PluginType) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType})
}
return labels
}
33 changes: 33 additions & 0 deletions pkg/helper/self_metrics_component_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

//////////////////////////////////////////////////////////////////////////
// component
//////////////////////////////////////////////////////////////////////////

/**********************************************************
* k8s meta
**********************************************************/
const (
MetricComponentK8sMetaAddEventTotal = "component_k8s_meta_add_event_total"
MetricComponentK8sMetaUpdateEventTotal = "component_k8s_meta_update_event_total"
MetricComponentK8sMetaDeleteEventTotal = "component_k8s_meta_delete_event_total"
MetricComponentK8sMetaCacheSize = "component_k8s_meta_cache_size"
MetricComponentK8sMetaQueueSize = "component_k8s_meta_queue_size"
MetricComponentK8sMetaHTTPRequestTotal = "component_k8s_meta_http_request_total"
MetricComponentK8sMetaHTTPAvgDelayMs = "component_k8s_meta_avg_delay_ms"
MetricComponentK8sMetaHTTPMaxDelayMs = "component_k8s_meta_max_delay_ms"
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,6 @@

package helper

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
)

//////////////////////////////////////////////////////////////////////////
// agent
//////////////////////////////////////////////////////////////////////////

// metric keys
const (
MetricAgentMemoryGo = "agent_go_memory_used_mb"
MetricAgentGoRoutinesTotal = "agent_go_routines_total"
)

//////////////////////////////////////////////////////////////////////////
// component
//////////////////////////////////////////////////////////////////////////

/**********************************************************
* k8s meta
**********************************************************/
const (
MetricComponentK8sMetaAddEventTotal = "component_k8s_meta_add_event_total"
MetricComponentK8sMetaUpdateEventTotal = "component_k8s_meta_update_event_total"
MetricComponentK8sMetaDeleteEventTotal = "component_k8s_meta_delete_event_total"
MetricComponentK8sMetaCacheResourceSize = "component_k8s_meta_cache_resource_size"
MetricComponentK8sMetaQueueSize = "component_k8s_meta_queue_size"
MetricComponentK8sMetaHTTPRequestTotal = "component_k8s_meta_http_request_total"
MetricComponentK8sMetaHTTPAvgDelayMs = "component_k8s_meta_avg_delay_ms"
MetricComponentK8sMetaHTTPMaxDelayMs = "component_k8s_meta_max_delay_ms"
)

//////////////////////////////////////////////////////////////////////////
// plugin
//////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -99,20 +67,13 @@ const (
/**********************************************************
* service_mysql
* service_rdb
* service_kubernetes_meta
**********************************************************/
const (
MetricPluginCollectAvgCostTimeMs = "plugin_collect_avg_cost_time_ms"
MetricPluginCollectTotal = "plugin_collect_total"
)

/**********************************************************
* service_kubernetes_meta
**********************************************************/
const (
MetricPluginCollectEntityTotal = "plugin_collect_entity_total"
MetricPluginCollectLinkTotal = "plugin_collect_link_total"
)

/**********************************************************
* all processor(所有解析类的处理插件通用指标。Todo:目前统计还不全、不准确)
**********************************************************/
Expand All @@ -131,18 +92,3 @@ const (
const (
PluginPairsPerLogTotal = "plugin_pairs_per_log_total"
)

func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair {
labels := make([]pipeline.LabelPair, 0)
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()})

if len(pluginMeta.PluginID) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID})
}
if len(pluginMeta.PluginType) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType})
}
return labels
}
25 changes: 12 additions & 13 deletions plugins/input/kubernetesmetav2/meta_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/helper/k8smeta"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
Expand Down Expand Up @@ -329,23 +330,23 @@ func (m *metaCollector) sendInBackground() {
case e := <-m.entityBuffer:
entityGroup.Events = append(entityGroup.Events, e)
if len(entityGroup.Events) >= 100 {
sendFunc(entityGroup)
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
sendFunc(entityGroup)
}
case e := <-m.entityLinkBuffer:
entityLinkGroup.Events = append(entityLinkGroup.Events, e)
if len(entityLinkGroup.Events) >= 100 {
sendFunc(entityLinkGroup)
m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events)))
sendFunc(entityLinkGroup)
}
case <-time.After(3 * time.Second):
if len(entityGroup.Events) > 0 {
sendFunc(entityGroup)
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
sendFunc(entityGroup)
}
if len(entityLinkGroup.Events) > 0 {
sendFunc(entityLinkGroup)
m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events)))
sendFunc(entityLinkGroup)
}
case <-m.stopCh:
return
Expand Down Expand Up @@ -406,16 +407,14 @@ func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEve
}

func (m *metaCollector) genEntityTypeKey(kind string) string {
var prefix string
switch {
case kind == "":
prefix = m.serviceK8sMeta.domain + ".k8s"
case kind == clusterTypeName && m.serviceK8sMeta.domain == acsDomain:
prefix = m.serviceK8sMeta.domainWithClusterMode + "."
default:
prefix = m.serviceK8sMeta.domain + ".k8s."
// assert domain is initialized
if kind == "" {
return m.serviceK8sMeta.domain + ".k8s"
}
if kind == clusterTypeName && m.serviceK8sMeta.domain == acsDomain {
return m.serviceK8sMeta.domain + "." + *flags.ClusterType + "." + clusterTypeName
}
return fmt.Sprintf("%s%s", prefix, strings.ToLower(kind))
return m.serviceK8sMeta.domain + ".k8s." + strings.ToLower(kind)
}

func convertPipelineEvent2Log(event models.PipelineEvent) *protocol.Log {
Expand Down
8 changes: 4 additions & 4 deletions plugins/input/kubernetesmetav2/meta_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ func TestGenEntityTypeKeyAcs(t *testing.T) {
m := metaCollector{
serviceK8sMeta: &ServiceK8sMeta{},
}
*flags.ClusterMode = ackCluster
*flags.ClusterType = ackCluster
m.serviceK8sMeta.initDomain()
assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod"))
assert.Equal(t, "acs.ack.cluster", m.genEntityTypeKey("cluster"))

*flags.ClusterMode = oneCluster
*flags.ClusterType = oneCluster
m.serviceK8sMeta.initDomain()
assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod"))
assert.Equal(t, "acs.one.cluster", m.genEntityTypeKey("cluster"))

*flags.ClusterMode = asiCluster
*flags.ClusterType = asiCluster
m.serviceK8sMeta.initDomain()
assert.Equal(t, "acs.k8s.pod", m.genEntityTypeKey("pod"))
assert.Equal(t, "acs.asi.cluster", m.genEntityTypeKey("cluster"))
Expand All @@ -32,7 +32,7 @@ func TestGenEntityTypeKeyInfra(t *testing.T) {
m := metaCollector{
serviceK8sMeta: &ServiceK8sMeta{},
}
*flags.ClusterMode = "k8s"
*flags.ClusterType = "k8s"
m.serviceK8sMeta.initDomain()
assert.Equal(t, "infra.k8s.pod", m.genEntityTypeKey("pod"))
assert.Equal(t, "infra.k8s.cluster", m.genEntityTypeKey("cluster"))
Expand Down
32 changes: 17 additions & 15 deletions plugins/input/kubernetesmetav2/service_meta.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package kubernetesmetav2

import (
"fmt"

"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/helper/k8smeta"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
)

type ProcessFunc func(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent
Expand Down Expand Up @@ -35,14 +34,13 @@ type ServiceK8sMeta struct {
Ingress bool
Container bool
// other
context pipeline.Context
metaManager *k8smeta.MetaManager
collector pipeline.Collector
metaCollector *metaCollector
configName string
clusterID string
domain string
domainWithClusterMode string
context pipeline.Context
metaManager *k8smeta.MetaManager
collector pipeline.Collector
metaCollector *metaCollector
configName string
clusterID string
domain string
// self metric
entityCount pipeline.CounterMetric
linkCount pipeline.CounterMetric
Expand All @@ -57,8 +55,14 @@ func (s *ServiceK8sMeta) Init(context pipeline.Context) (int, error) {
s.initDomain()

metricRecord := s.context.GetMetricRecord()
s.entityCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectEntityTotal)
s.linkCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectLinkTotal)
s.entityCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectTotal, &protocol.Log_Content{
Key: "category",
Value: "entity",
})
s.linkCount = helper.NewCounterMetricAndRegister(metricRecord, helper.MetricPluginCollectTotal, &protocol.Log_Content{
Key: "category",
Value: "link",
})
return 0, nil
}

Expand Down Expand Up @@ -86,13 +90,11 @@ func (s *ServiceK8sMeta) Start(collector pipeline.Collector) error {
}

func (s *ServiceK8sMeta) initDomain() {
switch *flags.ClusterMode {
switch *flags.ClusterType {
case ackCluster, oneCluster, asiCluster:
s.domain = acsDomain
s.domainWithClusterMode = fmt.Sprintf("%s.%s", acsDomain, *flags.ClusterMode)
default:
s.domain = infraDomain
s.domainWithClusterMode = infraDomain + ".k8s"
}
}

Expand Down

0 comments on commit 24f0620

Please sign in to comment.