Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] buffered channel drain #34

Closed
wants to merge 16 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 111 additions & 37 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -31,6 +33,20 @@ var (
unregister func()
cmt *cmetrics.Context
logger Logger
buflock sync.Mutex
)

const (
// collectInterval is equal to the interval originally used in the high
// frequency patch in fluent-bit.
collectInterval = time.Nanosecond * 1000
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
maxBufferedMessages = 300000
// maxConcurrentChannels is the number of channels that will be buffered
// for incoming messages to the message buffer. These messages will be
// ingested as quickly as possible until the buffer is full.
maxConcurrentChannels = 16
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -62,7 +78,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialised and any data or flag required for
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -117,7 +133,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// initialized, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
Expand All @@ -130,50 +146,107 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()

theChannel = make(chan Message, maxBufferedMessages)
cbuf := make(chan Message, maxConcurrentChannels)

// Most plugins expect Collect to be invoked once and then takes over the
// input thread by running in an infinite loop. Here we simulate this
// behavior and also simulate the original behavior for those plugins that
// do not hold on to the thread.
go func(runCtx context.Context) {
t := time.NewTicker(collectInterval)
defer t.Stop()

for {
select {
case <-runCtx.Done():
return
case <-t.C:
if err := theInput.Collect(runCtx, cbuf); err != nil {
fmt.Fprintf(os.Stderr, "Error collecting input: %s\n", err.Error())
}
}
}
}(runCtx)

// Limit submits to a single full buffer for each second. This limits
// the amount of locking when invoking the fluent-bit API.
go func(cbuf chan Message) {
t := time.NewTicker(1 * time.Second)
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
defer t.Stop()

// Use a mutex lock for the buffer to avoid filling the buffer more than
// once per period (1s). We also use the mutex lock to avoid infinitely
// filling the buffer while it is being flushed to fluent-bit.
for {
buflock.Lock()
select {
case msg, ok := <-cbuf:
if !ok {
continue
}
buflock.Unlock()
theChannel <- msg
buflock.Lock()
case <-t.C:
buflock.Unlock()
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
buflock.Lock()
case <-runCtx.Done():
buflock.Unlock()
return
}
buflock.Unlock()
}
}(cbuf)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}
buf := bytes.NewBuffer([]byte{})

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
// Here we read all the messages produced in the internal buffer submit them
// once for each period invocation. We lock the buffer so no new messages
// arrive while draining the buffer.
buflock.Lock()
for loop := len(theChannel) > 0; loop; {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
default:
// when there are no more messages explicitly mark the loop be terminated.
loop = false
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = false
}
}
buflock.Unlock()

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -191,7 +264,8 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
// TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down