Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support singleton input #1933

Merged
merged 16 commits into from
Dec 11, 2024
2 changes: 2 additions & 0 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ using PipelineConfigDiff = ConfigDiff<PipelineConfig>;
using TaskConfigDiff = ConfigDiff<TaskConfig>;
using InstanceConfigDiff = ConfigDiff<InstanceConfig>;

enum ConfigDiffEnum { Added, Modified, Removed, Unchanged };

} // namespace logtail
15 changes: 14 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ bool PipelineConfig::Parse() {
mRegion);
}
const string pluginType = it->asString();
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
mSingletonInput = pluginType;
if (itr->size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input plugin is given when global singleton input plugin is used",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
}
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoInput = true;
Expand Down Expand Up @@ -241,7 +254,7 @@ bool PipelineConfig::Parse() {
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input_file or input_container_stdio plugin is given",
"more than 1 input_file or input_container_stdio is given",
noModule,
mName,
mProject,
Expand Down
7 changes: 5 additions & 2 deletions core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct PipelineConfig {
uint32_t mCreateTime = 0;
const Json::Value* mGlobal = nullptr;
std::vector<const Json::Value*> mInputs;
std::string mSingletonInput;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
std::vector<const Json::Value*> mProcessors;
std::vector<const Json::Value*> mAggregators;
std::vector<const Json::Value*> mFlushers;
Expand All @@ -49,7 +50,8 @@ struct PipelineConfig {
std::string mLogstore;
std::string mRegion;

PipelineConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail) : mName(name), mDetail(std::move(detail)) {}
PipelineConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail)
: mName(name), mDetail(std::move(detail)) {}

bool Parse();

Expand All @@ -67,7 +69,8 @@ struct PipelineConfig {
// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
// return !(mHasGoInput && !mHasNativeProcessor);
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && !mHasNativeFlusher)));
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher &&
// !mHasNativeFlusher)));
// }

bool HasGoPlugin() const { return mHasGoFlusher || mHasGoProcessor || mHasGoInput; }
Expand Down
165 changes: 150 additions & 15 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
SingletonConfigCache singletonCache;
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet);
InsertBuiltInPipelines(pDiff, tDiff, configSet, singletonCache);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet, singletonCache);

CheckSingletonInput(pDiff, singletonCache);
for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
pDiff.mRemoved.push_back(name);
Expand Down Expand Up @@ -88,8 +90,9 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
}

