From 799bab14dead244530b144275c4dd7884cbe42d2 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Tue, 26 Sep 2023 17:55:00 +0900 Subject: [PATCH 1/7] cshared: plugin: Handle hot reloading Aligned lifecycles for channels and goroutines on hot-reloading * Added pre_run callbacks for Golang input and output plugins to ensure execute once before entering the running state * Binded resume/pause callbacks for Input Golang plugins * Added pre_exit callbacks for Golang input and output plugins to ensure aligning lifecycles for Golang's channels and goroutines Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- cshared.go | 173 +++++++++++++++++++++++++++++++++++++++-------------- plugin.go | 1 + 2 files changed, 128 insertions(+), 46 deletions(-) diff --git a/cshared.go b/cshared.go index 68bf910..8564676 100644 --- a/cshared.go +++ b/cshared.go @@ -44,6 +44,16 @@ var ( maxBufferedMessages = defaultMaxBufferedMessages ) +//export FLBPluginPreRegister +func FLBPluginPreRegister(hotReloading C.int) int { + if hotReloading == C.int(1) { + initWG.Add(1) + registerWG.Add(1) + } + + return input.FLB_OK +} + // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description // can be provided. // @@ -72,6 +82,24 @@ func FLBPluginRegister(def unsafe.Pointer) int { return out } +func cleanup() int { + if unregister != nil { + unregister() + unregister = nil + } + + if runCancel != nil { + runCancel() + runCancel = nil + } + + if theChannel != nil { + defer close(theChannel) + } + + return input.FLB_OK +} + // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. // here all the plugin context should be initialized and any data or flag required for // plugins to execute the collect or flush callback. @@ -80,8 +108,6 @@ func FLBPluginRegister(def unsafe.Pointer) int { func FLBPluginInit(ptr unsafe.Pointer) int { defer initWG.Done() - registerWG.Wait() - if theInput == nil && theOutput == nil { fmt.Fprintf(os.Stderr, "no input or output registered\n") return input.FLB_RETRY @@ -167,6 +193,104 @@ var theInputLock sync.Mutex // // This function will invoke Collect only once to preserve backward // compatible behavior. There are unit tests to enforce this behavior. +func prepareInputCollector() (err error) { + runCtx, runCancel = context.WithCancel(context.Background()) + theChannel = make(chan Message, maxBufferedMessages) + + theInputLock.Lock() + + go func(theChannel chan<- Message) { + defer theInputLock.Unlock() + + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) + } + }(theChannel) + + return err +} + +// FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been +// initialised, the plugin invoked only once before executing the input callbacks. +// +//export FLBPluginInputPreRun +func FLBPluginInputPreRun(useHotReload C.int) int { + registerWG.Wait() + + var err error + err = prepareInputCollector() + + if err != nil { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } + + return input.FLB_OK +} + + +// FLBPluginInputPause this method gets invoked by the fluent-bit runtime, once the plugin has been +// paused, the plugin invoked this method and entering paused state. +// +//export FLBPluginInputPause +func FLBPluginInputPause() { + if runCancel != nil { + runCancel() + runCancel = nil + } + + if theChannel != nil { + close(theChannel) + theChannel = nil + } +} + +// FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been +// resumeed, the plugin invoked this method and re-running state. +// +//export FLBPluginInputResume +func FLBPluginInputResume() { + var err error + err = prepareInputCollector() + + if err != nil { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + } +} + +//export FLBPluginOutputPreRun +func FLBPluginOutputPreRun(useHotReload C.int) int { + registerWG.Wait() + + var err error + runCtx, runCancel = context.WithCancel(context.Background()) + theChannel = make(chan Message) + go func(runCtx context.Context) { + for { + select { + case <-runCtx.Done(): + log.Printf("goroutine will be stopping: name=%q\n", theName) + return + default: + err = theOutput.Flush(runCtx, theChannel) + } + } + }(runCtx) + + if err != nil { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return output.FLB_ERROR + } + + return output.FLB_OK +} + +// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been +// initialised, the plugin implementation is responsible for handling the incoming data and the context +// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit +// will not execute further callbacks. // //export FLBPluginInputCallback func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { @@ -177,23 +301,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - once.Do(func() { - runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message, maxBufferedMessages) - - theInputLock.Lock() - - go func(theChannel chan<- Message) { - defer theInputLock.Unlock() - - err := theInput.Collect(runCtx, theChannel) - if err != nil { - fmt.Fprintf(os.Stderr, - "collect error: %s\n", err.Error()) - } - }(theChannel) - }) - buf := bytes.NewBuffer([]byte{}) for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { @@ -261,18 +368,6 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { } var err error - once.Do(func() { - runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) - go func() { - err = theOutput.Flush(runCtx, theChannel) - }() - }) - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return output.FLB_ERROR - } - select { case <-runCtx.Done(): err = runCtx.Err() @@ -374,21 +469,7 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { // //export FLBPluginExit func FLBPluginExit() int { - log.Printf("calling FLBPluginExit(): name=%q\n", theName) - - if unregister != nil { - unregister() - } - - if runCancel != nil { - runCancel() - } - - if theChannel != nil { - defer close(theChannel) - } - - return input.FLB_OK + return cleanup() } type flbInputConfigLoader struct { diff --git a/plugin.go b/plugin.go index 6f6c581..a9e16c8 100644 --- a/plugin.go +++ b/plugin.go @@ -33,6 +33,7 @@ var ( func init() { registerWG.Add(1) initWG.Add(1) + theChannel = nil } type Fluentbit struct { From 9a462e68005daf01ddda46f9415c8df600c128a9 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Mon, 2 Oct 2023 15:25:13 +0900 Subject: [PATCH 2/7] out_stdout: Migrate to use calyptia/plugin API Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- examples/out_gstdout/out_gstdout.go | 96 +++++++++++------------------ 1 file changed, 37 insertions(+), 59 deletions(-) diff --git a/examples/out_gstdout/out_gstdout.go b/examples/out_gstdout/out_gstdout.go index 4ddc854..de37386 100644 --- a/examples/out_gstdout/out_gstdout.go +++ b/examples/out_gstdout/out_gstdout.go @@ -1,82 +1,60 @@ package main import ( - "C" + "context" "fmt" - "time" - "unsafe" + "reflect" - "github.com/calyptia/plugin/output" + "github.com/calyptia/plugin" + "github.com/calyptia/plugin/metric" ) -//export FLBPluginRegister -func FLBPluginRegister(def unsafe.Pointer) int { - return output.FLBPluginRegister(def, "gstdout", "Stdout GO!") +func init() { + plugin.RegisterOutput("gstdout", "StdOut GO!", &gstdoutPlugin{}) } -// (fluentbit will call this) -// plugin (context) pointer to fluentbit context (state/ c code) -// -//export FLBPluginInit -func FLBPluginInit(plugin unsafe.Pointer) int { - // Example to retrieve an optional configuration parameter - param := output.FLBPluginConfigKey(plugin, "param") - fmt.Printf("[flb-go] plugin parameter = '%s'\n", param) - return output.FLB_OK +type gstdoutPlugin struct { + param string + flushCounter metric.Counter + log plugin.Logger } -//export FLBPluginFlush -func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { - var count int - var ret int - var ts interface{} - var record map[interface{}]interface{} +func (plug *gstdoutPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error { + plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "gstdout") + plug.param = fbit.Conf.String("param") + plug.log = fbit.Logger - // Create Fluent Bit decoder - dec := output.NewDecoder(data, int(length)) + return nil +} +func (plug gstdoutPlugin) Flush(ctx context.Context, ch <-chan plugin.Message) error { // Iterate Records - count = 0 - for { - // Extract Record - ret, ts, record = output.GetRecord(dec) - if ret != 0 { - break - } + count := 0 - var timestamp time.Time - switch t := ts.(type) { - case output.FLBTime: - timestamp = ts.(output.FLBTime).Time - case uint64: - timestamp = time.Unix(int64(t), 0) - default: - fmt.Println("time provided invalid, defaulting to now.") - timestamp = time.Now() - } + for msg := range ch { + plug.flushCounter.Add(1) + plug.log.Debug("[gstdout] operation proceeded") // Print record keys and values - fmt.Printf("[%d] %s: [%s, {", count, C.GoString(tag), - timestamp.String()) - for k, v := range record { - fmt.Printf("\"%s\": %v, ", k, v) + fmt.Printf("[%d] %s: [%d.%d, {", count, msg.Tag(), + msg.Time.Unix(), msg.Time.Nanosecond()) + rec := reflect.ValueOf(msg.Record) + if rec.Kind() == reflect.Map { + keyCount := 0 + for _, key := range rec.MapKeys() { + if keyCount > 0 { + fmt.Printf(", ") + } + strct := rec.MapIndex(key) + fmt.Printf("\"%s\":\"%v\"", key.Interface(), strct.Interface()) + keyCount++ + } } - fmt.Printf("}\n") + fmt.Printf("}]\n") count++ } - // Return options: - // - // output.FLB_OK = data have been processed. - // output.FLB_ERROR = unrecoverable error, do not try this again. - // output.FLB_RETRY = retry to flush later. - return output.FLB_OK + return nil } -//export FLBPluginExit -func FLBPluginExit() int { - return output.FLB_OK -} - -func main() { -} +func main() {} From 4df6533ba5e307afb3e2c340505e46b226aefd33 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Tue, 3 Oct 2023 16:15:48 +0900 Subject: [PATCH 3/7] cshared: Handle event format of Fluent Bit V2 Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- cshared.go | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/cshared.go b/cshared.go index 8564676..35373c0 100644 --- a/cshared.go +++ b/cshared.go @@ -414,24 +414,29 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { return output.FLB_ERROR } - if d := len(entry); d != 2 { - fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", d) + slice := reflect.ValueOf(entry) + if slice.Kind() != reflect.Slice || slice.Len() < 2 { + fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", slice.Len()) return output.FLB_ERROR } - ft, ok := entry[0].(bigEndianTime) - if !ok { + var t time.Time + ts := slice.Index(0).Interface() + switch ft := ts.(type) { + case bigEndianTime: + t = time.Time(ft) + case []interface{}: + s := reflect.ValueOf(ft) + st := s.Index(0).Interface() + ty := st.(bigEndianTime) + t = time.Time(ty) + default: fmt.Fprintf(os.Stderr, "unexpected entry time type: %T\n", entry[0]) return output.FLB_ERROR } - t := time.Time(ft) - - recVal, ok := entry[1].(map[any]any) - if !ok { - fmt.Fprintf(os.Stderr, "unexpected entry record type: %T\n", entry[1]) - return output.FLB_ERROR - } + data := slice.Index(1) + recVal := data.Interface().(map[interface{}]interface{}) var rec map[string]string if d := len(recVal); d != 0 { @@ -443,13 +448,22 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { return output.FLB_ERROR } - val, ok := v.([]uint8) - if !ok { + var val string + switch tv := v.(type) { + case []uint8: + val = string(tv) + case uint64: + val = strconv.FormatUint(tv, 10) + case int64: + val = strconv.FormatInt(tv, 10) + case bool: + val = strconv.FormatBool(tv) + default: fmt.Fprintf(os.Stderr, "unexpected record value type: %T\n", v) return output.FLB_ERROR } - rec[key] = string(val) + rec[key] = val } } From 2f9fc7b43c1f5c161e2baab5df739bae3aa8ba43 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Wed, 4 Oct 2023 17:11:53 +0900 Subject: [PATCH 4/7] cshared: test: Fix failing cshared tests Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- cshared_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/cshared_test.go b/cshared_test.go index 76f2fe7..a4d0156 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -40,6 +40,13 @@ func TestInputCallbackCtrlC(t *testing.T) { ptr := unsafe.Pointer(nil) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + return + } + go func() { FLBPluginInputCallback(&ptr, nil) cdone <- true @@ -86,6 +93,12 @@ func TestInputCallbackDangle(t *testing.T) { cdone := make(chan bool) ptr := unsafe.Pointer(nil) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + } + go func() { t := time.NewTicker(collectInterval) defer t.Stop() @@ -156,6 +169,13 @@ func TestInputCallbackInfinite(t *testing.T) { cshutdown := make(chan bool) ptr := unsafe.Pointer(nil) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + return + } + go func() { t := time.NewTicker(collectInterval) defer t.Stop() @@ -237,6 +257,13 @@ func TestInputCallbackLatency(t *testing.T) { cstarted := make(chan bool) cmsg := make(chan []byte) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + return + } + go func() { t := time.NewTicker(collectInterval) defer t.Stop() @@ -363,6 +390,12 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { concurrentWait.Add(64) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + } + go func(cstarted chan bool) { ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() From 24e9ea013ffe3530470e22c9fe9847548a91ba5c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Fri, 6 Oct 2023 17:52:17 +0900 Subject: [PATCH 5/7] cshared: Make goroutine for Collect to be able to exit Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- cshared.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index 35373c0..3c5a2b2 100644 --- a/cshared.go +++ b/cshared.go @@ -201,8 +201,16 @@ func prepareInputCollector() (err error) { go func(theChannel chan<- Message) { defer theInputLock.Unlock() + for { + select { + case <-runCtx.Done(): + log.Printf("goroutine will be stopping: name=%q\n", theName) + return + default: + err = theInput.Collect(runCtx, theChannel) + } + } - err := theInput.Collect(runCtx, theChannel) if err != nil { fmt.Fprintf(os.Stderr, "collect error: %s\n", err.Error()) From 81c0547aa59213823175d2e37c34bf7003f84ef1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Fri, 6 Oct 2023 19:04:14 +0900 Subject: [PATCH 6/7] cshared: Make backward compatible and interruptable goroutines Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- cshared.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cshared.go b/cshared.go index 3c5a2b2..951d3e0 100644 --- a/cshared.go +++ b/cshared.go @@ -201,13 +201,16 @@ func prepareInputCollector() (err error) { go func(theChannel chan<- Message) { defer theInputLock.Unlock() + + go func(theChannel chan<- Message) { + err = theInput.Collect(runCtx, theChannel) + }(theChannel) + for { select { case <-runCtx.Done(): log.Printf("goroutine will be stopping: name=%q\n", theName) return - default: - err = theInput.Collect(runCtx, theChannel) } } @@ -276,15 +279,18 @@ func FLBPluginOutputPreRun(useHotReload C.int) int { runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message) go func(runCtx context.Context) { + go func(runCtx context.Context) { + err = theOutput.Flush(runCtx, theChannel) + }(runCtx) + for { select { case <-runCtx.Done(): log.Printf("goroutine will be stopping: name=%q\n", theName) return - default: - err = theOutput.Flush(runCtx, theChannel) } } + }(runCtx) if err != nil { From 44630365671b2343720f2b3b11fcb6d34078e1e0 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake <hatake@calyptia.com> Date: Fri, 6 Oct 2023 19:09:35 +0900 Subject: [PATCH 7/7] examples: out_gstdout: Update dependencies Signed-off-by: Hiroshi Hatake <hatake@calyptia.com> --- examples/out_gstdout/go.mod | 9 +++++++-- examples/out_gstdout/go.sum | 4 ++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/examples/out_gstdout/go.mod b/examples/out_gstdout/go.mod index 4829621..e5635fe 100644 --- a/examples/out_gstdout/go.mod +++ b/examples/out_gstdout/go.mod @@ -2,6 +2,11 @@ module github.com/fluent/fluent-bit-go/examples/gstdout go 1.21.0 -require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963 +require github.com/calyptia/plugin v0.1.6 -replace github.com/fluent/fluent-bit-go => ../.. +require ( + github.com/calyptia/cmetrics-go v0.1.7 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect +) + +replace github.com/calyptia/plugin => ../.. diff --git a/examples/out_gstdout/go.sum b/examples/out_gstdout/go.sum index d612ace..297fc41 100644 --- a/examples/out_gstdout/go.sum +++ b/examples/out_gstdout/go.sum @@ -119,7 +119,10 @@ github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx2 github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0/go.mod h1:J4Y6YJm0qTWB9aFziB7cPeSyc6dOZFyJdteSeybVpXQ= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/cactus/go-statsd-client/statsd v0.0.0-20191106001114-12b4e2b38748/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= +github.com/calyptia/cmetrics-go v0.1.7 h1:A4kEFuFqVuWzytIbbey9KivHi0GQVjOkE2JJkdRbQ2U= github.com/calyptia/cmetrics-go v0.1.7/go.mod h1:K1IEPgICDtD4mJW7RVhfG4BkCywnjCdYZwbKs0jSw/U= +github.com/calyptia/plugin v1.1.1 h1:vOQvSKTXsAlNQ+/+VA1xDbhOwqKf/IeRBs0zMdOhT2k= +github.com/calyptia/plugin v1.1.1/go.mod h1:V5LqfR3UJ7G+NMf80Hm3VUgSocDJQJ7OuXlgorCw++M= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= @@ -714,6 +717,7 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=