Skip to content

Commit

Permalink
cshared: use an intermediate buffer that gets flushed on each input c…
Browse files Browse the repository at this point in the history
…allback.

Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed May 5, 2023
1 parent b7c5cad commit d0d4cf5
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"reflect"
"runtime"
"sync"
"time"
"unsafe"

Expand All @@ -29,6 +30,7 @@ import (
var unregister func()
var cmt *cmetrics.Context
var logger Logger
var buflock sync.Mutex

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
// can be provided.
Expand Down Expand Up @@ -130,19 +132,44 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, 256)
// we need to configure this part....
theChannel = make(chan Message, 300000)
// do we need to buffer this part???
cbuf := make(chan Message, 16)

go func() {
err = theInput.Collect(runCtx, theChannel)
err = theInput.Collect(runCtx, cbuf)
}()
go func(cbuf chan Message) {
t := time.NewTicker(1 * time.Second)
for {
buflock.Lock()
select {
case msg, ok := <-cbuf:
if !ok {
continue
}
buflock.Unlock()
theChannel <- msg
buflock.Lock()
case <-t.C:
buflock.Unlock()
buflock.Lock()
default:
}
buflock.Unlock()
}
}(cbuf)
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

buf := bytes.NewBuffer([]byte{})
buflock.Lock()

for loop := true; loop; {
for loop := len(theChannel) > 0; loop; {
select {
case msg, ok := <-theChannel:
if !ok {
Expand Down Expand Up @@ -172,12 +199,14 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
loop = false
}
}
buflock.Unlock()

b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
*csize = C.size_t(len(b))
if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)
*data = cdata
*csize = C.size_t(len(b))
}

return input.FLB_OK
}
Expand Down

0 comments on commit d0d4cf5

Please sign in to comment.