Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: judge file server changed #1970

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}
Expand All @@ -51,16 +51,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false;
bool isFileServerInputChanged = false;
for (const auto& name : diff.mRemoved) {
isFileServerInputChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]);
}
for (const auto& config : diff.mModified) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
for (const auto& config : diff.mAdded) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
bool isFileServerInputChanged = CheckIfFileServerUpdated(diff);

#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
Expand Down Expand Up @@ -238,9 +229,26 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
bool PipelineManager::CheckIfFileServerUpdated(PipelineConfigDiff& diff) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既然已经出现了之前测试不到的情况,是不是应该针对性的补充用例?

for (const auto& name : diff.mRemoved) {
string inputType = mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mModified) {
string inputType = (*config.mInputs[0])["Type"].asString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load配置的地方是否能够保证[0]的合法性?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

前面配置解析会检验input至少包含一个,保证这里加载流水线一定是合法的

if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mAdded) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
return false;
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PipelineManager {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);
bool CheckIfFileServerUpdated(PipelineConfigDiff& diff);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down
Loading