Skip to content

Commit

Permalink
support custom protobuf serializer for logGroup to improve send perfo…
Browse files Browse the repository at this point in the history
…rmance (#1845)
  • Loading branch information
henryzhx8 authored Nov 4, 2024
1 parent 3d56d3b commit 3d26daf
Show file tree
Hide file tree
Showing 18 changed files with 755 additions and 1,637 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
run: make unittest_core

- name: Unit Test Coverage
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --gcov-ignore-errors=no_working_dir_found --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*sdk.*\" -e \".*logger.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*protobuf.*\" -e \".*runner.*\""
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --gcov-ignore-errors=no_working_dir_found --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*\.pb\.cc\" -e \".*\.pb\.h\" -e \".*unittest.*\" -e \".*sdk.*\" -e \".*logger.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*runner.*\""

- name: Setup Python3.10
uses: actions/setup-python@v5
Expand Down
1 change: 0 additions & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ endif()
# remove several files in go_pipeline
list(REMOVE_ITEM FRAMEWORK_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.h)


# add provider
add_subdirectory("${PROVIDER_PATH}" "${CMAKE_BINARY_DIR}/provider")

Expand Down
4 changes: 4 additions & 0 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
auto before = chrono::system_clock::now();
bool allSucceeded = true;
for (auto& group : groupList) {
if (group.GetEvents().empty()) {
LOG_DEBUG(sLogger, ("empty event group", "discard")("config", mName));
continue;
}
auto res = mRouter.Route(group);
for (auto& item : res) {
if (item.first >= mFlushers.size()) {
Expand Down
197 changes: 117 additions & 80 deletions core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,13 @@

#include "pipeline/serializer/SLSSerializer.h"

#include "application/Application.h"
#include "common/Flags.h"
#include "common/TimeUtil.h"
#include "common/compression/CompressType.h"
#include "plugin/flusher/sls/FlusherSLS.h"

#include "protobuf/sls/LogGroupSerializer.h"

DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);

const std::string METRIC_RESERVED_KEY_NAME = "__name__";
const std::string METRIC_RESERVED_KEY_LABELS = "__labels__";
const std::string METRIC_RESERVED_KEY_VALUE = "__value__";
const std::string METRIC_RESERVED_KEY_TIME_NANO = "__time_nano__";

const std::string METRIC_LABELS_SEPARATOR = "|";
const std::string METRIC_LABELS_KEY_VALUE_SEPARATOR = "#$#";

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -61,88 +51,135 @@ bool Serializer<vector<CompressedLogGroup>>::DoSerialize(vector<CompressedLogGro
}

bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
sls_logs::LogGroup logGroup;
for (const auto& e : group.mEvents) {
if (e.Is<LogEvent>()) {
const auto& logEvent = e.Cast<LogEvent>();
auto log = logGroup.add_logs();
for (const auto& kv : logEvent) {
auto contPtr = log->add_contents();
contPtr->set_key(kv.first.to_string());
contPtr->set_value(kv.second.to_string());
}
log->set_time(logEvent.GetTimestamp());
if (mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond
&& logEvent.GetTimestampNanosecond()) {
log->set_time_ns(logEvent.GetTimestampNanosecond().value());
}
} else if (e.Is<MetricEvent>()) {
const auto& metricEvent = e.Cast<MetricEvent>();
if (metricEvent.Is<std::monostate>()) {
continue;
}
auto log = logGroup.add_logs();
std::ostringstream oss;
// set __labels__
bool hasPrev = false;
for (auto it = metricEvent.TagsBegin(); it != metricEvent.TagsEnd(); ++it) {
if (hasPrev) {
oss << METRIC_LABELS_SEPARATOR;
if (group.mEvents.empty()) {
errorMsg = "empty event group";
return false;
}

PipelineEvent::Type eventType = group.mEvents[0]->GetType();
if (eventType == PipelineEvent::Type::NONE) {
// should not happen
errorMsg = "unsupported event type in event group";
return false;
}

bool enableNs = mFlusher->GetContext().GetGlobalConfig().mEnableTimestampNanosecond;

// caculate serialized logGroup size first, where some critical results can be cached
vector<size_t> logSZ(group.mEvents.size());
vector<pair<string, size_t>> metricEventContentCache(group.mEvents.size());
size_t logGroupSZ = 0;
switch (eventType) {
case PipelineEvent::Type::LOG:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<LogEvent>();
if (e.Empty()) {
continue;
}
hasPrev = true;
oss << it->first << METRIC_LABELS_KEY_VALUE_SEPARATOR << it->second;
}
auto logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_LABELS);
logPtr->set_value(oss.str());
// set time, no need to set nanosecond for metric
log->set_time(metricEvent.GetTimestamp());
// set __time_nano__
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_TIME_NANO);
if (metricEvent.GetTimestampNanosecond()) {
logPtr->set_value(std::to_string(metricEvent.GetTimestamp())
+ NumberToDigitString(metricEvent.GetTimestampNanosecond().value(), 9));
} else {
logPtr->set_value(std::to_string(metricEvent.GetTimestamp()));
size_t contentSZ = 0;
for (const auto& kv : e) {
contentSZ += GetLogContentSize(kv.first.size(), kv.second.size());
}
logGroupSZ += GetLogSize(contentSZ, enableNs && e.GetTimestampNanosecond(), logSZ[i]);
}
// set __value__
if (metricEvent.Is<UntypedSingleValue>()) {
double value = metricEvent.GetValue<UntypedSingleValue>()->mValue;
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_VALUE);
logPtr->set_value(std::to_string(value));
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<UntypedSingleValue>()) {
metricEventContentCache[i].first = to_string(e.GetValue<UntypedSingleValue>()->mValue);
} else {
// should not happen
LOG_ERROR(sLogger,
("unexpected error",
"invalid metric event type")("config", mFlusher->GetContext().GetConfigName()));
continue;
}
metricEventContentCache[i].second = GetMetricLabelSize(e);

size_t contentSZ = 0;
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_NAME.size(), e.GetName().size());
contentSZ
+= GetLogContentSize(METRIC_RESERVED_KEY_VALUE.size(), metricEventContentCache[i].first.size());
contentSZ
+= GetLogContentSize(METRIC_RESERVED_KEY_TIME_NANO.size(), e.GetTimestampNanosecond() ? 19U : 10U);
contentSZ += GetLogContentSize(METRIC_RESERVED_KEY_LABELS.size(), metricEventContentCache[i].second);
logGroupSZ += GetLogSize(contentSZ, false, logSZ[i]);
}
// set __name__
logPtr = log->add_contents();
logPtr->set_key(METRIC_RESERVED_KEY_NAME);
logPtr->set_value(metricEvent.GetName().to_string());
break;
case PipelineEvent::Type::SPAN:
break;
default:
break;
}
if (logGroupSZ == 0) {
errorMsg = "all empty logs";
return false;
}

// loggroup.category is deprecated, no need to set
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC || tag.first == LOG_RESERVED_KEY_SOURCE
|| tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
logGroupSZ += GetStringSize(tag.second.size());
} else {
errorMsg = "unsupported event type in event group";
return false;
logGroupSZ += GetLogTagSize(tag.first.size(), tag.second.size());
}
}

