Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: judge file server changed #1970

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class LogInput : public LogRunnable {
friend class FuxiSceneUnittest;
friend class ConfigMatchUnittest;
friend class FuseFileUnittest;
friend class PipelineUpdateUnittest;

void CleanEnviroments();
#endif
Expand Down
12 changes: 10 additions & 2 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ class LoongCollectorMonitor {
void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); }
void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); }
void SetAgentGoRoutinesTotal(uint64_t total) { mAgentGoRoutinesTotal->Set(total); }
void SetAgentOpenFdTotal(uint64_t total) { mAgentOpenFdTotal->Set(total); }
void SetAgentConfigTotal(uint64_t total) { mAgentConfigTotal->Set(total); }
void SetAgentOpenFdTotal(uint64_t total) {
#ifndef APSARA_UNIT_TEST_MAIN
mAgentOpenFdTotal->Set(total);
#endif
}
void SetAgentConfigTotal(uint64_t total) {
#ifndef APSARA_UNIT_TEST_MAIN
mAgentConfigTotal->Set(total);
#endif
}

static std::string mHostname;
static std::string mIpAddr;
Expand Down
42 changes: 25 additions & 17 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,28 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}

static shared_ptr<Pipeline> sEmptyPipeline;

void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false;
bool isFileServerInputChanged = false;
for (const auto& name : diff.mRemoved) {
isFileServerInputChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]);
}
for (const auto& config : diff.mModified) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
for (const auto& config : diff.mAdded) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
bool isFileServerInputChanged = CheckIfFileServerUpdated(diff);

#ifndef APSARA_UNIT_TEST_MAIN
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Pause();
}
#endif
#endif
if (isFileServerStarted && isFileServerInputChanged) {
FileServer::GetInstance()->Pause();
}
#endif

for (const auto& name : diff.mRemoved) {
auto iter = mPipelineNameEntityMap.find(name);
Expand Down Expand Up @@ -134,7 +125,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::APPLIED);
}

#ifndef APSARA_UNIT_TEST_MAIN
// 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响
// Sender::CleanUnusedAk();

Expand All @@ -147,6 +137,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
}
}

#ifndef APSARA_UNIT_TEST_MAIN
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Resume();
Expand Down Expand Up @@ -238,9 +229,26 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
bool PipelineManager::CheckIfFileServerUpdated(PipelineConfigDiff& diff) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既然已经出现了之前测试不到的情况,是不是应该针对性的补充用例?

for (const auto& name : diff.mRemoved) {
string inputType = mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mModified) {
string inputType = (*config.mInputs[0])["Type"].asString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load配置的地方是否能够保证[0]的合法性?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

前面配置解析会检验input至少包含一个,保证这里加载流水线一定是合法的

if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mAdded) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
return false;
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PipelineManager {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);
bool CheckIfFileServerUpdated(PipelineConfigDiff& diff);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down
5 changes: 5 additions & 0 deletions core/unittest/config/CommonConfigProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "file_server/FileServer.h"
#include "gmock/gmock.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
Expand Down Expand Up @@ -72,6 +73,10 @@ class CommonConfigProviderUnittest : public ::testing::Test {
return true;
}

static void SetUpTestCase() {}

static void TearDownTestCase() { FileServer::GetInstance()->Stop(); }

