Skip to content

Commit

Permalink
fix: judge file server changed (#1970)
Browse files Browse the repository at this point in the history
* fix: judge file server changed

* ut

* fix
  • Loading branch information
Abingcbc authored Dec 18, 2024
1 parent 1d02270 commit c5be143
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 20 deletions.
1 change: 1 addition & 0 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class LogInput : public LogRunnable {
friend class FuxiSceneUnittest;
friend class ConfigMatchUnittest;
friend class FuseFileUnittest;
friend class PipelineUpdateUnittest;

void CleanEnviroments();
#endif
Expand Down
12 changes: 10 additions & 2 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ class LoongCollectorMonitor {
void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); }
void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); }
void SetAgentGoRoutinesTotal(uint64_t total) { mAgentGoRoutinesTotal->Set(total); }
void SetAgentOpenFdTotal(uint64_t total) { mAgentOpenFdTotal->Set(total); }
void SetAgentConfigTotal(uint64_t total) { mAgentConfigTotal->Set(total); }
void SetAgentOpenFdTotal(uint64_t total) {
#ifndef APSARA_UNIT_TEST_MAIN
mAgentOpenFdTotal->Set(total);
#endif
}
void SetAgentConfigTotal(uint64_t total) {
#ifndef APSARA_UNIT_TEST_MAIN
mAgentConfigTotal->Set(total);
#endif
}

static std::string mHostname;
static std::string mIpAddr;
Expand Down
42 changes: 25 additions & 17 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,28 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}

static shared_ptr<Pipeline> sEmptyPipeline;

void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false;
bool isFileServerInputChanged = false;
for (const auto& name : diff.mRemoved) {
isFileServerInputChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]);
}
for (const auto& config : diff.mModified) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
for (const auto& config : diff.mAdded) {
isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
bool isFileServerInputChanged = CheckIfFileServerUpdated(diff);

#ifndef APSARA_UNIT_TEST_MAIN
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Pause();
}
#endif
#endif
if (isFileServerStarted && isFileServerInputChanged) {
FileServer::GetInstance()->Pause();
}
#endif

for (const auto& name : diff.mRemoved) {
auto iter = mPipelineNameEntityMap.find(name);
Expand Down Expand Up @@ -134,7 +125,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::APPLIED);
}

#ifndef APSARA_UNIT_TEST_MAIN
// 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响
// Sender::CleanUnusedAk();

Expand All @@ -147,6 +137,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
}
}

#ifndef APSARA_UNIT_TEST_MAIN
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Resume();
Expand Down Expand Up @@ -238,9 +229,26 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
bool PipelineManager::CheckIfFileServerUpdated(PipelineConfigDiff& diff) {
for (const auto& name : diff.mRemoved) {
string inputType = mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mModified) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
for (const auto& config : diff.mAdded) {
string inputType = (*config.mInputs[0])["Type"].asString();
if (inputType == "input_file" || inputType == "input_container_stdio") {
return true;
}
}
return false;
}

} // namespace logtail
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PipelineManager {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);
bool CheckIfFileServerUpdated(PipelineConfigDiff& diff);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down
5 changes: 5 additions & 0 deletions core/unittest/config/CommonConfigProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "file_server/FileServer.h"
#include "gmock/gmock.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
Expand Down Expand Up @@ -72,6 +73,10 @@ class CommonConfigProviderUnittest : public ::testing::Test {
return true;
}

static void SetUpTestCase() {}

static void TearDownTestCase() { FileServer::GetInstance()->Stop(); }

