Skip to content

Commit

Permalink
Merge branch 'main' into feat/prom-curl-err-msg
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas committed Dec 3, 2024
2 parents 6e35124 + 0254c5c commit 0d7b61b
Show file tree
Hide file tree
Showing 80 changed files with 1,153 additions and 1,245 deletions.
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/profile_sender models
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/metric_models monitor/profile_sender models
config config/watcher constants
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
task_pipeline
Expand Down
17 changes: 0 additions & 17 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,22 +430,6 @@ string GetAgentLogName() {
}
}

string GetAgentSnapshotDir() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + STRING_FLAG(logtail_snapshot_dir);
} else {
return GetAgentLogDir() + "snapshot";
}
}

string GetAgentStatusLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail_status.LOG";
} else {
return "loongcollector_status.LOG";
}
}

string GetObserverEbpfHostPath() {
if (BOOL_FLAG(logtail_mode)) {
return STRING_FLAG(sls_observer_ebpf_host_path);
Expand Down Expand Up @@ -909,7 +893,6 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mSendRequestConcurrency = confJson["send_request_concurrency"].asInt();
else
mSendRequestConcurrency = INT32_FLAG(send_request_concurrency);
LogtailMonitor::GetInstance()->UpdateConstMetric("send_request_concurrency", mSendRequestConcurrency);

if (confJson.isMember("process_thread_count") && confJson["process_thread_count"].isInt())
mProcessThreadCount = confJson["process_thread_count"].asInt();
Expand Down
2 changes: 0 additions & 2 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ std::string GetLocalEventDataFileName();
std::string GetInotifyWatcherDirsDumpFileName();
std::string GetAgentLoggersPrefix();
std::string GetAgentLogName();
std::string GetAgentSnapshotDir();
std::string GetAgentStatusLogName();
std::string GetObserverEbpfHostPath();
std::string GetSendBufferFileNamePrefix();
std::string GetLegacyUserLocalConfigFilePath();
Expand Down
3 changes: 0 additions & 3 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ void Application::Init() {
const string& configIP = AppConfig::GetInstance()->GetConfigIP();
if (!configIP.empty()) {
LoongCollectorMonitor::mIpAddr = configIP;
LogtailMonitor::GetInstance()->UpdateConstMetric("logtail_ip", GetHostIp());
} else if (!interface.empty()) {
LoongCollectorMonitor::mIpAddr = GetHostIp(interface);
if (LoongCollectorMonitor::mIpAddr.empty()) {
Expand All @@ -151,7 +150,6 @@ void Application::Init() {
const string& configHostName = AppConfig::GetInstance()->GetConfigHostName();
if (!configHostName.empty()) {
LoongCollectorMonitor::mHostname = configHostName;
LogtailMonitor::GetInstance()->UpdateConstMetric("logtail_hostname", GetHostName());
}

GenerateInstanceId();
Expand Down Expand Up @@ -198,7 +196,6 @@ void Application::Init() {

void Application::Start() { // GCOVR_EXCL_START
LoongCollectorMonitor::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogtailMonitor::GetInstance()->UpdateConstMetric("start_time", LoongCollectorMonitor::mStartTime);

#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
Expand Down
4 changes: 4 additions & 0 deletions core/common/SafeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class SafeQueue {
return mQueue.size();
}

#ifdef APSARA_UNIT_TEST_MAIN
void Clear() { std::queue<T>().swap(mQueue); }
#endif

private:
std::queue<T> mQueue;
mutable std::mutex mMux;
Expand Down
75 changes: 41 additions & 34 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include "common/FileSystemUtil.h"
#include "config/ConfigUtil.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "logger/Logger.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
Expand All @@ -37,9 +41,9 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
PipelineConfigDiff pDiff;
TaskConfigDiff tDiff;
unordered_set<string> configSet;
// inner configs
InsertInnerPipelines(pDiff, tDiff, configSet);
// configs from file
// builtin pipeline configs
InsertBuiltInPipelines(pDiff, tDiff, configSet);
// file pipeline configs
InsertPipelines(pDiff, tDiff, configSet);

for (const auto& name : mPipelineManager->GetAllConfigNames()) {
Expand Down Expand Up @@ -83,87 +87,90 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
return make_pair(std::move(pDiff), std::move(tDiff));
}

void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
unordered_set<string>& configSet) {
std::map<std::string, std::string> innerPipelines;
// self-monitor metric
innerPipelines[LoongCollectorMonitor::GetInnerSelfMonitorMetricPipelineName()]
= LoongCollectorMonitor::GetInnerSelfMonitorMetricPipeline();
#ifdef __ENTERPRISE__
const std::map<std::string, std::string>& builtInPipelines
= EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs();

// process
for (const auto& pipeline : innerPipelines) {
if (configSet.find(pipeline.first) != configSet.end()) {
for (const auto& pipeline : builtInPipelines) {
const string& pipelineName = pipeline.first;
const string& pipleineDetail = pipeline.second;
if (configSet.find(pipelineName) != configSet.end()) {
LOG_WARNING(sLogger,
("more than 1 config with the same name is found", "skip current config")("inner pipeline",
pipeline.first));
pipelineName));
continue;
}
configSet.insert(pipeline.first);
configSet.insert(pipelineName);

string errorMsg;
auto iter = mInnerConfigMap.find(pipeline.first);
auto iter = mInnerConfigMap.find(pipelineName);
if (iter == mInnerConfigMap.end()) {
mInnerConfigMap[pipeline.first] = pipeline.second;
mInnerConfigMap[pipelineName] = pipleineDetail;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
if (!ParseConfigDetail(pipleineDetail, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
pipelineName));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipeline.first));
if (!IsConfigEnabled(pipelineName, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipelineName));
continue;
}
if (!CheckAddedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else if (pipeline.second != iter->second) {
mInnerConfigMap[pipeline.first] = pipeline.second;
} else if (pipleineDetail != iter->second) {
mInnerConfigMap[pipelineName] = pipleineDetail;
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!ParseConfigDetail(pipeline.second, ".json", *detail, errorMsg)) {
if (!ParseConfigDetail(pipleineDetail, ".json", *detail, errorMsg)) {
LOG_WARNING(sLogger,
("config format error", "skip current object")("error msg", errorMsg)("inner pipeline",
pipeline.first));
pipelineName));
continue;
}
if (!IsConfigEnabled(pipeline.first, *detail)) {
if (!IsConfigEnabled(pipelineName, *detail)) {
switch (GetConfigType(*detail)) {
case ConfigType::Pipeline:
if (mPipelineManager->FindConfigByName(pipeline.first)) {
pDiff.mRemoved.push_back(pipeline.first);
if (mPipelineManager->FindConfigByName(pipelineName)) {
pDiff.mRemoved.push_back(pipelineName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running pipeline")("config", pipeline.first));
"prepare to stop current running pipeline")("config", pipelineName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
"skip current object")("config", pipelineName));
}
break;
case ConfigType::Task:
if (mTaskPipelineManager->FindPipelineByName(pipeline.first)) {
tDiff.mRemoved.push_back(pipeline.first);
if (mTaskPipelineManager->FindPipelineByName(pipelineName)) {
tDiff.mRemoved.push_back(pipelineName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running task")("config", pipeline.first));
"prepare to stop current running task")("config", pipelineName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", pipeline.first));
"skip current object")("config", pipelineName));
}
break;
}
continue;
}
if (!CheckModifiedConfig(pipeline.first, std::move(detail), pDiff, tDiff)) {
if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing inner config unchanged", "skip current object"));
}
}
#else
return;
#endif
}

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
Expand Down
2 changes: 1 addition & 1 deletion core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertBuiltInPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
Expand Down
4 changes: 2 additions & 2 deletions core/ebpf/SelfMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <atomic>

#include "ebpf/include/export.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "common/Lock.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "monitor/metric_constants/MetricConstants.h"

namespace logtail {
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "ebpf/include/export.h"
#include "common/LogtailCommonFlags.h"
#include "common/MachineInfoUtil.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "common/Lock.h"

DEFINE_FLAG_INT64(kernel_min_version_for_ebpf,
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/eBPFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "ebpf/handler/AbstractHandler.h"
#include "ebpf/handler/ObserveHandler.h"
#include "ebpf/handler/SecurityHandler.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "ebpf/SelfMonitor.h"

namespace logtail {
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/handler/AbstractHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <mutex>

#include "pipeline/PipelineContext.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "monitor/MetricManager.h"

namespace logtail{
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/MetricManager.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "pipeline/PipelineContext.h"


Expand Down
15 changes: 2 additions & 13 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,21 +349,10 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
}

void LogInput::UpdateCriticalMetric(int32_t curTime) {
LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time",
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));
mLastRunTime->Set(mLastReadEventTime.load());

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize();
LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal);
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mRegisterdHandlersTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}

Expand Down
12 changes: 3 additions & 9 deletions core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,11 @@ void PollingDirFile::PollingIteration() {
}
sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength);

size_t configTotal = nameConfigMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal);
LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(configTotal);
LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(nameConfigMap.size());
{
ScopedSpinLock lock(mCacheLock);
size_t pollingDirCacheSize = mDirCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize);
mPollingDirCacheSize->Set(pollingDirCacheSize);
size_t pollingFileCacheSize = mFileCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize);
mPollingFileCacheSize->Set(pollingFileCacheSize);
mPollingDirCacheSize->Set(mDirCacheMap.size());
mPollingFileCacheSize->Set(mFileCacheMap.size());
}

// Iterate all normal configs, make sure stat count will not exceed limit.
Expand Down
4 changes: 1 addition & 3 deletions core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ void PollingModify::PollingIteration() {
vector<SplitedFilePath> deletedFileVec;
vector<Event*> pollingEventVec;
int32_t statCount = 0;
size_t pollingModifySizeTotal = mModifyCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal);
mPollingModifySize->Set(pollingModifySizeTotal);
mPollingModifySize->Set(mModifyCacheMap.size());
for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) {
if (!mRuningFlag || mHoldOnFlag)
break;
Expand Down
8 changes: 0 additions & 8 deletions core/logger/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,6 @@ void Logger::LoadAllDefaultConfigs(std::map<std::string, LoggerConfig>& loggerCf
LoadDefaultConfig(loggerCfgs, sinkCfgs);

loggerCfgs.insert({GetAgentLoggersPrefix(), LoggerConfig{"AsyncFileSink", level::info}});
loggerCfgs.insert({GetAgentLoggersPrefix() + "/status", LoggerConfig{"AsyncFileSinkStatus", level::info}});

std::string dirPath = GetAgentSnapshotDir();
if (!Mkdir(dirPath)) {
LogMsg(std::string("Create snapshot dir error ") + dirPath + ", error" + ErrnoToString(GetErrno()));
}
sinkCfgs.insert(
{"AsyncFileSinkStatus", SinkConfig{"AsyncFile", 61, 1, 1, dirPath + PATH_SEPARATOR + GetAgentStatusLogName()}});
}

void Logger::EnsureSnapshotDirExist(std::map<std::string, SinkConfig>& sinkCfgs) {
Expand Down
Loading

0 comments on commit 0d7b61b

Please sign in to comment.