Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport 2.1]:fix go pipeline stop hang caused by improper component… #1958 #1964

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions pluginmanager/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package pluginmanager

import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/suite"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/plugins/flusher/checker"

"github.com/stretchr/testify/suite"
)

var updateConfigName = "update_mock_block"
Expand Down Expand Up @@ -67,16 +66,17 @@ func (s *configUpdateTestSuite) TestConfigUpdate() {
s.NoError(HoldOn(false))
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
s.NoError(err)
// s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
s.NoError(LoadMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName)))
s.NoError(Resume())
s.Nil(LogtailConfig[updateConfigName], "the stopping config only allow to load same config when stopped")
s.NotNil(LogtailConfig[updateConfigName])
s.NotNil(LogtailConfig[noblockUpdateConfigName])

// unblock old config
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(10000, checkFlusher.GetLogCount())
s.Equal(0, checkFlusher.GetLogCount())
// this magic number(10000) must exceed number of logs can be hold in processor channel(LogsChan) + aggregator buffer(defaultLogGroup) + flusher channel(LogGroupsChan)
s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount())
}
Expand All @@ -88,19 +88,22 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() {
s.True(ok)

s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
// load block config
for i := 0; i < 5; i++ {
s.NoError(HoldOn(false))
err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
s.NoError(Resume())
s.Nil(LogtailConfig[updateConfigName], "the stopping config only allow to load same config when stopped")
}
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(checkFlusher.GetLogCount(), 10000)

/*
// load block config
for i := 0; i < 5; i++ {
s.NoError(HoldOn(false))
err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
s.NoError(err)
//s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
s.NoError(Resume())
s.NotNil(LogtailConfig[updateConfigName])
//s.Nil(LogtailConfig[updateConfigName], "the stopping config only allow to load same config when stopped")
}
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(checkFlusher.GetLogCount(), 0)
*/
// load normal config
for i := 0; i < 5; i++ {
s.NoError(HoldOn(false))
Expand All @@ -123,7 +126,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateName() {
defer func() {
checkFlusher.Block = false
time.Sleep(time.Second * 5)
s.Equal(checkFlusher.GetLogCount(), 10000)
s.Equal(checkFlusher.GetLogCount(), 0)
}()
s.True(ok)

Expand Down Expand Up @@ -168,7 +171,7 @@ func (s *configUpdateTestSuite) TestHoldOnExitTimeout() {
s.Equal(0, checkFlusher.GetLogCount())
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(10000, checkFlusher.GetLogCount())
s.Equal(0, checkFlusher.GetLogCount())
time.Sleep(time.Second * 10)
s.NoError(Resume())
}
4 changes: 2 additions & 2 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type LogstoreConfig struct {
ConfigName string
ConfigNameWithSuffix string
LogstoreKey int64
FlushOutFlag bool
FlushOutFlag atomic.Bool
// Each LogstoreConfig can have its independent GlobalConfig if the "global" field
// is offered in configuration, see build-in AlarmConfig.
GlobalConfig *config.GlobalConfig
Expand Down Expand Up @@ -143,7 +143,7 @@ func (p *LogstoreStatistics) Init(context pipeline.Context) {
// 4. Start inputs (including metrics and services), just like aggregator, each input
// has its own goroutine.
func (lc *LogstoreConfig) Start() {
lc.FlushOutFlag = false
lc.FlushOutFlag.Store(false)
logger.Info(lc.Context.GetRuntimeContext(), "config start", "begin")

lc.pauseChan = make(chan struct{}, 1)
Expand Down
4 changes: 2 additions & 2 deletions pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (p *pluginv1Runner) runFlusherInternal(cc *pipeline.AsyncControl) {
}
break
}
if !p.LogstoreConfig.FlushOutFlag {
if !p.LogstoreConfig.FlushOutFlag.Load() {
time.Sleep(time.Duration(10) * time.Millisecond)
continue
}
Expand All @@ -377,6 +377,7 @@ func (p *pluginv1Runner) Stop(exit bool) error {
for _, flusher := range p.FlusherPlugins {
flusher.Flusher.SetUrgent(exit)
}
p.LogstoreConfig.FlushOutFlag.Store(true)
for _, service := range p.ServicePlugins {
_ = service.Stop()
}
Expand All @@ -389,7 +390,6 @@ func (p *pluginv1Runner) Stop(exit bool) error {
p.AggregateControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done")

p.LogstoreConfig.FlushOutFlag = true
p.FlushControl.WaitCancel()

if exit && p.FlushOutStore.Len() > 0 {
Expand Down
5 changes: 3 additions & 2 deletions pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (p *pluginv2Runner) runFlusherInternal(cc *pipeline.AsyncControl) {
}
break
}
if !p.LogstoreConfig.FlushOutFlag {
if !p.LogstoreConfig.FlushOutFlag.Load() {
time.Sleep(time.Duration(10) * time.Millisecond)
continue
}
Expand All @@ -400,6 +400,8 @@ func (p *pluginv2Runner) Stop(exit bool) error {
for _, flusher := range p.FlusherPlugins {
flusher.Flusher.SetUrgent(exit)
}
p.LogstoreConfig.FlushOutFlag.Store(true)

for _, serviceInput := range p.ServicePlugins {
_ = serviceInput.Input.Stop()
}
Expand All @@ -412,7 +414,6 @@ func (p *pluginv2Runner) Stop(exit bool) error {
p.AggregateControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done")

p.LogstoreConfig.FlushOutFlag = true
p.FlushControl.WaitCancel()

if exit && p.FlushOutStore.Len() > 0 {
Expand Down
105 changes: 0 additions & 105 deletions test/case/core/block_holdon_resume/ilogtail-e2e.yaml

This file was deleted.

Loading
Loading