Skip to content

Commit

Permalink
remove ProcessProfile
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Dec 17, 2024
1 parent 3906305 commit eebbeb7
Show file tree
Hide file tree
Showing 24 changed files with 26 additions and 131 deletions.
17 changes: 0 additions & 17 deletions core/pipeline/PipelineContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,6 @@ namespace logtail {
class Pipeline;
class FlusherSLS;

// for compatiblity with shennong profile
struct ProcessProfile {
int readBytes = 0;
int skipBytes = 0;
int feedLines = 0;
int splitLines = 0;
int parseFailures = 0;
int regexMatchFailures = 0;
int parseTimeFailures = 0;
int historyFailures = 0;
int logGroupSize = 0;

void Reset() { memset(this, 0, sizeof(ProcessProfile)); }
};

class PipelineContext {
public:
PipelineContext() {}
Expand Down Expand Up @@ -89,7 +74,6 @@ class PipelineContext {
bool IsFlushingThroughGoPipeline() const { return mIsFlushingThroughGoPipeline; }
void SetIsFlushingThroughGoPipelineFlag(bool flag) { mIsFlushingThroughGoPipeline = flag; }

ProcessProfile& GetProcessProfile() const { return mProcessProfile; }
const Logger::logger& GetLogger() const { return mLogger; }
AlarmManager& GetAlarm() const { return *mAlarm; };

Expand All @@ -111,7 +95,6 @@ class PipelineContext {
bool mHasNativeProcessors = false;
bool mIsFlushingThroughGoPipeline = false;

mutable ProcessProfile mProcessProfile;
Logger::logger mLogger = sLogger;
AlarmManager* mAlarm = AlarmManager::GetInstance();
};
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ProcessorInstance : public PluginInstance {
friend class ProcessorParseDelimiterNativeUnittest;
friend class ProcessorFilterNativeUnittest;
friend class ProcessorDesensitizeNativeUnittest;
friend class ProcessorSplitLogStringNativeUnittest;
friend class InputFileUnittest;
friend class InputPrometheusUnittest;
friend class PipelineUnittest;
Expand Down
7 changes: 0 additions & 7 deletions core/plugin/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ bool ProcessorParseApsaraNative::Init(const Json::Value& config) {
return false;
}

mLogGroupSize = &(GetContext().GetProcessProfile().logGroupSize);
mParseFailures = &(GetContext().GetProcessProfile().parseFailures);
mHistoryFailures = &(GetContext().GetProcessProfile().historyFailures);

mDiscardedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_DISCARDED_EVENTS_TOTAL);
mOutFailedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_FAILED_EVENTS_TOTAL);
mOutKeyNotFoundEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_KEY_NOT_FOUND_EVENTS_TOTAL);
Expand Down Expand Up @@ -159,7 +155,6 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
mOutFailedEventsTotal->Add(1);
++(*mParseFailures);
sourceEvent.DelContent(mSourceKey);
if (mCommonParserOptions.ShouldAddSourceContent(false)) {
AddLog(mCommonParserOptions.mRenamedSourceKey, buffer, sourceEvent, false);
Expand Down Expand Up @@ -195,7 +190,6 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mHistoryFailures);
mHistoryFailureTotal->Add(1);
mDiscardedEventsTotal->Add(1);
return false;
Expand Down Expand Up @@ -473,7 +467,6 @@ void ProcessorParseApsaraNative::AddLog(const StringView& key,
return;
}
targetEvent.AppendContentNoCopy(key, value);
*mLogGroupSize += key.size() + value.size() + 5;
}

bool ProcessorParseApsaraNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
4 changes: 0 additions & 4 deletions core/plugin/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ class ProcessorParseApsaraNative : public Processor {

int32_t mLogTimeZoneOffsetSecond = 0;

int* mLogGroupSize = nullptr;
int* mParseFailures = nullptr;
int* mHistoryFailures = nullptr;

CounterPtr mDiscardedEventsTotal;
CounterPtr mOutFailedEventsTotal;
CounterPtr mOutKeyNotFoundEventsTotal;
Expand Down
7 changes: 0 additions & 7 deletions core/plugin/processor/ProcessorParseDelimiterNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ bool ProcessorParseDelimiterNative::Init(const Json::Value& config) {
return false;
}

mParseFailures = &(GetContext().GetProcessProfile().parseFailures);
mLogGroupSize = &(GetContext().GetProcessProfile().logGroupSize);

mDiscardedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_DISCARDED_EVENTS_TOTAL);
mOutFailedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_FAILED_EVENTS_TOTAL);
mOutKeyNotFoundEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_KEY_NOT_FOUND_EVENTS_TOTAL);
Expand Down Expand Up @@ -297,7 +294,6 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe
GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion());
++(*mParseFailures);
parseSuccess = false;
}
} else {
Expand All @@ -307,7 +303,6 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe
GetContext().GetProjectName(),
GetContext().GetLogstoreName(),
GetContext().GetRegion());
++(*mParseFailures);
parseSuccess = false;
}
} else {
Expand All @@ -319,7 +314,6 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe
LOG_WARNING(sLogger,
("parse delimiter log fail", "no column keys defined")("project", GetContext().GetProjectName())(
"logstore", GetContext().GetLogstoreName())("file", logPath));
++(*mParseFailures);
parseSuccess = false;
}

