Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Dec 6, 2024
1 parent 2dae778 commit b12fbda
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 72 deletions.
85 changes: 32 additions & 53 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,75 +208,57 @@ void FlusherSLS::SetDefaultRegion(const string& region) {
sDefaultRegion = region;
}

mutex FlusherSLS::sProjectRefCntMapLock;
mutex FlusherSLS::sProjectRegionMapLock;
unordered_map<string, int32_t> FlusherSLS::sProjectRefCntMap;
mutex FlusherSLS::sRegionRefCntMapLock;
unordered_map<string, int32_t> FlusherSLS::sRegionRefCntMap;
mutex FlusherSLS::sProjectRegionMapLock;
unordered_map<string, string> FlusherSLS::sProjectRegionMap;


string FlusherSLS::GetAllProjects() {
string result;
lock_guard<mutex> lock(sProjectRefCntMapLock);
lock_guard<mutex> lock(sProjectRegionMapLock);
for (auto iter = sProjectRefCntMap.cbegin(); iter != sProjectRefCntMap.cend(); ++iter) {
result.append(iter->first).append(" ");
}
return result;
}

void FlusherSLS::IncreaseProjectReferenceCnt(const string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
++sProjectRefCntMap[project];
}

void FlusherSLS::DecreaseProjectReferenceCnt(const string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
auto iter = sProjectRefCntMap.find(project);
if (iter == sProjectRefCntMap.end()) {
// should not happen
return;
}
if (--iter->second == 0) {
sProjectRefCntMap.erase(iter);
}
}

bool FlusherSLS::IsRegionContainingConfig(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
lock_guard<mutex> lock(sProjectRegionMapLock);
return sRegionRefCntMap.find(region) != sRegionRefCntMap.end();
}

void FlusherSLS::IncreaseRegionReferenceCnt(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
++sRegionRefCntMap[region];
}

void FlusherSLS::DecreaseRegionReferenceCnt(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
auto iter = sRegionRefCntMap.find(region);
if (iter == sRegionRefCntMap.end()) {
// should not happen
return;
}
if (--iter->second == 0) {
sRegionRefCntMap.erase(iter);
}
}

std::string FlusherSLS::GetProjectRegion(const std::string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
return sProjectRegionMap[project];
lock_guard<mutex> lock(sProjectRegionMapLock);
auto iter = sProjectRegionMap.find(project);
if (iter == sProjectRegionMap.end()) {
return "";
}
return iter->second;
}

void FlusherSLS::SetProjectRegion(const std::string& project, const std::string& region) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
void FlusherSLS::IncreaseProjectRegionReferenceCnt(const string& project, const string& region) {
lock_guard<mutex> lock(sProjectRegionMapLock);
++sProjectRefCntMap[project];
++sRegionRefCntMap[region];
sProjectRegionMap[project] = region;
}

