From 52e6398c394539f9ed6068f65397c148ae2128fd Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Jan 2023 18:42:51 -0300 Subject: [PATCH 01/22] cshared: use a buffered channel and cat multiple records into a single buffer per input call. Signed-off-by: Phillip Whelan --- cshared.go | 65 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/cshared.go b/cshared.go index afa215d..ff2a87b 100644 --- a/cshared.go +++ b/cshared.go @@ -6,6 +6,7 @@ package plugin import "C" import ( + "bytes" "context" "errors" "fmt" @@ -133,7 +134,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) + theChannel = make(chan Message, 16) go func() { err = theInput.Collect(runCtx, theChannel) }() @@ -143,39 +144,45 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_ERROR } - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_OK - } + buf := bytes.NewBuffer([]byte{}) - t := input.FLBTime{Time: msg.Time} - b, err := input.NewEncoder().Encode([]any{t, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "encode: %s\n", err) - return input.FLB_ERROR - } - - cdata := C.CBytes(b) - - *data = cdata - *csize = C.size_t(len(b)) + for loop := true; loop; { + select { + case msg, ok := <-theChannel: + if !ok { + return input.FLB_ERROR + } - // C.free(unsafe.Pointer(cdata)) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR + t := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{t, msg.Record}) + if err != nil { + fmt.Fprintf(os.Stderr, "encode: %s\n", err) + return input.FLB_ERROR + } + buf.Grow(len(b)) + buf.Write(b) + default: + loop = false + case <-runCtx.Done(): + err := runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + loop = false } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - default: - break } + b := buf.Bytes() + cdata := C.CBytes(b) + + *data = cdata + *csize = C.size_t(len(b)) + return input.FLB_OK } From 00bdd0c0e249fdd6af1a706e8983c75989fa2464 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 11 Jan 2023 15:55:59 -0300 Subject: [PATCH 02/22] cshared: increase channel buffer size. Signed-off-by: Phillip Whelan --- cshared.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index ff2a87b..64e9126 100644 --- a/cshared.go +++ b/cshared.go @@ -134,7 +134,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message, 16) + theChannel = make(chan Message, 256) go func() { err = theInput.Collect(runCtx, theChannel) }() From c2b676b5a96d109225801e0bea049c0acba1aebd Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 12 Jan 2023 16:51:51 -0300 Subject: [PATCH 03/22] cshared: use an intermediate buffer that gets flushed on each input callback. Signed-off-by: Phillip Whelan --- cshared.go | 45 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/cshared.go b/cshared.go index 64e9126..b97fe5d 100644 --- a/cshared.go +++ b/cshared.go @@ -17,6 +17,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -32,6 +33,7 @@ var ( unregister func() cmt *cmetrics.Context logger Logger + buflock sync.Mutex ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -134,10 +136,34 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message, 256) + // we need to configure this part.... + theChannel = make(chan Message, 300000) + // do we need to buffer this part??? + cbuf := make(chan Message, 16) + go func() { - err = theInput.Collect(runCtx, theChannel) + err = theInput.Collect(runCtx, cbuf) }() + go func(cbuf chan Message) { + t := time.NewTicker(1 * time.Second) + for { + buflock.Lock() + select { + case msg, ok := <-cbuf: + if !ok { + continue + } + buflock.Unlock() + theChannel <- msg + buflock.Lock() + case <-t.C: + buflock.Unlock() + buflock.Lock() + default: + } + buflock.Unlock() + } + }(cbuf) }) if err != nil { fmt.Fprintf(os.Stderr, "run: %s\n", err) @@ -145,8 +171,9 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } buf := bytes.NewBuffer([]byte{}) + buflock.Lock() - for loop := true; loop; { + for loop := len(theChannel) > 0; loop; { select { case msg, ok := <-theChannel: if !ok { @@ -176,12 +203,14 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { loop = false } } + buflock.Unlock() - b := buf.Bytes() - cdata := C.CBytes(b) - - *data = cdata - *csize = C.size_t(len(b)) + if buf.Len() > 0 { + b := buf.Bytes() + cdata := C.CBytes(b) + *data = cdata + *csize = C.size_t(len(b)) + } return input.FLB_OK } From 0cba01410f6edb9988eecfa6e3579fec252c078b Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 8 Sep 2023 17:07:20 -0300 Subject: [PATCH 04/22] fix for 100% cpu utilization. Signed-off-by: Phillip Whelan --- cshared.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cshared.go b/cshared.go index b97fe5d..b72fe0c 100644 --- a/cshared.go +++ b/cshared.go @@ -159,7 +159,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { case <-t.C: buflock.Unlock() buflock.Lock() - default: } buflock.Unlock() } From b7c5437a8f32ed76c36960f63a8175cec95127f4 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 11:26:56 -0300 Subject: [PATCH 05/22] cshared: defer the stopping of the main loop timer. Signed-off-by: Phillip Whelan --- cshared.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cshared.go b/cshared.go index b72fe0c..0b9157b 100644 --- a/cshared.go +++ b/cshared.go @@ -146,6 +146,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { }() go func(cbuf chan Message) { t := time.NewTicker(1 * time.Second) + defer t.Stop() for { buflock.Lock() select { From 8c3db6e89f6177e41c595d1540a54bcbd5e80721 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 11:30:09 -0300 Subject: [PATCH 06/22] cshared: log Collect error. Signed-off-by: Phillip Whelan --- cshared.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index 0b9157b..cda8add 100644 --- a/cshared.go +++ b/cshared.go @@ -142,7 +142,9 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { cbuf := make(chan Message, 16) go func() { - err = theInput.Collect(runCtx, cbuf) + if err := theInput.Collect(runCtx, cbuf); err != nil { + fmt.Fprintf("Error collecting input: %s\n", err) + } }() go func(cbuf chan Message) { t := time.NewTicker(1 * time.Second) From 52bc5c8e70563f51bd73fcab01b9170b54a76f41 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:22:58 -0300 Subject: [PATCH 07/22] cshared: emulate the old collect interval. Signed-off-by: Phillip Whelan --- cshared.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/cshared.go b/cshared.go index cda8add..63085b7 100644 --- a/cshared.go +++ b/cshared.go @@ -141,11 +141,26 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { // do we need to buffer this part??? cbuf := make(chan Message, 16) - go func() { - if err := theInput.Collect(runCtx, cbuf); err != nil { - fmt.Fprintf("Error collecting input: %s\n", err) + // Most plugins expect Collect to be invoked once and then takes over the + // input thread by running in an infinite loop. Here we simulate this + // behaviour and also simulate the original behaviour for those plugins that + // do not hold on to the thread. + go func(runCtx context.Context) { + t := time.NewTicker(1000 * time.Nanosecond) + defer t.Stop() + + for { + select { + case <-runCtx.Done(): + return + case <-t.C: + if err := theInput.Collect(runCtx, cbuf); err != nil { + fmt.Fprintf("Error collecting input: %s\n", err) + } + } } - }() + }(runCtx) + go func(cbuf chan Message) { t := time.NewTicker(1 * time.Second) defer t.Stop() From a9fcbb8febc47d8e155f6e3dcbf7c466c0a5cb47 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:23:42 -0300 Subject: [PATCH 08/22] cshared: comment the use of the buffer lock and why it is being used. Signed-off-by: Phillip Whelan --- cshared.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index 63085b7..9745f19 100644 --- a/cshared.go +++ b/cshared.go @@ -161,9 +161,15 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } }(runCtx) + // Limit submits to a single full buffer for each second. This limits + // the amount of locking when invoking the fluent-bit API. go func(cbuf chan Message) { t := time.NewTicker(1 * time.Second) defer t.Stop() + + // Use a mutex lock for the buffer to avoid filling the buffer more than + // once per period (1s). We also use the mutex lock to avoid infinitely + // filling the buffer while it is being flushed to fluent-bit. for { buflock.Lock() select { @@ -188,8 +194,11 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } buf := bytes.NewBuffer([]byte{}) - buflock.Lock() + // Here we read all the messages produced in the internal buffer submit them + // once for each period invocation. We lock the buffer so no new messages + // arrive while draining the buffer. + buflock.Lock() for loop := len(theChannel) > 0; loop; { select { case msg, ok := <-theChannel: @@ -206,6 +215,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { buf.Grow(len(b)) buf.Write(b) default: + // when there are no more messages explicitly mark the loop be terminated. loop = false case <-runCtx.Done(): err := runCtx.Err() From 48922b54ba420bee0d5913cf986f0b5b8339e150 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:24:03 -0300 Subject: [PATCH 09/22] cshared: exit buffer loop when runCtx is done. Signed-off-by: Phillip Whelan --- cshared.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cshared.go b/cshared.go index 9745f19..6433b73 100644 --- a/cshared.go +++ b/cshared.go @@ -183,6 +183,9 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { case <-t.C: buflock.Unlock() buflock.Lock() + case <-runCtx.Done(): + buflock.Unlock() + return } buflock.Unlock() } From dbff4a81b8055333bd9c8e95573cfc769d096a54 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:29:26 -0300 Subject: [PATCH 10/22] cshared: fix minor misspellings. Signed-off-by: Phillip Whelan --- cshared.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cshared.go b/cshared.go index 6433b73..9515c22 100644 --- a/cshared.go +++ b/cshared.go @@ -65,7 +65,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { } // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. -// here all the plugin context should be initialised and any data or flag required for +// here all the plugin context should be initialized and any data or flag required for // plugins to execute the collect or flush callback. // //export FLBPluginInit @@ -120,7 +120,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialised, the plugin implementation is responsible for handling the incoming data and the context +// initialized, the plugin implementation is responsible for handling the incoming data and the context // that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit // will not execute further callbacks. // @@ -143,7 +143,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { // Most plugins expect Collect to be invoked once and then takes over the // input thread by running in an infinite loop. Here we simulate this - // behaviour and also simulate the original behaviour for those plugins that + // behavior and also simulate the original behavior for those plugins that // do not hold on to the thread. go func(runCtx context.Context) { t := time.NewTicker(1000 * time.Nanosecond) @@ -155,7 +155,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return case <-t.C: if err := theInput.Collect(runCtx, cbuf); err != nil { - fmt.Fprintf("Error collecting input: %s\n", err) + fmt.Fprintf(os.Stderr, "Error collecting input: %s\n", err) } } } From 17975aa57c9e694c676a655a4532fbc3961f4e23 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:51:40 -0300 Subject: [PATCH 11/22] cshared: fix 'cannot use err (variable of type error) as string'. Signed-off-by: Phillip Whelan --- cshared.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index 9515c22..070d538 100644 --- a/cshared.go +++ b/cshared.go @@ -155,7 +155,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return case <-t.C: if err := theInput.Collect(runCtx, cbuf); err != nil { - fmt.Fprintf(os.Stderr, "Error collecting input: %s\n", err) + fmt.Fprintf(os.Stderr, "Error collecting input: %s\n", err.Error()) } } } From 8dbb0baa84da0640d29ebf18a6001f6fca2bb984 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 13:53:39 -0300 Subject: [PATCH 12/22] cshared: define the collect interval as a const. Signed-off-by: Phillip Whelan --- cshared.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index 070d538..9c0a66d 100644 --- a/cshared.go +++ b/cshared.go @@ -36,6 +36,11 @@ var ( buflock sync.Mutex ) +const ( + collectInterval = time.Nanosecond * 1000 + +) + // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description // can be provided. // @@ -146,7 +151,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { // behavior and also simulate the original behavior for those plugins that // do not hold on to the thread. go func(runCtx context.Context) { - t := time.NewTicker(1000 * time.Nanosecond) + t := time.NewTicker(collectInterval) defer t.Stop() for { From ff41f316efdaedd74e056a0adaeae841caddc937 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 14:02:53 -0300 Subject: [PATCH 13/22] cshared: remove unused declaration and code. Signed-off-by: Phillip Whelan --- cshared.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cshared.go b/cshared.go index 9c0a66d..a1ebf58 100644 --- a/cshared.go +++ b/cshared.go @@ -138,7 +138,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) // we need to configure this part.... @@ -196,10 +195,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } }(cbuf) }) - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } buf := bytes.NewBuffer([]byte{}) From a7af12acaa4a76043833cabc45a232a84d462f55 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 14:03:17 -0300 Subject: [PATCH 14/22] cshared: fix nolintlint (remove unused nolint directives). Signed-off-by: Phillip Whelan --- cshared.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cshared.go b/cshared.go index a1ebf58..1d2d444 100644 --- a/cshared.go +++ b/cshared.go @@ -257,7 +257,8 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation. // //export FLBPluginFlush -//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions. +// TODO: refactor into smaller functions. +//nolint:funlen //ignore length requirement for this function func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { initWG.Wait() From af57b6cd56ec74b2732f0968e6fcda6d80a6c873 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 14:04:15 -0300 Subject: [PATCH 15/22] cshared: move mnd for the message buffer size to a const. Signed-off-by: Phillip Whelan --- cshared.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cshared.go b/cshared.go index 1d2d444..20d843e 100644 --- a/cshared.go +++ b/cshared.go @@ -38,7 +38,7 @@ var ( const ( collectInterval = time.Nanosecond * 1000 - + maxBufferedMessages = 300000 ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -141,7 +141,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) // we need to configure this part.... - theChannel = make(chan Message, 300000) + theChannel = make(chan Message, maxBufferedMessages) // do we need to buffer this part??? cbuf := make(chan Message, 16) From e9110a3d368826b4970102c1c47406348c730742 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 12 Sep 2023 14:11:39 -0300 Subject: [PATCH 16/22] cshared: comment and set as consts all magic numbers. Signed-off-by: Phillip Whelan --- cshared.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cshared.go b/cshared.go index 20d843e..d498206 100644 --- a/cshared.go +++ b/cshared.go @@ -37,8 +37,16 @@ var ( ) const ( + // collectInterval is equal to the interval originally used in the high + // frequency patch in fluent-bit. collectInterval = time.Nanosecond * 1000 + // maxBufferedMessages is the number of messages that will be buffered + // between each fluent-bit interval (approx 1 second). maxBufferedMessages = 300000 + // maxConcurrentChannels is the number of channels that will be buffered + // for incoming messages to the message buffer. These messages will be + // ingested as quickly as possible until the buffer is full. + maxConcurrentChannels = 16 ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -140,10 +148,9 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - // we need to configure this part.... + theChannel = make(chan Message, maxBufferedMessages) - // do we need to buffer this part??? - cbuf := make(chan Message, 16) + cbuf := make(chan Message, maxConcurrentChannels) // Most plugins expect Collect to be invoked once and then takes over the // input thread by running in an infinite loop. Here we simulate this From e31c026b0cbd2a483387e51c60508dd69d1383b8 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 13 Sep 2023 16:20:04 -0300 Subject: [PATCH 17/22] cshared: refactor buffering to remove the use of the cbuf intermediate channel. Signed-off-by: Phillip Whelan --- cshared.go | 113 +++++++++++++++++------------------------------------ 1 file changed, 35 insertions(+), 78 deletions(-) diff --git a/cshared.go b/cshared.go index d498206..d101262 100644 --- a/cshared.go +++ b/cshared.go @@ -17,7 +17,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "unsafe" @@ -29,24 +28,17 @@ import ( "github.com/calyptia/plugin/output" ) -var ( - unregister func() - cmt *cmetrics.Context - logger Logger - buflock sync.Mutex -) - const ( - // collectInterval is equal to the interval originally used in the high - // frequency patch in fluent-bit. - collectInterval = time.Nanosecond * 1000 // maxBufferedMessages is the number of messages that will be buffered // between each fluent-bit interval (approx 1 second). - maxBufferedMessages = 300000 - // maxConcurrentChannels is the number of channels that will be buffered - // for incoming messages to the message buffer. These messages will be - // ingested as quickly as possible until the buffer is full. - maxConcurrentChannels = 16 + defaultMaxBufferedMessages = 300000 +) + +var ( + unregister func() + cmt *cmetrics.Context + logger Logger + maxBufferedMessages = defaultMaxBufferedMessages ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -110,6 +102,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } err = theInput.Init(ctx, fbit) + if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { + maxbuffered, err := strconv.Atoi(maxbuffered) + if err != nil { + maxBufferedMessages = maxbuffered + } + } } else { conf := &flbOutputConfigLoader{ptr: ptr} cmt, err = output.FLBPluginGetCMetricsContext(ptr) @@ -132,6 +130,13 @@ func FLBPluginInit(ptr unsafe.Pointer) int { return input.FLB_OK } +func min(a, b int) int { + if a < b { + return a + } + return b +} + // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been // initialized, the plugin implementation is responsible for handling the incoming data and the context // that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit @@ -148,68 +153,20 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message, maxBufferedMessages) - cbuf := make(chan Message, maxConcurrentChannels) - - // Most plugins expect Collect to be invoked once and then takes over the - // input thread by running in an infinite loop. Here we simulate this - // behavior and also simulate the original behavior for those plugins that - // do not hold on to the thread. - go func(runCtx context.Context) { - t := time.NewTicker(collectInterval) - defer t.Stop() - - for { - select { - case <-runCtx.Done(): - return - case <-t.C: - if err := theInput.Collect(runCtx, cbuf); err != nil { - fmt.Fprintf(os.Stderr, "Error collecting input: %s\n", err.Error()) - } - } - } - }(runCtx) - - // Limit submits to a single full buffer for each second. This limits - // the amount of locking when invoking the fluent-bit API. - go func(cbuf chan Message) { - t := time.NewTicker(1 * time.Second) - defer t.Stop() - - // Use a mutex lock for the buffer to avoid filling the buffer more than - // once per period (1s). We also use the mutex lock to avoid infinitely - // filling the buffer while it is being flushed to fluent-bit. - for { - buflock.Lock() - select { - case msg, ok := <-cbuf: - if !ok { - continue - } - buflock.Unlock() - theChannel <- msg - buflock.Lock() - case <-t.C: - buflock.Unlock() - buflock.Lock() - case <-runCtx.Done(): - buflock.Unlock() - return - } - buflock.Unlock() + + go func(theChannel chan<- Message) { + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) } - }(cbuf) + }(theChannel) }) buf := bytes.NewBuffer([]byte{}) - // Here we read all the messages produced in the internal buffer submit them - // once for each period invocation. We lock the buffer so no new messages - // arrive while draining the buffer. - buflock.Lock() - for loop := len(theChannel) > 0; loop; { + for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { select { case msg, ok := <-theChannel: if !ok { @@ -224,9 +181,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } buf.Grow(len(b)) buf.Write(b) - default: - // when there are no more messages explicitly mark the loop be terminated. - loop = false case <-runCtx.Done(): err := runCtx.Err() if err != nil && !errors.Is(err, context.Canceled) { @@ -237,16 +191,19 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { // fluent-bit to kick in before any remaining data has not been GC'ed // causing a sigsegv. defer runtime.GC() - loop = false + loop = 0 + default: + loop = 0 } } - buflock.Unlock() if buf.Len() > 0 { b := buf.Bytes() cdata := C.CBytes(b) *data = cdata - *csize = C.size_t(len(b)) + if csize != nil { + *csize = C.size_t(len(b)) + } } return input.FLB_OK From 63f32608f212166b31b6016ad87f97590e0d5485 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 13 Sep 2023 16:20:52 -0300 Subject: [PATCH 18/22] cshared: add tests for collect. Signed-off-by: Phillip Whelan --- cshared_test.go | 119 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 6 deletions(-) diff --git a/cshared_test.go b/cshared_test.go index 1ffdcac..df5ed67 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -2,37 +2,144 @@ package plugin import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" "unsafe" ) -type testPluginInputCallback struct{} +type testPluginInputCallbackCtrlC struct{} -func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error { +func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error { +func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { return nil } +func init() { + initWG.Done() + registerWG.Done() +} + func TestInputCallbackCtrlC(t *testing.T) { - theInput = testPluginInputCallback{} + theInput = testPluginInputCallbackCtrlC{} cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) ptr := unsafe.Pointer(nil) - initWG.Done() - registerWG.Done() + go func() { + FLBPluginInputCallback(&ptr, nil) + cdone <- true + }() + + select { + case <-cdone: + runCancel() + case <-timeout.C: + t.Fail() + } +} + +var testPluginInputCallbackInfiniteFuncs atomic.Int64 + +type testPluginInputCallbackInfinite struct{} + +func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { + go func(ch chan<- Message) { + testPluginInputCallbackInfiniteFuncs.Add(1) + for { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + } + }(ch) + return nil +} + +// TestInputCallbackInfinite is a test for the main method most plugins +// use where they do not return from the first invocation of collect. +func TestInputCallbackInfinite(t *testing.T) { + theInput = testPluginInputCallbackInfinite{} + cdone := make(chan bool) + timeout := time.NewTimer(10 * time.Second) + ptr := unsafe.Pointer(nil) + + go func() { + for { + FLBPluginInputCallback(&ptr, nil) + time.Sleep(1 * time.Second) + + if ptr != nil { + cdone <- true + } + } + }() + + select { + case <-cdone: + runCancel() + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackInfiniteFuncs.Load() != 1 { + t.Fail() + } + case <-timeout.C: + t.Fail() + } +} + +type testInputCallbackInfiniteConcurrent struct{} + +var concurrentWait sync.WaitGroup + +func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { + for i := 0; i < 64; i++ { + go func(ch chan<- Message, id int) { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "ID": fmt.Sprintf("%d", id), + }, + } + concurrentWait.Done() + }(ch, i) + } + return nil +} + +// TestInputCallbackInfiniteConcurrent is meant to make sure we do not +// break anythin with respect to concurrent ingest. +func TestInputCallbackInfiniteConcurrent(t *testing.T) { + theInput = testInputCallbackInfiniteConcurrent{} + cdone := make(chan bool) + timeout := time.NewTimer(10 * time.Second) + ptr := unsafe.Pointer(nil) + concurrentWait.Add(64) go func() { FLBPluginInputCallback(&ptr, nil) + concurrentWait.Wait() cdone <- true }() select { case <-cdone: + runCancel() case <-timeout.C: t.Fail() } From bf14d040e2d350eb53bca19506a9ffcdaadb8ea6 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 14 Sep 2023 11:28:46 -0300 Subject: [PATCH 19/22] cshared: add fixes for collect interval handling. Signed-off-by: Phillip Whelan --- cshared.go | 36 ++++++++++++++++++++++++++++-------- cshared_test.go | 27 +++++++++++++++++++++------ 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/cshared.go b/cshared.go index d101262..58d0abd 100644 --- a/cshared.go +++ b/cshared.go @@ -32,6 +32,8 @@ const ( // maxBufferedMessages is the number of messages that will be buffered // between each fluent-bit interval (approx 1 second). defaultMaxBufferedMessages = 300000 + // collectInterval is set to the interval present before in core-fluent-bit. + collectInterval = 1000 * time.Nanosecond ) var ( @@ -151,18 +153,36 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - once.Do(func() { + if runCtx == nil { + fmt.Println("EXEC ONCE") + runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message, maxBufferedMessages) - go func(theChannel chan<- Message) { - err := theInput.Collect(runCtx, theChannel) - if err != nil { - fmt.Fprintf(os.Stderr, - "collect error: %s\n", err.Error()) + // We use a timer instead of a Ticker so that it is not + // rescheduled during a cancel(). + t := time.NewTimer(collectInterval) + + go func(t *time.Timer, theChannel chan<- Message) { + for { + select { + case <-t.C: + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) + } + t.Reset(collectInterval) + case <-runCtx.Done(): + t.Stop() + runCtx = nil + runCancel = nil + close(theChannel) + return + } } - }(theChannel) - }) + }(t, theChannel) + } buf := bytes.NewBuffer([]byte{}) diff --git a/cshared_test.go b/cshared_test.go index df5ed67..0569abe 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -53,18 +53,22 @@ func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentb } func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { - go func(ch chan<- Message) { - testPluginInputCallbackInfiniteFuncs.Add(1) - for { + testPluginInputCallbackInfiniteFuncs.Add(1) + for { + select { + default: ch <- Message{ Time: time.Now(), Record: map[string]string{ "Foo": "BAR", }, } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + case <-ctx.Done(): + return nil } - }(ch) - return nil + } } // TestInputCallbackInfinite is a test for the main method most plugins @@ -94,9 +98,12 @@ func TestInputCallbackInfinite(t *testing.T) { if testPluginInputCallbackInfiniteFuncs.Load() != 1 { t.Fail() } + return case <-timeout.C: + runCancel() t.Fail() } + t.Fail() } type testInputCallbackInfiniteConcurrent struct{} @@ -119,7 +126,14 @@ func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch cha concurrentWait.Done() }(ch, i) } - return nil + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + for { + select { + case <-ctx.Done(): + return nil + } + } } // TestInputCallbackInfiniteConcurrent is meant to make sure we do not @@ -141,6 +155,7 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { case <-cdone: runCancel() case <-timeout.C: + runCancel() t.Fail() } } From e9506f1c2a200f1051241ed2f8aa89ce7b570ff2 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 14 Sep 2023 11:35:22 -0300 Subject: [PATCH 20/22] cshared: remove implementation of min since it was introduced in 1.21. Signed-off-by: Phillip Whelan --- cshared.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cshared.go b/cshared.go index 58d0abd..865b046 100644 --- a/cshared.go +++ b/cshared.go @@ -132,13 +132,6 @@ func FLBPluginInit(ptr unsafe.Pointer) int { return input.FLB_OK } -func min(a, b int) int { - if a < b { - return a - } - return b -} - // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been // initialized, the plugin implementation is responsible for handling the incoming data and the context // that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit @@ -154,8 +147,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { } if runCtx == nil { - fmt.Println("EXEC ONCE") - runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message, maxBufferedMessages) From 52c78961fdf7c985ee28f57abed8110a74d6ff85 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 14 Sep 2023 11:44:25 -0300 Subject: [PATCH 21/22] cshared: reimplement initialization done in input callback using once. Signed-off-by: Phillip Whelan --- cshared.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cshared.go b/cshared.go index 865b046..ae1ec72 100644 --- a/cshared.go +++ b/cshared.go @@ -17,6 +17,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -146,7 +147,7 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - if runCtx == nil { + once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message, maxBufferedMessages) @@ -166,14 +167,13 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { t.Reset(collectInterval) case <-runCtx.Done(): t.Stop() - runCtx = nil - runCancel = nil + once = sync.Once{} close(theChannel) return } } }(t, theChannel) - } + }) buf := bytes.NewBuffer([]byte{}) From e24ea7e6b06f2c4def67b4f41b071dde7610dc26 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 14 Sep 2023 11:45:07 -0300 Subject: [PATCH 22/22] cshared: initialize input timer to 0 so it executes immediately on start. Signed-off-by: Phillip Whelan --- cshared.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cshared.go b/cshared.go index ae1ec72..596da68 100644 --- a/cshared.go +++ b/cshared.go @@ -152,8 +152,9 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { theChannel = make(chan Message, maxBufferedMessages) // We use a timer instead of a Ticker so that it is not - // rescheduled during a cancel(). - t := time.NewTimer(collectInterval) + // rescheduled during a cancel(). We start the timer at 0 + // so the first interval gets executed immediately. + t := time.NewTimer(0) go func(t *time.Timer, theChannel chan<- Message) { for {