Expand Down Expand Up @@ -417,7 +411,6 @@ void ProcessorParseDelimiterNative::AddLog(const StringView& key,
return;
}
targetEvent.SetContentNoCopy(key, value);
*mLogGroupSize += key.size() + value.size() + 5;
}

bool ProcessorParseDelimiterNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
3 changes: 0 additions & 3 deletions core/plugin/processor/ProcessorParseDelimiterNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ class ProcessorParseDelimiterNative : public Processor {
bool mSourceKeyOverwritten = false;
std::unique_ptr<DelimiterModeFsmParser> mDelimiterModeFsmParserPtr;

int* mLogGroupSize = nullptr;
int* mParseFailures = nullptr;

CounterPtr mDiscardedEventsTotal;
CounterPtr mOutFailedEventsTotal;
CounterPtr mOutKeyNotFoundEventsTotal;
Expand Down
6 changes: 0 additions & 6 deletions core/plugin/processor/ProcessorParseJsonNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ bool ProcessorParseJsonNative::Init(const Json::Value& config) {
return false;
}

mParseFailures = &(GetContext().GetProcessProfile().parseFailures);
mLogGroupSize = &(GetContext().GetProcessProfile().logGroupSize);

mDiscardedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_DISCARDED_EVENTS_TOTAL);
mOutFailedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_FAILED_EVENTS_TOTAL);
mOutKeyNotFoundEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_KEY_NOT_FOUND_EVENTS_TOTAL);
Expand Down Expand Up @@ -135,7 +132,6 @@ bool ProcessorParseJsonNative::JsonLogLineParser(LogEvent& sourceEvent,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mParseFailures);
mOutFailedEventsTotal->Add(1);
parseSuccess = false;
} else if (!doc.IsObject()) {
Expand All @@ -149,7 +145,6 @@ bool ProcessorParseJsonNative::JsonLogLineParser(LogEvent& sourceEvent,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mParseFailures);
mOutFailedEventsTotal->Add(1);
parseSuccess = false;
}
Expand Down Expand Up @@ -209,7 +204,6 @@ void ProcessorParseJsonNative::AddLog(const StringView& key,
return;
}
targetEvent.SetContentNoCopy(key, value);
*mLogGroupSize += key.size() + value.size() + 5;
}

bool ProcessorParseJsonNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
3 changes: 0 additions & 3 deletions core/plugin/processor/ProcessorParseJsonNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ class ProcessorParseJsonNative : public Processor {
bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e);
static std::string RapidjsonValueToString(const rapidjson::Value& value);

int* mParseFailures = nullptr;
int* mLogGroupSize = nullptr;

CounterPtr mDiscardedEventsTotal;
CounterPtr mOutFailedEventsTotal;
CounterPtr mOutKeyNotFoundEventsTotal;
Expand Down
9 changes: 0 additions & 9 deletions core/plugin/processor/ProcessorParseRegexNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ bool ProcessorParseRegexNative::Init(const Json::Value& config) {
return false;
}

mParseFailures = &(GetContext().GetProcessProfile().parseFailures);
mRegexMatchFailures = &(GetContext().GetProcessProfile().regexMatchFailures);
mLogGroupSize = &(GetContext().GetProcessProfile().logGroupSize);

mDiscardedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_DISCARDED_EVENTS_TOTAL);
mOutFailedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_FAILED_EVENTS_TOTAL);
mOutKeyNotFoundEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_KEY_NOT_FOUND_EVENTS_TOTAL);
Expand Down Expand Up @@ -178,7 +174,6 @@ void ProcessorParseRegexNative::AddLog(const StringView& key,
return;
}
targetEvent.SetContentNoCopy(key, value);
*mLogGroupSize += key.size() + value.size() + 5;
}

