Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add raw event type #1856

Merged
merged 18 commits into from
Nov 15, 2024
29 changes: 29 additions & 0 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}

RawEvent* EventPool::AcquireRawEvent(PipelineEventGroup* ptr) {
if (mEnableLock) {
TransferPoolIfEmpty(mRawEventPool, mRawEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}

void EventPool::Release(vector<LogEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
Expand Down Expand Up @@ -94,6 +103,15 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}
}

void EventPool::Release(vector<RawEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
mRawEventPoolBak.insert(mRawEventPoolBak.end(), obj.begin(), obj.end());
} else {
mRawEventPool.insert(mRawEventPool.end(), obj.begin(), obj.end());
}
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
Expand Down Expand Up @@ -131,10 +149,12 @@ void EventPool::CheckGC() {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, &mPoolBakMux, "raw");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, nullptr, "raw");
}
mLastGCTime = time(nullptr);
}
Expand All @@ -150,6 +170,9 @@ void EventPool::DestroyAllEventPool() {
for (auto& item : mSpanEventPool) {
delete item;
}
for (auto& item : mRawEventPool) {
delete item;
}
}

void EventPool::DestroyAllEventPoolBak() {
Expand All @@ -162,6 +185,9 @@ void EventPool::DestroyAllEventPoolBak() {
for (auto& item : mSpanEventPoolBak) {
delete item;
}
for (auto& item : mRawEventPoolBak) {
delete item;
}
}

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -172,16 +198,19 @@ void EventPool::Clear() {
mLogEventPool.clear();
mMetricEventPool.clear();
mSpanEventPool.clear();
mRawEventPool.clear();
mMinUnusedLogEventsCnt = numeric_limits<size_t>::max();
mMinUnusedMetricEventsCnt = numeric_limits<size_t>::max();
mMinUnusedSpanEventsCnt = numeric_limits<size_t>::max();
mMinUnusedRawEventsCnt = numeric_limits<size_t>::max();
}
{
lock_guard<mutex> lock(mPoolBakMux);
DestroyAllEventPoolBak();
mLogEventPoolBak.clear();
mMetricEventPoolBak.clear();
mSpanEventPoolBak.clear();
mRawEventPoolBak.clear();
}
mLastGCTime = 0;
}
Expand Down
6 changes: 6 additions & 0 deletions core/models/EventPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "models/LogEvent.h"
#include "models/MetricEvent.h"
#include "models/RawEvent.h"
#include "models/SpanEvent.h"

