Skip to content

Commit

Permalink
add UsingOldContentTag. When UsingOldContentTag is set to false, the …
Browse files Browse the repository at this point in the history
…tag is now put into the context during cgo. (#1169)

* add mUsingOldContentTag

* add changelog

* add extractTagsToLogTags

* add logGroupPoolSize

* set aggregator_context as default aggregator

* Add protoc description

* add AddToQueueWithRetry

* add LogGroupWithSize

* add Test_extractTagsToLogTags

* delete nowLogGroupSizeMap

* make lint

* fix genernate.sh

* set aggregator_context as aggregator_default

* add WITHOUTGDB into e2e

* fix doc

* logGroupPoolSize = 0 when reset

* change LogGroupWithSize to *LogGroupWithSize

* add tag_kv_regex

* addToQueueWithRetry

* add using_old_content_tag

* fix

* modify changelog
  • Loading branch information
quzard authored Oct 25, 2023
1 parent 8e9ab4f commit 8df501f
Show file tree
Hide file tree
Showing 27 changed files with 424 additions and 59 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
BUILD_LOGTAIL_UT: OFF
ENABLE_COMPATIBLE_MODE: ON
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
run: make dist && scripts/check_glibc.sh

- name: Build Docker
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/e2e-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ jobs:
sudo chmod +x /usr/local/bin/docker-compose
- name: E2E Core Structure Test
env:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: make e2e-core

result:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ jobs:
run: docker --version

- name: E2E Test
env:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: make e2e

- name: UnitTest E2e Engine
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]