if (static_cast<int32_t>(logGroupSZ) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(logGroupSZ)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}

thread_local LogGroupSerializer serializer;
serializer.Prepare(logGroupSZ);
switch (eventType) {
case PipelineEvent::Type::LOG:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& logEvent = group.mEvents[i].Cast<LogEvent>();
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(logEvent.GetTimestamp());
for (const auto& kv : logEvent) {
serializer.AddLogContent(kv.first, kv.second);
}
if (enableNs && logEvent.GetTimestampNanosecond()) {
serializer.AddLogTimeNs(logEvent.GetTimestampNanosecond().value());
}
}
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& metricEvent = group.mEvents[i].Cast<MetricEvent>();
if (metricEvent.Is<std::monostate>()) {
continue;
}
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(metricEvent.GetTimestamp());
serializer.AddLogContentMetricLabel(metricEvent, metricEventContentCache[i].second);
serializer.AddLogContentMetricTimeNano(metricEvent);
serializer.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentCache[i].first);
serializer.AddLogContent(METRIC_RESERVED_KEY_NAME, metricEvent.GetName());
}
break;
case PipelineEvent::Type::SPAN:
break;
default:
break;
}
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC) {
logGroup.set_topic(tag.second.to_string());
serializer.AddTopic(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_SOURCE) {
logGroup.set_source(tag.second.to_string());
serializer.AddSource(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
logGroup.set_machineuuid(tag.second.to_string());
serializer.AddMachineUUID(tag.second);
} else {
auto logTag = logGroup.add_logtags();
logTag->set_key(tag.first.to_string());
logTag->set_value(tag.second.to_string());
serializer.AddLogTag(tag.first, tag.second);
}
}
// loggroup.category is deprecated, no need to set
size_t size = logGroup.ByteSizeLong();
if (static_cast<int32_t>(size) > INT32_FLAG(max_send_log_group_size)) {
errorMsg = "log group exceeds size limit\tgroup size: " + ToString(size)
+ "\tsize limit: " + ToString(INT32_FLAG(max_send_log_group_size));
return false;
}
logGroup.SerializeToString(&res);
res = std::move(serializer.GetResult());
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ namespace logtail {

class FlusherSLS : public HttpFlusher {
public:

static std::shared_ptr<ConcurrencyLimiter> GetLogstoreConcurrencyLimiter(const std::string& project, const std::string& logstore);
static std::shared_ptr<ConcurrencyLimiter> GetLogstoreConcurrencyLimiter(const std::string& project,
const std::string& logstore);
static std::shared_ptr<ConcurrencyLimiter> GetProjectConcurrencyLimiter(const std::string& project);
static std::shared_ptr<ConcurrencyLimiter> GetRegionConcurrencyLimiter(const std::string& region);
static void ClearInvalidConcurrencyLimiters();
Expand Down
Loading

0 comments on commit 3d26daf

Please sign in to comment.