Skip to content

Commit

Permalink
Rework fluent-bit-watcher and make use of the hot-reload mechanism
Browse files Browse the repository at this point in the history
This reworks the fluent-bit-watcher to be "single-child-process" only. Since we can now use hot-reload, we don't need to account for potential fluent-bit restarts "under the hood" and can instead rely on a single process to live "forever". Whenever that one fluent-bit process exits, the watcher should exit as well and Kubernetes' `CrashLookBackoff` take care of the rest.

That fundamental change in assumption allows us to vastly simplify the lifecycle in the watcher, which this does. It gets rid of all, sometimes unsafely shared, globals in favor of a single process being started, watched, kicked and eventually stopped.

SIGTERM is still forwarded to the child process. There is no need for a SIGKILL timeout anymore since Kubernetes will eventually send a SIGKILL itself if the process isn't exiting as expected.

Signed-off-by: Markus Thömmes <[email protected]>
  • Loading branch information
markusthoemmes committed Jan 30, 2024
1 parent 02e3d56 commit 4c1fb3a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 242 deletions.
316 changes: 74 additions & 242 deletions cmd/fluent-watcher/fluentbit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,296 +3,128 @@ package main
import (
"context"
"flag"
"math"
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"os/signal"
"syscall"
"time"

"github.com/fluent/fluent-operator/v2/pkg/filenotify"
"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"golang.org/x/sync/errgroup"
)

const (
defaultBinPath = "/fluent-bit/bin/fluent-bit"
defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf"
defaultWatchDir = "/fluent-bit/config"
defaultPollInterval = 1 * time.Second
defaultFlbTimeout = 30 * time.Second

MaxDelayTime = 5 * time.Minute
ResetTime = 10 * time.Minute
)

var (
logger log.Logger
cmd *exec.Cmd
flbTerminated chan bool
mutex sync.Mutex
restartTimes int32
timerCtx context.Context
timerCancel context.CancelFunc
)

var configPath string
var externalPluginPath string
var binPath string
var watchPath string
var poll bool
var exitOnFailure bool
var pollInterval time.Duration
var flbTerminationTimeout time.Duration

func main() {
var configPath string
var externalPluginPath string
var binPath string
var watchPath string
var poll bool
var pollInterval time.Duration

flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.")
flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.")
flag.StringVar(&externalPluginPath, "e", "", "Path to external plugin (shared lib)")
flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "If fluentbit exits with failure, also exit the watcher.")
flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.")
flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.")
flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.")
flag.DurationVar(&flbTerminationTimeout, "flb-timeout", defaultFlbTimeout, "Time to wait for FluentBit to gracefully terminate before sending SIGKILL.")

flag.Parse()

logger = log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339))

timerCtx, timerCancel = context.WithCancel(context.Background())

var g run.Group
{
// Termination handler.
g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
}
{
// Watch the Fluent bit, if the Fluent bit not exists or stopped, restart it.
cancel := make(chan struct{})
g.Add(
func() error {

for {
select {
case <-cancel:
return nil
default:
}

// Start fluent bit if it does not existed.
start()
// Wait for the fluent bit exit.
err := wait()
if exitOnFailure && err != nil {
_ = level.Error(logger).Log("msg", "Fluent bit exited with error; exiting watcher")
return err
}

timerCtx, timerCancel = context.WithCancel(context.Background())

// After the fluent bit exit, fluent bit watcher restarts it with an exponential
// back-off delay (1s, 2s, 4s, ...), that is capped at five minutes.
backoff()
}
},
func(err error) {
close(cancel)
stop()
resetTimer()
},
)
}
{
// Watch the config file, if the config file changed, stop Fluent bit.
watcher, err := newWatcher(poll, pollInterval)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
}

// Start watcher.
err = watcher.Add(watchPath)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
}

cancel := make(chan struct{})
g.Add(
func() error {

for {
select {
case <-cancel:
return nil
case event := <-watcher.Events():
if !isValidEvent(event) {
continue
}
// After the config file changed, it should stop the fluent bit,
// and resets the restart backoff timer.
if cmd != nil {
_ = level.Info(logger).Log("msg", "Config file changed, stopping Fluent Bit")
stop()
resetTimer()
_ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit")
}
case <-watcher.Errors():
_ = level.Error(logger).Log("msg", "Watcher stopped")
return nil
}
}
},
func(err error) {
_ = watcher.Close()
close(cancel)
},
)
}

if err := g.Run(); err != nil {
_ = level.Error(logger).Log("err", err)
os.Exit(1)
}
_ = level.Info(logger).Log("msg", "See you next time!")
}

func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) {
var err error
var watcher filenotify.FileWatcher

if poll {
watcher = filenotify.NewPollingWatcher(interval)
} else {
watcher, err = filenotify.New(interval)
}

if err != nil {
return nil, err
}

return watcher, nil
}

// Inspired by https://github.com/jimmidyson/configmap-reload
func isValidEvent(event fsnotify.Event) bool {
return event.Op == fsnotify.Create || event.Op == fsnotify.Write
}

func start() {
mutex.Lock()
defer mutex.Unlock()
signalCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

if cmd != nil {
return
}
logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339))

