Skip to content

Commit

Permalink
feat: support singleton input (#1933)
Browse files Browse the repository at this point in the history
* feat: support singleton input

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
Abingcbc authored Dec 11, 2024
1 parent c4a0666 commit fc6820f
Show file tree
Hide file tree
Showing 13 changed files with 1,770 additions and 79 deletions.
2 changes: 2 additions & 0 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ using PipelineConfigDiff = ConfigDiff<PipelineConfig>;
using TaskConfigDiff = ConfigDiff<TaskConfig>;
using InstanceConfigDiff = ConfigDiff<InstanceConfig>;

enum ConfigDiffEnum { Added, Modified, Removed, Unchanged };

} // namespace logtail
16 changes: 15 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ bool PipelineConfig::Parse() {
mRegion);
}
const string pluginType = it->asString();
// when input is singleton, there should only one input to simpify config load transaction
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
mSingletonInput = pluginType;
if (itr->size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input plugin is given when global singleton input plugin is used",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
}
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoInput = true;
Expand Down Expand Up @@ -241,7 +255,7 @@ bool PipelineConfig::Parse() {
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input_file or input_container_stdio plugin is given",
"more than 1 input_file or input_container_stdio is given",
noModule,
mName,
mProject,
Expand Down
7 changes: 5 additions & 2 deletions core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct PipelineConfig {
uint32_t mCreateTime = 0;
const Json::Value* mGlobal = nullptr;
std::vector<const Json::Value*> mInputs;
std::optional<std::string> mSingletonInput;
std::vector<const Json::Value*> mProcessors;
std::vector<const Json::Value*> mAggregators;
std::vector<const Json::Value*> mFlushers;
Expand All @@ -49,7 +50,8 @@ struct PipelineConfig {
std::string mLogstore;
std::string mRegion;

PipelineConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail) : mName(name), mDetail(std::move(detail)) {}
PipelineConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail)
: mName(name), mDetail(std::move(detail)) {}

bool Parse();

Expand All @@ -67,7 +69,8 @@ struct PipelineConfig {
// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
// return !(mHasGoInput && !mHasNativeProcessor);
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && !mHasNativeFlusher)));
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher &&
// !mHasNativeFlusher)));
// }

bool HasGoPlugin() const { return mHasGoFlusher || mHasGoProcessor || mHasGoInput; }
Expand Down
165 changes: 150 additions & 15 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
SingletonConfigCache singletonCache;
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet);
InsertBuiltInPipelines(pDiff, tDiff, configSet, singletonCache);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet, singletonCache);

CheckSingletonInput(pDiff, singletonCache);
for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
pDiff.mRemoved.push_back(name);
Expand Down Expand Up @@ -88,8 +90,9 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
}

