Skip to content

Commit

Permalink
fix: judge file server changed
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 17, 2024
1 parent 7695ae8 commit 0c409e7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
36 changes: 22 additions & 14 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}
Expand All @@ -51,16 +51,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false;
bool isFileServerInputChanged = false;
for (const auto& name : diff.mRemoved) {
isFileServerInputChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]);
}
for (const auto& config : diff.mModified) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
for (const auto& config : diff.mAdded) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
bool isFileServerInputChanged = CheckIfFileServerUpdated(diff);

#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
Expand Down Expand Up @@ -238,9 +229,26 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
bool PipelineManager::CheckIfFileServerUpdated(PipelineConfigDiff& diff) {
for (const auto& name : diff.mRemoved) {
string inputType = mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mModified) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mAdded) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
return false;
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PipelineManager {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);
bool CheckIfFileServerUpdated(PipelineConfigDiff& diff);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down

0 comments on commit 0c409e7

Please sign in to comment.