// First, launch the fluent-bit process.
args := []string{"--enable-hot-reload", "-c", configPath}
if externalPluginPath != "" {
cmd = exec.Command(binPath, "-c", configPath, "-e", externalPluginPath)
} else {
cmd = exec.Command(binPath, "-c", configPath)
args = append(args, "-e", externalPluginPath)
}
cmd := exec.Command(binPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
flbTerminated = make(chan bool, 1)
if err := cmd.Start(); err != nil {
_ = level.Error(logger).Log("msg", "start Fluent bit error", "error", err)
cmd = nil
return
_ = level.Error(logger).Log("msg", "failed to start fluent-bit", "error", err)
os.Exit(1)
}
_ = level.Info(logger).Log("msg", "fluent-bit started")

_ = level.Info(logger).Log("msg", "Fluent bit started")
}

func wait() error {
mutex.Lock()
if cmd == nil {
mutex.Unlock()
grp, grpCtx := errgroup.WithContext(context.Background())
grp.Go(func() error {
// Watch the process. If it exits, we want to crash immediately.
defer cancel()
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to run fluent-bit: %w", err)
}
return nil
}
mutex.Unlock()

startTime := time.Now()

err := cmd.Wait()
if err != nil {
_ = level.Error(logger).Log("msg", "Fluent bit exited", "error", err)
}
cmd = nil
flbTerminated <- true

// Once the fluent bit has executed for 10 minutes without any problems,
// it should resets the restart backoff timer.
if time.Since(startTime) >= ResetTime {
atomic.StoreInt32(&restartTimes, 0)
}

return err
}

func backoff() {

delayTime := time.Duration(math.Pow(2, float64(atomic.LoadInt32(&restartTimes)))) * time.Second
if delayTime >= MaxDelayTime {
delayTime = MaxDelayTime
}

_ = level.Info(logger).Log("msg", "backoff", "delay", delayTime)
})
grp.Go(func() error {
// Watch the config as it's loaded into the pod and trigger a config reload.
var watcher filenotify.FileWatcher
if poll {
watcher = filenotify.NewPollingWatcher(pollInterval)
} else {
var err error
watcher, err = filenotify.NewEventWatcher()
if err != nil {
return fmt.Errorf("failed to open event watcher: %w", err)
}
}

startTime := time.Now()
if err := watcher.Add(watchPath); err != nil {
return fmt.Errorf("failed to watch path %q: %w", watchPath, err)
}

timer := time.NewTimer(delayTime)
defer timer.Stop()
for {
select {
case <-signalCtx.Done():
return nil
case <-grpCtx.Done():
return nil
case event := <-watcher.Events():
if !isValidEvent(event) {
continue
}
_ = level.Info(logger).Log("msg", "Config file changed, reloading...")
if err := cmd.Process.Signal(syscall.SIGUSR1); err != nil {
return fmt.Errorf("failed to reload config: %w", err)
}
case err := <-watcher.Errors():
return fmt.Errorf("failed the watcher: %w", err)
}
}
})

select {
case <-timerCtx.Done():
_ = level.Info(logger).Log("msg", "context cancel", "actual", time.Since(startTime), "expected", delayTime)

atomic.StoreInt32(&restartTimes, 0)

return
case <-timer.C:
_ = level.Info(logger).Log("msg", "backoff timer done", "actual", time.Since(startTime), "expected", delayTime)

atomic.AddInt32(&restartTimes, 1)

return
case <-signalCtx.Done():
case <-grpCtx.Done():
}
}

func stop() {

mutex.Lock()
defer mutex.Unlock()

if cmd == nil || cmd.Process == nil {
_ = level.Info(logger).Log("msg", "Fluent Bit not running. No process to stop.")
return
}

// Send SIGTERM, if fluent-bit doesn't terminate in the specified timeframe, send SIGKILL
// Always try to gracefully shut down fluent-bit. This will allow `cmd.Wait` above to finish
// and thus allow `grp.Wait` below to return.
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
_ = level.Info(logger).Log("msg", "Error while terminating FluentBit", "error", err)
} else {
_ = level.Info(logger).Log("msg", "Sent SIGTERM to FluentBit, waiting max "+flbTerminationTimeout.String())
_ = level.Error(logger).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err)
// Do not exit on error here. The process might've died and that's okay.
}

select {
case <-time.After(flbTerminationTimeout):
_ = level.Info(logger).Log("msg", "FluentBit failed to terminate gracefully, killing process")
cmd.Process.Kill()
<-flbTerminated
case <-flbTerminated:
_ = level.Info(logger).Log("msg", "FluentBit terminated successfully")
if err := grp.Wait(); err != nil {
_ = level.Error(logger).Log("msg", "Failure during the run time of fluent-bit", "error", err)
os.Exit(1)
}
}

func resetTimer() {
timerCancel()
atomic.StoreInt32(&restartTimes, 0)
// Inspired by https://github.com/jimmidyson/configmap-reload
func isValidEvent(event fsnotify.Event) bool {
return event.Op == fsnotify.Create || event.Op == fsnotify.Write
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/oklog/run v1.1.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.30.0
golang.org/x/sync v0.6.0
k8s.io/api v0.26.3
k8s.io/apimachinery v0.27.4
k8s.io/client-go v0.26.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down

0 comments on commit 4c1fb3a

Please sign in to comment.