Skip to content

Commit

Permalink
Merge pull request #44 from calyptia/revert-43-feat-buffered-channel-…
Browse files Browse the repository at this point in the history
…drain-next

Revert "[FEATURE] buffered channel drain next"
  • Loading branch information
niedbalski authored Sep 15, 2023
2 parents 5e6bd90 + 4842625 commit dece716
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 211 deletions.
123 changes: 40 additions & 83 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -17,7 +16,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -29,19 +27,10 @@ import (
"github.com/calyptia/plugin/output"
)

const (
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
defaultMaxBufferedMessages = 300000
// collectInterval is set to the interval present before in core-fluent-bit.
collectInterval = 1000 * time.Nanosecond
)

var (
unregister func()
cmt *cmetrics.Context
logger Logger
maxBufferedMessages = defaultMaxBufferedMessages
unregister func()
cmt *cmetrics.Context
logger Logger
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -73,7 +62,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialized and any data or flag required for
// here all the plugin context should be initialised and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -105,12 +94,6 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

err = theInput.Init(ctx, fbit)
if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" {
maxbuffered, err := strconv.Atoi(maxbuffered)
if err != nil {
maxBufferedMessages = maxbuffered
}
}
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
Expand All @@ -134,7 +117,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialized, the plugin implementation is responsible for handling the incoming data and the context
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
Expand All @@ -147,75 +130,50 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

// We use a timer instead of a Ticker so that it is not
// rescheduled during a cancel(). We start the timer at 0
// so the first interval gets executed immediately.
t := time.NewTimer(0)

go func(t *time.Timer, theChannel chan<- Message) {
for {
select {
case <-t.C:
err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
t.Reset(collectInterval)
case <-runCtx.Done():
t.Stop()
once = sync.Once{}
close(theChannel)
return
}
}
}(t, theChannel)
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

buf := bytes.NewBuffer([]byte{})

for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = 0
default:
loop = 0
t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
}

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
if csize != nil {
*csize = C.size_t(len(b))
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -233,8 +191,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
// TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down
134 changes: 6 additions & 128 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,160 +2,38 @@ package plugin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)

type testPluginInputCallbackCtrlC struct{}
type testPluginInputCallback struct{}

func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error {
func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error {
func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error {
return nil
}

func init() {
initWG.Done()
registerWG.Done()
}

func TestInputCallbackCtrlC(t *testing.T) {
theInput = testPluginInputCallbackCtrlC{}
theInput = testPluginInputCallback{}
cdone := make(chan bool)
timeout := time.NewTimer(1 * time.Second)
ptr := unsafe.Pointer(nil)

go func() {
FLBPluginInputCallback(&ptr, nil)
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
t.Fail()
}
}

var testPluginInputCallbackInfiniteFuncs atomic.Int64

type testPluginInputCallbackInfinite struct{}

func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error {
testPluginInputCallbackInfiniteFuncs.Add(1)
for {
select {
default:
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"Foo": "BAR",
},
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfinite is a test for the main method most plugins
// use where they do not return from the first invocation of collect.
func TestInputCallbackInfinite(t *testing.T) {
theInput = testPluginInputCallbackInfinite{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)

go func() {
for {
FLBPluginInputCallback(&ptr, nil)
time.Sleep(1 * time.Second)

if ptr != nil {
cdone <- true
}
}
}()

select {
case <-cdone:
runCancel()
// Test the assumption that only a single goroutine is
// ingesting records.
if testPluginInputCallbackInfiniteFuncs.Load() != 1 {
t.Fail()
}
return
case <-timeout.C:
runCancel()
t.Fail()
}
t.Fail()
}

type testInputCallbackInfiniteConcurrent struct{}

var concurrentWait sync.WaitGroup

func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error {
for i := 0; i < 64; i++ {
go func(ch chan<- Message, id int) {
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"ID": fmt.Sprintf("%d", id),
},
}
concurrentWait.Done()
}(ch, i)
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
for {
select {
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfiniteConcurrent is meant to make sure we do not
// break anythin with respect to concurrent ingest.
func TestInputCallbackInfiniteConcurrent(t *testing.T) {
theInput = testInputCallbackInfiniteConcurrent{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)
initWG.Done()
registerWG.Done()

concurrentWait.Add(64)
go func() {
FLBPluginInputCallback(&ptr, nil)
concurrentWait.Wait()
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
runCancel()
t.Fail()
}
}

0 comments on commit dece716

Please sign in to comment.