From cd2786ac040f74ee6135d505095ffd06c6820cbc Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 6 Oct 2023 19:04:14 +0900 Subject: [PATCH] cshared: Make backward compatible and interruptable goroutines Signed-off-by: Hiroshi Hatake --- cshared.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cshared.go b/cshared.go index 3c5a2b2..951d3e0 100644 --- a/cshared.go +++ b/cshared.go @@ -201,13 +201,16 @@ func prepareInputCollector() (err error) { go func(theChannel chan<- Message) { defer theInputLock.Unlock() + + go func(theChannel chan<- Message) { + err = theInput.Collect(runCtx, theChannel) + }(theChannel) + for { select { case <-runCtx.Done(): log.Printf("goroutine will be stopping: name=%q\n", theName) return - default: - err = theInput.Collect(runCtx, theChannel) } } @@ -276,15 +279,18 @@ func FLBPluginOutputPreRun(useHotReload C.int) int { runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message) go func(runCtx context.Context) { + go func(runCtx context.Context) { + err = theOutput.Flush(runCtx, theChannel) + }(runCtx) + for { select { case <-runCtx.Done(): log.Printf("goroutine will be stopping: name=%q\n", theName) return - default: - err = theOutput.Flush(runCtx, theChannel) } } + }(runCtx) if err != nil {