void FlusherSLS::RemoveProjectRegion(const std::string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
sProjectRegionMap.erase(project);
void FlusherSLS::DecreaseProjectRegionReferenceCnt(const string& project, const string& region) {
lock_guard<mutex> lock(sProjectRegionMapLock);
auto projectRefCnt = sProjectRefCntMap.find(project);
if (projectRefCnt != sProjectRefCntMap.end()) {
if (--projectRefCnt->second == 0) {
sProjectRefCntMap.erase(projectRefCnt);
sProjectRegionMap.erase(project);
}
}

auto regionRefCnt = sRegionRefCntMap.find(region);
if (regionRefCnt != sRegionRefCntMap.end()) {
if (--regionRefCnt->second == 0) {
sRegionRefCntMap.erase(regionRefCnt);
}
}
}

mutex FlusherSLS::sRegionStatusLock;
Expand Down Expand Up @@ -558,23 +540,20 @@ bool FlusherSLS::Start() {
Flusher::Start();
InitResource();

IncreaseProjectReferenceCnt(mProject);
IncreaseRegionReferenceCnt(mRegion);
SetProjectRegion(mProject, mRegion);
IncreaseProjectRegionReferenceCnt(mProject, mRegion);
SLSClientManager::GetInstance()->IncreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}

bool FlusherSLS::Stop(bool isPipelineRemoving) {
Flusher::Stop(isPipelineRemoving);

DecreaseProjectReferenceCnt(mProject);
DecreaseRegionReferenceCnt(mRegion);
RemoveProjectRegion(mProject);
DecreaseProjectRegionReferenceCnt(mProject, mRegion);
SLSClientManager::GetInstance()->DecreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}


bool FlusherSLS::Send(PipelineEventGroup&& g) {
if (g.IsReplay()) {
return SerializeAndPush(std::move(g));
Expand Down
12 changes: 3 additions & 9 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ class FlusherSLS : public HttpFlusher {

static void InitResource();

static void IncreaseProjectReferenceCnt(const std::string& project);
static void DecreaseProjectReferenceCnt(const std::string& project);
static void IncreaseRegionReferenceCnt(const std::string& region);
static void DecreaseRegionReferenceCnt(const std::string& region);
static void SetProjectRegion(const std::string& project, const std::string& region);
static void RemoveProjectRegion(const std::string& project);
static void IncreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region);
static void DecreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region);

static std::mutex sMux;
static std::unordered_map<std::string, std::weak_ptr<ConcurrencyLimiter>> sProjectConcurrencyLimiterMap;
Expand All @@ -106,11 +102,9 @@ class FlusherSLS : public HttpFlusher {
static std::mutex sDefaultRegionLock;
static std::string sDefaultRegion;

static std::mutex sProjectRefCntMapLock;
static std::mutex sProjectRegionMapLock;
static std::unordered_map<std::string, int32_t> sProjectRefCntMap;
static std::mutex sRegionRefCntMapLock;
static std::unordered_map<std::string, int32_t> sRegionRefCntMap;
static std::mutex sProjectRegionMapLock;
static std::unordered_map<std::string, std::string> sProjectRegionMap;

// TODO: should be moved to enterprise config provider
Expand Down
23 changes: 17 additions & 6 deletions core/unittest/config/CommonConfigProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "gmock/gmock.h"
Expand Down Expand Up @@ -433,18 +436,22 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理 pipelineconfig
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[0].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_EQUAL(1U + builtinPipelineCnt, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[builtinPipelineCnt].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);
// 再次处理 pipelineconfig
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty());
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);


Expand Down Expand Up @@ -649,17 +656,21 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理pipelineConfigDiff
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1");
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());
// 再次处理pipelineConfigDiff
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty());
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());

APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty());
// 处理instanceConfigDiff
Expand Down
9 changes: 8 additions & 1 deletion core/unittest/config/ConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

#include "config/PipelineConfig.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/PipelineConfigWatcher.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"
Expand Down Expand Up @@ -268,7 +271,11 @@ class ConfigUpdateUnittest : public testing::Test {

void ConfigUpdateUnittest::OnStartUp() const {
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

GenerateInitialConfigs();
Expand Down
21 changes: 18 additions & 3 deletions core/unittest/config/ConfigWatcherUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "config/ConfigDiff.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "pipeline/plugin/PluginRegistry.h"
Expand Down Expand Up @@ -51,7 +54,11 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co
void ConfigWatcherUnittest::InvalidConfigDirFound() const {
{
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

{ ofstream fout("continuous_pipeline_config"); }
Expand Down Expand Up @@ -83,7 +90,11 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const {
fout << "[}";
}
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());
filesystem::remove_all(configDir);
}
Expand Down Expand Up @@ -132,8 +143,12 @@ void ConfigWatcherUnittest::DuplicateConfigs() const {
}
{ ofstream fout("dir2/config.json"); }
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_FALSE(diff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, diff.first.mAdded.size());
APSARA_TEST_EQUAL(1U + builtinPipelineCnt, diff.first.mAdded.size());

filesystem::remove_all("dir1");
filesystem::remove_all("dir2");
Expand Down
3 changes: 3 additions & 0 deletions core/unittest/flusher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ target_link_libraries(pack_id_manager_unittest ${UT_BASE_TARGET})
if (ENABLE_ENTERPRISE)
add_executable(enterprise_sls_client_manager_unittest EnterpriseSLSClientManagerUnittest.cpp)
target_link_libraries(enterprise_sls_client_manager_unittest ${UT_BASE_TARGET})
add_executable(enterprise_flusher_sls_monitor_unittest EnterpriseFlusherSLSMonitorUnittest.cpp)
target_link_libraries(enterprise_flusher_sls_monitor_unittest ${UT_BASE_TARGET})
endif ()

include(GoogleTest)
gtest_discover_tests(flusher_sls_unittest)
gtest_discover_tests(pack_id_manager_unittest)
if (ENABLE_ENTERPRISE)
gtest_discover_tests(enterprise_sls_client_manager_unittest)
gtest_discover_tests(enterprise_flusher_sls_monitor_unittest)
endif ()

0 comments on commit b12fbda

Please sign in to comment.