diff --git a/cshared.go b/cshared.go index afa215d..596da68 100644 --- a/cshared.go +++ b/cshared.go @@ -6,6 +6,7 @@ package plugin import "C" import ( + "bytes" "context" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -27,10 +29,19 @@ import ( "github.com/calyptia/plugin/output" ) +const ( + // maxBufferedMessages is the number of messages that will be buffered + // between each fluent-bit interval (approx 1 second). + defaultMaxBufferedMessages = 300000 + // collectInterval is set to the interval present before in core-fluent-bit. + collectInterval = 1000 * time.Nanosecond +) + var ( - unregister func() - cmt *cmetrics.Context - logger Logger + unregister func() + cmt *cmetrics.Context + logger Logger + maxBufferedMessages = defaultMaxBufferedMessages ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { } // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. -// here all the plugin context should be initialised and any data or flag required for +// here all the plugin context should be initialized and any data or flag required for // plugins to execute the collect or flush callback. // //export FLBPluginInit @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } err = theInput.Init(ctx, fbit) + if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { + maxbuffered, err := strconv.Atoi(maxbuffered) + if err != nil { + maxBufferedMessages = maxbuffered + } + } } else { conf := &flbOutputConfigLoader{ptr: ptr} cmt, err = output.FLBPluginGetCMetricsContext(ptr) @@ -117,7 +134,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialised, the plugin implementation is responsible for handling the incoming data and the context +// initialized, the plugin implementation is responsible for handling the incoming data and the context // that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit // will not execute further callbacks. // @@ -130,50 +147,75 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) - go func() { - err = theInput.Collect(runCtx, theChannel) - }() + theChannel = make(chan Message, maxBufferedMessages) + + // We use a timer instead of a Ticker so that it is not + // rescheduled during a cancel(). We start the timer at 0 + // so the first interval gets executed immediately. + t := time.NewTimer(0) + + go func(t *time.Timer, theChannel chan<- Message) { + for { + select { + case <-t.C: + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) + } + t.Reset(collectInterval) + case <-runCtx.Done(): + t.Stop() + once = sync.Once{} + close(theChannel) + return + } + } + }(t, theChannel) }) - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_OK - } + buf := bytes.NewBuffer([]byte{}) - t := input.FLBTime{Time: msg.Time} - b, err := input.NewEncoder().Encode([]any{t, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "encode: %s\n", err) - return input.FLB_ERROR + for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { + select { + case msg, ok := <-theChannel: + if !ok { + return input.FLB_ERROR + } + + t := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{t, msg.Record}) + if err != nil { + fmt.Fprintf(os.Stderr, "encode: %s\n", err) + return input.FLB_ERROR + } + buf.Grow(len(b)) + buf.Write(b) + case <-runCtx.Done(): + err := runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + loop = 0 + default: + loop = 0 } + } + if buf.Len() > 0 { + b := buf.Bytes() cdata := C.CBytes(b) - *data = cdata - *csize = C.size_t(len(b)) - - // C.free(unsafe.Pointer(cdata)) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR + if csize != nil { + *csize = C.size_t(len(b)) } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - default: - break } return input.FLB_OK @@ -191,7 +233,8 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation. // //export FLBPluginFlush -//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions. +// TODO: refactor into smaller functions. +//nolint:funlen //ignore length requirement for this function func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { initWG.Wait() diff --git a/cshared_test.go b/cshared_test.go index 1ffdcac..0569abe 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -2,38 +2,160 @@ package plugin import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" "unsafe" ) -type testPluginInputCallback struct{} +type testPluginInputCallbackCtrlC struct{} -func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error { +func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error { +func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { return nil } +func init() { + initWG.Done() + registerWG.Done() +} + func TestInputCallbackCtrlC(t *testing.T) { - theInput = testPluginInputCallback{} + theInput = testPluginInputCallbackCtrlC{} cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) ptr := unsafe.Pointer(nil) - initWG.Done() - registerWG.Done() + go func() { + FLBPluginInputCallback(&ptr, nil) + cdone <- true + }() + + select { + case <-cdone: + runCancel() + case <-timeout.C: + t.Fail() + } +} + +var testPluginInputCallbackInfiniteFuncs atomic.Int64 + +type testPluginInputCallbackInfinite struct{} + +func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { + testPluginInputCallbackInfiniteFuncs.Add(1) + for { + select { + default: + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfinite is a test for the main method most plugins +// use where they do not return from the first invocation of collect. +func TestInputCallbackInfinite(t *testing.T) { + theInput = testPluginInputCallbackInfinite{} + cdone := make(chan bool) + timeout := time.NewTimer(10 * time.Second) + ptr := unsafe.Pointer(nil) + + go func() { + for { + FLBPluginInputCallback(&ptr, nil) + time.Sleep(1 * time.Second) + + if ptr != nil { + cdone <- true + } + } + }() + + select { + case <-cdone: + runCancel() + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackInfiniteFuncs.Load() != 1 { + t.Fail() + } + return + case <-timeout.C: + runCancel() + t.Fail() + } + t.Fail() +} + +type testInputCallbackInfiniteConcurrent struct{} + +var concurrentWait sync.WaitGroup + +func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { + for i := 0; i < 64; i++ { + go func(ch chan<- Message, id int) { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "ID": fmt.Sprintf("%d", id), + }, + } + concurrentWait.Done() + }(ch, i) + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + for { + select { + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteConcurrent is meant to make sure we do not +// break anythin with respect to concurrent ingest. +func TestInputCallbackInfiniteConcurrent(t *testing.T) { + theInput = testInputCallbackInfiniteConcurrent{} + cdone := make(chan bool) + timeout := time.NewTimer(10 * time.Second) + ptr := unsafe.Pointer(nil) + concurrentWait.Add(64) go func() { FLBPluginInputCallback(&ptr, nil) + concurrentWait.Wait() cdone <- true }() select { case <-cdone: + runCancel() case <-timeout.C: + runCancel() t.Fail() } }