- [public] [both] [added] add UsingOldContentTag. When UsingOldContentTag is set to false, the Tag is now placed in the Meta instead of Logs during cgo.
1 change: 1 addition & 0 deletions core/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Config {
uint16_t mSearchCheckpointDirDepth = 0; // Max directory depth when search checkpoint.

bool mEnableTimestampNanosecond = false;
bool mUsingOldContentTag = false;
// Deprecated
bool mEnablePreciseTimestamp = false;
std::string mPreciseTimestampKey;
Expand Down
6 changes: 6 additions & 0 deletions core/config/UserLogConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ void UserLogConfigParser::ParseAdvancedConfig(const Json::Value& originalVal, Co
cfg.mAdvancedConfig.mSpecifiedYear = static_cast<int32_t>(val.asUInt());
}
}
// using_old_content_tag
{
if (advancedVal.isMember("using_old_content_tag") && advancedVal["using_old_content_tag"].isBool()) {
cfg.mAdvancedConfig.mUsingOldContentTag = GetBoolValue(advancedVal, "using_old_content_tag");
}
}
// precise_timestamp
{
if (advancedVal.isMember("enable_timestamp_nanosecond") && advancedVal["enable_timestamp_nanosecond"].isBool()) {
Expand Down
6 changes: 6 additions & 0 deletions core/config_manager/ConfigManagerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,17 @@ void ConfigManagerBase::LoadSingleUserConfig(const std::string& logName, const J
SetNotFoundJsonMember(pluginConfigJson["global"],
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginConfigJson["global"],
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
} else {
Json::Value pluginGlobalConfigJson;
SetNotFoundJsonMember(pluginGlobalConfigJson,
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginGlobalConfigJson,
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
pluginConfigJson["global"] = pluginGlobalConfigJson;
}
config->mPluginConfig = pluginConfigJson.toStyledString();
Expand Down
1 change: 1 addition & 0 deletions core/config_manager/ConfigYamlToJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ConfigYamlToJson::ConfigYamlToJson() {
mFileAdvancedConfigMap["DirBlackList"] = "dir_blacklist";
mFileAdvancedConfigMap["FilepathBlackList"] = "filepath_blacklist";
mFileAdvancedConfigMap["EnableTimestampNanosecond"] = "enable_timestamp_nanosecond";
mFileAdvancedConfigMap["UsingOldContentTag"] = "using_old_content_tag";
mFileAdvancedConfigMap["EnablePreciseTimestamp"] = "enable_precise_timestamp";
mFileAdvancedConfigMap["PreciseTimestampKey"] = "precise_timestamp_key";
mFileAdvancedConfigMap["PreciseTimestampUnit"] = "precise_timestamp_unit";
Expand Down
2 changes: 1 addition & 1 deletion docs/cn/getting-started/how-to-use-prometheus-fetcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ iLogtail Prometheus 采集的Metrics 数据与日志同样遵循[iLogtail 的传

## E2E 快速上手
目前iLogtail 已经集成了prometheus 的E2E测试,可以在iLogtail 的根路径快速进行上手验证。
测试命令:_TEST_SCOPE=input_prometheus TEST_DEBUG=true make e2e(开启DEBUG 选项可以查看传输数据明细)_
测试命令:TEST_SCOPE=input_prometheus TEST_DEBUG=true make e2e(开启DEBUG 选项可以查看传输数据明细)_
```
TEST_DEBUG=true TEST_PROFILE=false ./scripts/e2e.sh behavior input_prometheus
=========================================
Expand Down
1 change: 1 addition & 0 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GlobalConfig struct {
DelayStopSec int

EnableTimestampNanosecond bool
UsingOldContentTag bool
EnableContainerdUpperDirDetect bool
EnableSlsMetricsFormat bool
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/protocol/proto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# How to generate pb

cd ~
wget https://ghproxy.com/https://github.com/gogo/protobuf/archive/refs/tags/v1.3.2.tar.gz
tar xzf v1.3.2.tar.gz
mkdir -p ${GOPATH}/src/github.com/gogo
mv protobuf-1.3.2 ${GOPATH}/src/github.com/gogo/protobuf

cd ~
wget https://ghproxy.com/https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/protoc-3.14.0-linux-x86_64.zip
unzip protoc-3.14.0-linux-x86_64.zip
mv bin/protoc /usr/local/bin/
mv include/google/ /usr/local/include/

go install github.com/gogo/protobuf/protoc-gen-gogofaster@latest

bash pkg/protocol/proto/genernate.sh
1 change: 1 addition & 0 deletions pkg/protocol/proto/genernate.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if [ -d "${PROTO_GEN_HOME}" ]; then
rm -rf "${PROTO_GEN_HOME}"
fi
mkdir "${PROTO_GEN_HOME}"
export PATH=$PATH:$GOPATH/bin

protoc -I="${PROTO_HOME}" \
-I="${GOPATH}/src" \
Expand Down
72 changes: 62 additions & 10 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,41 @@ func extractTags(rawTags []byte, log *protocol.Log) {
}
}

// extractTagsToLogTags extracts tags from rawTags and append them into []*protocol.LogTag.
// Rule: k1~=~v1^^^k2~=~v2
// rawTags
func extractTagsToLogTags(rawTags []byte) []*protocol.LogTag {
logTags := []*protocol.LogTag{}
defaultPrefixIndex := 0
for len(rawTags) != 0 {
idx := bytes.Index(rawTags, tagDelimiter)
var part []byte
if idx < 0 {
part = rawTags
rawTags = rawTags[len(rawTags):]
} else {
part = rawTags[:idx]
rawTags = rawTags[idx+len(tagDelimiter):]
}
if len(part) > 0 {
pos := bytes.Index(part, tagSeparator)
if pos > 0 {
logTags = append(logTags, &protocol.LogTag{
Key: string(part[:pos]),
Value: string(part[pos+len(tagSeparator):]),
})
} else {
logTags = append(logTags, &protocol.LogTag{
Key: defaultTagPrefix + strconv.Itoa(defaultPrefixIndex),
Value: string(part),
})
}
defaultPrefixIndex++
}
}
return logTags
}

// ProcessRawLogV2 ...
// V1 -> V2: enable topic field, and use tags field to pass more tags.
// unsafe parameter: rawLog,packID and tags
Expand All @@ -263,8 +298,14 @@ func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic st
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
logTags := extractTagsToLogTags(tags)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}})
} else {
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}

Expand All @@ -279,8 +320,14 @@ func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
logTags := extractTagsToLogTags(tags)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}})
} else {
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}

Expand All @@ -298,13 +345,18 @@ func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int {
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logGroup.LogTags}})
} else {
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}
Expand Down
20 changes: 19 additions & 1 deletion pluginmanager/logstore_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/alibaba/ilogtail/pkg/config"
global_config "github.com/alibaba/ilogtail/pkg/config"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
Expand Down Expand Up @@ -414,6 +415,22 @@ func Test_extractTags(t *testing.T) {
assert.Equal(t, l.Contents[0].Value, "k2")
}

func Test_extractTagsToLogTags(t *testing.T) {
logTag := extractTagsToLogTags([]byte("k1~=~v1^^^k2~=~v2"))
assert.Equal(t, logTag[0].Key, "k1")
assert.Equal(t, logTag[0].Value, "v1")
assert.Equal(t, logTag[1].Key, "k2")
assert.Equal(t, logTag[1].Value, "v2")

logTag = extractTagsToLogTags([]byte("^^^k2~=~v2"))
assert.Equal(t, logTag[0].Key, "k2")
assert.Equal(t, logTag[0].Value, "v2")

logTag = extractTagsToLogTags([]byte("^^^k2"))
assert.Equal(t, logTag[0].Key, "__tag__:__prefix__0")
assert.Equal(t, logTag[0].Value, "k2")
}

func TestLogstoreConfig_ProcessRawLogV2(t *testing.T) {
rawLogs := []byte("12345")
topic := "topic"
Expand All @@ -423,7 +440,8 @@ func TestLogstoreConfig_ProcessRawLogV2(t *testing.T) {
l.PluginRunner = &pluginv1Runner{
LogsChan: make(chan *pipeline.LogWithContext, 10),
}

l.GlobalConfig = &config.LogtailGlobalConfig
l.GlobalConfig.UsingOldContentTag = true
{
assert.Equal(t, 0, l.ProcessRawLogV2(rawLogs, "", topic, tags))
assert.Equal(t, 1, len(l.PluginRunner.(*pluginv1Runner).LogsChan))
Expand Down
1 change: 1 addition & 0 deletions pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (p *pluginv2Runner) Stop(exit bool) error {
return nil
}

// TODO: Design the ReceiveRawLogV2, which is passed in a PipelineGroupEvents not pipeline.LogWithContext, and tags should be added in the PipelineGroupEvents.
func (p *pluginv2Runner) ReceiveRawLog(in *pipeline.LogWithContext) {
md := models.NewMetadata()
if in.Context != nil {
Expand Down
4 changes: 2 additions & 2 deletions plugins/aggregator/aggregator_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package aggregator

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/plugins/aggregator/baseagg"
"github.com/alibaba/ilogtail/plugins/aggregator/context"
)

func init() {
pipeline.Aggregators["aggregator_default"] = func() pipeline.Aggregator {
return baseagg.NewAggregatorBase()
return context.NewAggregatorContext()
}
}
Loading

0 comments on commit 8df501f

Please sign in to comment.