Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Jan 17, 2025
1 parent 94da5d7 commit a52f25f
Show file tree
Hide file tree
Showing 53 changed files with 468 additions and 252 deletions.
76 changes: 39 additions & 37 deletions core/common/ParamExtractor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "boost/regex.hpp"

#include "TagConstants.h"
#include "constants/TagConstants.h"

using namespace std;

Expand Down Expand Up @@ -196,30 +196,12 @@ bool IsValidMap(const Json::Value& config, const string& key, string& errorMsg)
return true;
}

void ParseTagKey(const Json::Value* config,
const string& configField,
TagKey tagKey,
unordered_map<TagKey, string>& tagKeyMap,
const PipelineContext& context,
const std::string& pluginType,
bool defaultAdded) {
string customTagKey;
if (defaultAdded) {
customTagKey = ParseDefaultAddedTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType);
} else {
customTagKey = ParseOptionalTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType);
}
if (!customTagKey.empty()) {
tagKeyMap[tagKey] = customTagKey;
}
}


string ParseDefaultAddedTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const PipelineContext& context,
const string& pluginType) {
void ParseDefaultAddedTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const PipelineContext& context,
const string& pluginType,
string& outputKey) {
string errorMsg;
string customTagKey = DEFAULT_CONFIG_TAG_KEY_VALUE;
if (config && config->isMember(configField)) {
Expand All @@ -235,22 +217,23 @@ string ParseDefaultAddedTag(const Json::Value* config,
context.GetRegion());
}
if (customTagKey == DEFAULT_CONFIG_TAG_KEY_VALUE) {
return defaultTagKeyValue;
outputKey = defaultTagKeyValue;
}
return customTagKey;
outputKey = customTagKey;
}
return defaultTagKeyValue;
outputKey = defaultTagKeyValue;
}

string ParseOptionalTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const PipelineContext& context,
const string& pluginType) {
void ParseOptionalTag(const Json::Value* config,
const string& configField,
const string& defaultTagKeyValue,
const PipelineContext& context,
const string& pluginType,
string& outputKey) {
string errorMsg;
string customTagKey;
if (config && config->isMember(configField)) {
if (!GetOptionalStringParam(*config, configField, customTagKey, errorMsg)) {
if (!GetOptionalStringParam(*config, "Tags." + configField, customTagKey, errorMsg)) {
PARAM_WARNING_DEFAULT(context.GetLogger(),
context.GetAlarm(),
errorMsg,
Expand All @@ -262,11 +245,30 @@ string ParseOptionalTag(const Json::Value* config,
context.GetRegion());
}
if (customTagKey == DEFAULT_CONFIG_TAG_KEY_VALUE) {
return defaultTagKeyValue;
outputKey = defaultTagKeyValue;
}
return customTagKey;
outputKey = customTagKey;
}
outputKey = "";
}

// if there is no tag config, config maybe nullptr, will act as default (default added or optional)
void ParseTagKey(const Json::Value* config,
const string& configField,
TagKey tagKey,
unordered_map<TagKey, string>& tagKeyMap,
const PipelineContext& context,
const std::string& pluginType,
bool defaultAdded) {
string customTagKey;
if (defaultAdded) {
ParseDefaultAddedTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType, customTagKey);
} else {
ParseOptionalTag(config, configField, GetDefaultTagKeyString(tagKey), context, pluginType, customTagKey);
}
if (!customTagKey.empty()) {
tagKeyMap[tagKey] = customTagKey;
}
return "";
}

} // namespace logtail
12 changes: 0 additions & 12 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,4 @@ void ParseTagKey(const Json::Value* config,
const PipelineContext& context,
const std::string& pluginType,
bool defaultAdded);

std::string ParseDefaultAddedTag(const Json::Value* config,
const std::string& configField,
const std::string& defaultTagKeyValue,
const PipelineContext& context,
const std::string& pluginType);

std::string ParseOptionalTag(const Json::Value* config,
const std::string& configField,
const std::string& defaultTagKeyValue,
const PipelineContext& context,
const std::string& pluginType);
} // namespace logtail
2 changes: 2 additions & 0 deletions core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct PipelineConfig {
return mHasGoFlusher || ShouldNativeFlusherConnectedByGoPipeline();
}

bool ShouldAddNativeTagProcessor() const { return mHasNativeProcessor || (mHasNativeInput && !mHasGoProcessor); }

// bool IsProcessRunnerInvolved() const {
// // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版
// return !(mHasGoInput && !mHasNativeProcessor);
Expand Down
40 changes: 24 additions & 16 deletions core/constants/TagConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,39 @@

#include "constants/TagConstants.h"

#include <unordered_map>

using namespace std;

