diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index 64ddce85d3..fe2a841a27 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -78,11 +78,6 @@ LogtailPlugin::LogtailPlugin() { mPluginCfg["HostIP"] = LoongCollectorMonitor::mIpAddr; mPluginCfg["Hostname"] = LoongCollectorMonitor::mHostname; mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format); - if (!STRING_FLAG(ALIYUN_LOG_FILE_TAGS).empty()) { - mPluginCfg["FileTagsPath"] = GetFileTagsDir(); - mPluginCfg["FileTagsInterval"] = INT32_FLAG(file_tags_update_interval); - } - mPluginCfg["AgentHostID"] = STRING_FLAG(agent_host_id); } LogtailPlugin::~LogtailPlugin() { diff --git a/pkg/config/global_config.go b/pkg/config/global_config.go index 54f46dc900..c260dfd06e 100644 --- a/pkg/config/global_config.go +++ b/pkg/config/global_config.go @@ -50,12 +50,9 @@ type GlobalConfig struct { // Checkpoint file name of loongcollector plugin. LoongCollectorGoCheckPointFile string // Network identification from loongcollector. - HostIP string - Hostname string - DelayStopSec int - FileTagsPath string - FileTagsInterval int - AgentHostID string + HostIP string + Hostname string + DelayStopSec int EnableTimestampNanosecond bool UsingOldContentTag bool diff --git a/plugin_main/plugin_export.go b/plugin_main/plugin_export.go index 5ee3f1d10b..9a2650c7ca 100644 --- a/plugin_main/plugin_export.go +++ b/plugin_main/plugin_export.go @@ -338,7 +338,6 @@ func initPluginBase(cfgStr string) int { initOnce.Do(func() { LoadGlobalConfig(cfgStr) InitHTTPServer() - pluginmanager.InitFileConfig(&config.LoongcollectorGlobalConfig) setGCPercentForSlowStart() logger.Info(context.Background(), "init plugin base, version", config.BaseVersion) if *flags.DeployMode == flags.DeploySingleton && *flags.EnableKubernetesMeta { diff --git a/pluginmanager/file_config.go b/pluginmanager/file_config.go deleted file mode 100644 index 1dcec8e4e7..0000000000 --- a/pluginmanager/file_config.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2025 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pluginmanager - -import ( - "context" - "encoding/json" - "io/ioutil" - "math" - "os" - "path/filepath" - "sync/atomic" - "time" - - "github.com/alibaba/ilogtail/pkg/config" - "github.com/alibaba/ilogtail/pkg/logger" -) - -const ( - InstanceIdentityFilename = "instance_identity" -) - -var fileConfig *FileConfig - -type DoubleBuffer struct { - buffer []interface{} - bufferIndex atomic.Int32 -} - -func NewDoubleBuffer() *DoubleBuffer { - return &DoubleBuffer{ - buffer: make([]interface{}, 2), - bufferIndex: atomic.Int32{}, - } -} - -func (db *DoubleBuffer) Get() interface{} { - return db.buffer[db.bufferIndex.Load()] -} - -func (db *DoubleBuffer) Swap(newBuffer interface{}) { - db.buffer[1-db.bufferIndex.Load()] = newBuffer - db.bufferIndex.Store(1 - db.bufferIndex.Load()) -} - -type InstanceIdentity struct { - InstanceID string `json:"instance-id"` - OwnerAccountID string `json:"owner-account-id"` - RegionID string `json:"region-id"` - RandomHostID string `json:"random-host"` - ECSAssistMachineID string `json:"ecs-assist-machine-id"` - GFlagHostID string `json:"-"` -} - -type FileConfig struct { - fileTagsPath string - fileTagsInterval int - fileTagsBuffer *DoubleBuffer - - instanceIdentityPath string - instanceIdentityBuffer *DoubleBuffer - - fileConfigStopCh chan struct{} -} - -func InitFileConfig(config *config.GlobalConfig) { - fileConfig = &FileConfig{ - fileTagsPath: config.FileTagsPath, - fileTagsInterval: config.FileTagsInterval, - fileTagsBuffer: NewDoubleBuffer(), - instanceIdentityPath: filepath.Join(config.LoongCollectorDataDir, InstanceIdentityFilename), - instanceIdentityBuffer: NewDoubleBuffer(), - fileConfigStopCh: make(chan struct{}), - } - - go fileConfig.loadLoop(config.AgentHostID) -} - -func StopFileConfig() { - close(fileConfig.fileConfigStopCh) -} - -func (fc *FileConfig) GetFileTags() map[string]interface{} { - result, ok := fc.fileTagsBuffer.Get().(map[string]interface{}) - if !ok { - return nil - } - return result -} - -func (fc *FileConfig) GetInstanceIdentity() *InstanceIdentity { - result, ok := fc.instanceIdentityBuffer.Get().(InstanceIdentity) - if !ok { - return nil - } - return &result -} - -func (fc *FileConfig) loadLoop(gFlagHostID string) { - lastUpdateInstanceIdentity := time.Now() - interval := 1 - for { - select { - case <-fc.fileConfigStopCh: - return - case <-time.After(time.Duration(math.Min(float64(fc.fileTagsInterval), float64(interval))) * time.Second): - if fileConfig.fileTagsPath != "" { - data, err := ReadFile(fc.fileTagsPath) - if err == nil { - var fileTags map[string]interface{} - err = json.Unmarshal(data, &fileTags) - if err != nil { - logger.Error(context.Background(), "LOAD_FILE_CONFIG_ALARM", "unmarshal file failed", err) - } else { - fc.fileTagsBuffer.Swap(fileTags) - } - } - } - if time.Since(lastUpdateInstanceIdentity) > time.Duration(interval)*time.Second { - data, err := ReadFile(fc.instanceIdentityPath) - var instanceIdentity InstanceIdentity - if err == nil { - err = json.Unmarshal(data, &instanceIdentity) - if err != nil { - logger.Error(context.Background(), "LOAD_FILE_CONFIG_ALARM", "unmarshal file failed", err) - } - } - instanceIdentity.GFlagHostID = gFlagHostID - oldInstanceIdentity := fc.instanceIdentityBuffer.Get() - if oldInstanceIdentity == nil || instanceIdentity.InstanceID != oldInstanceIdentity.(InstanceIdentity).InstanceID { - fc.instanceIdentityBuffer.Swap(instanceIdentity) - } - if instanceIdentity.InstanceID != "" { - interval = int(math.Min(float64(interval*2), 3600*24)) - } - lastUpdateInstanceIdentity = time.Now() - } - } - } -} - -func ReadFile(path string) ([]byte, error) { - _, err := os.Stat(path) - if os.IsNotExist(err) { - return nil, err - } - file, err := os.Open(path) //nolint:gosec - if err != nil { - logger.Error(context.Background(), "LOAD_FILE_CONFIG_ALARM", "open file failed", err) - return nil, err - } - defer file.Close() //nolint:gosec - data, err := ioutil.ReadAll(file) - if err != nil { - logger.Error(context.Background(), "LOAD_FILE_CONFIG_ALARM", "read file failed", err) - return nil, err - } - return data, err -} diff --git a/pluginmanager/file_config_test.go b/pluginmanager/file_config_test.go deleted file mode 100644 index 1822a1d7b4..0000000000 --- a/pluginmanager/file_config_test.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2025 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pluginmanager - -import ( - "os" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/alibaba/ilogtail/pkg/config" -) - -func TestDoubleBuffer(t *testing.T) { - db := NewDoubleBuffer() - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for i := 0; i < 100; i++ { - db.Swap(2) - time.Sleep(10 * time.Millisecond) - db.Swap(1) - } - wg.Done() - }() - for i := 0; i < 100; i++ { - db.Get() - } - wg.Wait() - assert.Equal(t, db.buffer[0], 1) - assert.Equal(t, db.buffer[1], 2) -} - -func TestFileConfig(t *testing.T) { - globalConfig := config.GlobalConfig{ - FileTagsPath: "test.json", - FileTagsInterval: 1, - } - InitFileConfig(&globalConfig) - testJSON := []byte(`{ - "test1": "test value1" - }`) - os.WriteFile("test.json", testJSON, 0644) - time.Sleep(2 * time.Second) - buffer := fileConfig.GetFileTags() - assert.Equal(t, buffer["test1"].(string), "test value1") - testJSON = []byte(`{ - "test2": "test value2" - }`) - os.WriteFile("test.json", testJSON, 0644) - time.Sleep(2 * time.Second) - buffer = fileConfig.GetFileTags() - assert.Equal(t, buffer["test2"].(string), "test value2") - - os.Remove("test.json") - StopFileConfig() -} - -func TestInstanceIdentity(t *testing.T) { - globalConfig := config.GlobalConfig{ - FileTagsPath: "test.json", - FileTagsInterval: 1, - AgentHostID: "test", - LoongCollectorDataDir: ".", - } - InitFileConfig(&globalConfig) - testJSON := []byte(`{ - "instance-id": "test1", - "owner-account-id": "test2", - "region-id": "test3", - "random-host": "test4", - "ecs-assist-machine-id": "test5" - }`) - os.WriteFile(fileConfig.instanceIdentityPath, testJSON, 0644) - time.Sleep(2 * time.Second) - identity := fileConfig.GetInstanceIdentity() - assert.Equal(t, identity.InstanceID, "test1") - assert.Equal(t, identity.OwnerAccountID, "test2") - assert.Equal(t, identity.RegionID, "test3") - assert.Equal(t, identity.RandomHostID, "test4") - assert.Equal(t, identity.ECSAssistMachineID, "test5") - assert.Equal(t, identity.GFlagHostID, "test") - - testJSON = []byte(`{ - "instance-id": "test6", - "owner-account-id": "test7", - "region-id": "test8", - "random-host": "test9", - "ecs-assist-machine-id": "test10" - }`) - os.WriteFile(fileConfig.instanceIdentityPath, testJSON, 0644) - time.Sleep(2 * time.Second) - identity = fileConfig.GetInstanceIdentity() - assert.Equal(t, identity.InstanceID, "test6") - assert.Equal(t, identity.OwnerAccountID, "test7") - assert.Equal(t, identity.RegionID, "test8") - assert.Equal(t, identity.RandomHostID, "test9") - assert.Equal(t, identity.ECSAssistMachineID, "test10") - assert.Equal(t, identity.GFlagHostID, "test") - - os.Remove(InstanceIdentityFilename) - StopFileConfig() -} diff --git a/pluginmanager/plugin_manager.go b/pluginmanager/plugin_manager.go index e948e7abd9..0b9133afe6 100644 --- a/pluginmanager/plugin_manager.go +++ b/pluginmanager/plugin_manager.go @@ -203,7 +203,6 @@ func StopBuiltInModulesConfig() { ContainerConfig = nil } CheckPointManager.Stop() - StopFileConfig() } // Stop stop the given config. ConfigName is with suffix. diff --git a/pluginmanager/processor_tag.go b/pluginmanager/processor_tag.go index db9009b510..61ab452ee9 100644 --- a/pluginmanager/processor_tag.go +++ b/pluginmanager/processor_tag.go @@ -30,14 +30,6 @@ const ( TagKeyCloudProvider ) -const ( - hostNameDefaultTagKey = "__hostname__" - hostIPDefaultTagKey = "__host_ip__" - hostIDDefaultTagKey = "__host_id__" - cloudProviderDefaultTagKey = "__cloud_provider__" - defaultConfigTagKeyValue = "__default__" -) - // Processor interface cannot meet the requirements of tag processing, so we need to create a special ProcessorTag struct type ProcessorTag struct { pipelineMetaTagKey map[TagKey]string @@ -69,12 +61,7 @@ func (p *ProcessorTag) ProcessV1(logCtx *pipeline.LogWithContext) { tagsMap[tag.Key] = tag.Value } } - // file tags p.addAllConfigurableTags(tagsMap) - fileTags := fileConfig.GetFileTags() - for k, v := range fileTags { - tagsMap[k] = v.(string) - } // env tags for i := 0; i < len(helper.EnvTags); i += 2 { if len(p.agentEnvMetaTagKey) == 0 && p.appendingAllEnvMetaTag { @@ -102,11 +89,6 @@ func (p *ProcessorTag) ProcessV2(in *models.PipelineGroupEvents) { for k, v := range tagsMap { in.Group.Tags.Add(k, v) } - fileTags := fileConfig.GetFileTags() - // file tags - for k, v := range fileTags { - in.Group.Tags.Add(k, v.(string)) - } // env tags for i := 0; i < len(helper.EnvTags); i += 2 { if len(p.agentEnvMetaTagKey) == 0 && p.appendingAllEnvMetaTag { @@ -136,19 +118,6 @@ func (p *ProcessorTag) parseDefaultAddedTag(configKey string, tagKey TagKey, def } } -func (p *ProcessorTag) parseOptionalTag(configKey string, tagKey TagKey, defaultKey string, config map[string]string) { - if customKey, ok := config[configKey]; ok { - if customKey != "" { - if customKey == defaultConfigTagKeyValue { - p.pipelineMetaTagKey[tagKey] = defaultKey - } else { - p.pipelineMetaTagKey[tagKey] = customKey - } - } - // empty value means delete - } -} - func (p *ProcessorTag) addTag(tagKey TagKey, value string, tags map[string]string) { if key, ok := p.pipelineMetaTagKey[tagKey]; ok { if key != "" { diff --git a/pluginmanager/processor_tag_helper.go b/pluginmanager/processor_tag_helper.go index 5e9a06e7ec..391f138ac8 100644 --- a/pluginmanager/processor_tag_helper.go +++ b/pluginmanager/processor_tag_helper.go @@ -18,29 +18,19 @@ package pluginmanager import "github.com/alibaba/ilogtail/pkg/util" +const ( + hostNameDefaultTagKey = "__hostname__" + hostIPDefaultTagKey = "__host_ip__" + defaultConfigTagKeyValue = "__default__" +) + func (p *ProcessorTag) parseAllConfigurableTags(pipelineMetaTagKey map[string]string) { p.parseDefaultAddedTag("HOST_NAME", TagKeyHostName, hostNameDefaultTagKey, pipelineMetaTagKey) p.parseDefaultAddedTag("HOST_IP", TagKeyHostIP, hostIPDefaultTagKey, pipelineMetaTagKey) - p.parseOptionalTag("HOST_ID", TagKeyHostID, hostIDDefaultTagKey, pipelineMetaTagKey) - p.parseOptionalTag("CLOUD_PROVIDER", TagKeyCloudProvider, cloudProviderDefaultTagKey, pipelineMetaTagKey) } // should keep same with C++ ProcessorTagNative::Process func (p *ProcessorTag) addAllConfigurableTags(tagsMap map[string]string) { p.addTag(TagKeyHostName, util.GetHostName(), tagsMap) p.addTag(TagKeyHostIP, util.GetIPAddress(), tagsMap) - instanceIdentity := fileConfig.GetInstanceIdentity() - if instanceIdentity != nil { - switch { - case instanceIdentity.InstanceID != "": - p.addTag(TagKeyHostID, instanceIdentity.InstanceID, tagsMap) - case instanceIdentity.ECSAssistMachineID != "": - p.addTag(TagKeyHostID, instanceIdentity.ECSAssistMachineID, tagsMap) - case instanceIdentity.RandomHostID != "": - p.addTag(TagKeyHostID, instanceIdentity.RandomHostID, tagsMap) - case instanceIdentity.GFlagHostID != "": - p.addTag(TagKeyHostID, instanceIdentity.GFlagHostID, tagsMap) - } - p.addTag(TagKeyCloudProvider, "infra", tagsMap) - } }