namespace logtail {
Expand All @@ -39,9 +40,11 @@ class EventPool {
LogEvent* AcquireLogEvent(PipelineEventGroup* ptr);
MetricEvent* AcquireMetricEvent(PipelineEventGroup* ptr);
SpanEvent* AcquireSpanEvent(PipelineEventGroup* ptr);
RawEvent* AcquireRawEvent(PipelineEventGroup* ptr);
void Release(std::vector<LogEvent*>&& obj);
void Release(std::vector<MetricEvent*>&& obj);
void Release(std::vector<SpanEvent*>&& obj);
void Release(std::vector<RawEvent*>&& obj);
void CheckGC();

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down Expand Up @@ -80,16 +83,19 @@ class EventPool {
std::vector<LogEvent*> mLogEventPool;
std::vector<MetricEvent*> mMetricEventPool;
std::vector<SpanEvent*> mSpanEventPool;
std::vector<RawEvent*> mRawEventPool;

// only meaningful when mEnableLock is true
std::mutex mPoolBakMux;
std::vector<LogEvent*> mLogEventPoolBak;
std::vector<MetricEvent*> mMetricEventPoolBak;
std::vector<SpanEvent*> mSpanEventPoolBak;
std::vector<RawEvent*> mRawEventPoolBak;

size_t mMinUnusedLogEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedMetricEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedSpanEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedRawEventsCnt = std::numeric_limits<size_t>::max();

time_t mLastGCTime = 0;

Expand Down
19 changes: 11 additions & 8 deletions core/models/PipelineEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ StringView gEmptyStringView;
const string& PipelineEventTypeToString(PipelineEvent::Type t) {
switch (t) {
case PipelineEvent::Type::LOG:
static string logname = "Log";
return logname;
static string logName = "Log";
return logName;
case PipelineEvent::Type::METRIC:
static string metricname = "Metric";
return metricname;
static string metricName = "Metric";
return metricName;
case PipelineEvent::Type::SPAN:
static string spanname = "Span";
return spanname;
static string spanName = "Span";
return spanName;
case PipelineEvent::Type::RAW:
static string rawName = "Raw";
return rawName;
default:
static string voidname = "";
return voidname;
static string voidName = "";
return voidName;
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/models/PipelineEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PipelineEventGroup;

class PipelineEvent {
public:
enum class Type { NONE, LOG, METRIC, SPAN };
enum class Type { NONE, LOG, METRIC, SPAN, RAW };

virtual ~PipelineEvent() = default;

Expand Down
36 changes: 35 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ PipelineEventGroup::~PipelineEventGroup() {
case PipelineEvent::Type::SPAN:
DestroyEvents<SpanEvent>(std::move(mEvents));
break;
case PipelineEvent::Type::RAW:
DestroyEvents<RawEvent>(std::move(mEvents));
break;
default:
break;
}
Expand Down Expand Up @@ -159,6 +162,20 @@ unique_ptr<SpanEvent> PipelineEventGroup::CreateSpanEvent(bool fromPool, EventPo
return unique_ptr<SpanEvent>(e);
}

unique_ptr<RawEvent> PipelineEventGroup::CreateRawEvent(bool fromPool, EventPool* pool) {
RawEvent* e = nullptr;
if (fromPool) {
if (pool) {
e = pool->AcquireRawEvent(this);
} else {
e = gThreadedEventPool.AcquireRawEvent(this);
}
} else {
e = new RawEvent(this);
}
return unique_ptr<RawEvent>(e);
}

LogEvent* PipelineEventGroup::AddLogEvent(bool fromPool, EventPool* pool) {
LogEvent* e = nullptr;
if (fromPool) {
Expand Down Expand Up @@ -204,6 +221,21 @@ SpanEvent* PipelineEventGroup::AddSpanEvent(bool fromPool, EventPool* pool) {
return e;
}

RawEvent* PipelineEventGroup::AddRawEvent(bool fromPool, EventPool* pool) {
RawEvent* e = nullptr;
if (fromPool) {
if (pool) {
e = pool->AcquireRawEvent(this);
} else {
e = gThreadedEventPool.AcquireRawEvent(this);
}
} else {
e = new RawEvent(this);
}
mEvents.emplace_back(e, fromPool, pool);
return e;
}

void PipelineEventGroup::SetMetadata(EventGroupMetaKey key, StringView val) {
SetMetadataNoCopy(key, mSourceBuffer->CopyString(val));
}
Expand Down Expand Up @@ -404,8 +436,10 @@ bool PipelineEventGroup::FromJson(const Json::Value& root) {
AddLogEvent()->FromJson(event);
} else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::METRIC)) {
AddMetricEvent()->FromJson(event);
} else {
} else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::SPAN)) {
AddSpanEvent()->FromJson(event);
} else {
AddRawEvent()->FromJson(event);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ class PipelineEventGroup {
std::unique_ptr<LogEvent> CreateLogEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<MetricEvent> CreateMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<SpanEvent> CreateSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<RawEvent> CreateRawEvent(bool fromPool = false, EventPool* pool = nullptr);

const EventsContainer& GetEvents() const { return mEvents; }
EventsContainer& MutableEvents() { return mEvents; }
LogEvent* AddLogEvent(bool fromPool = false, EventPool* pool = nullptr);
MetricEvent* AddMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
SpanEvent* AddSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
RawEvent* AddRawEvent(bool fromPool = false, EventPool* pool = nullptr);
void SwapEvents(EventsContainer& other) { mEvents.swap(other); }
void ReserveEvents(size_t size) { mEvents.reserve(size); }

Expand Down
4 changes: 4 additions & 0 deletions core/models/PipelineEventPtr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "models/MetricEvent.h"
#include "models/PipelineEvent.h"
#include "models/SpanEvent.h"
#include "models/RawEvent.h"

namespace logtail {
class EventPool;
Expand All @@ -47,6 +48,9 @@ class PipelineEventPtr {
if (typeid(T) == typeid(SpanEvent)) {
return mData->GetType() == PipelineEvent::Type::SPAN;
}
if (typeid(T) == typeid(RawEvent)) {
return mData->GetType() == PipelineEvent::Type::RAW;
}
return false;
}
template <typename T>
Expand Down
76 changes: 76 additions & 0 deletions core/models/RawEvent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "models/RawEvent.h"

using namespace std;

namespace logtail {

RawEvent::RawEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::RAW, ptr) {
}

unique_ptr<PipelineEvent> RawEvent::Copy() const {
return make_unique<RawEvent>(*this);
}

void RawEvent::Reset() {
PipelineEvent::Reset();
mContent.clear();
}

void RawEvent::SetContent(const std::string& content) {
SetContentNoCopy(GetSourceBuffer()->CopyString(content));
}

void RawEvent::SetContentNoCopy(StringView content) {
mContent = content;
}

void RawEvent::SetContentNoCopy(const StringBuffer& content) {
mContent = StringView(content.data, content.size);
}

size_t RawEvent::DataSize() const {
return PipelineEvent::DataSize() + mContent.size();
}

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value RawEvent::ToJson(bool enableEventMeta) const {
Json::Value root;
root["type"] = static_cast<int>(GetType());
root["timestamp"] = GetTimestamp();
if (GetTimestampNanosecond()) {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["content"] = mContent.to_string();
return root;
}

bool RawEvent::FromJson(const Json::Value& root) {
if (root.isMember("timestampNanosecond")) {
SetTimestamp(root["timestamp"].asInt64(), root["timestampNanosecond"].asInt64());
} else {
SetTimestamp(root["timestamp"].asInt64());
}
if (root.isMember("content")) {
SetContent(root["content"].asString());
}
return true;
}
#endif

} // namespace logtail
49 changes: 49 additions & 0 deletions core/models/RawEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "models/PipelineEvent.h"

namespace logtail {

class RawEvent : public PipelineEvent {
friend class PipelineEventGroup;
friend class EventPool;

public:
std::unique_ptr<PipelineEvent> Copy() const override;
void Reset() override;

StringView GetContent() const { return mContent; }
void SetContent(const std::string& content);
void SetContentNoCopy(StringView content);
void SetContentNoCopy(const StringBuffer& content);

size_t DataSize() const override;

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value ToJson(bool enableEventMeta = false) const override;
bool FromJson(const Json::Value&) override;
#endif

private:
RawEvent(PipelineEventGroup* ptr);

StringView mContent;
};

} // namespace logtail
2 changes: 2 additions & 0 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ bool Pipeline::Init(PipelineConfig&& config) {
mContext.SetCreateTime(config.mCreateTime);
mContext.SetPipeline(*this);
mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);
mContext.SetHasNativeProcessorsFlag(config.mHasNativeProcessor);
mContext.SetIsFlushingThroughGoPipelineFlag(config.IsFlushingThroughGoPipelineExisted());

// for special treatment below
const InputFile* inputFile = nullptr;
Expand Down
Loading
Loading