From dccfddff6d2dd04ed97671d0dc5d212c95a13ea4 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Thu, 5 Dec 2024 14:55:35 +0800 Subject: [PATCH] fix --- core/config/ConfigUtil.h | 1 - core/config/PipelineConfig.cpp | 14 +++- core/config/watcher/PipelineConfigWatcher.cpp | 78 ++++++++++++------- 3 files changed, 63 insertions(+), 30 deletions(-) diff --git a/core/config/ConfigUtil.h b/core/config/ConfigUtil.h index 1d1c708b42..c5fd7d0ae2 100644 --- a/core/config/ConfigUtil.h +++ b/core/config/ConfigUtil.h @@ -31,7 +31,6 @@ bool ParseConfigDetail(const std::string& content, Json::Value& detail, std::string& errorMsg); bool IsConfigEnabled(const std::string& name, const Json::Value& detail); -void GetAllInputTypes(const Json::Value& detail, std::vector& inputTypes); ConfigType GetConfigType(const Json::Value& detail); } // namespace logtail diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index 1d86a97927..223ad46778 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -193,6 +193,16 @@ bool PipelineConfig::Parse() { const string pluginType = it->asString(); if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) { mSingletonInput = pluginType; + if (itr->size() > 1) { + PARAM_ERROR_RETURN(sLogger, + alarm, + "more than 1 input plugin is given when global singleton input plugin is used", + noModule, + mName, + mProject, + mLogstore, + mRegion); + } } if (i == 0) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { @@ -241,10 +251,10 @@ bool PipelineConfig::Parse() { } } // TODO: remove these special restrictions - if ((hasFileInput || !mSingletonInput.empty()) && (*mDetail)["inputs"].size() > 1) { + if ((hasFileInput) && (*mDetail)["inputs"].size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, - "more than 1 input_file or input_container_stdio or global singleton plugin is given", + "more than 1 input_file or input_container_stdio is given", noModule, mName, mProject, diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 13740e4401..5946ad5c42 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -169,6 +169,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff, } } else { LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object")); + CheckUnchangedConfig(pipelineName, path, pDiff, tDiff, singletonCache); } } #else @@ -419,7 +420,6 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, return true; } - bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, const filesystem::path& path, PipelineConfigDiff& pDiff, @@ -427,14 +427,14 @@ bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, SingletonConfigCache& singletonCache) { auto pipeline = mPipelineManager->FindConfigByName(configName); auto task = mTaskPipelineManager->FindPipelineByName(configName).get(); - if (pipeline) { + if (task) { + return true; + } else if (pipeline) { // running pipeline in last config update std::unique_ptr configDetail = make_unique(); PipelineConfig config(configName, std::move(configDetail)); config.mCreateTime = pipeline->GetContext().GetCreateTime(); config.mSingletonInput = pipeline->GetSingletonInput(); PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache); - } else if (task) { - return true; } else { // low priority singleton input in last config update, sort it again unique_ptr detail = make_unique(); if (!LoadConfigDetailFromFile(path, *detail)) { @@ -444,9 +444,21 @@ bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, LOG_INFO(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName)); return false; } - if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) { + PipelineConfig config(configName, std::move(detail)); + if (!config.Parse()) { + LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)); + AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM, + "new config found but invalid: skip current object, config: " + + configName, + config.mProject, + config.mLogstore, + config.mRegion); return false; } + if (!config.mSingletonInput.empty()) { + singletonCache[config.mSingletonInput].push_back( + make_shared(std::move(config), ConfigDiffEnum::Added)); + } } return true; } @@ -494,30 +506,42 @@ void PipelineConfigWatcher::CheckSingletonInput(PipelineConfigDiff& pDiff, Singl const auto& diffEnum = configs[i]->diffEnum; const auto& configName = configs[i]->config.mName; if (i == 0) { - if (diffEnum == ConfigDiffEnum::Added) { - LOG_INFO(sLogger, - ("new config with singleton input found and passed topology check", - "prepare to build pipeline")("config", configName)); - pDiff.mAdded.push_back(std::move(configs[0]->config)); - } else if (diffEnum == ConfigDiffEnum::Modified) { - LOG_INFO(sLogger, - ("existing config with singleton input modified and passed topology check", - "prepare to build pipeline")("config", configName)); - pDiff.mModified.push_back(std::move(configs[0]->config)); + switch (diffEnum) { + // greatest priority config + case ConfigDiffEnum::Added: + LOG_INFO(sLogger, + ("new config with singleton input found and passed topology check", + "prepare to build pipeline")("config", configName)); + pDiff.mAdded.push_back(std::move(configs[0]->config)); + break; + case ConfigDiffEnum::Modified: + LOG_INFO(sLogger, + ("existing config with singleton input modified and passed topology check", + "prepare to build pipeline")("config", configName)); + pDiff.mModified.push_back(std::move(configs[0]->config)); + break; + default: + break; } } else { - if (diffEnum == ConfigDiffEnum::Modified) { - LOG_WARNING(sLogger, - ("global singleton plugin found, but another older config or smaller name config " - "already exists", - "skip current object")("config", configName)); - pDiff.mRemoved.push_back(configName); - } else if (diffEnum == ConfigDiffEnum::Unchanged) { - LOG_WARNING(sLogger, - ("existing valid config with global singleton plugin, but another older config or " - "smaller name config found", - "prepare to stop current running pipeline")("config", configName)); - pDiff.mRemoved.push_back(configName); + // other low priority configs + switch (diffEnum) { + case ConfigDiffEnum::Modified: + LOG_WARNING(sLogger, + ("global singleton plugin found, but another older config or smaller name config " + "already exists", + "skip current object")("config", configName)); + pDiff.mRemoved.push_back(configName); + break; + case ConfigDiffEnum::Unchanged: + LOG_WARNING(sLogger, + ("existing valid config with global singleton plugin, but another older config or " + "smaller name config found", + "prepare to stop current running pipeline")("config", configName)); + pDiff.mRemoved.push_back(configName); + break; + default: + break; } } }