void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
TaskConfigDiff& tDiff,
unordered_set<string>& configSet,
SingletonConfigCache& singletonCache) {
#ifdef __ENTERPRISE__
const std::map<std::string, std::string>& builtInPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs();
Expand Down Expand Up @@ -120,7 +123,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipelineName));
continue;
}
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (pipleineDetail != iter->second) {
Expand Down Expand Up @@ -161,7 +164,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
Expand All @@ -175,7 +178,8 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet) {
std::unordered_set<std::string>& configSet,
SingletonConfigCache& singletonCache) {
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
Expand Down Expand Up @@ -231,7 +235,7 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName));
continue;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (iter->second.first != size || iter->second.second != mTime) {
Expand Down Expand Up @@ -270,11 +274,12 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object"));
CheckUnchangedConfig(configName, path, pDiff, tDiff, singletonCache);
}
}
}
Expand All @@ -283,7 +288,8 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
PipelineConfig config(configName, std::move(configDetail));
Expand All @@ -297,7 +303,7 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache);
LOG_INFO(sLogger,
("new config found and passed topology check", "prepare to build pipeline")("config", configName));
break;
Expand All @@ -322,7 +328,8 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
shared_ptr<Pipeline> p = mPipelineManager->FindConfigByName(configName);
Expand All @@ -341,10 +348,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
LOG_INFO(sLogger,
("existing invalid config modified and passed topology check",
"prepare to build pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache);
} else if (*configDetail != p->GetConfig()) {
PipelineConfig config(configName, std::move(configDetail));
if (!config.Parse()) {
Expand All @@ -360,10 +367,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mModified.push_back(std::move(config));
LOG_INFO(sLogger,
("existing valid config modified and passed topology check",
"prepare to rebuild pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Modified, pDiff, singletonCache);
} else {
LOG_DEBUG(sLogger, ("existing valid config file modified, but no change found", "skip current object"));
}
Expand Down Expand Up @@ -412,4 +419,132 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}

bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName,
const filesystem::path& path,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache) {
auto pipeline = mPipelineManager->FindConfigByName(configName);
auto task = mTaskPipelineManager->FindPipelineByName(configName).get();
if (task) {
return true;
} else if (pipeline) { // running pipeline in last config update
std::unique_ptr<Json::Value> configDetail = make_unique<Json::Value>();
PipelineConfig config(configName, std::move(configDetail));
config.mCreateTime = pipeline->GetContext().GetCreateTime();
config.mSingletonInput = pipeline->GetSingletonInput();
PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache);
} else { // low priority singleton input in last config update, sort it again
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
return false;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName));
return false;
}
PipelineConfig config(configName, std::move(detail));
if (!config.Parse()) {
LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName));
AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM,
"new config found but invalid: skip current object, config: "
+ configName,
config.mProject,
config.mLogstore,
config.mRegion);
return false;
}
if (!config.mSingletonInput.empty()) {
singletonCache[config.mSingletonInput].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), ConfigDiffEnum::Added));
}
}
return true;
}

void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config,
ConfigDiffEnum diffEnum,
PipelineConfigDiff& pDiff,
SingletonConfigCache& singletonCache) {
// singleton input
if (!config.mSingletonInput.empty()) {
if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified
|| diffEnum == ConfigDiffEnum::Unchanged) {
singletonCache[config.mSingletonInput].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), diffEnum));
} else {
LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum));
}
return;
}
// no singleton input
switch (diffEnum) {
case ConfigDiffEnum::Added:
pDiff.mAdded.push_back(std::move(config));
break;
case ConfigDiffEnum::Modified:
pDiff.mModified.push_back(std::move(config));
break;
default:
break;
}
}

void PipelineConfigWatcher::CheckSingletonInput(PipelineConfigDiff& pDiff, SingletonConfigCache& singletonCache) {
for (auto& [name, configs] : singletonCache) {
std::sort(configs.begin(),
configs.end(),
[](const std::shared_ptr<PipelineConfigWithDiffInfo>& a,
const std::shared_ptr<PipelineConfigWithDiffInfo>& b) -> bool {
if (a->config.mCreateTime == b->config.mCreateTime) {
return a->config.mName < b->config.mName;
}
return a->config.mCreateTime < b->config.mCreateTime;
});
for (size_t i = 0; i < configs.size(); ++i) {
const auto& diffEnum = configs[i]->diffEnum;
const auto& configName = configs[i]->config.mName;
if (i == 0) {
switch (diffEnum) {
// greatest priority config
case ConfigDiffEnum::Added:
LOG_INFO(sLogger,
("new config with singleton input found and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mAdded.push_back(std::move(configs[0]->config));
break;
case ConfigDiffEnum::Modified:
LOG_INFO(sLogger,
("existing config with singleton input modified and passed topology check",
"prepare to build pipeline")("config", configName));
pDiff.mModified.push_back(std::move(configs[0]->config));
break;
default:
break;
}
} else {
// other low priority configs
switch (diffEnum) {
case ConfigDiffEnum::Modified:
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
case ConfigDiffEnum::Unchanged:
LOG_WARNING(sLogger,
("existing valid config with global singleton plugin, but another older config or "
"smaller name config found",
"prepare to stop current running pipeline")("config", configName));
pDiff.mRemoved.push_back(configName);
break;
default:
break;
}
}
}
}
}

} // namespace logtail
Loading
Loading