Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] buffered channel drain #34

Closed
wants to merge 16 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -31,6 +33,7 @@ var (
unregister func()
cmt *cmetrics.Context
logger Logger
buflock sync.Mutex
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -133,47 +136,79 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
// we need to configure this part....
theChannel = make(chan Message, 300000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, what's this magic number? where does it comes from and what's the base for this math?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally derived the value experimentally. This number seemed to be the most optimal at the time of my initial testing. I'll see if I can find out what factors lead to that number, or if it even is all that optimal.

// do we need to buffer this part???
cbuf := make(chan Message, 16)
niedbalski marked this conversation as resolved.
Show resolved Hide resolved

go func() {
err = theInput.Collect(runCtx, theChannel)
err = theInput.Collect(runCtx, cbuf)
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
}()
go func(cbuf chan Message) {
t := time.NewTicker(1 * time.Second)
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
for {
buflock.Lock()
select {
case msg, ok := <-cbuf:
if !ok {
continue
}
buflock.Unlock()
theChannel <- msg
buflock.Lock()
case <-t.C:
buflock.Unlock()
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
buflock.Lock()
}
buflock.Unlock()
}
}(cbuf)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}
buf := bytes.NewBuffer([]byte{})
buflock.Lock()

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
for loop := len(theChannel) > 0; loop; {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
default:
loop = false
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = false
}
}
buflock.Unlock()

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand Down