Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 10, 2024
1 parent 3abe1ce commit 8559f64
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 10 deletions.
1 change: 1 addition & 0 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bool PipelineConfig::Parse() {
mRegion);
}
const string pluginType = it->asString();
// when input is singleton, there should only one input to simpify config load transaction
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) {
mSingletonInput = pluginType;
if (itr->size() > 1) {
Expand Down
2 changes: 1 addition & 1 deletion core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct PipelineConfig {
uint32_t mCreateTime = 0;
const Json::Value* mGlobal = nullptr;
std::vector<const Json::Value*> mInputs;
std::string mSingletonInput;
std::optional<std::string> mSingletonInput;
std::vector<const Json::Value*> mProcessors;
std::vector<const Json::Value*> mAggregators;
std::vector<const Json::Value*> mFlushers;
Expand Down
8 changes: 4 additions & 4 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,8 @@ bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName,
config.mRegion);
return false;
}
if (!config.mSingletonInput.empty()) {
singletonCache[config.mSingletonInput].push_back(
if (config.mSingletonInput) {
singletonCache[config.mSingletonInput.value()].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), ConfigDiffEnum::Added));
}
}
Expand All @@ -467,10 +467,10 @@ void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config,
PipelineConfigDiff& pDiff,
SingletonConfigCache& singletonCache) {
// singleton input
if (!config.mSingletonInput.empty()) {
if (config.mSingletonInput) {
if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified
|| diffEnum == ConfigDiffEnum::Unchanged) {
singletonCache[config.mSingletonInput].push_back(
singletonCache[config.mSingletonInput.value()].push_back(
make_shared<PipelineConfigWithDiffInfo>(std::move(config), diffEnum));
} else {
LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum));
Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Pipeline {
const std::string& Name() const { return mName; }
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::string& GetSingletonInput() const { return mSingletonInput; }
const std::optional<std::string>& GetSingletonInput() const { return mSingletonInput; }
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
Expand Down Expand Up @@ -101,7 +101,7 @@ class Pipeline {
mutable PipelineContext mContext;
std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>> mPluginCntMap;
std::unique_ptr<Json::Value> mConfig;
std::string mSingletonInput;
std::optional<std::string> mSingletonInput;
std::atomic_uint16_t mPluginID;
std::atomic_int16_t mInProcessCnt;

Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSinglet
RegisterCreator(INPUT_PLUGIN, creator, isSingleton);
}

void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(PROCESSOR_PLUGIN, creator, isSingleton);
void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) {
RegisterCreator(PROCESSOR_PLUGIN, creator, false);
}

void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) {
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/plugin/PluginRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PluginRegistry {
void LoadStaticPlugins();
void LoadDynamicPlugins(const std::set<std::string>& plugins);
void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterProcessorCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterProcessorCreator(PluginCreator* creator);
void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false);
PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType);
void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton);
Expand Down

0 comments on commit 8559f64

Please sign in to comment.