Skip to content

Commit

Permalink
Refactor self-monitor directory structure and remove status_profile (#…
Browse files Browse the repository at this point in the history
…1932)

* init

* polish

* polish

* polish

* polish
  • Loading branch information
Takuka0311 authored Dec 3, 2024
1 parent 293891f commit 0254c5c
Show file tree
Hide file tree
Showing 41 changed files with 500 additions and 544 deletions.
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/profile_sender models
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/metric_models monitor/profile_sender models
config config/watcher constants
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
task_pipeline
Expand Down
17 changes: 0 additions & 17 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,22 +430,6 @@ string GetAgentLogName() {
}
}

string GetAgentSnapshotDir() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + STRING_FLAG(logtail_snapshot_dir);
} else {
return GetAgentLogDir() + "snapshot";
}
}

string GetAgentStatusLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail_status.LOG";
} else {
return "loongcollector_status.LOG";
}
}

string GetObserverEbpfHostPath() {
if (BOOL_FLAG(logtail_mode)) {
return STRING_FLAG(sls_observer_ebpf_host_path);
Expand Down Expand Up @@ -909,7 +893,6 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mSendRequestConcurrency = confJson["send_request_concurrency"].asInt();
else
mSendRequestConcurrency = INT32_FLAG(send_request_concurrency);
LogtailMonitor::GetInstance()->UpdateConstMetric("send_request_concurrency", mSendRequestConcurrency);

if (confJson.isMember("process_thread_count") && confJson["process_thread_count"].isInt())
mProcessThreadCount = confJson["process_thread_count"].asInt();
Expand Down
2 changes: 0 additions & 2 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ std::string GetLocalEventDataFileName();
std::string GetInotifyWatcherDirsDumpFileName();
std::string GetAgentLoggersPrefix();
std::string GetAgentLogName();
std::string GetAgentSnapshotDir();
std::string GetAgentStatusLogName();
std::string GetObserverEbpfHostPath();
std::string GetSendBufferFileNamePrefix();
std::string GetLegacyUserLocalConfigFilePath();
Expand Down
3 changes: 0 additions & 3 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ void Application::Init() {
const string& configIP = AppConfig::GetInstance()->GetConfigIP();
if (!configIP.empty()) {
LoongCollectorMonitor::mIpAddr = configIP;
LogtailMonitor::GetInstance()->UpdateConstMetric("logtail_ip", GetHostIp());
} else if (!interface.empty()) {
LoongCollectorMonitor::mIpAddr = GetHostIp(interface);
if (LoongCollectorMonitor::mIpAddr.empty()) {
Expand All @@ -151,7 +150,6 @@ void Application::Init() {
const string& configHostName = AppConfig::GetInstance()->GetConfigHostName();
if (!configHostName.empty()) {
LoongCollectorMonitor::mHostname = configHostName;
LogtailMonitor::GetInstance()->UpdateConstMetric("logtail_hostname", GetHostName());
}

GenerateInstanceId();
Expand Down Expand Up @@ -198,7 +196,6 @@ void Application::Init() {

void Application::Start() { // GCOVR_EXCL_START
LoongCollectorMonitor::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogtailMonitor::GetInstance()->UpdateConstMetric("start_time", LoongCollectorMonitor::mStartTime);

#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
Expand Down
4 changes: 2 additions & 2 deletions core/ebpf/SelfMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <atomic>

#include "ebpf/include/export.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "common/Lock.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "monitor/metric_constants/MetricConstants.h"

namespace logtail {
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "ebpf/include/export.h"
#include "common/LogtailCommonFlags.h"
#include "common/MachineInfoUtil.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "common/Lock.h"

DEFINE_FLAG_INT64(kernel_min_version_for_ebpf,
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/eBPFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "ebpf/handler/AbstractHandler.h"
#include "ebpf/handler/ObserveHandler.h"
#include "ebpf/handler/SecurityHandler.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "ebpf/SelfMonitor.h"

namespace logtail {
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/handler/AbstractHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <mutex>

#include "pipeline/PipelineContext.h"
#include "monitor/MetricTypes.h"
#include "monitor/metric_models/MetricTypes.h"
#include "monitor/MetricManager.h"

namespace logtail{
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/MetricManager.h"
#include "monitor/PluginMetricManager.h"
#include "monitor/metric_models/ReentrantMetricsRecord.h"
#include "pipeline/PipelineContext.h"


Expand Down
15 changes: 2 additions & 13 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,21 +349,10 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
}

void LogInput::UpdateCriticalMetric(int32_t curTime) {
LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time",
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));
mLastRunTime->Set(mLastReadEventTime.load());

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize();
LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal);
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mRegisterdHandlersTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}

Expand Down
12 changes: 3 additions & 9 deletions core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,11 @@ void PollingDirFile::PollingIteration() {
}
sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength);

size_t configTotal = nameConfigMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal);
LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(configTotal);
LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(nameConfigMap.size());
{
ScopedSpinLock lock(mCacheLock);
size_t pollingDirCacheSize = mDirCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize);
mPollingDirCacheSize->Set(pollingDirCacheSize);
size_t pollingFileCacheSize = mFileCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize);
mPollingFileCacheSize->Set(pollingFileCacheSize);
mPollingDirCacheSize->Set(mDirCacheMap.size());
mPollingFileCacheSize->Set(mFileCacheMap.size());
}

// Iterate all normal configs, make sure stat count will not exceed limit.
Expand Down
4 changes: 1 addition & 3 deletions core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ void PollingModify::PollingIteration() {
vector<SplitedFilePath> deletedFileVec;
vector<Event*> pollingEventVec;
int32_t statCount = 0;
size_t pollingModifySizeTotal = mModifyCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal);
mPollingModifySize->Set(pollingModifySizeTotal);
mPollingModifySize->Set(mModifyCacheMap.size());
for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) {
if (!mRuningFlag || mHoldOnFlag)
break;
Expand Down
8 changes: 0 additions & 8 deletions core/logger/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,6 @@ void Logger::LoadAllDefaultConfigs(std::map<std::string, LoggerConfig>& loggerCf
LoadDefaultConfig(loggerCfgs, sinkCfgs);

loggerCfgs.insert({GetAgentLoggersPrefix(), LoggerConfig{"AsyncFileSink", level::info}});
loggerCfgs.insert({GetAgentLoggersPrefix() + "/status", LoggerConfig{"AsyncFileSinkStatus", level::info}});

std::string dirPath = GetAgentSnapshotDir();
if (!Mkdir(dirPath)) {
LogMsg(std::string("Create snapshot dir error ") + dirPath + ", error" + ErrnoToString(GetErrno()));
}
sinkCfgs.insert(
{"AsyncFileSinkStatus", SinkConfig{"AsyncFile", 61, 1, 1, dirPath + PATH_SEPARATOR + GetAgentStatusLogName()}});
}

void Logger::EnsureSnapshotDirExist(std::map<std::string, SinkConfig>& sinkCfgs) {
Expand Down
158 changes: 0 additions & 158 deletions core/monitor/MetricManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

#include "Monitor.h"
#include "app_config/AppConfig.h"
#include "common/HashUtil.h"
#include "common/JsonUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "go_pipeline/LogtailPlugin.h"
#include "logger/Logger.h"
#include "provider/Provider.h"
Expand All @@ -37,159 +33,6 @@ const string METRIC_KEY_LABEL = "label";
const string METRIC_TOPIC_TYPE = "loongcollector_metric";
const string METRIC_EXPORT_TYPE_GO = "direct";
const string METRIC_EXPORT_TYPE_CPP = "cpp_provided";
const string METRIC_GO_KEY_LABELS = "labels";
const string METRIC_GO_KEY_COUNTERS = "counters";
const string METRIC_GO_KEY_GAUGES = "gauges";

SelfMonitorMetricEvent::SelfMonitorMetricEvent() {
}

SelfMonitorMetricEvent::SelfMonitorMetricEvent(MetricsRecord* metricRecord) {
// category
mCategory = metricRecord->GetCategory();
// labels
for (auto item = metricRecord->GetLabels()->begin(); item != metricRecord->GetLabels()->end(); ++item) {
pair<string, string> pair = *item;
mLabels[pair.first] = pair.second;
}
for (auto item = metricRecord->GetDynamicLabels()->begin(); item != metricRecord->GetDynamicLabels()->end();
++item) {
pair<string, function<string()>> pair = *item;
string value = pair.second();
mLabels[pair.first] = value;
}
// counters
for (auto& item : metricRecord->GetCounters()) {
mCounters[item->GetName()] = item->GetValue();
}
for (auto& item : metricRecord->GetTimeCounters()) {
mCounters[item->GetName()] = item->GetValue();
}
// gauges
for (auto& item : metricRecord->GetIntGauges()) {
mGauges[item->GetName()] = item->GetValue();
}
for (auto& item : metricRecord->GetDoubleGauges()) {
mGauges[item->GetName()] = item->GetValue();
}
CreateKey();
}

SelfMonitorMetricEvent::SelfMonitorMetricEvent(const std::map<std::string, std::string>& metricRecord) {
Json::Value labels, counters, gauges;
string errMsg;
ParseJsonTable(metricRecord.at(METRIC_GO_KEY_LABELS), labels, errMsg);
ParseJsonTable(metricRecord.at(METRIC_GO_KEY_COUNTERS), counters, errMsg);
ParseJsonTable(metricRecord.at(METRIC_GO_KEY_GAUGES), gauges, errMsg);
// category
if (labels.isMember("metric_category")) {
mCategory = labels["metric_category"].asString();
labels.removeMember("metric_category");
} else {
mCategory = MetricCategory::METRIC_CATEGORY_UNKNOWN;
LOG_ERROR(sLogger, ("parse go metric", "labels")("err", "metric_category not found"));
}
// labels
for (Json::Value::const_iterator itr = labels.begin(); itr != labels.end(); ++itr) {
if (itr->isString()) {
mLabels[itr.key().asString()] = itr->asString();
}
}
// counters
for (Json::Value::const_iterator itr = counters.begin(); itr != counters.end(); ++itr) {
if (itr->isUInt64()) {
mCounters[itr.key().asString()] = itr->asUInt64();
}
if (itr->isDouble()) {
mCounters[itr.key().asString()] = static_cast<uint64_t>(itr->asDouble());
}
if (itr->isString()) {
try {
mCounters[itr.key().asString()] = static_cast<uint64_t>(std::stod(itr->asString()));
} catch (...) {
mCounters[itr.key().asString()] = 0;
}
}
}
// gauges
for (Json::Value::const_iterator itr = gauges.begin(); itr != gauges.end(); ++itr) {
if (itr->isDouble()) {
mGauges[itr.key().asString()] = itr->asDouble();
}
if (itr->isString()) {
try {
double value = std::stod(itr->asString());
mGauges[itr.key().asString()] = value;
} catch (...) {
mGauges[itr.key().asString()] = 0;
}
}
}
CreateKey();
}

void SelfMonitorMetricEvent::CreateKey() {
string key = "category:" + mCategory;
for (auto label : mLabels) {
key += (";" + label.first + ":" + label.second);
}
mKey = HashString(key);
mUpdatedFlag = true;
}

void SelfMonitorMetricEvent::SetInterval(size_t interval) {
mLastSendInterval = 0;
mSendInterval = interval;
}

void SelfMonitorMetricEvent::Merge(SelfMonitorMetricEvent& event) {
if (mSendInterval != event.mSendInterval) {
mSendInterval = event.mSendInterval;
mLastSendInterval = 0;
}
for (auto counter = event.mCounters.begin(); counter != event.mCounters.end(); counter++) {
if (mCounters.find(counter->first) != mCounters.end())
mCounters[counter->first] += counter->second;
else
mCounters[counter->first] = counter->second;
}
for (auto gauge = event.mGauges.begin(); gauge != event.mGauges.end(); gauge++) {
mGauges[gauge->first] = gauge->second;
}
mUpdatedFlag = true;
}

bool SelfMonitorMetricEvent::ShouldSend() {
mLastSendInterval++;
return (mLastSendInterval >= mSendInterval) && mUpdatedFlag;
}

bool SelfMonitorMetricEvent::ShouldDelete() {
return (mLastSendInterval >= mSendInterval) && !mUpdatedFlag;
}

void SelfMonitorMetricEvent::ReadAsMetricEvent(MetricEvent* metricEventPtr) {
// time
metricEventPtr->SetTimestamp(GetCurrentLogtailTime().tv_sec);
// __tag__
for (auto label = mLabels.begin(); label != mLabels.end(); label++) {
metricEventPtr->SetTag(label->first, label->second);
}
// name
metricEventPtr->SetName(mCategory);
// values
metricEventPtr->SetValue({});
for (auto counter = mCounters.begin(); counter != mCounters.end(); counter++) {
metricEventPtr->MutableValue<UntypedMultiDoubleValues>()->SetValue(counter->first, counter->second);
counter->second = 0;
}
for (auto gauge = mGauges.begin(); gauge != mGauges.end(); gauge++) {
metricEventPtr->MutableValue<UntypedMultiDoubleValues>()->SetValue(gauge->first, gauge->second);
}
// set flags
mLastSendInterval = 0;
mUpdatedFlag = false;
}

WriteMetrics::~WriteMetrics() {
Clear();
Expand Down Expand Up @@ -391,7 +234,6 @@ void ReadMetrics::UpdateGoCppProvidedMetrics(vector<map<string, string>>& metric
if (metric.first == METRIC_AGENT_GO_ROUTINES_TOTAL) {
LoongCollectorMonitor::GetInstance()->SetAgentGoRoutinesTotal(stoi(metric.second));
}
LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second);
}
}
}
Expand Down
Loading

0 comments on commit 0254c5c

Please sign in to comment.