Skip to content

Commit

Permalink
add raw event type (#1856)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 authored Nov 15, 2024
1 parent 8252616 commit a9a529d
Show file tree
Hide file tree
Showing 28 changed files with 847 additions and 79 deletions.
29 changes: 29 additions & 0 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}

RawEvent* EventPool::AcquireRawEvent(PipelineEventGroup* ptr) {
if (mEnableLock) {
TransferPoolIfEmpty(mRawEventPool, mRawEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}

void EventPool::Release(vector<LogEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
Expand Down Expand Up @@ -94,6 +103,15 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}
}

void EventPool::Release(vector<RawEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
mRawEventPoolBak.insert(mRawEventPoolBak.end(), obj.begin(), obj.end());
} else {
mRawEventPool.insert(mRawEventPool.end(), obj.begin(), obj.end());
}
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
Expand Down Expand Up @@ -131,10 +149,12 @@ void EventPool::CheckGC() {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, &mPoolBakMux, "raw");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, nullptr, "raw");
}
mLastGCTime = time(nullptr);
}
Expand All @@ -150,6 +170,9 @@ void EventPool::DestroyAllEventPool() {
for (auto& item : mSpanEventPool) {
delete item;
}
for (auto& item : mRawEventPool) {
delete item;
}
}

void EventPool::DestroyAllEventPoolBak() {
Expand All @@ -162,6 +185,9 @@ void EventPool::DestroyAllEventPoolBak() {
for (auto& item : mSpanEventPoolBak) {
delete item;
}
for (auto& item : mRawEventPoolBak) {
delete item;
}
}

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -172,16 +198,19 @@ void EventPool::Clear() {
mLogEventPool.clear();
mMetricEventPool.clear();
mSpanEventPool.clear();
mRawEventPool.clear();
mMinUnusedLogEventsCnt = numeric_limits<size_t>::max();
mMinUnusedMetricEventsCnt = numeric_limits<size_t>::max();
mMinUnusedSpanEventsCnt = numeric_limits<size_t>::max();
mMinUnusedRawEventsCnt = numeric_limits<size_t>::max();
}
{
lock_guard<mutex> lock(mPoolBakMux);
DestroyAllEventPoolBak();
mLogEventPoolBak.clear();
mMetricEventPoolBak.clear();
mSpanEventPoolBak.clear();
mRawEventPoolBak.clear();
}
mLastGCTime = 0;
}
Expand Down
6 changes: 6 additions & 0 deletions core/models/EventPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "models/LogEvent.h"
#include "models/MetricEvent.h"
#include "models/RawEvent.h"
#include "models/SpanEvent.h"

