Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disable nanosecond timestamp when only EnableTimestampNanosecond is enabled #1550

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/common/TimeUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,11 @@ void SetLogTime(sls_logs::Log* log, time_t second) {
log->set_time(second);
}

void SetLogTimeWithNano(sls_logs::Log* log, time_t second, long nanosecond) {
// Usage: set nanosecond first, and then discard at LogProcess@ProcessBufferLegacy
void SetLogTimeWithNano(sls_logs::Log* log, time_t second, std::optional<uint32_t> nanosecond) {
log->set_time(second);
log->set_time_ns(nanosecond);
if (nanosecond) {
log->set_time_ns(nanosecond.value());
}
}

LogtailTime GetCurrentLogtailTime() {
Expand Down
3 changes: 2 additions & 1 deletion core/common/TimeUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once
#include <ctime>
#include <optional>
#include <string>
#include <thread>

Expand Down Expand Up @@ -83,7 +84,7 @@ uint64_t GetPreciseTimestampFromLogtailTime(LogtailTime logTime, const PreciseTi

void SetLogTime(sls_logs::Log* log, time_t second);

void SetLogTimeWithNano(sls_logs::Log* log, time_t second, long nanosecond);
void SetLogTimeWithNano(sls_logs::Log* log, time_t second, std::optional<uint32_t> nanosecond);

LogtailTime GetCurrentLogtailTime();

Expand Down
9 changes: 6 additions & 3 deletions core/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

#include "BlockEventManager.h"
#include "processor/daemon/LogProcess.h"

#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "polling/PollingEventQueue.h"
#include "processor/daemon/LogProcess.h"
#include "queue/ProcessQueueManager.h"

DEFINE_FLAG_INT32(max_block_event_timeout, "max block event timeout, seconds", 3);

Expand Down Expand Up @@ -48,8 +50,9 @@ void BlockedEventManager::UpdateBlockEvent(const LogstoreFeedBackKey& logstoreKe
.append(pEvent->GetConfigName());
hashKey = HashSignatureString(key.c_str(), key.size());
}
// LOG_DEBUG(sLogger, ("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
// pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
LOG_DEBUG(sLogger,
("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
ScopedSpinLock lock(mLock);
mBlockEventMap[hashKey].Update(logstoreKey, pEvent, curTime);
}
Expand Down
11 changes: 9 additions & 2 deletions core/event/BlockEventManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#pragma once
#include <string>
#include <unordered_map>
#include "common/LogstoreFeedbackQueue.h"

#include "Event.h"
#include "common/FeedbackInterface.h"
#include "common/Flags.h"
#include "common/Lock.h"
#include "Event.h"
#include "queue/FeedbackQueueKey.h"

DECLARE_FLAG_INT32(max_block_event_timeout);

Expand Down Expand Up @@ -89,6 +91,11 @@ class BlockedEventManager : public LogstoreFeedBackInterface {

std::unordered_map<int64_t, BlockedEvent> mBlockEventMap;
SpinLock mLock;

private:
#ifdef APSARA_UNIT_TEST_MAIN
friend class ForceReadUnittest;
#endif
};

} // namespace logtail
1 change: 1 addition & 0 deletions core/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ModifyHandler : public EventHandler {
friend class EventDispatcherTest;
friend class SenderUnittest;
friend class ModifyHandlerUnittest;
friend class ForceReadUnittest;
#endif
};

Expand Down
8 changes: 7 additions & 1 deletion core/models/LogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ Json::Value LogEvent::ToJson() const {
Json::Value root;
root["type"] = static_cast<int>(GetType());
root["timestamp"] = GetTimestamp();
root["timestampNanosecond"] = GetTimestampNanosecond();
if (GetTimestampNanosecond()) {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
if (enableEventMeta) {
root["fileOffset"] = GetPosition().first;
root["rawSize"] = GetPosition().second;
}
if (!Empty()) {
Json::Value contents;
for (const auto& content : *this) {
Expand Down
12 changes: 11 additions & 1 deletion core/models/MetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,17 @@ Json::Value MetricEvent::ToJson() const {
Json::Value root;
root["type"] = static_cast<int>(GetType());
root["timestamp"] = GetTimestamp();
root["timestampNanosecond"] = GetTimestampNanosecond();
if (GetTimestampNanosecond()) {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["name"] = mName.to_string();
root["value"] = MetricValueToJson(mValue);
if (!mTags.mInner.empty()) {
Json::Value& tags = root["tags"];
for (const auto& tag : mTags.mInner) {
tags[tag.first.to_string()] = tag.second.to_string();
}
}
return root;
}

Expand Down
21 changes: 13 additions & 8 deletions core/models/PipelineEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ctime>
#include <map>
#include <memory>
#include <optional>
#include <string>

#include "models/StringView.h"
Expand All @@ -38,12 +39,16 @@ class PipelineEvent {
virtual ~PipelineEvent() = default;

Type GetType() const { return mType; }
time_t GetTimestamp() const { return timestamp; }
long GetTimestampNanosecond() const { return timestampNanosecond; }
void SetTimestamp(time_t t) { timestamp = t; }
void SetTimestamp(time_t t, long ns) {
timestamp = t;
timestampNanosecond = ns; // Only nanosecond part
time_t GetTimestamp() const { return mTimestamp; }
std::optional<uint32_t> GetTimestampNanosecond() const { return mTimestampNanosecond; }
void SetTimestamp(time_t t) { mTimestamp = t; }
void SetTimestamp(time_t t, uint32_t ns) {
mTimestamp = t;
mTimestampNanosecond = ns; // Only nanosecond part
}
void SetTimestamp(time_t t, std::optional<uint32_t> ns) {
mTimestamp = t;
mTimestampNanosecond = ns; // Only nanosecond part
}
void ResetPipelineEventGroup(PipelineEventGroup* ptr) { mPipelineEventGroupPtr = ptr; }
std::shared_ptr<SourceBuffer>& GetSourceBuffer();
Expand All @@ -61,8 +66,8 @@ class PipelineEvent {
PipelineEvent(Type type, PipelineEventGroup* ptr);

Type mType = Type::NONE;
time_t timestamp = 0;
long timestampNanosecond = 0;
time_t mTimestamp = 0;
std::optional<uint32_t> mTimestampNanosecond;
PipelineEventGroup* mPipelineEventGroupPtr = nullptr;
};

Expand Down
165 changes: 161 additions & 4 deletions core/models/SpanEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,174 @@ namespace logtail {
SpanEvent::SpanEvent(PipelineEventGroup* ptr) : PipelineEvent(Type::SPAN, ptr) {
}

uint64_t SpanEvent::EventsSizeBytes() {
// TODO
return 0;
void SpanEvent::SetTraceId(const string& traceId) {
const StringBuffer& b = GetSourceBuffer()->CopyString(traceId);
mTraceId = StringView(b.data, b.size);
}

void SpanEvent::SetSpanId(const string& spanId) {
const StringBuffer& b = GetSourceBuffer()->CopyString(spanId);
mSpanId = StringView(b.data, b.size);
}

void SpanEvent::SetTraceState(const string& traceState) {
const StringBuffer& b = GetSourceBuffer()->CopyString(traceState);
mTraceState = StringView(b.data, b.size);
}

void SpanEvent::SetParentSpanId(const string& parentSpanId) {
const StringBuffer& b = GetSourceBuffer()->CopyString(parentSpanId);
mParentSpanId = StringView(b.data, b.size);
}

void SpanEvent::SetName(const string& name) {
const StringBuffer& b = GetSourceBuffer()->CopyString(name);
mName = StringView(b.data, b.size);
}

StringView SpanEvent::GetTag(StringView key) const {
auto it = mTags.mInner.find(key);
if (it != mTags.mInner.end()) {
return it->second;
}
return gEmptyStringView;
}

bool SpanEvent::HasTag(StringView key) const {
return mTags.mInner.find(key) != mTags.mInner.end();
}

void SpanEvent::SetTag(StringView key, StringView val) {
SetTagNoCopy(GetSourceBuffer()->CopyString(key), GetSourceBuffer()->CopyString(val));
}

void SpanEvent::SetTag(const string& key, const string& val) {
SetTagNoCopy(GetSourceBuffer()->CopyString(key), GetSourceBuffer()->CopyString(val));
}

void SpanEvent::SetTagNoCopy(const StringBuffer& key, const StringBuffer& val) {
SetTagNoCopy(StringView(key.data, key.size), StringView(val.data, val.size));
}

void SpanEvent::SetTagNoCopy(StringView key, StringView val) {
mTags.Insert(key, val);
}

void SpanEvent::DelTag(StringView key) {
mTags.Erase(key);
}

SpanEvent::InnerEvent* SpanEvent::AddEvent() {
SpanEvent::InnerEvent e(this);
mEvents.emplace_back(std::move(e));
return &mEvents.back();
}

SpanEvent::SpanLink* SpanEvent::AddLink() {
SpanEvent::SpanLink l(this);
mLinks.emplace_back(std::move(l));
return &mLinks.back();
}

StringView SpanEvent::GetScopeTag(StringView key) const {
auto it = mScopeTags.mInner.find(key);
if (it != mScopeTags.mInner.end()) {
return it->second;
}
return gEmptyStringView;
}

bool SpanEvent::HasScopeTag(StringView key) const {
return mScopeTags.mInner.find(key) != mScopeTags.mInner.end();
}

void SpanEvent::SetScopeTag(StringView key, StringView val) {
SetScopeTagNoCopy(GetSourceBuffer()->CopyString(key), GetSourceBuffer()->CopyString(val));
}

void SpanEvent::SetScopeTag(const string& key, const string& val) {
SetScopeTagNoCopy(GetSourceBuffer()->CopyString(key), GetSourceBuffer()->CopyString(val));
}

void SpanEvent::SetScopeTagNoCopy(const StringBuffer& key, const StringBuffer& val) {
SetScopeTagNoCopy(StringView(key.data, key.size), StringView(val.data, val.size));
}

void SpanEvent::SetScopeTagNoCopy(StringView key, StringView val) {
mScopeTags.Insert(key, val);
}

void SpanEvent::DelScopeTag(StringView key) {
mScopeTags.Erase(key);
}

size_t SpanEvent::DataSize() const {
// TODO: this is not O(1), however, these two fields are not frequently used, so it can thought of O(1)
size_t eventsSize = sizeof(decltype(mEvents));
for (const auto& item : mEvents) {
eventsSize += item.DataSize();
}
size_t linksSize = sizeof(decltype(mLinks));
for (const auto& item : mLinks) {
linksSize += item.DataSize();
}
// TODO: for enum, it seems more reasonable to use actual string size instead of size of enum
return PipelineEvent::DataSize() + mTraceId.size() + mSpanId.size() + mTraceState.size() + mParentSpanId.size()
+ mName.size() + sizeof(decltype(mKind)) + sizeof(decltype(mStartTimeNs)) + sizeof(decltype(mEndTimeNs))
+ mTags.DataSize() + eventsSize + linksSize + sizeof(decltype(mStatus)) + mScopeTags.DataSize();
}

#ifdef APSARA_UNIT_TEST_MAIN
Json::Value SpanEvent::ToJson() const {
Json::Value root;
root["type"] = static_cast<int>(GetType());
root["timestamp"] = GetTimestamp();
root["timestampNanosecond"] = GetTimestampNanosecond();
if (GetTimestampNanosecond()) {
root["timestampNanosecond"] = static_cast<int32_t>(GetTimestampNanosecond().value());
}
root["traceId"] = mTraceId.to_string();
root["spanId"] = mSpanId.to_string();
if (!mTraceState.empty()) {
root["traceState"] = mTraceState.to_string();
}
if (!mParentSpanId.empty()) {
root["parentSpanId"] = mParentSpanId.to_string();
}
root["name"] = mName.to_string();
if (mKind != Kind::Unspecified) {
root["kind"] = static_cast<int>(mKind);
}
// must be int since jsoncpp will take integral as int during parse, which
// will lead to inequality on json comparison
root["startTimeNs"] = static_cast<int64_t>(mStartTimeNs);
root["endTimeNs"] = static_cast<int64_t>(mEndTimeNs);
if (!mTags.mInner.empty()) {
Json::Value& tags = root["tags"];
for (const auto& tag : mTags.mInner) {
tags[tag.first.to_string()] = tag.second.to_string();
}
}
if (!mEvents.empty()) {
Json::Value& events = root["events"];
for (const auto& event : mEvents) {
events.append(event.ToJson());
}
}
if (!mLinks.empty()) {
Json::Value& links = root["links"];
for (const auto& link : mLinks) {
links.append(link.ToJson());
}
}
if (mStatus != StatusCode::Unset) {
root["status"] = static_cast<int>(mStatus);
}
if (!mScopeTags.mInner.empty()) {
Json::Value& tags = root["scopeTags"];
for (const auto& tag : mScopeTags.mInner) {
tags[tag.first.to_string()] = tag.second.to_string();
}
}
return root;
}

Expand Down
2 changes: 0 additions & 2 deletions core/processor/ProcessorSplitLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ void ProcessorSplitLogStringNative::Process(PipelineEventGroup& logGroup) {
}
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);

return;
}

bool ProcessorSplitLogStringNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
1 change: 1 addition & 0 deletions core/processor/ProcessorSplitMultilineLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ void ProcessorSplitMultilineLogStringNative::Process(PipelineEventGroup& logGrou
}
mProcMatchedLinesCnt->Add(inputLines - unmatchLines);
mProcUnmatchedLinesCnt->Add(unmatchLines);
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);
}

Expand Down
8 changes: 8 additions & 0 deletions core/processor/daemon/LogProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,14 @@ void* LogProcess::ProcessLoop(int32_t threadNo) {
#endif
{
ReadLock lock(mAccessProcessThreadRWL);

std::unique_ptr<ProcessQueueItem> item;
std::string configName;
if (!ProcessQueueManager::GetInstance()->PopItem(threadNo, item, configName)) {
ProcessQueueManager::GetInstance()->Wait(100);
continue;
}

mThreadFlags[threadNo] = true;
s_processCount++;
uint64_t readBytes = logBuffer->rawBuffer.size() + 1; // may not be accurate if input is not utf8
Expand Down
Loading
Loading