From 90d364bfc1d42187eaddc600b1a6077fa1cfa4f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 30 Jan 2024 16:32:25 +0100 Subject: [PATCH 1/4] Rework fluent-bit-watcher and make use of the hot-reload mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reworks the fluent-bit-watcher to be "single-child-process" only. Since we can now use hot-reload, we don't need to account for potential fluent-bit restarts "under the hood" and can instead rely on a single process to live "forever". Whenever that one fluent-bit process exits, the watcher should exit as well and Kubernetes' `CrashLookBackoff` take care of the rest. That fundamental change in assumption allows us to vastly simplify the lifecycle in the watcher, which this does. It gets rid of all, sometimes unsafely shared, globals in favor of a single process being started, watched, kicked and eventually stopped. SIGTERM is still forwarded to the child process. There is no need for a SIGKILL timeout anymore since Kubernetes will eventually send a SIGKILL itself if the process isn't exiting as expected. Signed-off-by: Markus Thömmes --- cmd/fluent-watcher/fluentbit/main.go | 316 +++++++-------------------- go.mod | 1 + go.sum | 2 + 3 files changed, 77 insertions(+), 242 deletions(-) diff --git a/cmd/fluent-watcher/fluentbit/main.go b/cmd/fluent-watcher/fluentbit/main.go index 4d19fccc5..64f29ff26 100644 --- a/cmd/fluent-watcher/fluentbit/main.go +++ b/cmd/fluent-watcher/fluentbit/main.go @@ -3,11 +3,10 @@ package main import ( "context" "flag" - "math" + "fmt" "os" "os/exec" - "sync" - "sync/atomic" + "os/signal" "syscall" "time" @@ -15,7 +14,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/oklog/run" + "golang.org/x/sync/errgroup" ) const ( @@ -23,276 +22,109 @@ const ( defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf" defaultWatchDir = "/fluent-bit/config" defaultPollInterval = 1 * time.Second - defaultFlbTimeout = 30 * time.Second - - MaxDelayTime = 5 * time.Minute - ResetTime = 10 * time.Minute -) - -var ( - logger log.Logger - cmd *exec.Cmd - flbTerminated chan bool - mutex sync.Mutex - restartTimes int32 - timerCtx context.Context - timerCancel context.CancelFunc ) -var configPath string -var externalPluginPath string -var binPath string -var watchPath string -var poll bool -var exitOnFailure bool -var pollInterval time.Duration -var flbTerminationTimeout time.Duration - func main() { + var configPath string + var externalPluginPath string + var binPath string + var watchPath string + var poll bool + var pollInterval time.Duration flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.") flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.") flag.StringVar(&externalPluginPath, "e", "", "Path to external plugin (shared lib)") - flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "If fluentbit exits with failure, also exit the watcher.") flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.") flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.") flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.") - flag.DurationVar(&flbTerminationTimeout, "flb-timeout", defaultFlbTimeout, "Time to wait for FluentBit to gracefully terminate before sending SIGKILL.") - flag.Parse() - logger = log.NewLogfmtLogger(os.Stdout) - logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339)) - - timerCtx, timerCancel = context.WithCancel(context.Background()) - - var g run.Group - { - // Termination handler. - g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) - } - { - // Watch the Fluent bit, if the Fluent bit not exists or stopped, restart it. - cancel := make(chan struct{}) - g.Add( - func() error { - - for { - select { - case <-cancel: - return nil - default: - } - - // Start fluent bit if it does not existed. - start() - // Wait for the fluent bit exit. - err := wait() - if exitOnFailure && err != nil { - _ = level.Error(logger).Log("msg", "Fluent bit exited with error; exiting watcher") - return err - } - - timerCtx, timerCancel = context.WithCancel(context.Background()) - - // After the fluent bit exit, fluent bit watcher restarts it with an exponential - // back-off delay (1s, 2s, 4s, ...), that is capped at five minutes. - backoff() - } - }, - func(err error) { - close(cancel) - stop() - resetTimer() - }, - ) - } - { - // Watch the config file, if the config file changed, stop Fluent bit. - watcher, err := newWatcher(poll, pollInterval) - if err != nil { - _ = level.Error(logger).Log("err", err) - return - } - - // Start watcher. - err = watcher.Add(watchPath) - if err != nil { - _ = level.Error(logger).Log("err", err) - return - } - - cancel := make(chan struct{}) - g.Add( - func() error { - - for { - select { - case <-cancel: - return nil - case event := <-watcher.Events(): - if !isValidEvent(event) { - continue - } - // After the config file changed, it should stop the fluent bit, - // and resets the restart backoff timer. - if cmd != nil { - _ = level.Info(logger).Log("msg", "Config file changed, stopping Fluent Bit") - stop() - resetTimer() - _ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit") - } - case <-watcher.Errors(): - _ = level.Error(logger).Log("msg", "Watcher stopped") - return nil - } - } - }, - func(err error) { - _ = watcher.Close() - close(cancel) - }, - ) - } - - if err := g.Run(); err != nil { - _ = level.Error(logger).Log("err", err) - os.Exit(1) - } - _ = level.Info(logger).Log("msg", "See you next time!") -} - -func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) { - var err error - var watcher filenotify.FileWatcher - - if poll { - watcher = filenotify.NewPollingWatcher(interval) - } else { - watcher, err = filenotify.New(interval) - } - - if err != nil { - return nil, err - } - - return watcher, nil -} - -// Inspired by https://github.com/jimmidyson/configmap-reload -func isValidEvent(event fsnotify.Event) bool { - return event.Op == fsnotify.Create || event.Op == fsnotify.Write -} - -func start() { - mutex.Lock() - defer mutex.Unlock() + signalCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() - if cmd != nil { - return - } + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339)) + // First, launch the fluent-bit process. + args := []string{"--enable-hot-reload", "-c", configPath} if externalPluginPath != "" { - cmd = exec.Command(binPath, "-c", configPath, "-e", externalPluginPath) - } else { - cmd = exec.Command(binPath, "-c", configPath) + args = append(args, "-e", externalPluginPath) } + cmd := exec.Command(binPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - flbTerminated = make(chan bool, 1) if err := cmd.Start(); err != nil { - _ = level.Error(logger).Log("msg", "start Fluent bit error", "error", err) - cmd = nil - return + _ = level.Error(logger).Log("msg", "failed to start fluent-bit", "error", err) + os.Exit(1) } + _ = level.Info(logger).Log("msg", "fluent-bit started") - _ = level.Info(logger).Log("msg", "Fluent bit started") -} - -func wait() error { - mutex.Lock() - if cmd == nil { - mutex.Unlock() + grp, grpCtx := errgroup.WithContext(context.Background()) + grp.Go(func() error { + // Watch the process. If it exits, we want to crash immediately. + defer cancel() + if err := cmd.Wait(); err != nil { + return fmt.Errorf("failed to run fluent-bit: %w", err) + } return nil - } - mutex.Unlock() - - startTime := time.Now() - - err := cmd.Wait() - if err != nil { - _ = level.Error(logger).Log("msg", "Fluent bit exited", "error", err) - } - cmd = nil - flbTerminated <- true - - // Once the fluent bit has executed for 10 minutes without any problems, - // it should resets the restart backoff timer. - if time.Since(startTime) >= ResetTime { - atomic.StoreInt32(&restartTimes, 0) - } - - return err -} - -func backoff() { - - delayTime := time.Duration(math.Pow(2, float64(atomic.LoadInt32(&restartTimes)))) * time.Second - if delayTime >= MaxDelayTime { - delayTime = MaxDelayTime - } - - _ = level.Info(logger).Log("msg", "backoff", "delay", delayTime) + }) + grp.Go(func() error { + // Watch the config as it's loaded into the pod and trigger a config reload. + var watcher filenotify.FileWatcher + if poll { + watcher = filenotify.NewPollingWatcher(pollInterval) + } else { + var err error + watcher, err = filenotify.NewEventWatcher() + if err != nil { + return fmt.Errorf("failed to open event watcher: %w", err) + } + } - startTime := time.Now() + if err := watcher.Add(watchPath); err != nil { + return fmt.Errorf("failed to watch path %q: %w", watchPath, err) + } - timer := time.NewTimer(delayTime) - defer timer.Stop() + for { + select { + case <-signalCtx.Done(): + return nil + case <-grpCtx.Done(): + return nil + case event := <-watcher.Events(): + if !isValidEvent(event) { + continue + } + _ = level.Info(logger).Log("msg", "Config file changed, reloading...") + if err := cmd.Process.Signal(syscall.SIGHUP); err != nil { + return fmt.Errorf("failed to reload config: %w", err) + } + case err := <-watcher.Errors(): + return fmt.Errorf("failed the watcher: %w", err) + } + } + }) select { - case <-timerCtx.Done(): - _ = level.Info(logger).Log("msg", "context cancel", "actual", time.Since(startTime), "expected", delayTime) - - atomic.StoreInt32(&restartTimes, 0) - - return - case <-timer.C: - _ = level.Info(logger).Log("msg", "backoff timer done", "actual", time.Since(startTime), "expected", delayTime) - - atomic.AddInt32(&restartTimes, 1) - - return + case <-signalCtx.Done(): + case <-grpCtx.Done(): } -} -func stop() { - - mutex.Lock() - defer mutex.Unlock() - - if cmd == nil || cmd.Process == nil { - _ = level.Info(logger).Log("msg", "Fluent Bit not running. No process to stop.") - return - } - - // Send SIGTERM, if fluent-bit doesn't terminate in the specified timeframe, send SIGKILL + // Always try to gracefully shut down fluent-bit. This will allow `cmd.Wait` above to finish + // and thus allow `grp.Wait` below to return. if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - _ = level.Info(logger).Log("msg", "Error while terminating FluentBit", "error", err) - } else { - _ = level.Info(logger).Log("msg", "Sent SIGTERM to FluentBit, waiting max "+flbTerminationTimeout.String()) + _ = level.Error(logger).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err) + // Do not exit on error here. The process might've died and that's okay. } - select { - case <-time.After(flbTerminationTimeout): - _ = level.Info(logger).Log("msg", "FluentBit failed to terminate gracefully, killing process") - cmd.Process.Kill() - <-flbTerminated - case <-flbTerminated: - _ = level.Info(logger).Log("msg", "FluentBit terminated successfully") + if err := grp.Wait(); err != nil { + _ = level.Error(logger).Log("msg", "Failure during the run time of fluent-bit", "error", err) + os.Exit(1) } } -func resetTimer() { - timerCancel() - atomic.StoreInt32(&restartTimes, 0) +// Inspired by https://github.com/jimmidyson/configmap-reload +func isValidEvent(event fsnotify.Event) bool { + return event.Op == fsnotify.Create || event.Op == fsnotify.Write } diff --git a/go.mod b/go.mod index b21b2c76f..178beed42 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/oklog/run v1.1.0 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.30.0 + golang.org/x/sync v0.6.0 k8s.io/api v0.26.3 k8s.io/apimachinery v0.27.4 k8s.io/client-go v0.26.3 diff --git a/go.sum b/go.sum index af3c1d1ee..7439d0cf9 100644 --- a/go.sum +++ b/go.sum @@ -408,6 +408,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 3a41f7a1f152856c28a3a7463302a61fb39da357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Wed, 31 Jan 2024 11:38:46 +0100 Subject: [PATCH 2/4] Silence errors if the process has already exited MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Markus Thömmes --- cmd/fluent-watcher/fluentbit/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/fluent-watcher/fluentbit/main.go b/cmd/fluent-watcher/fluentbit/main.go index 64f29ff26..7d489ed1d 100644 --- a/cmd/fluent-watcher/fluentbit/main.go +++ b/cmd/fluent-watcher/fluentbit/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "os" @@ -113,7 +114,7 @@ func main() { // Always try to gracefully shut down fluent-bit. This will allow `cmd.Wait` above to finish // and thus allow `grp.Wait` below to return. - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil && !errors.Is(err, os.ErrProcessDone) { _ = level.Error(logger).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err) // Do not exit on error here. The process might've died and that's okay. } From 051ca718442672c8b659229f0305c7d075e610c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Wed, 31 Jan 2024 11:43:57 +0100 Subject: [PATCH 3/4] Keep old flags and deprecate them for backwards compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Markus Thömmes --- cmd/fluent-watcher/fluentbit/main.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/fluent-watcher/fluentbit/main.go b/cmd/fluent-watcher/fluentbit/main.go index 7d489ed1d..712e6b6b5 100644 --- a/cmd/fluent-watcher/fluentbit/main.go +++ b/cmd/fluent-watcher/fluentbit/main.go @@ -32,13 +32,19 @@ func main() { var watchPath string var poll bool var pollInterval time.Duration - flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.") flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.") flag.StringVar(&externalPluginPath, "e", "", "Path to external plugin (shared lib)") flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.") flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.") flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.") + + // Deprecated flags to be removed in one of the next releases. + var exitOnFailure bool + var flbTerminationTimeout time.Duration + flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "Deprecated: This has no effect anymore.") + flag.DurationVar(&flbTerminationTimeout, "flb-timeout", 0, "Deprecated: This has no effect anymore.") + flag.Parse() signalCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -47,6 +53,13 @@ func main() { logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339)) + if exitOnFailure { + level.Warn(logger).Log("--exit-on-failure is deprecated. The process will exit no matter what if fluent-bit exits so this can safely be removed.") + } + if flbTerminationTimeout > 0 { + level.Warn(logger).Log("--flb-timeout is deprecated. Consider setting the terminationGracePeriod field on the `(Cluster)FluentBit` instance.") + } + // First, launch the fluent-bit process. args := []string{"--enable-hot-reload", "-c", configPath} if externalPluginPath != "" { From 468acdc159c08431b1c83af79a1c616c3b935284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Wed, 31 Jan 2024 12:06:52 +0100 Subject: [PATCH 4/4] Fully qualify default parsers.conf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Markus Thömmes --- .../fluentbit/v1alpha2/clusterfluentbitconfig_types.go | 10 +++++++++- .../v1alpha2/clusterfluentbitconfig_types_test.go | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go index dc4183438..2b7c64721 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go @@ -170,7 +170,15 @@ func (s *Service) Params() *params.KVs { m.Insert("Log_Level", s.LogLevel) } if s.ParsersFile != "" { - m.Insert("Parsers_File", s.ParsersFile) + if s.ParsersFile == "parsers.conf" { + // For backwards compatibility, if the "usual" parsers.conf is + // configured, actually write the fully-qualified path in order + // to not break hot-reload. + // See https://github.com/fluent/fluent-bit/issues/8275. + m.Insert("Parsers_File", "/fluent-bit/etc/parsers.conf") + } else { + m.Insert("Parsers_File", s.ParsersFile) + } } if s.Storage != nil { if s.Storage.Path != "" { diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go index c47bc9b8c..ea4b7fcb3 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go @@ -19,7 +19,7 @@ var expected = `[Service] Grace 30 Http_Server true Log_Level info - Parsers_File parsers.conf + Parsers_File /fluent-bit/etc/parsers.conf [Input] Name tail Alias input0_alias @@ -104,7 +104,7 @@ var expectedK8s = `[Service] Grace 30 Http_Server true Log_Level info - Parsers_File parsers.conf + Parsers_File /fluent-bit/etc/parsers.conf [Input] Name tail Path /var/log/containers/*.log