namespace logtail {

const string& GetDefaultTagKeyString(TagKey key) {
static const vector<string> TagKeyDefaultValue = {
DEFAULT_LOG_TAG_FILE_OFFSET,
DEFAULT_LOG_TAG_FILE_INODE,
DEFAULT_LOG_TAG_FILE_PATH,
DEFAULT_LOG_TAG_NAMESPACE,
DEFAULT_LOG_TAG_POD_NAME,
DEFAULT_LOG_TAG_POD_UID,
DEFAULT_LOG_TAG_CONTAINER_NAME,
DEFAULT_LOG_TAG_CONTAINER_IP,
DEFAULT_LOG_TAG_IMAGE_NAME,
DEFAULT_LOG_TAG_HOST_NAME,
DEFAULT_LOG_TAG_HOST_ID,
DEFAULT_LOG_TAG_CLOUD_PROVIDER,
static const unordered_map<TagKey, string> TagKeyDefaultValue = {
{TagKey::FILE_OFFSET_KEY, DEFAULT_LOG_TAG_FILE_OFFSET},
{TagKey::FILE_INODE_TAG_KEY, DEFAULT_LOG_TAG_FILE_INODE},
{TagKey::FILE_PATH_TAG_KEY, DEFAULT_LOG_TAG_FILE_PATH},
{TagKey::K8S_NAMESPACE_TAG_KEY, DEFAULT_LOG_TAG_NAMESPACE},
{TagKey::K8S_POD_NAME_TAG_KEY, DEFAULT_LOG_TAG_POD_NAME},
{TagKey::K8S_POD_UID_TAG_KEY, DEFAULT_LOG_TAG_POD_UID},
{TagKey::CONTAINER_NAME_TAG_KEY, DEFAULT_LOG_TAG_CONTAINER_NAME},
{TagKey::CONTAINER_IP_TAG_KEY, DEFAULT_LOG_TAG_CONTAINER_IP},
{TagKey::CONTAINER_IMAGE_NAME_TAG_KEY, DEFAULT_LOG_TAG_IMAGE_NAME},
{TagKey::HOST_NAME_TAG_KEY, DEFAULT_LOG_TAG_HOST_NAME},
{TagKey::HOST_ID_TAG_KEY, DEFAULT_LOG_TAG_HOST_ID},
{TagKey::CLOUD_PROVIDER_TAG_KEY, DEFAULT_LOG_TAG_CLOUD_PROVIDER},
#ifndef __ENTERPRISE__
DEFAULT_LOG_TAG_HOST_IP,
{TagKey::HOST_IP_TAG_KEY, DEFAULT_LOG_TAG_HOST_IP},
#else
DEFAULT_LOG_TAG_USER_DEFINED_ID,
{TagKey::AGENT_TAG_TAG_KEY, DEFAULT_LOG_TAG_USER_DEFINED_ID},
#endif
};
return TagKeyDefaultValue[key];
static const string unknown = "unknown_tag_key";
auto iter = TagKeyDefaultValue.find(key);
if (iter != TagKeyDefaultValue.end()) {
return iter->second;
} else {
return unknown;
}
}

////////////////////////// COMMON ////////////////////////
Expand Down
10 changes: 5 additions & 5 deletions core/constants/TagConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ enum TagKey : int {
CONTAINER_NAME_TAG_KEY,
CONTAINER_IP_TAG_KEY,
CONTAINER_IMAGE_NAME_TAG_KEY,
HOST_NAME,
HOST_ID,
CLOUD_PROVIDER,
HOST_NAME_TAG_KEY,
HOST_ID_TAG_KEY,
CLOUD_PROVIDER_TAG_KEY,
#ifndef __ENTERPRISE__
HOST_IP,
HOST_IP_TAG_KEY,
#else
AGENT_TAG,
AGENT_TAG_TAG_KEY,
#endif
};

Expand Down
8 changes: 4 additions & 4 deletions core/file_server/FileTagOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ StringView FileTagOptions::GetFileTagKeyName(TagKey key) const {
}

bool FileTagOptions::EnableLogPositionMeta() {
bool enableFileOffset
= mFileTags.find(TagKey::FILE_OFFSET_KEY) != mFileTags.end() && !mFileTags[TagKey::FILE_OFFSET_KEY].empty();
bool enableFileInode = mFileTags.find(TagKey::FILE_INODE_TAG_KEY) != mFileTags.end()
&& !mFileTags[TagKey::FILE_INODE_TAG_KEY].empty();
auto offsetIter = mFileTags.find(TagKey::FILE_OFFSET_KEY);
bool enableFileOffset = offsetIter != mFileTags.end() && !offsetIter->second.empty();
auto inodeIter = mFileTags.find(TagKey::FILE_INODE_TAG_KEY);
bool enableFileInode = inodeIter != mFileTags.end() && !inodeIter->second.empty();
return enableFileOffset || enableFileInode;
}

Expand Down
2 changes: 1 addition & 1 deletion core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2502,7 +2502,7 @@ void LogFileReader::SetEventGroupMetaAndTag(PipelineEventGroup& group) {
}
}

std::string topic = GetTopicName();
const auto& topic = GetTopicName();
if (!topic.empty()) {
StringBuffer b = group.GetSourceBuffer()->CopyString(topic);
group.SetTagNoCopy(LOG_RESERVED_KEY_TOPIC, StringView(b.data, b.size));
Expand Down
2 changes: 1 addition & 1 deletion core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ LogtailPlugin::LogtailPlugin() {
mPluginCfg["Hostname"] = LoongCollectorMonitor::mHostname;
mPluginCfg["EnableContainerdUpperDirDetect"] = BOOL_FLAG(enable_containerd_upper_dir_detect);
mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format);
mPluginCfg["LogFileTagsPath"] = STRING_FLAG(ALIYUN_LOG_FILE_TAGS);
mPluginCfg["FileTagsPath"] = STRING_FLAG(ALIYUN_LOG_FILE_TAGS);
mPluginCfg["MachineUUID"] = Application::GetInstance()->GetUUID();
}

Expand Down
13 changes: 7 additions & 6 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include "json/value.h"

#include "ProcessorInstance.h"
#include "app_config/AppConfig.h"
#include "common/Flags.h"
#include "common/ParamExtractor.h"
Expand Down Expand Up @@ -231,11 +230,9 @@ bool Pipeline::Init(PipelineConfig&& config) {
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithInput);
CopyNativeGlobalParamToGoPipeline(mGoPipelineWithoutInput);

// only add native tag processor when there is only native input, otherwise will use go tag processor
if (!mInputs.empty() && !HasGoPipelineWithInput()) {
if (config.ShouldAddNativeTagProcessor()) {
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(ProcessorTagNative::sName, GenNextPluginMeta(false));
mInnerProcessorLine.emplace_back(std::move(processor));
Json::Value detail;
if (config.mGlobal) {
detail = *config.mGlobal;
Expand All @@ -244,7 +241,11 @@ bool Pipeline::Init(PipelineConfig&& config) {
// should not happen
return false;
}
mInnerProcessorLine.emplace_back(std::move(processor));
mPipelineInnerProcessor.emplace_back(std::move(processor));
} else {
// processor tag requires tags as input, so it is a special processor, cannot add as plugin
mGoPipelineWithInput["global"]["EnableProcessorTag"] = true;
mGoPipelineWithoutInput["global"]["EnableProcessorTag"] = true;
}

// mandatory override global.DefaultLogQueueSize in Go pipeline when input_file and Go processing coexist.
Expand Down Expand Up @@ -396,7 +397,7 @@ void Pipeline::Process(vector<PipelineEventGroup>& logGroupList, size_t inputInd
for (auto& p : mInputs[inputIndex]->GetInnerProcessors()) {
p->Process(logGroupList);
}
for (auto& p : mInnerProcessorLine) {
for (auto& p : mPipelineInnerProcessor) {
p->Process(logGroupList);
}
for (auto& p : mProcessorLine) {
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class Pipeline {

std::string mName;
std::vector<std::unique_ptr<InputInstance>> mInputs;
std::vector<std::unique_ptr<ProcessorInstance>> mInnerProcessorLine;
std::vector<std::unique_ptr<ProcessorInstance>> mPipelineInnerProcessor;
std::vector<std::unique_ptr<ProcessorInstance>> mProcessorLine;
std::vector<std::unique_ptr<FlusherInstance>> mFlushers;
Router mRouter;
Expand Down
42 changes: 23 additions & 19 deletions core/plugin/processor/inner/ProcessorTagNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ bool ProcessorTagNative::Init(const Json::Value& config) {
tagConfig = nullptr;
}
}
ParseTagKey(tagConfig, "HOST_NAME", TagKey::HOST_NAME, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "HOST_ID", TagKey::HOST_ID, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "CLOUD_PROVIDER", TagKey::CLOUD_PROVIDER, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "HOST_NAME", TagKey::HOST_NAME_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "HOST_ID", TagKey::HOST_ID_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(
tagConfig, "CLOUD_PROVIDER", TagKey::CLOUD_PROVIDER_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);

#ifdef __ENTERPRISE__
ParseTagKey(tagConfig, "AGENT_TAG", TagKey::AGENT_TAG, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "AGENT_TAG", TagKey::AGENT_TAG_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);
#else
ParseTagKey(tagConfig, "HOST_IP", TagKey::HOST_IP, mPipelineMetaTagKey, *mContext, sName, true);
ParseTagKey(tagConfig, "HOST_IP", TagKey::HOST_IP_TAG_KEY, mPipelineMetaTagKey, *mContext, sName, true);
#endif

#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -98,23 +99,23 @@ void ProcessorTagNative::Process(PipelineEventGroup& logGroup) {
return;
}

addTag(logGroup, TagKey::HOST_NAME, LoongCollectorMonitor::GetInstance()->mHostname);
AddTag(logGroup, TagKey::HOST_NAME_TAG_KEY, LoongCollectorMonitor::GetInstance()->mHostname);
auto entity = InstanceIdentity::Instance()->GetEntity();
if (entity != nullptr) {
addTag(logGroup, TagKey::HOST_ID, entity->GetHostID());
AddTag(logGroup, TagKey::HOST_ID_TAG_KEY, entity->GetHostID());
#ifdef __ENTERPRISE__
ECSMeta meta = entity->GetECSMeta();
const string cloudProvider
= meta.GetInstanceID().empty() ? DEFAULT_VALUE_DOMAIN_INFRA : DEFAULT_VALUE_DOMAIN_ACS;
#else
const string cloudProvider = DEFAULT_VALUE_DOMAIN_INFRA;
#endif
addTag(logGroup, TagKey::CLOUD_PROVIDER, cloudProvider);
AddTag(logGroup, TagKey::CLOUD_PROVIDER_TAG_KEY, cloudProvider);
}
#ifdef __ENTERPRISE__
addTag(logGroup, TagKey::AGENT_TAG, EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet());
AddTag(logGroup, TagKey::AGENT_TAG_TAG_KEY, EnterpriseConfigProvider::GetInstance()->GetUserDefinedIdSet());
#else
addTag(logGroup, TagKey::HOST_IP, LoongCollectorMonitor::GetInstance()->mIpAddr);
AddTag(logGroup, TagKey::HOST_IP_TAG_KEY, LoongCollectorMonitor::GetInstance()->mIpAddr);
#endif

if (!STRING_FLAG(ALIYUN_LOG_FILE_TAGS).empty()) {
Expand All @@ -129,17 +130,20 @@ void ProcessorTagNative::Process(PipelineEventGroup& logGroup) {
if (!sEnvTags.empty()) {
for (size_t i = 0; i < sEnvTags.size(); ++i) {
#ifdef __ENTERPRISE__
if (mAppendingAllEnvMetaTag) {
if (mAgentEnvMetaTagKey.empty() && AppendingAllEnvMetaTag) {
logGroup.SetTagNoCopy(sEnvTags[i].key(), sEnvTags[i].value());
} else {
auto envTagKey = sEnvTags[i].key();
if (mAgentEnvMetaTagKey.find(envTagKey) != mAgentEnvMetaTagKey.end()) {
if (!mAgentEnvMetaTagKey[envTagKey].empty()) {
logGroup.SetTagNoCopy(mAgentEnvMetaTagKey[envTagKey], sEnvTags[i].value());
auto iter = mAgentEnvMetaTagKey.find(envTagKey);
if (iter != mAgentEnvMetaTagKey.end()) {
if (!iter->second.empty()) {
logGroup.SetTagNoCopy(iter->second, sEnvTags[i].value());
}
}
continue;
}
#endif
#else
logGroup.SetTagNoCopy(sEnvTags[i].key(), sEnvTags[i].value());
#endif
}
}

Expand All @@ -151,19 +155,19 @@ bool ProcessorTagNative::IsSupportedEvent(const PipelineEventPtr& /*e*/) const {
return true;
}

void ProcessorTagNative::addTag(PipelineEventGroup& logGroup, TagKey tagKey, const string& value) const {
auto sb = logGroup.GetSourceBuffer()->CopyString(value);
void ProcessorTagNative::AddTag(PipelineEventGroup& logGroup, TagKey tagKey, const string& value) const {
auto it = mPipelineMetaTagKey.find(tagKey);
if (it != mPipelineMetaTagKey.end()) {
if (!it->second.empty()) {
auto sb = logGroup.GetSourceBuffer()->CopyString(value);
logGroup.SetTagNoCopy(it->second, StringView(sb.data, sb.size));
}
// empty value means delete
}
}


void ProcessorTagNative::addTag(PipelineEventGroup& logGroup, TagKey tagKey, StringView value) const {
void ProcessorTagNative::AddTag(PipelineEventGroup& logGroup, TagKey tagKey, StringView value) const {
auto it = mPipelineMetaTagKey.find(tagKey);
if (it != mPipelineMetaTagKey.end()) {
if (!it->second.empty()) {
Expand Down
Loading

0 comments on commit a52f25f

Please sign in to comment.