bool ProcessorParseRegexNative::RegexLogLineParser(LogEvent& sourceEvent,
Expand Down Expand Up @@ -218,8 +213,6 @@ bool ProcessorParseRegexNative::RegexLogLineParser(LogEvent& sourceEvent,
GetContext().GetRegion());
}
}
++(*mRegexMatchFailures);
++(*mParseFailures);
mOutFailedEventsTotal->Add(1);
parseSuccess = false;
} else if (what.size() <= keys.size()) {
Expand All @@ -237,8 +230,6 @@ bool ProcessorParseRegexNative::RegexLogLineParser(LogEvent& sourceEvent,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mRegexMatchFailures);
++(*mParseFailures);
parseSuccess = false;
}
if (!parseSuccess) {
Expand Down
4 changes: 0 additions & 4 deletions core/plugin/processor/ProcessorParseRegexNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ class ProcessorParseRegexNative : public Processor {
bool mIsWholeLineMode = false;
boost::regex mReg;

int* mParseFailures = nullptr;
int* mRegexMatchFailures = nullptr;
int* mLogGroupSize = nullptr;

CounterPtr mDiscardedEventsTotal;
CounterPtr mOutFailedEventsTotal;
CounterPtr mOutKeyNotFoundEventsTotal;
Expand Down
5 changes: 0 additions & 5 deletions core/plugin/processor/ProcessorParseTimestampNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ bool ProcessorParseTimestampNative::Init(const Json::Value& config) {
mContext->GetRegion());
}

mParseTimeFailures = &(GetContext().GetProcessProfile().parseTimeFailures);
mHistoryFailures = &(GetContext().GetProcessProfile().historyFailures);

mDiscardedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_DISCARDED_EVENTS_TOTAL);
mOutFailedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_FAILED_EVENTS_TOTAL);
mOutKeyNotFoundEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_OUT_KEY_NOT_FOUND_EVENTS_TOTAL);
Expand Down Expand Up @@ -164,7 +161,6 @@ bool ProcessorParseTimestampNative::ProcessEvent(StringView logPath,
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mHistoryFailures);
mHistoryFailureTotal->Add(1);
mDiscardedEventsTotal->Add(1);
return false;
Expand Down Expand Up @@ -221,7 +217,6 @@ bool ProcessorParseTimestampNative::ParseLogTime(const StringView& curTimeStr, /
GetContext().GetLogstoreName(),
GetContext().GetRegion());
}
++(*mParseTimeFailures);
return false;
}

Expand Down
3 changes: 0 additions & 3 deletions core/plugin/processor/ProcessorParseTimestampNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ class ProcessorParseTimestampNative : public Processor {

int32_t mLogTimeZoneOffsetSecond = 0;

int* mParseTimeFailures = nullptr;
int* mHistoryFailures = nullptr;

CounterPtr mDiscardedEventsTotal;
CounterPtr mOutFailedEventsTotal;
CounterPtr mOutKeyNotFoundEventsTotal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ bool ProcessorMergeMultilineLogNative::Init(const Json::Value& config) {
mContext->GetRegion());
}

mSplitLines = &(GetContext().GetProcessProfile().splitLines);

mMergedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MERGED_EVENTS_TOTAL);
mUnmatchedEventsTotal
= GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_UNMATCHED_EVENTS_TOTAL);
Expand All @@ -93,7 +91,6 @@ void ProcessorMergeMultilineLogNative::Process(PipelineEventGroup& logGroup) {
MergeLogsByFlag(logGroup);
logGroup.DelMetadata(EventGroupMetaKey::HAS_PART_LOG);
}
*mSplitLines = logGroup.GetEvents().size();
}

bool ProcessorMergeMultilineLogNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ class ProcessorMergeMultilineLogNative : public Processor {
CounterPtr mUnmatchedEventsTotal; // 未成功合并的日志条数
// CounterPtr mProcUnmatchedEventsBytes; // 未成功合并的日志字节数

int* mSplitLines = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorMergeMultilineLogNativeUnittest;
friend class ProcessorMergeMultilineLogDisacardUnmatchUnittest;
Expand Down
3 changes: 0 additions & 3 deletions core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ bool ProcessorSplitLogStringNative::Init(const Json::Value& config) {
mContext->GetRegion());
}

mSplitLines = &(GetContext().GetProcessProfile().splitLines);

return true;
}

Expand All @@ -94,7 +92,6 @@ void ProcessorSplitLogStringNative::Process(PipelineEventGroup& logGroup) {
for (PipelineEventPtr& e : logGroup.MutableEvents()) {
ProcessEvent(logGroup, std::move(e), newEvents);
}
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);
}

Expand Down
2 changes: 0 additions & 2 deletions core/plugin/processor/inner/ProcessorSplitLogStringNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class ProcessorSplitLogStringNative : public Processor {
void ProcessEvent(PipelineEventGroup& logGroup, PipelineEventPtr&& e, EventsContainer& newEvents);
StringView GetNextLine(StringView log, size_t begin);

int* mSplitLines = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorRegexStringNativeUnittest;
friend class ProcessorParseDelimiterNativeUnittest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ bool ProcessorSplitMultilineLogStringNative::Init(const Json::Value& config) {
mMatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_LINES_TOTAL);
mUnmatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_UNMATCHED_LINES_TOTAL);

mSplitLines = &(mContext->GetProcessProfile().splitLines);

return true;
}

Expand All @@ -105,7 +103,6 @@ void ProcessorSplitMultilineLogStringNative::Process(PipelineEventGroup& logGrou
}
mMatchedLinesTotal->Add(inputLines - unmatchLines);
mUnmatchedLinesTotal->Add(unmatchLines);
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ class ProcessorSplitMultilineLogStringNative : public Processor {
int* unmatchLines);
StringView GetNextLine(StringView log, size_t begin);

int* mSplitLines = nullptr;

CounterPtr mMatchedEventsTotal;
CounterPtr mMatchedLinesTotal;
CounterPtr mUnmatchedLinesTotal;
Expand Down
Loading

0 comments on commit eebbeb7

Please sign in to comment.