namespace logtail {
Expand All @@ -39,9 +40,11 @@ class EventPool {
LogEvent* AcquireLogEvent(PipelineEventGroup* ptr);
MetricEvent* AcquireMetricEvent(PipelineEventGroup* ptr);
SpanEvent* AcquireSpanEvent(PipelineEventGroup* ptr);
RawEvent* AcquireRawEvent(PipelineEventGroup* ptr);
void Release(std::vector<LogEvent*>&& obj);
void Release(std::vector<MetricEvent*>&& obj);
void Release(std::vector<SpanEvent*>&& obj);
void Release(std::vector<RawEvent*>&& obj);
void CheckGC();

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down Expand Up @@ -80,16 +83,19 @@ class EventPool {
std::vector<LogEvent*> mLogEventPool;
std::vector<MetricEvent*> mMetricEventPool;
std::vector<SpanEvent*> mSpanEventPool;
std::vector<RawEvent*> mRawEventPool;

// only meaningful when mEnableLock is true
std::mutex mPoolBakMux;
std::vector<LogEvent*> mLogEventPoolBak;
std::vector<MetricEvent*> mMetricEventPoolBak;
std::vector<SpanEvent*> mSpanEventPoolBak;
std::vector<RawEvent*> mRawEventPoolBak;

size_t mMinUnusedLogEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedMetricEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedSpanEventsCnt = std::numeric_limits<size_t>::max();
size_t mMinUnusedRawEventsCnt = std::numeric_limits<size_t>::max();

time_t mLastGCTime = 0;

Expand Down
19 changes: 11 additions & 8 deletions core/models/PipelineEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ StringView gEmptyStringView;
const string& PipelineEventTypeToString(PipelineEvent::Type t) {
switch (t) {
case PipelineEvent::Type::LOG:
static string logname = "Log";
return logname;
static string logName = "Log";
return logName;
case PipelineEvent::Type::METRIC:
static string metricname = "Metric";
return metricname;
static string metricName = "Metric";
return metricName;
case PipelineEvent::Type::SPAN:
static string spanname = "Span";
return spanname;
static string spanName = "Span";
return spanName;
case PipelineEvent::Type::RAW:
static string rawName = "Raw";
return rawName;
default:
static string voidname = "";
return voidname;
static string voidName = "";
return voidName;
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/models/PipelineEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PipelineEventGroup;

class PipelineEvent {
public:
enum class Type { NONE, LOG, METRIC, SPAN };
enum class Type { NONE, LOG, METRIC, SPAN, RAW };

virtual ~PipelineEvent() = default;

Expand Down
36 changes: 35 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ PipelineEventGroup::~PipelineEventGroup() {
case PipelineEvent::Type::SPAN:
DestroyEvents<SpanEvent>(std::move(mEvents));
break;
case PipelineEvent::Type::RAW:
DestroyEvents<RawEvent>(std::move(mEvents));
break;
default:
break;
}
Expand Down Expand Up @@ -159,6 +162,20 @@ unique_ptr<SpanEvent> PipelineEventGroup::CreateSpanEvent(bool fromPool, EventPo
return unique_ptr<SpanEvent>(e);
}

unique_ptr<RawEvent> PipelineEventGroup::CreateRawEvent(bool fromPool, EventPool* pool) {
RawEvent* e = nullptr;
if (fromPool) {
if (pool) {
e = pool->AcquireRawEvent(this);
} else {
e = gThreadedEventPool.AcquireRawEvent(this);
}
} else {
e = new RawEvent(this);
}
return unique_ptr<RawEvent>(e);
}

LogEvent* PipelineEventGroup::AddLogEvent(bool fromPool, EventPool* pool) {
LogEvent* e = nullptr;
if (fromPool) {
Expand Down Expand Up @@ -204,6 +221,21 @@ SpanEvent* PipelineEventGroup::AddSpanEvent(bool fromPool, EventPool* pool) {
return e;
}

RawEvent* PipelineEventGroup::AddRawEvent(bool fromPool, EventPool* pool) {
RawEvent* e = nullptr;
if (fromPool) {
if (pool) {
e = pool->AcquireRawEvent(this);
} else {
e = gThreadedEventPool.AcquireRawEvent(this);
}
} else {
e = new RawEvent(this);
}
mEvents.emplace_back(e, fromPool, pool);
return e;
}

void PipelineEventGroup::SetMetadata(EventGroupMetaKey key, StringView val) {
SetMetadataNoCopy(key, mSourceBuffer->CopyString(val));
}
Expand Down Expand Up @@ -404,8 +436,10 @@ bool PipelineEventGroup::FromJson(const Json::Value& root) {
AddLogEvent()->FromJson(event);
} else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::METRIC)) {
AddMetricEvent()->FromJson(event);
} else {
} else if (event["type"].asInt() == static_cast<int>(PipelineEvent::Type::SPAN)) {
AddSpanEvent()->FromJson(event);
} else {
AddRawEvent()->FromJson(event);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ class PipelineEventGroup {
std::unique_ptr<LogEvent> CreateLogEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<MetricEvent> CreateMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<SpanEvent> CreateSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
std::unique_ptr<RawEvent> CreateRawEvent(bool fromPool = false, EventPool* pool = nullptr);

const EventsContainer& GetEvents() const { return mEvents; }
EventsContainer& MutableEvents() { return mEvents; }
LogEvent* AddLogEvent(bool fromPool = false, EventPool* pool = nullptr);
MetricEvent* AddMetricEvent(bool fromPool = false, EventPool* pool = nullptr);
SpanEvent* AddSpanEvent(bool fromPool = false, EventPool* pool = nullptr);
RawEvent* AddRawEvent(bool fromPool = false, EventPool* pool = nullptr);
void SwapEvents(EventsContainer& other) { mEvents.swap(other); }
void ReserveEvents(size_t size) { mEvents.reserve(size); }

Expand Down
4 changes: 4 additions & 0 deletions core/models/PipelineEventPtr.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "models/MetricEvent.h"
#include "models/PipelineEvent.h"
#include "models/SpanEvent.h"
#include "models/RawEvent.h"

namespace logtail {
class EventPool;
Expand All @@ -47,6 +48,9 @@ class PipelineEventPtr {
if (typeid(T) == typeid(SpanEvent)) {
return mData->GetType() == PipelineEvent::Type::SPAN;
}
if (typeid(T) == typeid(RawEvent)) {
return mData->GetType() == PipelineEvent::Type::RAW;
}
return false;
}
template <typename T>
Expand Down
76 changes: 76 additions & 0 deletions core/models/RawEvent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "models/RawEvent.h"

using namespace std;

namespace logtail {

RawEvent::RawEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::RAW, ptr) {
}

unique_ptr<PipelineEvent> RawEvent::Copy() const {
return make_unique<RawEvent>(*this);
}

void RawEvent::Reset() {
PipelineEvent::Reset();
mContent.clear();
}

void RawEvent::SetContent(const std::string& content) {
SetContentNoCopy(GetSourceBuffer()->CopyString(content));
}

void RawEvent::SetContentNoCopy(StringView content) {
mContent = content;
}

void RawEvent::SetContentNoCopy(const StringBuffer& content) {
mContent = StringView(content.data, content.size);
}

size_t RawEvent::DataSize() const {
return PipelineEvent::DataSize() + mContent.size();
}

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value RawEvent::ToJson(bool enableEventMeta) const {
Json::Value root;
root["type"] = static_cast<int>(GetType());
root["timestamp"] = GetTimestamp();
if (GetTimestampNanosecond()) {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["content"] = mContent.to_string();
return root;
}

bool RawEvent::FromJson(const Json::Value& root) {
if (root.isMember("timestampNanosecond")) {
SetTimestamp(root["timestamp"].asInt64(), root["timestampNanosecond"].asInt64());
} else {
SetTimestamp(root["timestamp"].asInt64());
}
if (root.isMember("content")) {
SetContent(root["content"].asString());
}
return true;
}
#endif

} // namespace logtail
49 changes: 49 additions & 0 deletions core/models/RawEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "models/PipelineEvent.h"

namespace logtail {

class RawEvent : public PipelineEvent {
friend class PipelineEventGroup;
friend class EventPool;

public:
std::unique_ptr<PipelineEvent> Copy() const override;
void Reset() override;

StringView GetContent() const { return mContent; }
void SetContent(const std::string& content);
void SetContentNoCopy(StringView content);
void SetContentNoCopy(const StringBuffer& content);

size_t DataSize() const override;

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value ToJson(bool enableEventMeta = false) const override;
bool FromJson(const Json::Value&) override;
#endif

private:
RawEvent(PipelineEventGroup* ptr);

StringView mContent;
};

} // namespace logtail
2 changes: 2 additions & 0 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ bool Pipeline::Init(PipelineConfig&& config) {
mContext.SetCreateTime(config.mCreateTime);
mContext.SetPipeline(*this);
mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);
mContext.SetHasNativeProcessorsFlag(config.mHasNativeProcessor);
mContext.SetIsFlushingThroughGoPipelineFlag(config.IsFlushingThroughGoPipelineExisted());

// for special treatment below
const InputFile* inputFile = nullptr;
Expand Down
Loading

0 comments on commit a9a529d

Please sign in to comment.