Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support singleton input #1933

Merged
merged 16 commits into from
Dec 11, 2024
2 changes: 2 additions & 0 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ using PipelineConfigDiff = ConfigDiff<PipelineConfig>;
using TaskConfigDiff = ConfigDiff<TaskConfig>;
using InstanceConfigDiff = ConfigDiff<InstanceConfig>;

enum ConfigDiffEnum { Added, Modified, Removed, Unchanged };

} // namespace logtail
16 changes: 16 additions & 0 deletions core/config/ConfigUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ bool IsConfigEnabled(const string& name, const Json::Value& detail) {
return true;
}

void GetAllInputTypes(const Json::Value& detail, std::vector<string>& inputTypes) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
const char* key = "inputs";
const Json::Value* inputs = detail.find(key, key + strlen(key));
if (inputs == nullptr || !inputs->isArray()) {
return;
}
for (const Json::Value& input : *inputs) {
const char* typeKey = "Type";
const Json::Value* type = input.find(typeKey, typeKey + strlen(typeKey));
if (type == nullptr || !type->isString()) {
continue;
}
inputTypes.push_back(type->asString());
}
}

ConfigType GetConfigType(const Json::Value& detail) {
return detail.isMember("task") ? ConfigType::Task : ConfigType::Pipeline;
}
Expand Down
1 change: 1 addition & 0 deletions core/config/ConfigUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ bool ParseConfigDetail(const std::string& content,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);
void GetAllInputTypes(const Json::Value& detail, std::vector<std::string>& inputTypes);
ConfigType GetConfigType(const Json::Value& detail);

} // namespace logtail
7 changes: 5 additions & 2 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ bool PipelineConfig::Parse() {
mRegion);
}
const string pluginType = it->asString();
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
mSingletonInput = pluginType;
}
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoInput = true;
Expand Down Expand Up @@ -238,10 +241,10 @@ bool PipelineConfig::Parse() {
}
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
if ((hasFileInput || !mSingletonInput.empty()) && (*mDetail)["inputs"].size() > 1) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 input_file or input_container_stdio plugin is given",
"more than 1 input_file or input_container_stdio or global singleton plugin is given",
noModule,
mName,
mProject,
Expand Down
3 changes: 2 additions & 1 deletion core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct PipelineConfig {
uint32_t mCreateTime = 0;
const Json::Value* mGlobal = nullptr;
std::vector<const Json::Value*> mInputs;
std::string mSingletonInput;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
std::vector<const Json::Value*> mProcessors;
std::vector<const Json::Value*> mAggregators;
std::vector<const Json::Value*> mFlushers;
Expand Down Expand Up @@ -67,7 +68,7 @@ struct PipelineConfig {
// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
// return !(mHasGoInput && !mHasNativeProcessor);
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && !mHasNativeFlusher)));
// // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && !mHasNativeFlusher)));
// }

bool HasGoPlugin() const { return mHasGoFlusher || mHasGoProcessor || mHasGoInput; }
Expand Down
126 changes: 99 additions & 27 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
SingletonConfigCache singletonCache;
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet);
InsertBuiltInPipelines(pDiff, tDiff, configSet, singletonCache);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet);
InsertPipelines(pDiff, tDiff, configSet, singletonCache);

for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
Expand Down Expand Up @@ -89,7 +90,8 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(

void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
unordered_set<string>& configSet,
SingletonConfigCache& singletonCache) {
#ifdef __ENTERPRISE__
const std::map<std::string, std::string>& builtInPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs();
Expand Down Expand Up @@ -120,7 +122,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipelineName));
continue;
}
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (pipleineDetail != iter->second) {
Expand Down Expand Up @@ -161,7 +163,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
Expand All @@ -175,7 +177,8 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet) {
std::unordered_set<std::string>& configSet,
SingletonConfigCache& singletonCache) {
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
Expand Down Expand Up @@ -231,7 +234,7 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName));
continue;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else if (iter->second.first != size || iter->second.second != mTime) {
Expand Down Expand Up @@ -270,20 +273,35 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
}
continue;
}
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object"));
}
}
}
for (const auto& [name, config] : singletonCache) {
if (config->diffEnum == ConfigDiffEnum::Added) {
pDiff.mAdded.push_back(std::move(config->config));
LOG_INFO(sLogger,
("new config found and passed topology check", "prepare to build pipeline")("config",
config->config.mName));
} else {
pDiff.mModified.push_back(std::move(config->config));
LOG_INFO(sLogger,
("existing invalid config modified and passed topology check",
"prepare to build pipeline")("config", config->config.mName));
}
}
}

bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
bool PipelineConfigWatcher::CheckAddedConfig(
const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_map<std::string, std::shared_ptr<PipelineConfigWithDiffInfo>>& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
PipelineConfig config(configName, std::move(configDetail));
Expand All @@ -297,9 +315,7 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
LOG_INFO(sLogger,
("new config found and passed topology check", "prepare to build pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache);
break;
}
case ConfigType::Task: {
Expand All @@ -319,10 +335,12 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
return true;
}

bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff) {
bool PipelineConfigWatcher::CheckModifiedConfig(
const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_map<std::string, std::shared_ptr<PipelineConfigWithDiffInfo>>& singletonCache) {
switch (GetConfigType(*configDetail)) {
case ConfigType::Pipeline: {
shared_ptr<Pipeline> p = mPipelineManager->FindConfigByName(configName);
Expand All @@ -341,10 +359,7 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mAdded.push_back(std::move(config));
LOG_INFO(sLogger,
("existing invalid config modified and passed topology check",
"prepare to build pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Modified, pDiff, singletonCache);
} else if (*configDetail != p->GetConfig()) {
PipelineConfig config(configName, std::move(configDetail));
if (!config.Parse()) {
Expand All @@ -360,10 +375,7 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
config.mRegion);
return false;
}
pDiff.mModified.push_back(std::move(config));
LOG_INFO(sLogger,
("existing valid config modified and passed topology check",
"prepare to rebuild pipeline")("config", configName));
PushPipelineConfig(std::move(config), ConfigDiffEnum::Modified, pDiff, singletonCache);
} else {
LOG_DEBUG(sLogger, ("existing valid config file modified, but no change found", "skip current object"));
}
Expand Down Expand Up @@ -412,4 +424,64 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}


void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config,
ConfigDiffEnum diffEnum,
PipelineConfigDiff& pDiff,
SingletonConfigCache& singletonCache) {
// singleton input
if (!config.mSingletonInput.empty()) {
if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified) {
auto it = singletonCache.find(config.mSingletonInput);
if (it != singletonCache.end()) {
if (it->second->config.mCreateTime < config.mCreateTime
|| (it->second->config.mCreateTime == config.mCreateTime
&& it->second->config.mName < config.mName)) {
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", config.mName)("inputType", config.mSingletonInput));
return;
}
if (mPipelineManager->FindConfigByName(it->second->config.mName)) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
pDiff.mRemoved.push_back(it->second->config.mName);
LOG_WARNING(
sLogger,
("existing valid config with global singleton plugin, but another older config or smaller "
"name config found",
"prepare to stop current running pipeline")("config", it->second->config.mName));
} else {
LOG_WARNING(sLogger,
("global singleton plugin found, but another older config or smaller name config "
"already exists",
"skip current object")("config", it->second->config.mName)("inputType",
config.mSingletonInput));
}
}
auto pipelineConfig = make_shared<PipelineConfigWithDiffInfo>(std::move(config), diffEnum);
singletonCache[pipelineConfig->config.mSingletonInput] = pipelineConfig;
return;
} else {
LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum));
}
}
// no singleton input
switch (diffEnum) {
case ConfigDiffEnum::Added:
pDiff.mAdded.push_back(std::move(config));
LOG_INFO(
sLogger,
("new config found and passed topology check", "prepare to build pipeline")("config", config.mName));
break;
case ConfigDiffEnum::Modified:
pDiff.mModified.push_back(std::move(config));
LOG_INFO(sLogger,
("existing invalid config modified and passed topology check",
"prepare to build pipeline")("config", config.mName));
break;
default:
break;
}
}

} // namespace logtail
35 changes: 31 additions & 4 deletions core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#pragma once

#include <filesystem>
#include <memory>
#include <unordered_map>
#include <unordered_set>

#include "config/ConfigDiff.h"
Expand All @@ -26,6 +29,14 @@ namespace logtail {
class PipelineManager;
class TaskPipelineManager;

struct PipelineConfigWithDiffInfo {
PipelineConfig config;
ConfigDiffEnum diffEnum;
PipelineConfigWithDiffInfo(PipelineConfig&& config, ConfigDiffEnum diffEnum)
: config(std::move(config)), diffEnum(diffEnum) {}
};
using SingletonConfigCache = std::unordered_map<std::string, std::shared_ptr<PipelineConfigWithDiffInfo>>;

class PipelineConfigWatcher : public ConfigWatcher {
public:
PipelineConfigWatcher(const PipelineConfigWatcher&) = delete;
Expand All @@ -46,19 +57,35 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertBuiltInPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet,
SingletonConfigCache& singletonCache);
void InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet,
SingletonConfigCache& singletonCache);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff);
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache);
bool CheckModifiedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff);
TaskConfigDiff& tDiff,
SingletonConfigCache& singletonCache);
void PushPipelineConfig(PipelineConfig&& config,
ConfigDiffEnum diffEnum,
PipelineConfigDiff& pDiff,
SingletonConfigCache& singletonCache);

const PipelineManager* mPipelineManager = nullptr;
const TaskPipelineManager* mTaskPipelineManager = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineConfigWatcherUnittest;
#endif
};

} // namespace logtail
Loading
Loading