Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Assassin718 committed Oct 6, 2024
2 parents b4584c1 + eb19fdf commit 011d8cd
Show file tree
Hide file tree
Showing 165 changed files with 2,049 additions and 1,870 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]
- [inner] [both] [updated] Support SLS Metricstore output
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor profile_sender models
application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models
config config/watcher
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
Expand Down
22 changes: 11 additions & 11 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <chrono>

#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand All @@ -25,31 +25,31 @@ namespace logtail {
void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES);
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT);
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_TOTAL);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_TOTAL);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_SIZE_BYTES);
mTotalProcessMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_PROCESS_TIME_MS);
mDiscardedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
if (mMetricsRecordRef != nullptr) {
mInItemsCnt->Add(1);
mInItemsTotal->Add(1);
mInItemSizeBytes->Add(input.size());
}

auto before = chrono::system_clock::now();
auto res = Compress(input, output, errorMsg);

if (mMetricsRecordRef != nullptr) {
mTotalDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
mTotalProcessMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
if (res) {
mOutItemsCnt->Add(1);
mOutItemsTotal->Add(1);
mOutItemSizeBytes->Add(output.size());
} else {
mDiscardedItemsCnt->Add(1);
mDiscardedItemsTotal->Add(1);
mDiscardedItemSizeBytes->Add(input.size());
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/common/compression/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class Compressor {

protected:
mutable MetricsRecordRef mMetricsRecordRef;
CounterPtr mInItemsCnt;
CounterPtr mInItemsTotal;
CounterPtr mInItemSizeBytes;
CounterPtr mOutItemsCnt;
CounterPtr mOutItemsTotal;
CounterPtr mOutItemSizeBytes;
CounterPtr mDiscardedItemsCnt;
CounterPtr mDiscardedItemsTotal;
CounterPtr mDiscardedItemSizeBytes;
CounterPtr mTotalDelayMs;
CounterPtr mTotalProcessMs;

private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
Expand Down
10 changes: 5 additions & 5 deletions core/common/compression/CompressorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"

Expand Down Expand Up @@ -61,10 +61,10 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
} else {
compressor = Create(defaultType);
}
compressor->SetMetricRecordRef({{METRIC_LABEL_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, "compressor"},
{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
compressor->SetMetricRecordRef({{METRIC_LABEL_KEY_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_COMPRESSOR},
{METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID, flusherId}});
return compressor;
}

Expand Down
2 changes: 1 addition & 1 deletion core/container_manager/ContainerDiscoveryOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct ContainerDiscoveryOptions {
bool Init(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginType);
void GenerateContainerMetaFetchingGoPipeline(Json::Value& res,
const FileDiscoveryOptions* fileDiscovery = nullptr,
const PluginInstance::PluginMeta pluginMeta = {"0", "0", "0"}) const;
const PluginInstance::PluginMeta pluginMeta = {"0"}) const;
};

using ContainerDiscoveryConfig = std::pair<const ContainerDiscoveryOptions*, const PipelineContext*>;
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace logtail {

FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, "file_server"}});
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
Expand Down
8 changes: 4 additions & 4 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ void LogInput::Start() {

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT);
mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT);
mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
Expand Down Expand Up @@ -353,9 +353,9 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
mAgentOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mRegisterdHandlersCnt->Set(handlerCount);
mRegisterdHandlersTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class LogInput : public LogRunnable {

IntGaugePtr mLastRunTime;
IntGaugePtr mAgentOpenFdTotal;
IntGaugePtr mRegisterdHandlersCnt;
IntGaugePtr mActiveReadersCnt;
IntGaugePtr mRegisterdHandlersTotal;
IntGaugePtr mActiveReadersTotal;
IntGaugePtr mEnableFileIncludedByMultiConfigs;

std::atomic_int mLastReadEventTime{0};
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "file_server/polling/PollingModify.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

// Control the check frequency to call ClearUnavailableFileAndDir.
DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20);
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "file_server/event/Event.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand Down
26 changes: 14 additions & 12 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
Expand Down Expand Up @@ -203,20 +203,21 @@ LogFileReader::LogFileReader(const std::string& hostLogPathDir,

void LogFileReader::SetMetrics() {
mMetricInited = false;
mMetricLabels = {{METRIC_LABEL_FILE_NAME, GetConvertedPath()},
{METRIC_LABEL_FILE_DEV, std::to_string(GetDevInode().dev)},
{METRIC_LABEL_FILE_INODE, std::to_string(GetDevInode().inode)}};
mMetricLabels = {{METRIC_LABEL_KEY_FILE_NAME, GetConvertedPath()},
{METRIC_LABEL_KEY_FILE_DEV, std::to_string(GetDevInode().dev)},
{METRIC_LABEL_KEY_FILE_INODE, std::to_string(GetDevInode().inode)}};
mMetricsRecordRef = FileServer::GetInstance()->GetOrCreateReentrantMetricsRecordRef(GetConfigName(), mMetricLabels);
if (mMetricsRecordRef == nullptr) {
LOG_ERROR(sLogger,
("failed to init metrics", "cannot get config's metricRecordRef")("config name", GetConfigName()));
return;
}

mInputRecordsSizeBytesCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_RECORDS_SIZE_BYTES);
mInputReadTotalCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_READ_TOTAL);
mInputFileSizeBytesGauge = mMetricsRecordRef->GetIntGauge(METRIC_INPUT_FILE_SIZE_BYTES);
mInputFileOffsetBytesGauge = mMetricsRecordRef->GetIntGauge(METRIC_INPUT_FILE_OFFSET_BYTES);
mOutEventsTotal = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL);
mOutEventGroupsTotal = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_EVENT_GROUPS_TOTAL);
mOutSizeBytes = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_SIZE_BYTES);
mSourceSizeBytes = mMetricsRecordRef->GetIntGauge(METRIC_PLUGIN_SOURCE_SIZE_BYTES);
mSourceReadOffsetBytes = mMetricsRecordRef->GetIntGauge(METRIC_PLUGIN_SOURCE_READ_OFFSET_BYTES);
mMetricInited = true;
}