// 在每个测试用例开始前的设置
void SetUp() override {
if (BOOL_FLAG(logtail_mode)) {
Expand Down
2 changes: 2 additions & 0 deletions core/unittest/config/ConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/PipelineConfigWatcher.h"
#include "file_server/FileServer.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ class ConfigUpdateUnittest : public testing::Test {
static void TearDownTestCase() {
PluginRegistry::GetInstance()->UnloadPlugins();
TaskRegistry::GetInstance()->UnloadPlugins();
FileServer::GetInstance()->Stop();
}

void SetUp() override {
Expand Down
3 changes: 3 additions & 0 deletions core/unittest/config/PipelineManagerMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class PipelineMock : public Pipeline {
mContext.SetCreateTime(config.mCreateTime);
return (*mConfig)["valid"].asBool();
}

bool Start() { return true; }
void Stop(bool isRemoving) {}
};

class PipelineManagerMock : public PipelineManager {
Expand Down
4 changes: 4 additions & 0 deletions core/unittest/pipeline/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ target_link_libraries(pipeline_manager_unittest ${UT_BASE_TARGET})
add_executable(concurrency_limiter_unittest ConcurrencyLimiterUnittest.cpp)
target_link_libraries(concurrency_limiter_unittest ${UT_BASE_TARGET})

add_executable(pipeline_update_unittest PipelineUpdateUnittest.cpp)
target_link_libraries(pipeline_update_unittest ${UT_BASE_TARGET})

include(GoogleTest)
gtest_discover_tests(global_config_unittest)
gtest_discover_tests(pipeline_unittest)
gtest_discover_tests(pipeline_manager_unittest)
gtest_discover_tests(concurrency_limiter_unittest)
gtest_discover_tests(pipeline_update_unittest)

118 changes: 118 additions & 0 deletions core/unittest/pipeline/PipelineUpdateUnittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <filesystem>
#include <fstream>
#include <memory>
#include <string>
#include <vector>

#include "common/JsonUtil.h"
#include "config/PipelineConfig.h"
#include "file_server/FileServer.h"
#include "file_server/event_handler/LogInput.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "unittest/Unittest.h"
#include "unittest/config/PipelineManagerMock.h"

using namespace std;

namespace logtail {

class PipelineUpdateUnittest : public testing::Test {
public:
void TestFileServerStart() const;

protected:
static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); }

static void TearDownTestCase() {
PluginRegistry::GetInstance()->UnloadPlugins();
FileServer::GetInstance()->Stop();
}

void SetUp() override {}

void TearDown() override {}

private:
Json::Value GeneratePipelineConfigJson(const string& inputConfig,
const string& processorConfig,
const string& flusherConfig) const {
Json::Value json;
string errorMsg;
ParseJsonTable(R"(
{
"valid": true,
"inputs": [)"
+ inputConfig + R"(],
"processors": [)"
+ processorConfig + R"(],
"flushers": [)"
+ flusherConfig + R"(]
})",
json,
errorMsg);
return json;
}
string nativeInputConfig = R"(
{
"Type": "input_file"
})";
string nativeProcessorConfig = R"(
{
"Type": "processor_parse_regex_native"
})";
string nativeFlusherConfig = R"(
{
"Type": "flusher_sls"
})";
string goInputConfig = R"(
{
"Type": "input_docker_stdout"
})";
string goProcessorConfig = R"(
{
"Type": "processor_regex"
})";
string goFlusherConfig = R"(
{
"Type": "flusher_stdout"
})";
};

void PipelineUpdateUnittest::TestFileServerStart() const {
Json::Value nativePipelineConfigJson
= GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig);
Json::Value goPipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig);
auto pipelineManager = PipelineManagerMock::GetInstance();
PipelineConfigDiff diff;
PipelineConfig nativePipelineConfigObj
= PipelineConfig("test1", make_unique<Json::Value>(nativePipelineConfigJson));
nativePipelineConfigObj.Parse();
diff.mAdded.push_back(std::move(nativePipelineConfigObj));
PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique<Json::Value>(goPipelineConfigJson));
goPipelineConfigObj.Parse();
diff.mAdded.push_back(std::move(goPipelineConfigObj));

pipelineManager->UpdatePipelines(diff);
APSARA_TEST_EQUAL_FATAL(2U, pipelineManager->GetAllPipelines().size());
APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag);
}

UNIT_TEST_CASE(PipelineUpdateUnittest, TestFileServerStart)

} // namespace logtail

UNIT_TEST_MAIN
Loading