Skip to content

Commit

Permalink
inner pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Nov 26, 2024
1 parent dac8341 commit ad9870e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void ConfigWatcher::AddSource(const string& dir, mutex* mux) {
void ConfigWatcher::ClearEnvironment() {
mSourceDir.clear();
mFileInfoMap.clear();
mInnerConfigMap.clear();
}
#endif

Expand Down
1 change: 1 addition & 0 deletions core/config/watcher/ConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ConfigWatcher {
std::vector<std::filesystem::path> mSourceDir;
std::map<std::string, std::mutex*> mDirMutexMap;
std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mFileInfoMap;
std::map<std::string, std::string> mInnerConfigMap;
};

} // namespace logtail
88 changes: 87 additions & 1 deletion core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
#include "config/watcher/PipelineConfigWatcher.h"

#include <memory>
#include <unordered_set>

#include "common/FileSystemUtil.h"
#include "config/ConfigUtil.h"
#include "logger/Logger.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
#include "task_pipeline/TaskPipelineManager.h"

Expand All @@ -37,6 +37,9 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
// inner configs
InsertInnerPipelines(pDiff, tDiff, configSet);
// configs from file
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
Expand Down Expand Up @@ -180,6 +183,89 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
return make_pair(std::move(pDiff), std::move(tDiff));
}

void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
std::map<std::string, std::string> innerPipelines;
// self-monitor metric
innerPipelines[LoongCollectorMonitor::GetInnerSelfMonitorMetricPipelineName()]
= LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline();

// process
for (const auto& pipeline : innerPipelines) {
if (configSet.find(pipeline.first) != configSet.end()) {
LOG_WARNING(sLogger,
("more than 1 config with the same name is found", "skip current config")("inner pipeline",
pipeline.first));
continue;
}
configSet.insert(pipeline.first);

string errorMsg;
auto iter = mInnerConfigMap.find(pipeline.first);
if (iter == mInnerConfigMap.end()) {
mInnerConfigMap[pipeline.first] = pipeline.second;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipeline.first));
continue;
}
if (!CheckAddedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
continue;
}
} else if (pipeline.second != iter->second) {
mInnerConfigMap[pipeline.first] = pipeline.second;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
switch (GetConfigType(*detail)) {
case ConfigType::Pipeline:
if (mPipelineManager->FindConfigByName(pipeline.first)) {
pDiff.mRemoved.push_back(pipeline.first);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running pipeline")("config", pipeline.first));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
}
break;
case ConfigType::Task:
if (mTaskPipelineManager->FindPipelineByName(pipeline.first)) {
tDiff.mRemoved.push_back(pipeline.first);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running task")("config", pipeline.first));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
}
break;
}
continue;
}
if (!CheckModifiedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object"));
}
}
}

bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
Expand Down
3 changes: 3 additions & 0 deletions core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <unordered_set>

#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"

Expand Down Expand Up @@ -44,6 +46,7 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
Expand Down
47 changes: 47 additions & 0 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,51 @@ void LoongCollectorMonitor::Stop() {

}

const string LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline() {
#ifdef __ENTERPRISE__
static string pipeline = "";
#else
static string pipeline = R"(
{
"inputs": [
{
"Type": "input_self_monitor_metric",
"Agent": {
"Enable": false,
"Interval": 1
},
"Runner": {
"Enable": false,
"Interval": 1
},
"Pipeline": {
"Enable": true,
"Interval": 1
},
"PluginSource": {
"Enable": true,
"Interval": 10
},
"Plugin": {
"Enable": false,
"Interval": 10
},
"Component": {
"Enable": false,
"Interval": 10
}
}
],
"flushers": [
{
"Type": "flusher_local_file",
"FileName": "./log/self_metrics.log"
}
]
}
)";
#endif
return pipeline;
}

} // namespace logtail
5 changes: 5 additions & 0 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ class LoongCollectorMonitor {
void Init();
void Stop();

static const std::string GetInnerSelfMonitorAlarmPipelineName() { return ""; }
static const std::string GetInnerSelfMonitorAlarmPipeline() { return ""; }
static const std::string GetInnerSelfMonitorMetricPipelineName() { return "inner-self-monitor-metric-pipeline"; }
static const std::string GetInnerSelfMonitorMetricPipeline();

void SetAgentCpu(double cpu) { mAgentCpu->Set(cpu); }
void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); }
void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); }
Expand Down

0 comments on commit ad9870e

Please sign in to comment.