Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Dec 3, 2024
1 parent a5c50f8 commit e0aa1c7
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 35 deletions.
4 changes: 0 additions & 4 deletions core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ void CommonConfigProvider::Stop() {
}
}

const std::map<std::string, std::string>& CommonConfigProvider::GetAllInernalPipelineConfigs() {
return mPipelineMaps;
}

void CommonConfigProvider::LoadConfigFile() {
error_code ec;
lock_guard<mutex> pipelineInfomaplock(mContinuousPipelineInfoMapMux);
Expand Down
2 changes: 0 additions & 2 deletions core/config/common_provider/CommonConfigProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable {
void Init(const std::string& dir) override;
void Stop() override;

const std::map<std::string, std::string>& GetAllInernalPipelineConfigs();
void FeedbackContinuousPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) override;
void FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) override;
void FeedbackOnetimePipelineConfigStatus(const std::string& type,
Expand Down Expand Up @@ -131,7 +130,6 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable {
std::vector<ConfigServerAddress> mConfigServerAddresses;
int mConfigServerAddressId = 0;
std::map<std::string, std::string> mConfigServerTags;
std::map<std::string, std::string> mPipelineMaps;

#ifdef APSARA_UNIT_TEST_MAIN
friend class CommonConfigProviderUnittest;
Expand Down
22 changes: 10 additions & 12 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
// inner configs
InsertInnerPipelines(pDiff, tDiff, configSet);
// configs from file
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet);

for (const auto& name : mPipelineManager->GetAllConfigNames()) {
Expand Down Expand Up @@ -87,19 +87,14 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
return make_pair(std::move(pDiff), std::move(tDiff));
}

void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
#ifdef __ENTERPRISE__
const std::map<std::string, std::string>& innerPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllInernalPipelineConfigs();
#else
const std::map<std::string, std::string>& innerPipelines
= CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs();
#endif
const std::map<std::string, std::string>& builtInPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs();

// process
for (const auto& pipeline : innerPipelines) {
for (const auto& pipeline : builtInPipelines) {
const string& pipelineName = pipeline.first;
const string& pipleineDetail = pipeline.second;
if (configSet.find(pipelineName) != configSet.end()) {
Expand Down Expand Up @@ -173,6 +168,9 @@ void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object"));
}
}
#else
return;
#endif
}

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
Expand Down
2 changes: 1 addition & 1 deletion core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertBuiltInPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
Expand Down
14 changes: 6 additions & 8 deletions core/unittest/config/CommonConfigProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,19 +433,18 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理 pipelineconfig
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U + innerPipelinesCnt, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[innerPipelinesCnt].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1 + innerPipelinesCnt);
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[0].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);
// 再次处理 pipelineconfig
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty());
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1 + innerPipelinesCnt);
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);


Expand Down Expand Up @@ -650,18 +649,17 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理pipelineConfigDiff
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1");
APSARA_TEST_EQUAL(0U + innerPipelinesCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());
// 再次处理pipelineConfigDiff
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty());
APSARA_TEST_EQUAL(0U + innerPipelinesCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());

APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty());
// 处理instanceConfigDiff
Expand Down
3 changes: 1 addition & 2 deletions core/unittest/config/ConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ class ConfigUpdateUnittest : public testing::Test {

void ConfigUpdateUnittest::OnStartUp() const {
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
APSARA_TEST_EQUAL(innerPipelinesCnt, diff.first.mAdded.size());
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

GenerateInitialConfigs();
Expand Down
9 changes: 3 additions & 6 deletions core/unittest/config/ConfigWatcherUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co
void ConfigWatcherUnittest::InvalidConfigDirFound() const {
{
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
APSARA_TEST_EQUAL(innerPipelinesCnt, diff.first.mAdded.size());
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

{ ofstream fout("continuous_pipeline_config"); }
Expand Down Expand Up @@ -84,8 +83,7 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const {
fout << "[}";
}
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
APSARA_TEST_EQUAL(innerPipelinesCnt, diff.first.mAdded.size());
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());
filesystem::remove_all(configDir);
}
Expand Down Expand Up @@ -134,9 +132,8 @@ void ConfigWatcherUnittest::DuplicateConfigs() const {
}
{ ofstream fout("dir2/config.json"); }
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
auto innerPipelinesCnt = CommonConfigProvider::GetInstance()->GetAllInernalPipelineConfigs().size();
APSARA_TEST_FALSE(diff.first.IsEmpty());
APSARA_TEST_EQUAL(1U + innerPipelinesCnt, diff.first.mAdded.size());
APSARA_TEST_EQUAL(1U, diff.first.mAdded.size());

filesystem::remove_all("dir1");
filesystem::remove_all("dir2");
Expand Down

0 comments on commit e0aa1c7

Please sign in to comment.