Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make hot reloadable #51

Merged
merged 7 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 169 additions & 60 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ var (
maxBufferedMessages = defaultMaxBufferedMessages
)

//export FLBPluginPreRegister
func FLBPluginPreRegister(hotReloading C.int) int {
if hotReloading == C.int(1) {
initWG.Add(1)
registerWG.Add(1)
}

return input.FLB_OK
}

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
// can be provided.
//
Expand Down Expand Up @@ -72,6 +82,24 @@ func FLBPluginRegister(def unsafe.Pointer) int {
return out
}

func cleanup() int {
if unregister != nil {
unregister()
unregister = nil
}

if runCancel != nil {
runCancel()
runCancel = nil
}

if theChannel != nil {
defer close(theChannel)
pwhelan marked this conversation as resolved.
Show resolved Hide resolved
}

return input.FLB_OK
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
Expand All @@ -80,8 +108,6 @@ func FLBPluginRegister(def unsafe.Pointer) int {
func FLBPluginInit(ptr unsafe.Pointer) int {
defer initWG.Done()

registerWG.Wait()

if theInput == nil && theOutput == nil {
fmt.Fprintf(os.Stderr, "no input or output registered\n")
return input.FLB_RETRY
Expand Down Expand Up @@ -167,6 +193,118 @@ var theInputLock sync.Mutex
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
func prepareInputCollector() (err error) {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

go func(theChannel chan<- Message) {
err = theInput.Collect(runCtx, theChannel)
}(theChannel)

for {
select {
case <-runCtx.Done():
log.Printf("goroutine will be stopping: name=%q\n", theName)
return
}
}

if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)

return err
}

// FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin invoked only once before executing the input callbacks.
//
//export FLBPluginInputPreRun
func FLBPluginInputPreRun(useHotReload C.int) int {
registerWG.Wait()

var err error
err = prepareInputCollector()

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

return input.FLB_OK
}


// FLBPluginInputPause this method gets invoked by the fluent-bit runtime, once the plugin has been
// paused, the plugin invoked this method and entering paused state.
//
//export FLBPluginInputPause
func FLBPluginInputPause() {
if runCancel != nil {
runCancel()
runCancel = nil
}

if theChannel != nil {
close(theChannel)
theChannel = nil
}
}

// FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been
// resumeed, the plugin invoked this method and re-running state.
//
//export FLBPluginInputResume
func FLBPluginInputResume() {
var err error
err = prepareInputCollector()

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
}
}

//export FLBPluginOutputPreRun
func FLBPluginOutputPreRun(useHotReload C.int) int {
registerWG.Wait()

var err error
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func(runCtx context.Context) {
go func(runCtx context.Context) {
err = theOutput.Flush(runCtx, theChannel)
}(runCtx)

for {
select {
case <-runCtx.Done():
log.Printf("goroutine will be stopping: name=%q\n", theName)
return
}
}

}(runCtx)

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return output.FLB_ERROR
}

return output.FLB_OK
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
Expand All @@ -177,23 +315,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)
})

buf := bytes.NewBuffer([]byte{})

for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
Expand Down Expand Up @@ -261,18 +382,6 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theOutput.Flush(runCtx, theChannel)
}()
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return output.FLB_ERROR
}

select {
case <-runCtx.Done():
err = runCtx.Err()
Expand Down Expand Up @@ -319,24 +428,29 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
return output.FLB_ERROR
}

if d := len(entry); d != 2 {
fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", d)
slice := reflect.ValueOf(entry)
if slice.Kind() != reflect.Slice || slice.Len() < 2 {
fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", slice.Len())
return output.FLB_ERROR
}

ft, ok := entry[0].(bigEndianTime)
if !ok {
var t time.Time
ts := slice.Index(0).Interface()
switch ft := ts.(type) {
case bigEndianTime:
t = time.Time(ft)
case []interface{}:
s := reflect.ValueOf(ft)
st := s.Index(0).Interface()
ty := st.(bigEndianTime)
t = time.Time(ty)
default:
fmt.Fprintf(os.Stderr, "unexpected entry time type: %T\n", entry[0])
return output.FLB_ERROR
}

t := time.Time(ft)

recVal, ok := entry[1].(map[any]any)
if !ok {
fmt.Fprintf(os.Stderr, "unexpected entry record type: %T\n", entry[1])
return output.FLB_ERROR
}
data := slice.Index(1)
recVal := data.Interface().(map[interface{}]interface{})

var rec map[string]string
if d := len(recVal); d != 0 {
Expand All @@ -348,13 +462,22 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
return output.FLB_ERROR
}

val, ok := v.([]uint8)
if !ok {
var val string
switch tv := v.(type) {
case []uint8:
val = string(tv)
case uint64:
val = strconv.FormatUint(tv, 10)
case int64:
val = strconv.FormatInt(tv, 10)
case bool:
val = strconv.FormatBool(tv)
default:
fmt.Fprintf(os.Stderr, "unexpected record value type: %T\n", v)
return output.FLB_ERROR
}

rec[key] = string(val)
rec[key] = val
}
}

Expand All @@ -374,21 +497,7 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
//
//export FLBPluginExit
func FLBPluginExit() int {
log.Printf("calling FLBPluginExit(): name=%q\n", theName)

if unregister != nil {
unregister()
}

if runCancel != nil {
runCancel()
}

if theChannel != nil {
defer close(theChannel)
}

return input.FLB_OK
return cleanup()
}

type flbInputConfigLoader struct {
Expand Down
33 changes: 33 additions & 0 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func TestInputCallbackCtrlC(t *testing.T) {

ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
FLBPluginInputCallback(&ptr, nil)
cdone <- true
Expand Down Expand Up @@ -86,6 +93,12 @@ func TestInputCallbackDangle(t *testing.T) {
cdone := make(chan bool)
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -156,6 +169,13 @@ func TestInputCallbackInfinite(t *testing.T) {
cshutdown := make(chan bool)
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -237,6 +257,13 @@ func TestInputCallbackLatency(t *testing.T) {
cstarted := make(chan bool)
cmsg := make(chan []byte)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -363,6 +390,12 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {

concurrentWait.Add(64)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
}

go func(cstarted chan bool) {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
Expand Down
9 changes: 7 additions & 2 deletions examples/out_gstdout/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module github.com/fluent/fluent-bit-go/examples/gstdout

go 1.21.0

require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963
require github.com/calyptia/plugin v0.1.6

replace github.com/fluent/fluent-bit-go => ../..
require (
github.com/calyptia/cmetrics-go v0.1.7 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
)

replace github.com/calyptia/plugin => ../..
Loading