Expand Down Expand Up @@ -2133,10 +2134,11 @@ std::unique_ptr<Event> LogFileReader::CreateFlushTimeoutEvent() {

void LogFileReader::ReportMetrics(uint64_t readSize) {
if (mMetricInited) {
mInputReadTotalCounter->Add(1);
mInputRecordsSizeBytesCounter->Add(readSize);
mInputFileOffsetBytesGauge->Set(GetLastFilePos());
mInputFileSizeBytesGauge->Set(GetFileSize());
mOutEventsTotal->Add(1);
mOutEventGroupsTotal->Add(1);
mOutSizeBytes->Add(readSize);
mSourceReadOffsetBytes->Set(GetLastFilePos());
mSourceSizeBytes->Set(GetFileSize());
}
}

Expand Down
9 changes: 5 additions & 4 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,11 @@ class LogFileReader {
MetricLabels mMetricLabels;
bool mMetricInited;
ReentrantMetricsRecordRef mMetricsRecordRef;
CounterPtr mInputRecordsSizeBytesCounter;
CounterPtr mInputReadTotalCounter;
IntGaugePtr mInputFileSizeBytesGauge;
IntGaugePtr mInputFileOffsetBytesGauge;
CounterPtr mOutEventsTotal;
CounterPtr mOutEventGroupsTotal;
CounterPtr mOutSizeBytes;
IntGaugePtr mSourceSizeBytes;
IntGaugePtr mSourceReadOffsetBytes;

private:
bool mHasReadContainerBom = false;
Expand Down
13 changes: 9 additions & 4 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ using namespace sls_logs;

namespace logtail {

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";

MetricsRecord::MetricsRecord(MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
}
Expand Down Expand Up @@ -325,15 +328,17 @@ ReadMetrics::~ReadMetrics() {
Clear();
}

void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const {
void ReadMetrics::ReadAsLogGroup(const std::string& regionFieldName,
const std::string& defaultRegion,
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const {
ReadLock lock(mReadWriteLock);
MetricsRecord* tmp = mHead;
while (tmp) {
Log* logPtr = nullptr;

for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
if (METRIC_FIELD_REGION == pair.first) {
if (regionFieldName == pair.first) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
std::string region = pair.second;
iter = logGroupMap.find(region);
Expand All @@ -349,14 +354,14 @@ void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& log
}
if (!logPtr) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
iter = logGroupMap.find(METRIC_REGION_DEFAULT);
iter = logGroupMap.find(defaultRegion);
if (iter != logGroupMap.end()) {
sls_logs::LogGroup* logGroup = iter->second;
logPtr = logGroup->add_logs();
} else {
sls_logs::LogGroup* logGroup = new sls_logs::LogGroup();
logPtr = logGroup->add_logs();
logGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(METRIC_REGION_DEFAULT, logGroup));
logGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(defaultRegion, logGroup));
}
}
auto now = GetCurrentLogtailTime();
Expand Down
4 changes: 3 additions & 1 deletion core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ class ReadMetrics {
static ReadMetrics* ptr = new ReadMetrics();
return ptr;
}
void ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;
void ReadAsLogGroup(const std::string& regionFieldName,
const std::string& defaultRegion,
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;
void ReadAsFileBuffer(std::string& metricsContent) const;
void UpdateMetrics();

Expand Down
Loading

0 comments on commit 011d8cd

Please sign in to comment.