Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 5, 2024
1 parent 0c41858 commit dccfddf
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
1 change: 0 additions & 1 deletion core/config/ConfigUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ bool ParseConfigDetail(const std::string& content,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);
void GetAllInputTypes(const Json::Value& detail, std::vector<std::string>& inputTypes);
ConfigType GetConfigType(const Json::Value& detail);

} // namespace logtail
14 changes: 12 additions & 2 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ bool PipelineConfig::Parse() {
const string pluginType = it->asString();
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
mSingletonInput = pluginType;
if (itr->size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input plugin is given when global singleton input plugin is used",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
}
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
Expand Down Expand Up @@ -241,10 +251,10 @@ bool PipelineConfig::Parse() {
}
}
// TODO: remove these special restrictions
if ((hasFileInput || !mSingletonInput.empty()) && (*mDetail)["inputs"].size() > 1) {
if ((hasFileInput) && (*mDetail)["inputs"].size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input_file or input_container_stdio or global singleton plugin is given",
"more than 1 input_file or input_container_stdio is given",
noModule,
mName,
mProject,
Expand Down
78 changes: 51 additions & 27 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
}
} else {
LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object"));
CheckUnchangedConfig(pipelineName, path, pDiff, tDiff, singletonCache);
}
}
#else
Expand Down Expand Up @@ -419,22 +420,21 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}


bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName,
const filesystem::path& path,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
auto pipeline = mPipelineManager->FindConfigByName(configName);
auto task = mTaskPipelineManager->FindPipelineByName(configName).get();
if (pipeline) {
if (task) {
return true;
} else if (pipeline) { // running pipeline in last config update
std::unique_ptr<Json::Value> configDetail = make_unique<Json::Value>();
PipelineConfig config(configName, std::move(configDetail));
config.mCreateTime = pipeline->GetContext().GetCreateTime();
config.mSingletonInput = pipeline->GetSingletonInput();
PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache);
} else if (task) {
return true;
} else { // low priority singleton input in last config update, sort it again
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
Expand All @@ -444,9 +444,21 @@ bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName,
LOG_INFO(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName));
return false;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
PipelineConfig config(configName, std::move(detail));
if (!config.Parse()) {
LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName));
AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM,
"new config found but invalid: skip current object, config: "
+ configName,
config.mProject,
config.mLogstore,
config.mRegion);
return false;
}
if (!config.mSingletonInput.empty()) {
singletonCache[config.mSingletonInput].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), ConfigDiffEnum::Added));
}
}
return true;
}
Expand Down Expand Up @@ -494,30 +506,42 @@ void PipelineConfigWatcher::CheckSingletonInput(PipelineConfigDiff& pDiff, Singl
const auto& diffEnum = configs[i]->diffEnum;
const auto& configName = configs[i]->config.mName;
if (i == 0) {
if (diffEnum == ConfigDiffEnum::Added) {
LOG_INFO(sLogger,
("new config with singleton input found and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mAdded.push_back(std::move(configs[0]->config));
} else if (diffEnum == ConfigDiffEnum::Modified) {
LOG_INFO(sLogger,
("existing config with singleton input modified and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mModified.push_back(std::move(configs[0]->config));
switch (diffEnum) {
// greatest priority config
case ConfigDiffEnum::Added:
LOG_INFO(sLogger,
("new config with singleton input found and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mAdded.push_back(std::move(configs[0]->config));
break;
case ConfigDiffEnum::Modified:
LOG_INFO(sLogger,
("existing config with singleton input modified and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mModified.push_back(std::move(configs[0]->config));
break;
default:
break;
}
} else {
if (diffEnum == ConfigDiffEnum::Modified) {
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", configName));
pDiff.mRemoved.push_back(configName);
} else if (diffEnum == ConfigDiffEnum::Unchanged) {
LOG_WARNING(sLogger,
("existing valid config with global singleton plugin, but another older config or "
"smaller name config found",
"prepare to stop current running pipeline")("config", configName));
pDiff.mRemoved.push_back(configName);
// other low priority configs
switch (diffEnum) {
case ConfigDiffEnum::Modified:
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
case ConfigDiffEnum::Unchanged:
LOG_WARNING(sLogger,
("existing valid config with global singleton plugin, but another older config or "
"smaller name config found",
"prepare to stop current running pipeline")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
default:
break;
}
}
}
Expand Down

0 comments on commit dccfddf

Please sign in to comment.