Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] buffered channel drain single collect #52

Merged
merged 7 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,25 @@ jobs:

- name: Unit tests
run: |
go test -v -covermode=atomic -coverprofile=coverage.out ./...
# go test -v -covermode=atomic -coverprofile=coverage.out ./...
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackCtrlC\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackDangle\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfinite\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteLatency\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteConcurrent\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestPlugin\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
-run \^TestInputCallbackInfiniteConcurrent\$ ./
go test -v -covermode=atomic -coverprofile=coverage.out \
./configloader/
go test -v -covermode=atomic -coverprofile=coverage.out \
./output/

- name: Upload coverage to Codecov
if: ${{ github.event_name != 'pull_request' }}
Expand Down
146 changes: 103 additions & 43 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -27,10 +29,19 @@ import (
"github.com/calyptia/plugin/output"
)

const (
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
defaultMaxBufferedMessages = 300000
// collectInterval is set to the interval present before in core-fluent-bit.
collectInterval = 1000 * time.Nanosecond
)

var (
unregister func()
cmt *cmetrics.Context
logger Logger
unregister func()
cmt *cmetrics.Context
logger Logger
maxBufferedMessages = defaultMaxBufferedMessages
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialised and any data or flag required for
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

err = theInput.Init(ctx, fbit)
if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" {
maxbuffered, err := strconv.Atoi(maxbuffered)
if err != nil {
maxBufferedMessages = maxbuffered
}
}
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
Expand All @@ -116,10 +133,40 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
return input.FLB_OK
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
// flbPluginReset is meant to reset the plugin between tests.
func flbPluginReset() {
theInputLock.Lock()
defer theInputLock.Unlock()

once = sync.Once{}
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
close(theChannel)
theInput = nil
}

func testFLBPluginInputCallback() ([]byte, error) {
data := unsafe.Pointer(nil)
var csize C.size_t

FLBPluginInputCallback(&data, &csize)

if data == nil {
return []byte{}, nil
}

defer C.free(data)
return C.GoBytes(data, C.int(csize)), nil
}

// Lock used to synchronize access to theInput variable.
var theInputLock sync.Mutex

// FLBPluginInputCallback this method gets invoked by the fluent-bit
// runtime, once the plugin has been initialized, the plugin
// implementation is responsible for handling the incoming data and the
// context that gets past
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
//
//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
Expand All @@ -130,50 +177,63 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}
buf := bytes.NewBuffer([]byte{})

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = 0
default:
loop = 0
}
}

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
if csize != nil {
*csize = C.size_t(len(b))
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -191,7 +251,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down
Loading