void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
TaskConfigDiff& tDiff,
unordered_set<string>& configSet,
SingletonConfigCache& singletonCache) {
#ifdef __ENTERPRISE__
const std::map<std::string, std::string>& builtInPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs();
Expand Down Expand Up @@ -120,7 +123,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipelineName));
continue;
}
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (pipleineDetail != iter->second) {
Expand Down Expand Up @@ -161,7 +164,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
Expand All @@ -175,7 +178,8 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet) {
std::unordered_set<std::string>& configSet,
SingletonConfigCache& singletonCache) {
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
Expand Down Expand Up @@ -231,7 +235,7 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName));
continue;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (iter->second.first != size || iter->second.second != mTime) {
Expand Down Expand Up @@ -270,11 +274,12 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object"));
CheckUnchangedConfig(configName, path, pDiff, tDiff, singletonCache);
}
}
}
Expand All @@ -283,7 +288,8 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
PipelineConfig config(configName, std::move(configDetail));
Expand All @@ -297,7 +303,7 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache);
LOG_INFO(sLogger,
("new config found and passed topology check", "prepare to build pipeline")("config", configName));
break;
Expand All @@ -322,7 +328,8 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
shared_ptr<Pipeline> p = mPipelineManager->FindConfigByName(configName);
Expand All @@ -341,10 +348,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
LOG_INFO(sLogger,
("existing invalid config modified and passed topology check",
"prepare to build pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache);
} else if (*configDetail != p->GetConfig()) {
PipelineConfig config(configName, std::move(configDetail));
if (!config.Parse()) {
Expand All @@ -360,10 +367,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mModified.push_back(std::move(config));
LOG_INFO(sLogger,
("existing valid config modified and passed topology check",
"prepare to rebuild pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Modified, pDiff, singletonCache);
} else {
LOG_DEBUG(sLogger, ("existing valid config file modified, but no change found", "skip current object"));
}
Expand Down Expand Up @@ -412,4 +419,132 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}

bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName,
const filesystem::path& path,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
auto pipeline = mPipelineManager->FindConfigByName(configName);
auto task = mTaskPipelineManager->FindPipelineByName(configName).get();
if (task) {
return true;
} else if (pipeline) { // running pipeline in last config update
std::unique_ptr<Json::Value> configDetail = make_unique<Json::Value>();
PipelineConfig config(configName, std::move(configDetail));
config.mCreateTime = pipeline->GetContext().GetCreateTime();
config.mSingletonInput = pipeline->GetSingletonInput();
PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache);
} else { // low priority singleton input in last config update, sort it again
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
return false;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName));
return false;
}
PipelineConfig config(configName, std::move(detail));
if (!config.Parse()) {
LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName));
AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM,
"new config found but invalid: skip current object, config: "
+ configName,
config.mProject,
config.mLogstore,
config.mRegion);
return false;
}
if (config.mSingletonInput) {
singletonCache[config.mSingletonInput.value()].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), ConfigDiffEnum::Added));
}
}
return true;
}

void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config,
ConfigDiffEnum diffEnum,
PipelineConfigDiff& pDiff,
SingletonConfigCache& singletonCache) {
// singleton input
if (config.mSingletonInput) {
if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified
|| diffEnum == ConfigDiffEnum::Unchanged) {
singletonCache[config.mSingletonInput.value()].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), diffEnum));
} else {
LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum));
}
return;
}
// no singleton input
switch (diffEnum) {
case ConfigDiffEnum::Added:
pDiff.mAdded.push_back(std::move(config));
break;
case ConfigDiffEnum::Modified:
pDiff.mModified.push_back(std::move(config));
break;
default:
break;
}
}

void PipelineConfigWatcher::CheckSingletonInput(PipelineConfigDiff& pDiff, SingletonConfigCache& singletonCache) {
for (auto& [name, configs] : singletonCache) {
std::sort(configs.begin(),
configs.end(),
[](const std::shared_ptr<PipelineConfigWithDiffInfo>& a,
const std::shared_ptr<PipelineConfigWithDiffInfo>& b) -> bool {
if (a->config.mCreateTime == b->config.mCreateTime) {
return a->config.mName < b->config.mName;
}
return a->config.mCreateTime < b->config.mCreateTime;
});
for (size_t i = 0; i < configs.size(); ++i) {
const auto& diffEnum = configs[i]->diffEnum;
const auto& configName = configs[i]->config.mName;
if (i == 0) {
switch (diffEnum) {
// greatest priority config
case ConfigDiffEnum::Added:
LOG_INFO(sLogger,
("new config with singleton input found and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mAdded.push_back(std::move(configs[0]->config));
break;
case ConfigDiffEnum::Modified:
LOG_INFO(sLogger,
("existing config with singleton input modified and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mModified.push_back(std::move(configs[0]->config));
break;
default:
break;
}
} else {
// other low priority configs
switch (diffEnum) {
case ConfigDiffEnum::Modified:
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
case ConfigDiffEnum::Unchanged:
LOG_WARNING(sLogger,
("existing valid config with global singleton plugin, but another older config or "
"smaller name config found",
"prepare to stop current running pipeline")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
default:
break;
}
}
}
}
}

} // namespace logtail
Loading

0 comments on commit fc6820f

Please sign in to comment.