diff --git a/core/models/EventPool.cpp b/core/models/EventPool.cpp index 0a9035af71..5e50091750 100644 --- a/core/models/EventPool.cpp +++ b/core/models/EventPool.cpp @@ -67,6 +67,15 @@ SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) { return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt); } +RawEvent* EventPool::AcquireRawEvent(PipelineEventGroup* ptr) { + if (mEnableLock) { + TransferPoolIfEmpty(mRawEventPool, mRawEventPoolBak); + lock_guard lock(mPoolMux); + return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt); + } + return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt); +} + void EventPool::Release(vector&& obj) { if (mEnableLock) { lock_guard lock(mPoolBakMux); @@ -94,6 +103,15 @@ void EventPool::Release(vector&& obj) { } } +void EventPool::Release(vector&& obj) { + if (mEnableLock) { + lock_guard lock(mPoolBakMux); + mRawEventPoolBak.insert(mRawEventPoolBak.end(), obj.begin(), obj.end()); + } else { + mRawEventPool.insert(mRawEventPool.end(), obj.begin(), obj.end()); + } +} + template void DoGC(vector& pool, vector& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) { if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits::max()) { @@ -131,10 +149,12 @@ void EventPool::CheckGC() { DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log"); DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric"); DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span"); + DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, &mPoolBakMux, "raw"); } else { DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log"); DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric"); DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span"); + DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, nullptr, "raw"); } mLastGCTime = time(nullptr); } @@ -150,6 +170,9 @@ void EventPool::DestroyAllEventPool() { for (auto& item : mSpanEventPool) { delete item; } + for (auto& item : mRawEventPool) { + delete item; + } } void EventPool::DestroyAllEventPoolBak() { @@ -162,6 +185,9 @@ void EventPool::DestroyAllEventPoolBak() { for (auto& item : mSpanEventPoolBak) { delete item; } + for (auto& item : mRawEventPoolBak) { + delete item; + } } #ifdef APSARA_UNIT_TEST_MAIN @@ -172,9 +198,11 @@ void EventPool::Clear() { mLogEventPool.clear(); mMetricEventPool.clear(); mSpanEventPool.clear(); + mRawEventPool.clear(); mMinUnusedLogEventsCnt = numeric_limits::max(); mMinUnusedMetricEventsCnt = numeric_limits::max(); mMinUnusedSpanEventsCnt = numeric_limits::max(); + mMinUnusedRawEventsCnt = numeric_limits::max(); } { lock_guard lock(mPoolBakMux); @@ -182,6 +210,7 @@ void EventPool::Clear() { mLogEventPoolBak.clear(); mMetricEventPoolBak.clear(); mSpanEventPoolBak.clear(); + mRawEventPoolBak.clear(); } mLastGCTime = 0; } diff --git a/core/models/EventPool.h b/core/models/EventPool.h index 4fd1f9f17d..33808e783c 100644 --- a/core/models/EventPool.h +++ b/core/models/EventPool.h @@ -24,6 +24,7 @@ #include "models/LogEvent.h" #include "models/MetricEvent.h" +#include "models/RawEvent.h" #include "models/SpanEvent.h" namespace logtail { @@ -39,9 +40,11 @@ class EventPool { LogEvent* AcquireLogEvent(PipelineEventGroup* ptr); MetricEvent* AcquireMetricEvent(PipelineEventGroup* ptr); SpanEvent* AcquireSpanEvent(PipelineEventGroup* ptr); + RawEvent* AcquireRawEvent(PipelineEventGroup* ptr); void Release(std::vector&& obj); void Release(std::vector&& obj); void Release(std::vector&& obj); + void Release(std::vector&& obj); void CheckGC(); #ifdef APSARA_UNIT_TEST_MAIN @@ -80,16 +83,19 @@ class EventPool { std::vector mLogEventPool; std::vector mMetricEventPool; std::vector mSpanEventPool; + std::vector mRawEventPool; // only meaningful when mEnableLock is true std::mutex mPoolBakMux; std::vector mLogEventPoolBak; std::vector mMetricEventPoolBak; std::vector mSpanEventPoolBak; + std::vector mRawEventPoolBak; size_t mMinUnusedLogEventsCnt = std::numeric_limits::max(); size_t mMinUnusedMetricEventsCnt = std::numeric_limits::max(); size_t mMinUnusedSpanEventsCnt = std::numeric_limits::max(); + size_t mMinUnusedRawEventsCnt = std::numeric_limits::max(); time_t mLastGCTime = 0; diff --git a/core/models/PipelineEvent.cpp b/core/models/PipelineEvent.cpp index 249b82d010..840228bb1b 100644 --- a/core/models/PipelineEvent.cpp +++ b/core/models/PipelineEvent.cpp @@ -28,17 +28,20 @@ StringView gEmptyStringView; const string& PipelineEventTypeToString(PipelineEvent::Type t) { switch (t) { case PipelineEvent::Type::LOG: - static string logname = "Log"; - return logname; + static string logName = "Log"; + return logName; case PipelineEvent::Type::METRIC: - static string metricname = "Metric"; - return metricname; + static string metricName = "Metric"; + return metricName; case PipelineEvent::Type::SPAN: - static string spanname = "Span"; - return spanname; + static string spanName = "Span"; + return spanName; + case PipelineEvent::Type::RAW: + static string rawName = "Raw"; + return rawName; default: - static string voidname = ""; - return voidname; + static string voidName = ""; + return voidName; } } diff --git a/core/models/PipelineEvent.h b/core/models/PipelineEvent.h index d25cc00184..e766a23689 100644 --- a/core/models/PipelineEvent.h +++ b/core/models/PipelineEvent.h @@ -36,7 +36,7 @@ class PipelineEventGroup; class PipelineEvent { public: - enum class Type { NONE, LOG, METRIC, SPAN }; + enum class Type { NONE, LOG, METRIC, SPAN, RAW }; virtual ~PipelineEvent() = default; diff --git a/core/models/PipelineEventGroup.cpp b/core/models/PipelineEventGroup.cpp index 35ebd35a0d..61562e483f 100644 --- a/core/models/PipelineEventGroup.cpp +++ b/core/models/PipelineEventGroup.cpp @@ -87,6 +87,9 @@ PipelineEventGroup::~PipelineEventGroup() { case PipelineEvent::Type::SPAN: DestroyEvents(std::move(mEvents)); break; + case PipelineEvent::Type::RAW: + DestroyEvents(std::move(mEvents)); + break; default: break; } @@ -159,6 +162,20 @@ unique_ptr PipelineEventGroup::CreateSpanEvent(bool fromPool, EventPo return unique_ptr(e); } +unique_ptr PipelineEventGroup::CreateRawEvent(bool fromPool, EventPool* pool) { + RawEvent* e = nullptr; + if (fromPool) { + if (pool) { + e = pool->AcquireRawEvent(this); + } else { + e = gThreadedEventPool.AcquireRawEvent(this); + } + } else { + e = new RawEvent(this); + } + return unique_ptr(e); +} + LogEvent* PipelineEventGroup::AddLogEvent(bool fromPool, EventPool* pool) { LogEvent* e = nullptr; if (fromPool) { @@ -204,6 +221,21 @@ SpanEvent* PipelineEventGroup::AddSpanEvent(bool fromPool, EventPool* pool) { return e; } +RawEvent* PipelineEventGroup::AddRawEvent(bool fromPool, EventPool* pool) { + RawEvent* e = nullptr; + if (fromPool) { + if (pool) { + e = pool->AcquireRawEvent(this); + } else { + e = gThreadedEventPool.AcquireRawEvent(this); + } + } else { + e = new RawEvent(this); + } + mEvents.emplace_back(e, fromPool, pool); + return e; +} + void PipelineEventGroup::SetMetadata(EventGroupMetaKey key, StringView val) { SetMetadataNoCopy(key, mSourceBuffer->CopyString(val)); } @@ -404,8 +436,10 @@ bool PipelineEventGroup::FromJson(const Json::Value& root) { AddLogEvent()->FromJson(event); } else if (event["type"].asInt() == static_cast(PipelineEvent::Type::METRIC)) { AddMetricEvent()->FromJson(event); - } else { + } else if (event["type"].asInt() == static_cast(PipelineEvent::Type::SPAN)) { AddSpanEvent()->FromJson(event); + } else { + AddRawEvent()->FromJson(event); } } } diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index 44ddd76981..5831a581d1 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -81,12 +81,14 @@ class PipelineEventGroup { std::unique_ptr CreateLogEvent(bool fromPool = false, EventPool* pool = nullptr); std::unique_ptr CreateMetricEvent(bool fromPool = false, EventPool* pool = nullptr); std::unique_ptr CreateSpanEvent(bool fromPool = false, EventPool* pool = nullptr); + std::unique_ptr CreateRawEvent(bool fromPool = false, EventPool* pool = nullptr); const EventsContainer& GetEvents() const { return mEvents; } EventsContainer& MutableEvents() { return mEvents; } LogEvent* AddLogEvent(bool fromPool = false, EventPool* pool = nullptr); MetricEvent* AddMetricEvent(bool fromPool = false, EventPool* pool = nullptr); SpanEvent* AddSpanEvent(bool fromPool = false, EventPool* pool = nullptr); + RawEvent* AddRawEvent(bool fromPool = false, EventPool* pool = nullptr); void SwapEvents(EventsContainer& other) { mEvents.swap(other); } void ReserveEvents(size_t size) { mEvents.reserve(size); } diff --git a/core/models/PipelineEventPtr.h b/core/models/PipelineEventPtr.h index ca0cea282d..99d64a6238 100644 --- a/core/models/PipelineEventPtr.h +++ b/core/models/PipelineEventPtr.h @@ -23,6 +23,7 @@ #include "models/MetricEvent.h" #include "models/PipelineEvent.h" #include "models/SpanEvent.h" +#include "models/RawEvent.h" namespace logtail { class EventPool; @@ -47,6 +48,9 @@ class PipelineEventPtr { if (typeid(T) == typeid(SpanEvent)) { return mData->GetType() == PipelineEvent::Type::SPAN; } + if (typeid(T) == typeid(RawEvent)) { + return mData->GetType() == PipelineEvent::Type::RAW; + } return false; } template diff --git a/core/models/RawEvent.cpp b/core/models/RawEvent.cpp new file mode 100644 index 0000000000..d95403dfdf --- /dev/null +++ b/core/models/RawEvent.cpp @@ -0,0 +1,76 @@ +/* + * Copyright 2023 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "models/RawEvent.h" + +using namespace std; + +namespace logtail { + +RawEvent::RawEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::RAW, ptr) { +} + +unique_ptr RawEvent::Copy() const { + return make_unique(*this); +} + +void RawEvent::Reset() { + PipelineEvent::Reset(); + mContent.clear(); +} + +void RawEvent::SetContent(const std::string& content) { + SetContentNoCopy(GetSourceBuffer()->CopyString(content)); +} + +void RawEvent::SetContentNoCopy(StringView content) { + mContent = content; +} + +void RawEvent::SetContentNoCopy(const StringBuffer& content) { + mContent = StringView(content.data, content.size); +} + +size_t RawEvent::DataSize() const { + return PipelineEvent::DataSize() + mContent.size(); +} + +#ifdef APSARA_UNIT_TEST_MAIN +Json::Value RawEvent::ToJson(bool enableEventMeta) const { + Json::Value root; + root["type"] = static_cast(GetType()); + root["timestamp"] = GetTimestamp(); + if (GetTimestampNanosecond()) { + root["timestampNanosecond"] = static_cast(GetTimestampNanosecond().value()); + } + root["content"] = mContent.to_string(); + return root; +} + +bool RawEvent::FromJson(const Json::Value& root) { + if (root.isMember("timestampNanosecond")) { + SetTimestamp(root["timestamp"].asInt64(), root["timestampNanosecond"].asInt64()); + } else { + SetTimestamp(root["timestamp"].asInt64()); + } + if (root.isMember("content")) { + SetContent(root["content"].asString()); + } + return true; +} +#endif + +} // namespace logtail diff --git a/core/models/RawEvent.h b/core/models/RawEvent.h new file mode 100644 index 0000000000..0e5af870cb --- /dev/null +++ b/core/models/RawEvent.h @@ -0,0 +1,49 @@ +/* + * Copyright 2023 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "models/PipelineEvent.h" + +namespace logtail { + +class RawEvent : public PipelineEvent { + friend class PipelineEventGroup; + friend class EventPool; + +public: + std::unique_ptr Copy() const override; + void Reset() override; + + StringView GetContent() const { return mContent; } + void SetContent(const std::string& content); + void SetContentNoCopy(StringView content); + void SetContentNoCopy(const StringBuffer& content); + + size_t DataSize() const override; + +#ifdef APSARA_UNIT_TEST_MAIN + Json::Value ToJson(bool enableEventMeta = false) const override; + bool FromJson(const Json::Value&) override; +#endif + +private: + RawEvent(PipelineEventGroup* ptr); + + StringView mContent; +}; + +} // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index af9bfb4ac0..fcaf358343 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -73,6 +73,8 @@ bool Pipeline::Init(PipelineConfig&& config) { mContext.SetCreateTime(config.mCreateTime); mContext.SetPipeline(*this); mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson); + mContext.SetHasNativeProcessorsFlag(config.mHasNativeProcessor); + mContext.SetIsFlushingThroughGoPipelineFlag(config.IsFlushingThroughGoPipelineExisted()); // for special treatment below const InputFile* inputFile = nullptr; diff --git a/core/pipeline/PipelineContext.h b/core/pipeline/PipelineContext.h index e7aa2085ac..7bbafa7b0a 100644 --- a/core/pipeline/PipelineContext.h +++ b/core/pipeline/PipelineContext.h @@ -76,6 +76,7 @@ class PipelineContext { QueueKey GetLogstoreKey() const; const FlusherSLS* GetSLSInfo() const { return mSLSInfo; } void SetSLSInfo(const FlusherSLS* flusherSLS) { mSLSInfo = flusherSLS; } + bool RequiringJsonReader() const { return mRequiringJsonReader; } void SetRequiringJsonReaderFlag(bool flag) { mRequiringJsonReader = flag; } bool IsFirstProcessorApsara() const { return mIsFirstProcessorApsara; } @@ -84,6 +85,10 @@ class PipelineContext { void SetIsFirstProcessorJsonFlag(bool flag) { mIsFirstProcessorJson = flag; } bool IsExactlyOnceEnabled() const {return mEnableExactlyOnce; } void SetExactlyOnceFlag(bool flag) { mEnableExactlyOnce = flag; } + bool HasNativeProcessors() const { return mHasNativeProcessors; } + void SetHasNativeProcessorsFlag(bool flag) { mHasNativeProcessors = flag; } + bool IsFlushingThroughGoPipeline() const { return mIsFlushingThroughGoPipeline; } + void SetIsFlushingThroughGoPipelineFlag(bool flag) { mIsFlushingThroughGoPipeline = flag; } ProcessProfile& GetProcessProfile() const { return mProcessProfile; } // LogFileProfiler& GetProfiler() { return *mProfiler; } @@ -99,12 +104,14 @@ class PipelineContext { QueueKey mProcessQueueKey = -1; Pipeline* mPipeline = nullptr; - // special members for compatability const FlusherSLS* mSLSInfo = nullptr; + // for input_file only bool mRequiringJsonReader = false; bool mIsFirstProcessorApsara = false; bool mIsFirstProcessorJson = false; bool mEnableExactlyOnce = false; + bool mHasNativeProcessors = false; + bool mIsFlushingThroughGoPipeline = false; mutable ProcessProfile mProcessProfile; // LogFileProfiler* mProfiler = LogFileProfiler::GetInstance(); diff --git a/core/pipeline/batch/BatchedEvents.cpp b/core/pipeline/batch/BatchedEvents.cpp index 43a63ea9e4..c2c951244e 100644 --- a/core/pipeline/batch/BatchedEvents.cpp +++ b/core/pipeline/batch/BatchedEvents.cpp @@ -66,6 +66,9 @@ BatchedEvents::~BatchedEvents() { case PipelineEvent::Type::SPAN: DestroyEvents(std::move(mEvents)); break; + case PipelineEvent::Type::RAW: + DestroyEvents(std::move(mEvents)); + break; default: break; } diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index a6b8d7b803..9ffb6b1541 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -109,6 +109,16 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri break; case PipelineEvent::Type::SPAN: break; + case PipelineEvent::Type::RAW: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + if (e.GetContent().empty()) { + continue; + } + size_t contentSZ = GetLogContentSize(DEFAULT_CONTENT_KEY.size(), e.GetContent().size()); + logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]); + } + break; default: break; } @@ -138,33 +148,44 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri switch (eventType) { case PipelineEvent::Type::LOG: for (size_t i = 0; i < group.mEvents.size(); ++i) { - const auto& logEvent = group.mEvents[i].Cast(); + const auto& e = group.mEvents[i].Cast(); serializer.StartToAddLog(logSZ[i]); - serializer.AddLogTime(logEvent.GetTimestamp()); - for (const auto& kv : logEvent) { + serializer.AddLogTime(e.GetTimestamp()); + for (const auto& kv : e) { serializer.AddLogContent(kv.first, kv.second); } - if (enableNs && logEvent.GetTimestampNanosecond()) { - serializer.AddLogTimeNs(logEvent.GetTimestampNanosecond().value()); + if (enableNs && e.GetTimestampNanosecond()) { + serializer.AddLogTimeNs(e.GetTimestampNanosecond().value()); } } break; case PipelineEvent::Type::METRIC: for (size_t i = 0; i < group.mEvents.size(); ++i) { - const auto& metricEvent = group.mEvents[i].Cast(); - if (metricEvent.Is()) { + const auto& e = group.mEvents[i].Cast(); + if (e.Is()) { continue; } serializer.StartToAddLog(logSZ[i]); - serializer.AddLogTime(metricEvent.GetTimestamp()); - serializer.AddLogContentMetricLabel(metricEvent, metricEventContentCache[i].second); - serializer.AddLogContentMetricTimeNano(metricEvent); + serializer.AddLogTime(e.GetTimestamp()); + serializer.AddLogContentMetricLabel(e, metricEventContentCache[i].second); + serializer.AddLogContentMetricTimeNano(e); serializer.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentCache[i].first); - serializer.AddLogContent(METRIC_RESERVED_KEY_NAME, metricEvent.GetName()); + serializer.AddLogContent(METRIC_RESERVED_KEY_NAME, e.GetName()); } break; case PipelineEvent::Type::SPAN: break; + case PipelineEvent::Type::RAW: + for (size_t i = 0; i < group.mEvents.size(); ++i) { + const auto& e = group.mEvents[i].Cast(); + serializer.StartToAddLog(logSZ[i]); + serializer.AddLogTime(e.GetTimestamp()); + serializer.AddLogContent(DEFAULT_CONTENT_KEY, e.GetContent()); + if (enableNs && e.GetTimestampNanosecond()) { + serializer.AddLogTimeNs(e.GetTimestampNanosecond().value()); + } + } + break; default: break; } diff --git a/core/plugin/input/InputFile.cpp b/core/plugin/input/InputFile.cpp index b8186797b8..b786ecc86e 100644 --- a/core/plugin/input/InputFile.cpp +++ b/core/plugin/input/InputFile.cpp @@ -227,6 +227,9 @@ bool InputFile::CreateInnerProcessors() { ProcessorSplitLogStringNative::sName, mContext->GetPipeline().GenNextPluginMeta(false)); detail["AppendingLogPositionMeta"] = Json::Value(mFileReader.mAppendingLogPositionMeta); } + detail["EnableRawContent"] + = Json::Value(!mContext->HasNativeProcessors() && !mContext->IsExactlyOnceEnabled() + && !mContext->IsFlushingThroughGoPipeline() && !mFileReader.mAppendingLogPositionMeta); if (!processor->Init(detail, *mContext)) { // should not happen return false; diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp index ad61db1a19..8f8424050a 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp @@ -68,6 +68,19 @@ bool ProcessorSplitLogStringNative::Init(const Json::Value& config) { mContext->GetRegion()); } + // EnableRawContent + if (!GetOptionalBoolParam(config, "EnableRawContent", mEnableRawContent, errorMsg)) { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + mEnableRawContent, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + mSplitLines = &(GetContext().GetProcessProfile().splitLines); return true; @@ -133,22 +146,29 @@ void ProcessorSplitLogStringNative::ProcessEvent(PipelineEventGroup& logGroup, size_t begin = 0; while (begin < sourceVal.size()) { - std::unique_ptr targetEvent = logGroup.CreateLogEvent(true); StringView content = GetNextLine(sourceVal, begin); - targetEvent->SetContentNoCopy(StringView(sourceKey.data, sourceKey.size), content); - targetEvent->SetTimestamp( - sourceEvent.GetTimestamp(), - sourceEvent.GetTimestampNanosecond()); // it is easy to forget other fields, better solution? - auto const offset = sourceEvent.GetPosition().first + (content.data() - sourceVal.data()); - auto const length = begin + content.size() == sourceVal.size() - ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) - : content.size() + 1; - targetEvent->SetPosition(offset, length); - if (mAppendingLogPositionMeta) { - StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); - targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + if (mEnableRawContent) { + std::unique_ptr targetEvent = logGroup.CreateRawEvent(true); + targetEvent->SetContentNoCopy(content); + targetEvent->SetTimestamp(sourceEvent.GetTimestamp(), sourceEvent.GetTimestampNanosecond()); + newEvents.emplace_back(std::move(targetEvent), true, nullptr); + } else { + std::unique_ptr targetEvent = logGroup.CreateLogEvent(true); + targetEvent->SetContentNoCopy(StringView(sourceKey.data, sourceKey.size), content); + targetEvent->SetTimestamp( + sourceEvent.GetTimestamp(), + sourceEvent.GetTimestampNanosecond()); // it is easy to forget other fields, better solution? + auto const offset = sourceEvent.GetPosition().first + (content.data() - sourceVal.data()); + auto const length = begin + content.size() == sourceVal.size() + ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) + : content.size() + 1; + targetEvent->SetPosition(offset, length); + if (mAppendingLogPositionMeta) { + StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); + targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + } + newEvents.emplace_back(std::move(targetEvent), true, nullptr); } - newEvents.emplace_back(std::move(targetEvent), true, nullptr); begin += content.size() + 1; } } diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.h b/core/plugin/processor/inner/ProcessorSplitLogStringNative.h index 54c3b039d6..0bec830b23 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.h +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.h @@ -31,6 +31,7 @@ class ProcessorSplitLogStringNative : public Processor { std::string mSourceKey = DEFAULT_CONTENT_KEY; char mSplitChar = '\n'; bool mAppendingLogPositionMeta = false; + bool mEnableRawContent = false; const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override; diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp index bb09a535cb..9afb6b57dd 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp @@ -20,8 +20,8 @@ #include #include "app_config/AppConfig.h" -#include "constants/Constants.h" #include "common/ParamExtractor.h" +#include "constants/Constants.h" #include "logger/Logger.h" #include "models/LogEvent.h" #include "monitor/metric_constants/MetricConstants.h" @@ -64,6 +64,19 @@ bool ProcessorSplitMultilineLogStringNative::Init(const Json::Value& config) { mContext->GetRegion()); } + // EnableRawContent + if (!GetOptionalBoolParam(config, "EnableRawContent", mEnableRawContent, errorMsg)) { + PARAM_WARNING_DEFAULT(mContext->GetLogger(), + mContext->GetAlarm(), + errorMsg, + mEnableRawContent, + sName, + mContext->GetConfigName(), + mContext->GetProjectName(), + mContext->GetLogstoreName(), + mContext->GetRegion()); + } + mMatchedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_EVENTS_TOTAL); mMatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_LINES_TOTAL); mUnmatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_UNMATCHED_LINES_TOTAL); @@ -303,21 +316,28 @@ void ProcessorSplitMultilineLogStringNative::CreateNewEvent(const StringView& co const LogEvent& sourceEvent, PipelineEventGroup& logGroup, EventsContainer& newEvents) { - StringView sourceVal = sourceEvent.GetContent(mSourceKey); - std::unique_ptr targetEvent = logGroup.CreateLogEvent(true); - targetEvent->SetContentNoCopy(StringView(sourceKey.data, sourceKey.size), content); - targetEvent->SetTimestamp( - sourceEvent.GetTimestamp(), - sourceEvent.GetTimestampNanosecond()); // it is easy to forget other fields, better solution? - auto const offset = sourceEvent.GetPosition().first + (content.data() - sourceVal.data()); - auto const length - = isLastLog ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) : content.size() + 1; - targetEvent->SetPosition(offset, length); - if (mAppendingLogPositionMeta) { - StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); - targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + if (mEnableRawContent) { + std::unique_ptr targetEvent = logGroup.CreateRawEvent(true); + targetEvent->SetContentNoCopy(content); + targetEvent->SetTimestamp(sourceEvent.GetTimestamp(), sourceEvent.GetTimestampNanosecond()); + newEvents.emplace_back(std::move(targetEvent), true, nullptr); + } else { + StringView sourceVal = sourceEvent.GetContent(mSourceKey); + std::unique_ptr targetEvent = logGroup.CreateLogEvent(true); + targetEvent->SetContentNoCopy(StringView(sourceKey.data, sourceKey.size), content); + targetEvent->SetTimestamp( + sourceEvent.GetTimestamp(), + sourceEvent.GetTimestampNanosecond()); // it is easy to forget other fields, better solution? + auto const offset = sourceEvent.GetPosition().first + (content.data() - sourceVal.data()); + auto const length + = isLastLog ? sourceEvent.GetPosition().second - (content.data() - sourceVal.data()) : content.size() + 1; + targetEvent->SetPosition(offset, length); + if (mAppendingLogPositionMeta) { + StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); + targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); + } + newEvents.emplace_back(std::move(targetEvent), true, nullptr); } - newEvents.emplace_back(std::move(targetEvent), true, nullptr); } void ProcessorSplitMultilineLogStringNative::HandleUnmatchLogs(const StringView& sourceVal, diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h index 5b6e37c42c..f32de10618 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h @@ -33,6 +33,7 @@ class ProcessorSplitMultilineLogStringNative : public Processor { std::string mSourceKey = DEFAULT_CONTENT_KEY; MultilineOptions mMultiline; bool mAppendingLogPositionMeta = false; + bool mEnableRawContent = false; const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override; diff --git a/core/unittest/input/InputFileUnittest.cpp b/core/unittest/input/InputFileUnittest.cpp index 17b334831b..057dff17e2 100644 --- a/core/unittest/input/InputFileUnittest.cpp +++ b/core/unittest/input/InputFileUnittest.cpp @@ -21,10 +21,10 @@ #include "app_config/AppConfig.h" #include "common/JsonUtil.h" #include "file_server/FileServer.h" -#include "plugin/input/InputFile.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineContext.h" #include "pipeline/plugin/PluginRegistry.h" +#include "plugin/input/InputFile.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "plugin/processor/inner/ProcessorTagNative.h" @@ -258,6 +258,7 @@ void InputFileUnittest::TestCreateInnerProcessors() { APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\n', plugin->mSplitChar); APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); + APSARA_TEST_FALSE(plugin->mEnableRawContent); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); } { @@ -293,6 +294,7 @@ void InputFileUnittest::TestCreateInnerProcessors() { APSARA_TEST_EQUAL(MultilineOptions::UnmatchedContentTreatment::DISCARD, plugin->mMultiline.mUnmatchedContentTreatment); APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); + APSARA_TEST_FALSE(plugin->mEnableRawContent); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); } { @@ -317,6 +319,7 @@ void InputFileUnittest::TestCreateInnerProcessors() { APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\0', plugin->mSplitChar); APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); + APSARA_TEST_FALSE(plugin->mEnableRawContent); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); ctx.SetIsFirstProcessorJsonFlag(false); } @@ -345,9 +348,92 @@ void InputFileUnittest::TestCreateInnerProcessors() { APSARA_TEST_EQUAL(DEFAULT_CONTENT_KEY, plugin->mSourceKey); APSARA_TEST_EQUAL('\0', plugin->mSplitChar); APSARA_TEST_TRUE(plugin->mAppendingLogPositionMeta); + APSARA_TEST_FALSE(plugin->mEnableRawContent); APSARA_TEST_EQUAL(ProcessorTagNative::sName, input->mInnerProcessors[1]->Name()); ctx.SetIsFirstProcessorJsonFlag(false); } + { + // disable raw content: has native processor + ctx.SetHasNativeProcessorsFlag(true); + configStr = R"( + { + "Type": "input_file", + "FilePaths": [] + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + configJson["FilePaths"].append(Json::Value(filePath.string())); + input.reset(new InputFile()); + input->SetContext(ctx); + input->SetMetricsRecordRef(InputFile::sName, "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(2U, input->mInnerProcessors.size()); + APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, input->mInnerProcessors[0]->Name()); + auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); + APSARA_TEST_FALSE(plugin->mEnableRawContent); + ctx.SetHasNativeProcessorsFlag(false); + } + { + // disable raw content: exactly once + ctx.SetExactlyOnceFlag(true); + configStr = R"( + { + "Type": "input_file", + "FilePaths": [] + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + configJson["FilePaths"].append(Json::Value(filePath.string())); + input.reset(new InputFile()); + input->SetContext(ctx); + input->SetMetricsRecordRef(InputFile::sName, "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(2U, input->mInnerProcessors.size()); + APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, input->mInnerProcessors[0]->Name()); + auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); + APSARA_TEST_FALSE(plugin->mEnableRawContent); + ctx.SetExactlyOnceFlag(false); + } + { + // disable raw content: flushing through go pipeline + ctx.SetIsFlushingThroughGoPipelineFlag(true); + configStr = R"( + { + "Type": "input_file", + "FilePaths": [] + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + configJson["FilePaths"].append(Json::Value(filePath.string())); + input.reset(new InputFile()); + input->SetContext(ctx); + input->SetMetricsRecordRef(InputFile::sName, "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(2U, input->mInnerProcessors.size()); + APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, input->mInnerProcessors[0]->Name()); + auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); + APSARA_TEST_FALSE(plugin->mEnableRawContent); + ctx.SetIsFlushingThroughGoPipelineFlag(false); + } + { + // enable raw content + configStr = R"( + { + "Type": "input_file", + "FilePaths": [] + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + configJson["FilePaths"].append(Json::Value(filePath.string())); + input.reset(new InputFile()); + input->SetContext(ctx); + input->SetMetricsRecordRef(InputFile::sName, "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(2U, input->mInnerProcessors.size()); + APSARA_TEST_EQUAL(ProcessorSplitLogStringNative::sName, input->mInnerProcessors[0]->Name()); + auto plugin = static_cast(input->mInnerProcessors[0]->mPlugin.get()); + APSARA_TEST_TRUE(plugin->mEnableRawContent); + } } void InputFileUnittest::OnPipelineUpdate() { diff --git a/core/unittest/models/CMakeLists.txt b/core/unittest/models/CMakeLists.txt index baec30366f..1ec6afe838 100644 --- a/core/unittest/models/CMakeLists.txt +++ b/core/unittest/models/CMakeLists.txt @@ -30,6 +30,9 @@ target_link_libraries(metric_event_unittest ${UT_BASE_TARGET}) add_executable(span_event_unittest SpanEventUnittest.cpp) target_link_libraries(span_event_unittest ${UT_BASE_TARGET}) +add_executable(raw_event_unittest RawEventUnittest.cpp) +target_link_libraries(raw_event_unittest ${UT_BASE_TARGET}) + add_executable(pipeline_event_ptr_unittest PipelineEventPtrUnittest.cpp) target_link_libraries(pipeline_event_ptr_unittest ${UT_BASE_TARGET}) @@ -45,6 +48,7 @@ gtest_discover_tests(log_event_unittest) gtest_discover_tests(metric_value_unittest) gtest_discover_tests(metric_event_unittest) gtest_discover_tests(span_event_unittest) +gtest_discover_tests(raw_event_unittest) gtest_discover_tests(pipeline_event_ptr_unittest) gtest_discover_tests(pipeline_event_group_unittest) gtest_discover_tests(event_pool_unittest) diff --git a/core/unittest/models/EventPoolUnittest.cpp b/core/unittest/models/EventPoolUnittest.cpp index 3cc5689d28..21f580b9b9 100644 --- a/core/unittest/models/EventPoolUnittest.cpp +++ b/core/unittest/models/EventPoolUnittest.cpp @@ -75,6 +75,19 @@ void EventPoolUnittest::TestNoLock() { APSARA_TEST_EQUAL(0U, pool.mMinUnusedSpanEventsCnt); APSARA_TEST_EQUAL(&g, e->GetPipelineEventGroupPtr()); } + { + auto e = pool.AcquireRawEvent(mGroup.get()); + pool.Release({e}); + APSARA_TEST_EQUAL(1U, pool.mRawEventPool.size()); + APSARA_TEST_EQUAL(e, pool.mRawEventPool[0]); + APSARA_TEST_EQUAL(mGroup.get(), e->GetPipelineEventGroupPtr()); + + PipelineEventGroup g(make_shared()); + e = pool.AcquireRawEvent(&g); + APSARA_TEST_EQUAL(0U, pool.mRawEventPool.size()); + APSARA_TEST_EQUAL(0U, pool.mMinUnusedRawEventsCnt); + APSARA_TEST_EQUAL(&g, e->GetPipelineEventGroupPtr()); + } } void EventPoolUnittest::TestLock() { @@ -134,6 +147,24 @@ void EventPoolUnittest::TestLock() { APSARA_TEST_EQUAL(0U, pool.mSpanEventPoolBak.size()); APSARA_TEST_EQUAL(1U, pool.mSpanEventPool.size()); } + { + auto e = pool.AcquireRawEvent(mGroup.get()); + auto e1 = pool.AcquireRawEvent(mGroup.get()); + pool.Release({e}); + APSARA_TEST_EQUAL(1U, pool.mRawEventPoolBak.size()); + APSARA_TEST_EQUAL(e, pool.mRawEventPoolBak[0]); + APSARA_TEST_EQUAL(mGroup.get(), e->GetPipelineEventGroupPtr()); + + e = pool.AcquireRawEvent(mGroup.get()); + APSARA_TEST_EQUAL(0U, pool.mRawEventPoolBak.size()); + APSARA_TEST_EQUAL(0U, pool.mRawEventPool.size()); + APSARA_TEST_EQUAL(mGroup.get(), e->GetPipelineEventGroupPtr()); + + pool.Release(vector{e, e1}); + pool.AcquireRawEvent(mGroup.get()); + APSARA_TEST_EQUAL(0U, pool.mRawEventPoolBak.size()); + APSARA_TEST_EQUAL(1U, pool.mRawEventPool.size()); + } } void EventPoolUnittest::TestGC() { diff --git a/core/unittest/models/PipelineEventGroupUnittest.cpp b/core/unittest/models/PipelineEventGroupUnittest.cpp index f2a4b41946..3a308faca5 100644 --- a/core/unittest/models/PipelineEventGroupUnittest.cpp +++ b/core/unittest/models/PipelineEventGroupUnittest.cpp @@ -59,25 +59,31 @@ void PipelineEventGroupUnittest::TestCreateEvent() { auto logEvent = mEventGroup->CreateLogEvent(); auto metricEvent = mEventGroup->CreateMetricEvent(); auto spanEvent = mEventGroup->CreateSpanEvent(); + auto rawEvent = mEventGroup->CreateRawEvent(); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } { auto logEvent = mEventGroup->CreateLogEvent(true); auto metricEvent = mEventGroup->CreateMetricEvent(true); auto spanEvent = mEventGroup->CreateSpanEvent(true); + auto rawEvent = mEventGroup->CreateRawEvent(true); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } { auto logEvent = mEventGroup->CreateLogEvent(true, &mPool); auto metricEvent = mEventGroup->CreateMetricEvent(true, &mPool); auto spanEvent = mEventGroup->CreateSpanEvent(true, &mPool); + auto rawEvent = mEventGroup->CreateRawEvent(true, &mPool); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } } @@ -86,25 +92,31 @@ void PipelineEventGroupUnittest::TestAddEvent() { auto logEvent = mEventGroup->AddLogEvent(); auto metricEvent = mEventGroup->AddMetricEvent(); auto spanEvent = mEventGroup->AddSpanEvent(); + auto rawEvent = mEventGroup->AddRawEvent(); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } { auto logEvent = mEventGroup->AddLogEvent(true); auto metricEvent = mEventGroup->AddMetricEvent(true); auto spanEvent = mEventGroup->AddSpanEvent(true); + auto rawEvent = mEventGroup->AddRawEvent(true); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } { auto logEvent = mEventGroup->AddLogEvent(true, &mPool); auto metricEvent = mEventGroup->AddMetricEvent(true, &mPool); auto spanEvent = mEventGroup->AddSpanEvent(true, &mPool); + auto rawEvent = mEventGroup->AddRawEvent(true, &mPool); APSARA_TEST_EQUAL(mEventGroup.get(), logEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), metricEvent->mPipelineEventGroupPtr); APSARA_TEST_EQUAL(mEventGroup.get(), spanEvent->mPipelineEventGroupPtr); + APSARA_TEST_EQUAL(mEventGroup.get(), rawEvent->mPipelineEventGroupPtr); } } @@ -112,9 +124,10 @@ void PipelineEventGroupUnittest::TestSwapEvents() { mEventGroup->AddLogEvent(); mEventGroup->AddMetricEvent(); mEventGroup->AddSpanEvent(); + mEventGroup->AddRawEvent(); EventsContainer eventContainer; mEventGroup->SwapEvents(eventContainer); - APSARA_TEST_EQUAL_FATAL(3U, eventContainer.size()); + APSARA_TEST_EQUAL_FATAL(4U, eventContainer.size()); APSARA_TEST_EQUAL_FATAL(0U, mEventGroup->GetEvents().size()); } @@ -122,6 +135,7 @@ void PipelineEventGroupUnittest::TestDestructor() { LogEvent* log = nullptr; MetricEvent* metric = nullptr; SpanEvent* span = nullptr; + RawEvent* raw = nullptr; { PipelineEventGroup g(make_shared()); log = g.AddLogEvent(true); @@ -146,6 +160,14 @@ void PipelineEventGroupUnittest::TestDestructor() { APSARA_TEST_EQUAL(1U, gThreadedEventPool.mSpanEventPool.size()); APSARA_TEST_EQUAL(span, gThreadedEventPool.mSpanEventPool.back()); APSARA_TEST_EQUAL(0, span->GetTimestamp()); + { + PipelineEventGroup g(make_shared()); + raw = g.AddRawEvent(true); + raw->SetTimestamp(1234567890); + } + APSARA_TEST_EQUAL(1U, gThreadedEventPool.mRawEventPool.size()); + APSARA_TEST_EQUAL(raw, gThreadedEventPool.mRawEventPool.back()); + APSARA_TEST_EQUAL(0, raw->GetTimestamp()); { PipelineEventGroup g(make_shared()); log = g.AddLogEvent(true, &mPool); @@ -170,6 +192,14 @@ void PipelineEventGroupUnittest::TestDestructor() { APSARA_TEST_EQUAL(1U, mPool.mSpanEventPoolBak.size()); APSARA_TEST_EQUAL(span, mPool.mSpanEventPoolBak.back()); APSARA_TEST_EQUAL(0, span->GetTimestamp()); + { + PipelineEventGroup g(make_shared()); + raw = g.AddRawEvent(true, &mPool); + raw->SetTimestamp(1234567890); + } + APSARA_TEST_EQUAL(1U, mPool.mRawEventPoolBak.size()); + APSARA_TEST_EQUAL(raw, mPool.mRawEventPoolBak.back()); + APSARA_TEST_EQUAL(0, raw->GetTimestamp()); } void PipelineEventGroupUnittest::TestReserveEvents() { diff --git a/core/unittest/models/PipelineEventPtrUnittest.cpp b/core/unittest/models/PipelineEventPtrUnittest.cpp index 9748e78191..cfdd125c4b 100644 --- a/core/unittest/models/PipelineEventPtrUnittest.cpp +++ b/core/unittest/models/PipelineEventPtrUnittest.cpp @@ -42,15 +42,26 @@ void PipelineEventPtrUnittest::TestIs() { PipelineEventPtr logEventPtr(mEventGroup->CreateLogEvent(), false, nullptr); PipelineEventPtr metricEventPtr(mEventGroup->CreateMetricEvent(), false, nullptr); PipelineEventPtr spanEventPtr(mEventGroup->CreateSpanEvent(), false, nullptr); + PipelineEventPtr rawEventPtr(mEventGroup->CreateRawEvent(), false, nullptr); APSARA_TEST_TRUE_FATAL(logEventPtr.Is()); APSARA_TEST_FALSE_FATAL(logEventPtr.Is()); APSARA_TEST_FALSE_FATAL(logEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(logEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(metricEventPtr.Is()); APSARA_TEST_TRUE_FATAL(metricEventPtr.Is()); APSARA_TEST_FALSE_FATAL(metricEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(metricEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(spanEventPtr.Is()); APSARA_TEST_FALSE_FATAL(spanEventPtr.Is()); APSARA_TEST_TRUE_FATAL(spanEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(spanEventPtr.Is()); + + APSARA_TEST_FALSE_FATAL(rawEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(rawEventPtr.Is()); + APSARA_TEST_FALSE_FATAL(rawEventPtr.Is()); + APSARA_TEST_TRUE_FATAL(rawEventPtr.Is()); } void PipelineEventPtrUnittest::TestGet() { @@ -78,6 +89,7 @@ void PipelineEventPtrUnittest::TestCopy() { mEventGroup->AddLogEvent(); mEventGroup->AddMetricEvent(); mEventGroup->AddSpanEvent(); + mEventGroup->AddRawEvent(); { auto& event = mEventGroup->MutableEvents()[0]; event->SetTimestamp(12345678901); @@ -102,6 +114,14 @@ void PipelineEventPtrUnittest::TestCopy() { APSARA_TEST_FALSE(res.IsFromEventPool()); APSARA_TEST_EQUAL(nullptr, res.GetEventPool()); } + { + auto& event = mEventGroup->MutableEvents()[3]; + event->SetTimestamp(12345678901); + auto res = event.Copy(); + APSARA_TEST_NOT_EQUAL(event.Get(), res.Get()); + APSARA_TEST_FALSE(res.IsFromEventPool()); + APSARA_TEST_EQUAL(nullptr, res.GetEventPool()); + } } UNIT_TEST_CASE(PipelineEventPtrUnittest, TestIs) diff --git a/core/unittest/models/PipelineEventUnittest.cpp b/core/unittest/models/PipelineEventUnittest.cpp index d0b6b40283..0e2d07bdce 100644 --- a/core/unittest/models/PipelineEventUnittest.cpp +++ b/core/unittest/models/PipelineEventUnittest.cpp @@ -17,6 +17,7 @@ #include "models/LogEvent.h" #include "models/MetricEvent.h" #include "models/PipelineEvent.h" +#include "models/RawEvent.h" #include "models/SpanEvent.h" #include "unittest/Unittest.h" @@ -41,9 +42,11 @@ void PipelineEventUnittest::TestGetType() { std::unique_ptr logEvent = mEventGroup->CreateLogEvent(); std::unique_ptr metricEvent = mEventGroup->CreateMetricEvent(); std::unique_ptr spanEvent = mEventGroup->CreateSpanEvent(); + std::unique_ptr rawEvent = mEventGroup->CreateRawEvent(); APSARA_TEST_STREQ_FATAL("Log", PipelineEventTypeToString(logEvent->GetType()).c_str()); APSARA_TEST_STREQ_FATAL("Metric", PipelineEventTypeToString(metricEvent->GetType()).c_str()); APSARA_TEST_STREQ_FATAL("Span", PipelineEventTypeToString(spanEvent->GetType()).c_str()); + APSARA_TEST_STREQ_FATAL("Raw", PipelineEventTypeToString(rawEvent->GetType()).c_str()); } UNIT_TEST_CASE(PipelineEventUnittest, TestGetType) diff --git a/core/unittest/models/RawEventUnittest.cpp b/core/unittest/models/RawEventUnittest.cpp new file mode 100644 index 0000000000..59d9f55e60 --- /dev/null +++ b/core/unittest/models/RawEventUnittest.cpp @@ -0,0 +1,113 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common/JsonUtil.h" +#include "models/RawEvent.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class RawEventUnittest : public ::testing::Test { +public: + void TestTimestampOp(); + void TestSetContent(); + void TestSize(); + void TestReset(); + void TestFromJson(); + void TestToJson(); + +protected: + void SetUp() override { + mSourceBuffer.reset(new SourceBuffer); + mEventGroup.reset(new PipelineEventGroup(mSourceBuffer)); + mRawEvent = mEventGroup->CreateRawEvent(); + } + +private: + shared_ptr mSourceBuffer; + unique_ptr mEventGroup; + unique_ptr mRawEvent; +}; + +void RawEventUnittest::TestTimestampOp() { + mRawEvent->SetTimestamp(13800000000); + APSARA_TEST_EQUAL(13800000000, mRawEvent->GetTimestamp()); +} + +void RawEventUnittest::TestSetContent() { + mRawEvent->SetContent(string("content")); + APSARA_TEST_EQUAL("content", mRawEvent->GetContent().to_string()); +} + +void RawEventUnittest::TestSize() { + size_t basicSize = sizeof(time_t) + sizeof(long); + mRawEvent->SetContent(string("content")); + APSARA_TEST_EQUAL(basicSize + 7U, mRawEvent->DataSize()); +} + +void RawEventUnittest::TestReset() { + mRawEvent->SetTimestamp(12345678901); + mRawEvent->SetContent(string("content")); + mRawEvent->Reset(); + APSARA_TEST_EQUAL(0, mRawEvent->GetTimestamp()); + APSARA_TEST_FALSE(mRawEvent->GetTimestampNanosecond().has_value()); + APSARA_TEST_TRUE(mRawEvent->GetContent().empty()); +} + +void RawEventUnittest::TestFromJson() { + Json::Value eventJson; + string eventStr = R"({ + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "content": "hello, world!" + })"; + string errorMsg; + ParseJsonTable(eventStr, eventJson, errorMsg); + mRawEvent->FromJson(eventJson); + + APSARA_TEST_EQUAL(12345678901, mRawEvent->GetTimestamp()); + APSARA_TEST_EQUAL(0L, mRawEvent->GetTimestampNanosecond().value()); + APSARA_TEST_EQUAL("hello, world!", mRawEvent->GetContent().to_string()); +} + +void RawEventUnittest::TestToJson() { + mRawEvent->SetTimestamp(12345678901, 0); + mRawEvent->SetContent("hello, world!"); + Json::Value res = mRawEvent->ToJson(); + + Json::Value eventJson; + string eventStr = R"({ + "content": "hello, world!", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type": 4 + })"; + string errorMsg; + ParseJsonTable(eventStr, eventJson, errorMsg); + + APSARA_TEST_TRUE(eventJson == res); +} + +UNIT_TEST_CASE(RawEventUnittest, TestTimestampOp) +UNIT_TEST_CASE(RawEventUnittest, TestSetContent) +UNIT_TEST_CASE(RawEventUnittest, TestSize) +UNIT_TEST_CASE(RawEventUnittest, TestReset) +UNIT_TEST_CASE(RawEventUnittest, TestFromJson) +UNIT_TEST_CASE(RawEventUnittest, TestToJson) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp b/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp index 1b52d5814a..a762cc9290 100644 --- a/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorSplitLogStringNativeUnittest.cpp @@ -31,6 +31,7 @@ class ProcessorSplitLogStringNativeUnittest : public ::testing::Test { void TestInit(); void TestProcessJson(); void TestProcessCommon(); + void TestEnableRawContent(); PipelineContext mContext; }; @@ -38,6 +39,7 @@ class ProcessorSplitLogStringNativeUnittest : public ::testing::Test { UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestInit) UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestProcessJson) UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestProcessCommon) +UNIT_TEST_CASE(ProcessorSplitLogStringNativeUnittest, TestEnableRawContent) PluginInstance::PluginMeta getPluginMeta(){ PluginInstance::PluginMeta pluginMeta{"1"}; @@ -224,6 +226,81 @@ void ProcessorSplitLogStringNativeUnittest::TestProcessCommon() { APSARA_TEST_EQUAL_FATAL(4, processor.GetContext().GetProcessProfile().splitLines); } +void ProcessorSplitLogStringNativeUnittest::TestEnableRawContent() { + // make config + Json::Value config; + config["AppendingLogPositionMeta"] = false; + config["EnableRawContent"] = true; + // make events + auto sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + std::string inJson = R"({ + "events" : + [ + { + "contents" : + { + "content" : "line1\nline2" + }, + "fileOffset": 1, + "rawSize": 12, + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 1 + }, + { + "contents" : + { + "content" : "line3\nline4" + }, + "fileOffset": 0, + "rawSize": 11, + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 1 + } + ] + })"; + eventGroup.FromJsonString(inJson); + // run function + ProcessorSplitLogStringNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + processor.Process(eventGroup); + // judge result + std::string expectJson = R"({ + "events" : + [ + { + "content" : "line1", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 4 + }, + { + "content" : "line2", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 4 + }, + { + "content" : "line3", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 4 + }, + { + "content" : "line4", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 4 + } + ] + })"; + std::string outJson = eventGroup.ToJsonString(true); + APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str()); +} + } // namespace logtail -UNIT_TEST_MAIN \ No newline at end of file +UNIT_TEST_MAIN diff --git a/core/unittest/processor/ProcessorSplitMultilineLogStringNativeUnittest.cpp b/core/unittest/processor/ProcessorSplitMultilineLogStringNativeUnittest.cpp index 4ff926e136..0c1af0a9c3 100644 --- a/core/unittest/processor/ProcessorSplitMultilineLogStringNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorSplitMultilineLogStringNativeUnittest.cpp @@ -13,9 +13,9 @@ // limitations under the License. #include -#include "constants/Constants.h" #include "common/JsonUtil.h" #include "config/PipelineConfig.h" +#include "constants/Constants.h" #include "models/LogEvent.h" #include "plugin/processor/inner/ProcessorSplitLogStringNative.h" #include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" @@ -39,6 +39,7 @@ class ProcessorSplitMultilineLogDisacardUnmatchUnittest : public ::testing::Test void TestLogSplitWithBegin(); void TestLogSplitWithContinueEnd(); void TestLogSplitWithEnd(); + void TestEnableRawEvent(); protected: void SetUp() override { mContext.SetConfigName("project##config_0"); } @@ -51,7 +52,7 @@ UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestLogSplitWi UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestLogSplitWithBeginEnd) UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestLogSplitWithBegin) UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestLogSplitWithContinueEnd) -UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestLogSplitWithEnd) +UNIT_TEST_CASE(ProcessorSplitMultilineLogDisacardUnmatchUnittest, TestEnableRawEvent) void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithBeginContinue() { // make config @@ -790,8 +791,7 @@ void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithBegin() "contents" : { "content" : ")" - << LOG_UNMATCH << R"(\n)" << LOG_BEGIN_STRING << R"(\n)" << LOG_UNMATCH << R"(\n)" << LOG_UNMATCH - << R"(" + << LOG_UNMATCH << R"(\n)" << LOG_BEGIN_STRING << R"(\n)" << LOG_UNMATCH << R"(\n)" << LOG_UNMATCH << R"(" }, "timestamp" : 12345678901, "timestampNanosecond" : 0, @@ -825,11 +825,9 @@ void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithBegin() } // metric - APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 1 + 1, - ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 1 + 1, ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 2 + 3, ProcessorSplitMultilineLogStringNative.mMatchedLinesTotal->GetValue()); - APSARA_TEST_EQUAL_FATAL(1 + 1 + 0 + 0 + 1, - ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 1 + 0 + 0 + 1, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithContinueEnd() { @@ -1029,11 +1027,9 @@ void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithContinue } // metric - APSARA_TEST_EQUAL_FATAL(0 + 0 + 1 + 0 + 1, - ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(0 + 0 + 1 + 0 + 1, ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); APSARA_TEST_EQUAL_FATAL(0 + 0 + 3 + 0 + 1, ProcessorSplitMultilineLogStringNative.mMatchedLinesTotal->GetValue()); - APSARA_TEST_EQUAL_FATAL(1 + 2 + 0 + 1 + 0, - ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 2 + 0 + 1 + 0, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithEnd() { @@ -1178,6 +1174,60 @@ void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestLogSplitWithEnd() { APSARA_TEST_EQUAL_FATAL(0 + 1 + 1, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } +void ProcessorSplitMultilineLogDisacardUnmatchUnittest::TestEnableRawEvent() { + // make config + Json::Value config; + config["StartPattern"] = LOG_BEGIN_REGEX; + config["SplitType"] = "regex"; + config["UnmatchedContentTreatment"] = "discard"; + config["EnableRawContent"] = true; + + // ProcessorSplitMultilineLogStringNative + ProcessorSplitMultilineLogStringNative ProcessorSplitMultilineLogStringNative; + ProcessorSplitMultilineLogStringNative.SetContext(mContext); + ProcessorSplitMultilineLogStringNative.SetMetricsRecordRef(ProcessorSplitMultilineLogStringNative::sName, "1"); + APSARA_TEST_TRUE_FATAL(ProcessorSplitMultilineLogStringNative.Init(config)); + + auto sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + std::stringstream inJson; + inJson << R"({ + "events" : + [ + { + "contents" : + { + "content" : ")" + << LOG_UNMATCH << R"(\n)" << LOG_BEGIN_STRING << R"(" + }, + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 1 + } + ] + })"; + eventGroup.FromJsonString(inJson.str()); + + // run test function + ProcessorSplitMultilineLogStringNative.Process(eventGroup); + // judge result + std::stringstream expectJson; + expectJson << R"({ + "events" : + [ + { + "content" : ")" + << LOG_BEGIN_STRING << R"(", + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 4 + } + ] + })"; + std::string outJson = eventGroup.ToJsonString(); + APSARA_TEST_STREQ(CompactJson(expectJson.str()).c_str(), CompactJson(outJson).c_str()); +} + class ProcessorSplitMultilineLogKeepUnmatchUnittest : public ::testing::Test { public: void SetUp() override { mContext.SetConfigName("project##config_0"); } @@ -1189,11 +1239,11 @@ class ProcessorSplitMultilineLogKeepUnmatchUnittest : public ::testing::Test { PipelineContext mContext; }; -UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBeginContinue); -UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBeginEnd); -UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBegin); -UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithContinueEnd); -UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithEnd); +UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBeginContinue) +UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBeginEnd) +UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithBegin) +UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithContinueEnd) +UNIT_TEST_CASE(ProcessorSplitMultilineLogKeepUnmatchUnittest, TestLogSplitWithEnd) void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithBeginContinue() { // make config @@ -2213,11 +2263,9 @@ void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithBegin() { } // metric - APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 1 + 1, - ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 1 + 1, ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); APSARA_TEST_EQUAL_FATAL(1 + 0 + 2 + 2 + 2, ProcessorSplitMultilineLogStringNative.mMatchedLinesTotal->GetValue()); - APSARA_TEST_EQUAL_FATAL(1 + 1 + 0 + 0 + 0, - ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 1 + 0 + 0 + 0, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithContinueEnd() { @@ -2475,11 +2523,9 @@ void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithContinueEnd( } // metric - APSARA_TEST_EQUAL_FATAL(0 + 0 + 1 + 0 + 1, - ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(0 + 0 + 1 + 0 + 1, ProcessorSplitMultilineLogStringNative.mMatchedEventsTotal->GetValue()); APSARA_TEST_EQUAL_FATAL(0 + 0 + 3 + 0 + 1, ProcessorSplitMultilineLogStringNative.mMatchedLinesTotal->GetValue()); - APSARA_TEST_EQUAL_FATAL(1 + 2 + 0 + 1 + 0, - ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); + APSARA_TEST_EQUAL_FATAL(1 + 2 + 0 + 1 + 0, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithEnd() { @@ -2653,6 +2699,7 @@ void ProcessorSplitMultilineLogKeepUnmatchUnittest::TestLogSplitWithEnd() { APSARA_TEST_EQUAL_FATAL(1 + 0 + 2, ProcessorSplitMultilineLogStringNative.mMatchedLinesTotal->GetValue()); APSARA_TEST_EQUAL_FATAL(0 + 1 + 1, ProcessorSplitMultilineLogStringNative.mUnmatchedLinesTotal->GetValue()); } + } // namespace logtail UNIT_TEST_MAIN diff --git a/core/unittest/serializer/SLSSerializerUnittest.cpp b/core/unittest/serializer/SLSSerializerUnittest.cpp index 837e48ff8b..603f95f953 100644 --- a/core/unittest/serializer/SLSSerializerUnittest.cpp +++ b/core/unittest/serializer/SLSSerializerUnittest.cpp @@ -40,6 +40,7 @@ class SLSSerializerUnittest : public ::testing::Test { BatchedEvents CreateBatchedLogEvents(bool enableNanosecond, bool emptyContent); BatchedEvents CreateBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag); + BatchedEvents CreateBatchedRawEvents(bool enableNanosecond, bool emptyContent); static unique_ptr sFlusher; @@ -215,6 +216,55 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { { // span } + { + // raw + { + // nano second disabled, and set + string res, errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, false), res, errorMsg)); + sls_logs::LogGroup logGroup; + APSARA_TEST_TRUE(logGroup.ParseFromString(res)); + APSARA_TEST_EQUAL(1, logGroup.logs_size()); + APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); + APSARA_TEST_STREQ("content", logGroup.logs(0).contents(0).key().c_str()); + APSARA_TEST_STREQ("value", logGroup.logs(0).contents(0).value().c_str()); + APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); + APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); + APSARA_TEST_EQUAL(1, logGroup.logtags_size()); + APSARA_TEST_STREQ("__pack_id__", logGroup.logtags(0).key().c_str()); + APSARA_TEST_STREQ("pack_id", logGroup.logtags(0).value().c_str()); + APSARA_TEST_STREQ("machine_uuid", logGroup.machineuuid().c_str()); + APSARA_TEST_STREQ("source", logGroup.source().c_str()); + APSARA_TEST_STREQ("topic", logGroup.topic().c_str()); + } + { + // nano second enabled, and set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res, errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(true, false), res, errorMsg)); + sls_logs::LogGroup logGroup; + APSARA_TEST_TRUE(logGroup.ParseFromString(res)); + APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); + APSARA_TEST_EQUAL(1U, logGroup.logs(0).time_ns()); + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; + } + { + // nano second enabled, not set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res, errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedRawEvents(false, false), res, errorMsg)); + sls_logs::LogGroup logGroup; + APSARA_TEST_TRUE(logGroup.ParseFromString(res)); + APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); + APSARA_TEST_FALSE(logGroup.logs(0).has_time_ns()); + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = false; + } + { + // empty log content + string res, errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedRawEvents(false, true), res, errorMsg)); + } + } { // log group exceed size limit INT32_FLAG(max_send_log_group_size) = 0; @@ -315,6 +365,32 @@ BatchedEvents SLSSerializerUnittest::CreateBatchedMetricEvents(bool enableNanose return batch; } +BatchedEvents SLSSerializerUnittest::CreateBatchedRawEvents(bool enableNanosecond, bool emptyContent) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + RawEvent* e = group.AddRawEvent(); + if (!emptyContent) { + e->SetContent(string("value")); + } + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroup) UNIT_TEST_CASE(SLSSerializerUnittest, TestSerializeEventGroupList)