Skip to content

Commit

Permalink
fix: fix go pipeline stop hang caused by improper component stop order (
Browse files Browse the repository at this point in the history
#1914)

* fix go pipeline stop hang caused by improper component stop order
  • Loading branch information
henryzhx8 authored Nov 26, 2024
1 parent 3d216eb commit aab3058
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
7 changes: 3 additions & 4 deletions pluginmanager/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (s *configUpdateTestSuite) TestConfigUpdate() {
// unblock old config
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(10000, checkFlusher.GetLogCount())
// this magic number(10000) must exceed number of logs can be hold in processor channel(LogsChan) + aggregator buffer(defaultLogGroup) + flusher channel(LogGroupsChan)
s.Equal(0, checkFlusher.GetLogCount())
LogtailConfigLock.RLock()
s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount())
LogtailConfigLock.RUnlock()
Expand All @@ -104,7 +103,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() {
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(checkFlusher.GetLogCount(), 10000)
s.Equal(checkFlusher.GetLogCount(), 0)

// load normal config
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -183,5 +182,5 @@ func (s *configUpdateTestSuite) TestStopAllExitTimeout() {
s.Equal(0, checkFlusher.GetLogCount())
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(10000, checkFlusher.GetLogCount())
s.Equal(0, checkFlusher.GetLogCount())
}
3 changes: 2 additions & 1 deletion pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ func (p *pluginv1Runner) Stop(exit bool) error {
for _, flusher := range p.FlusherPlugins {
flusher.Flusher.SetUrgent(exit)
}
p.LogstoreConfig.FlushOutFlag.Store(true)

for _, service := range p.ServicePlugins {
_ = service.Stop()
}
Expand All @@ -395,7 +397,6 @@ func (p *pluginv1Runner) Stop(exit bool) error {
p.AggregateControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done")

p.LogstoreConfig.FlushOutFlag.Store(true)
p.FlushControl.WaitCancel()

if exit && p.FlushOutStore.Len() > 0 {
Expand Down
3 changes: 2 additions & 1 deletion pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ func (p *pluginv2Runner) Stop(exit bool) error {
for _, flusher := range p.FlusherPlugins {
flusher.Flusher.SetUrgent(exit)
}
p.LogstoreConfig.FlushOutFlag.Store(true)

for _, serviceInput := range p.ServicePlugins {
_ = serviceInput.Input.Stop()
}
Expand All @@ -409,7 +411,6 @@ func (p *pluginv2Runner) Stop(exit bool) error {
p.AggregateControl.WaitCancel()
logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done")

p.LogstoreConfig.FlushOutFlag.Store(true)
p.FlushControl.WaitCancel()

if exit && p.FlushOutStore.Len() > 0 {
Expand Down

0 comments on commit aab3058

Please sign in to comment.