From ad9870ee05bd9edcfd8a0e1b7d39d3f94bad800a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Tue, 26 Nov 2024 09:25:04 +0000 Subject: [PATCH] inner pipeline --- core/config/watcher/ConfigWatcher.cpp | 1 + core/config/watcher/ConfigWatcher.h | 1 + core/config/watcher/PipelineConfigWatcher.cpp | 88 ++++++++++++++++++- core/config/watcher/PipelineConfigWatcher.h | 3 + core/monitor/Monitor.cpp | 47 ++++++++++ core/monitor/Monitor.h | 5 ++ 6 files changed, 144 insertions(+), 1 deletion(-) diff --git a/core/config/watcher/ConfigWatcher.cpp b/core/config/watcher/ConfigWatcher.cpp index fae78869b0..dd29d2ff63 100644 --- a/core/config/watcher/ConfigWatcher.cpp +++ b/core/config/watcher/ConfigWatcher.cpp @@ -29,6 +29,7 @@ void ConfigWatcher::AddSource(const string& dir, mutex* mux) { void ConfigWatcher::ClearEnvironment() { mSourceDir.clear(); mFileInfoMap.clear(); + mInnerConfigMap.clear(); } #endif diff --git a/core/config/watcher/ConfigWatcher.h b/core/config/watcher/ConfigWatcher.h index 0b43d18cbb..b95a0f611d 100644 --- a/core/config/watcher/ConfigWatcher.h +++ b/core/config/watcher/ConfigWatcher.h @@ -43,6 +43,7 @@ class ConfigWatcher { std::vector mSourceDir; std::map mDirMutexMap; std::map> mFileInfoMap; + std::map mInnerConfigMap; }; } // namespace logtail diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 40118cbc47..4199611f7b 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -15,11 +15,11 @@ #include "config/watcher/PipelineConfigWatcher.h" #include -#include #include "common/FileSystemUtil.h" #include "config/ConfigUtil.h" #include "logger/Logger.h" +#include "monitor/Monitor.h" #include "pipeline/PipelineManager.h" #include "task_pipeline/TaskPipelineManager.h" @@ -37,6 +37,9 @@ pair PipelineConfigWatcher::CheckConfigDiff( PipelineConfigDiff pDiff; TaskConfigDiff tDiff; unordered_set configSet; + // inner configs + InsertInnerPipelines(pDiff, tDiff, configSet); + // configs from file for (const auto& dir : mSourceDir) { error_code ec; filesystem::file_status s = filesystem::status(dir, ec); @@ -180,6 +183,89 @@ pair PipelineConfigWatcher::CheckConfigDiff( return make_pair(std::move(pDiff), std::move(tDiff)); } +void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + unordered_set& configSet) { + std::map innerPipelines; + // self-monitor metric + innerPipelines[LoongCollectorMonitor::GetInnerSelfMonitorMetricPipelineName()] + = LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline(); + + // process + for (const auto& pipeline : innerPipelines) { + if (configSet.find(pipeline.first) != configSet.end()) { + LOG_WARNING(sLogger, + ("more than 1 config with the same name is found", "skip current config")("inner pipeline", + pipeline.first)); + continue; + } + configSet.insert(pipeline.first); + + string errorMsg; + auto iter = mInnerConfigMap.find(pipeline.first); + if (iter == mInnerConfigMap.end()) { + mInnerConfigMap[pipeline.first] = pipeline.second; + unique_ptr detail = make_unique(); + if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) { + LOG_WARNING(sLogger, + ("config format error", "skip current object")("error msg", errorMsg)("inner pipeline", + pipeline.first)); + continue; + } + if (!IsConfigEnabled(pipeline.first, *detail)) { + LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipeline.first)); + continue; + } + if (!CheckAddedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) { + continue; + } + } else if (pipeline.second != iter->second) { + mInnerConfigMap[pipeline.first] = pipeline.second; + unique_ptr detail = make_unique(); + if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) { + LOG_WARNING(sLogger, + ("config format error", "skip current object")("error msg", errorMsg)("inner pipeline", + pipeline.first)); + continue; + } + if (!IsConfigEnabled(pipeline.first, *detail)) { + switch (GetConfigType(*detail)) { + case ConfigType::Pipeline: + if (mPipelineManager->FindConfigByName(pipeline.first)) { + pDiff.mRemoved.push_back(pipeline.first); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running pipeline")("config", pipeline.first)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", pipeline.first)); + } + break; + case ConfigType::Task: + if (mTaskPipelineManager->FindPipelineByName(pipeline.first)) { + tDiff.mRemoved.push_back(pipeline.first); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running task")("config", pipeline.first)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", pipeline.first)); + } + break; + } + continue; + } + if (!CheckModifiedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) { + continue; + } + } else { + LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object")); + } + } +} + bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, unique_ptr&& configDetail, PipelineConfigDiff& pDiff, diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index d1f77967fe..081c2e2044 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -16,6 +16,8 @@ #pragma once +#include + #include "config/ConfigDiff.h" #include "config/watcher/ConfigWatcher.h" @@ -44,6 +46,7 @@ class PipelineConfigWatcher : public ConfigWatcher { PipelineConfigWatcher(); ~PipelineConfigWatcher() = default; + void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); bool CheckAddedConfig(const std::string& configName, std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 5d9f40e74e..4e8d47938f 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -759,4 +759,51 @@ void LoongCollectorMonitor::Stop() { } +const string LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline() { +#ifdef __ENTERPRISE__ + static string pipeline = ""; +#else + static string pipeline = R"( + { + "inputs": [ + { + "Type": "input_self_monitor_metric", + "Agent": { + "Enable": false, + "Interval": 1 + }, + "Runner": { + "Enable": false, + "Interval": 1 + }, + "Pipeline": { + "Enable": true, + "Interval": 1 + }, + "PluginSource": { + "Enable": true, + "Interval": 10 + }, + "Plugin": { + "Enable": false, + "Interval": 10 + }, + "Component": { + "Enable": false, + "Interval": 10 + } + } + ], + "flushers": [ + { + "Type": "flusher_local_file", + "FileName": "./log/self_metrics.log" + } + ] + } + )"; +#endif + return pipeline; +} + } // namespace logtail diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index c66a47218e..372407d8f1 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -192,6 +192,11 @@ class LoongCollectorMonitor { void Init(); void Stop(); + static const std::string GetInnerSelfMonitorAlarmPipelineName() { return ""; } + static const std::string GetInnerSelfMonitorAlarmPipeline() { return ""; } + static const std::string GetInnerSelfMonitorMetricPipelineName() { return "inner-self-monitor-metric-pipeline"; } + static const std::string GetInnerSelfMonitorMetricPipeline(); + void SetAgentCpu(double cpu) { mAgentCpu->Set(cpu); } void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); } void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); }