// 在每个测试用例开始前的设置
void SetUp() override {
if (BOOL_FLAG(logtail_mode)) {
Expand Down
2 changes: 2 additions & 0 deletions core/unittest/config/ConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/PipelineConfigWatcher.h"
#include "file_server/FileServer.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ class ConfigUpdateUnittest : public testing::Test {
static void TearDownTestCase() {
PluginRegistry::GetInstance()->UnloadPlugins();
TaskRegistry::GetInstance()->UnloadPlugins();
FileServer::GetInstance()->Stop();
}

void SetUp() override {
Expand Down
3 changes: 3 additions & 0 deletions core/unittest/config/PipelineManagerMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class PipelineMock : public Pipeline {
mContext.SetCreateTime(config.mCreateTime);
return (*mConfig)["valid"].asBool();
}

bool Start() { return true; }
void Stop(bool isRemoving) {}
};

class PipelineManagerMock : public PipelineManager {
Expand Down
4 changes: 4 additions & 0 deletions core/unittest/pipeline/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ target_link_libraries(pipeline_manager_unittest ${UT_BASE_TARGET})
add_executable(concurrency_limiter_unittest ConcurrencyLimiterUnittest.cpp)
target_link_libraries(concurrency_limiter_unittest ${UT_BASE_TARGET})

add_executable(pipeline_update_unittest PipelineUpdateUnittest.cpp)
target_link_libraries(pipeline_update_unittest ${UT_BASE_TARGET})

include(GoogleTest)
gtest_discover_tests(global_config_unittest)
gtest_discover_tests(pipeline_unittest)
gtest_discover_tests(pipeline_manager_unittest)
gtest_discover_tests(concurrency_limiter_unittest)
gtest_discover_tests(pipeline_update_unittest)

118 changes: 118 additions & 0 deletions core/unittest/pipeline/PipelineUpdateUnittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <filesystem>
#include <fstream>
#include <memory>
#include <string>
#include <vector>

#include "common/JsonUtil.h"
#include "config/PipelineConfig.h"
#include "file_server/FileServer.h"
#include "file_server/event_handler/LogInput.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "unittest/Unittest.h"
#include "unittest/config/PipelineManagerMock.h"

using namespace std;

namespace logtail {

class PipelineUpdateUnittest : public testing::Test {
public:
void TestFileServerStart() const;

protected:
static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); }

static void TearDownTestCase() {
PluginRegistry::GetInstance()->UnloadPlugins();
FileServer::GetInstance()->Stop();
}

void SetUp() override {}

void TearDown() override {}

private:
Json::Value GeneratePipelineConfigJson(const string& inputConfig,
const string& processorConfig,
const string& flusherConfig) const {
Json::Value json;
string errorMsg;
ParseJsonTable(R"(
{
"valid": true,
"inputs": [)"
+ inputConfig + R"(],
"processors": [)"
+ processorConfig + R"(],
"flushers": [)"
+ flusherConfig + R"(]
})",
json,
errorMsg);
return json;
}
string nativeInputConfig = R"(
{
"Type": "input_file"
})";
string nativeProcessorConfig = R"(
{
"Type": "processor_parse_regex_native"
})";
string nativeFlusherConfig = R"(
{
"Type": "flusher_sls"
})";
string goInputConfig = R"(
{
"Type": "input_docker_stdout"
})";
string goProcessorConfig = R"(
{
"Type": "processor_regex"
})";
string goFlusherConfig = R"(
{
"Type": "flusher_stdout"
})";
};

void PipelineUpdateUnittest::TestFileServerStart() const {
Json::Value nativePipelineConfigJson
= GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig);
Json::Value goPipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig);
auto pipelineManager = PipelineManagerMock::GetInstance();
PipelineConfigDiff diff;
PipelineConfig nativePipelineConfigObj
= PipelineConfig("test1", make_unique<Json::Value>(nativePipelineConfigJson));
nativePipelineConfigObj.Parse();
diff.mAdded.push_back(std::move(nativePipelineConfigObj));
PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique<Json::Value>(goPipelineConfigJson));
goPipelineConfigObj.Parse();
diff.mAdded.push_back(std::move(goPipelineConfigObj));

pipelineManager->UpdatePipelines(diff);
APSARA_TEST_EQUAL_FATAL(2U, pipelineManager->GetAllPipelines().size());
APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag);
}

UNIT_TEST_CASE(PipelineUpdateUnittest, TestFileServerStart)

} // namespace logtail

UNIT_TEST_MAIN

0 comments on commit c5be143

Please sign in to comment.