Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] buffered channel drain next #43

Merged
merged 22 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
52e6398
cshared: use a buffered channel and cat multiple records into a singl…
pwhelan Jan 5, 2023
00bdd0c
cshared: increase channel buffer size.
pwhelan Jan 11, 2023
c2b676b
cshared: use an intermediate buffer that gets flushed on each input c…
pwhelan Jan 12, 2023
0cba014
fix for 100% cpu utilization.
pwhelan Sep 8, 2023
b7c5437
cshared: defer the stopping of the main loop timer.
pwhelan Sep 12, 2023
8c3db6e
cshared: log Collect error.
pwhelan Sep 12, 2023
52bc5c8
cshared: emulate the old collect interval.
pwhelan Sep 12, 2023
a9fcbb8
cshared: comment the use of the buffer lock and why it is being used.
pwhelan Sep 12, 2023
48922b5
cshared: exit buffer loop when runCtx is done.
pwhelan Sep 12, 2023
dbff4a8
cshared: fix minor misspellings.
pwhelan Sep 12, 2023
17975aa
cshared: fix 'cannot use err (variable of type error) as string'.
pwhelan Sep 12, 2023
8dbb0ba
cshared: define the collect interval as a const.
pwhelan Sep 12, 2023
ff41f31
cshared: remove unused declaration and code.
pwhelan Sep 12, 2023
a7af12a
cshared: fix nolintlint (remove unused nolint directives).
pwhelan Sep 12, 2023
af57b6c
cshared: move mnd for the message buffer size to a const.
pwhelan Sep 12, 2023
e9110a3
cshared: comment and set as consts all magic numbers.
pwhelan Sep 12, 2023
e31c026
cshared: refactor buffering to remove the use of the cbuf intermediat…
pwhelan Sep 13, 2023
63f3260
cshared: add tests for collect.
pwhelan Sep 13, 2023
bf14d04
cshared: add fixes for collect interval handling.
pwhelan Sep 14, 2023
e9506f1
cshared: remove implementation of min since it was introduced in 1.21.
pwhelan Sep 14, 2023
52c7896
cshared: reimplement initialization done in input callback using once.
pwhelan Sep 14, 2023
e24ea7e
cshared: initialize input timer to 0 so it executes immediately on st…
pwhelan Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 83 additions & 40 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -27,10 +29,19 @@ import (
"github.com/calyptia/plugin/output"
)

const (
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
defaultMaxBufferedMessages = 300000
// collectInterval is set to the interval present before in core-fluent-bit.
collectInterval = 1000 * time.Nanosecond
)

var (
unregister func()
cmt *cmetrics.Context
logger Logger
unregister func()
cmt *cmetrics.Context
logger Logger
maxBufferedMessages = defaultMaxBufferedMessages
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialised and any data or flag required for
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

err = theInput.Init(ctx, fbit)
if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" {
maxbuffered, err := strconv.Atoi(maxbuffered)
if err != nil {
maxBufferedMessages = maxbuffered
}
}
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
Expand All @@ -117,7 +134,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// initialized, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
Expand All @@ -130,50 +147,75 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()
theChannel = make(chan Message, maxBufferedMessages)

// We use a timer instead of a Ticker so that it is not
// rescheduled during a cancel(). We start the timer at 0
// so the first interval gets executed immediately.
t := time.NewTimer(0)

go func(t *time.Timer, theChannel chan<- Message) {
for {
select {
case <-t.C:
err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
t.Reset(collectInterval)
case <-runCtx.Done():
t.Stop()
once = sync.Once{}
close(theChannel)
return
}
}
}(t, theChannel)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}
buf := bytes.NewBuffer([]byte{})

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = 0
default:
loop = 0
}
}

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
if csize != nil {
*csize = C.size_t(len(b))
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -191,7 +233,8 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
// TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down
134 changes: 128 additions & 6 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,160 @@ package plugin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)

type testPluginInputCallback struct{}
type testPluginInputCallbackCtrlC struct{}

func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error {
func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error {
func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error {
return nil
}

func init() {
initWG.Done()
registerWG.Done()
}

func TestInputCallbackCtrlC(t *testing.T) {
theInput = testPluginInputCallback{}
theInput = testPluginInputCallbackCtrlC{}
cdone := make(chan bool)
timeout := time.NewTimer(1 * time.Second)
ptr := unsafe.Pointer(nil)

initWG.Done()
registerWG.Done()
go func() {
FLBPluginInputCallback(&ptr, nil)
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
t.Fail()
}
}

var testPluginInputCallbackInfiniteFuncs atomic.Int64

type testPluginInputCallbackInfinite struct{}

func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error {
testPluginInputCallbackInfiniteFuncs.Add(1)
for {
select {
default:
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"Foo": "BAR",
},
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfinite is a test for the main method most plugins
// use where they do not return from the first invocation of collect.
func TestInputCallbackInfinite(t *testing.T) {
theInput = testPluginInputCallbackInfinite{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)

go func() {
for {
FLBPluginInputCallback(&ptr, nil)
time.Sleep(1 * time.Second)

if ptr != nil {
cdone <- true
}
}
}()

select {
case <-cdone:
runCancel()
// Test the assumption that only a single goroutine is
// ingesting records.
if testPluginInputCallbackInfiniteFuncs.Load() != 1 {
t.Fail()
}
return
case <-timeout.C:
runCancel()
t.Fail()
}
t.Fail()
}

type testInputCallbackInfiniteConcurrent struct{}

var concurrentWait sync.WaitGroup

func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error {
for i := 0; i < 64; i++ {
go func(ch chan<- Message, id int) {
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"ID": fmt.Sprintf("%d", id),
},
}
concurrentWait.Done()
}(ch, i)
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
for {
select {
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfiniteConcurrent is meant to make sure we do not
// break anythin with respect to concurrent ingest.
func TestInputCallbackInfiniteConcurrent(t *testing.T) {
theInput = testInputCallbackInfiniteConcurrent{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)

concurrentWait.Add(64)
go func() {
FLBPluginInputCallback(&ptr, nil)
concurrentWait.Wait()
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
runCancel()
t.Fail()
}
}