Skip to content

Commit

Permalink
Merge pull request #51 from calyptia/cosmo0920-make-hot-reloadable
Browse files Browse the repository at this point in the history
Make hot reloadable
  • Loading branch information
pwhelan authored Oct 23, 2023
2 parents 4c8336c + 4463036 commit aa19413
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 121 deletions.
229 changes: 169 additions & 60 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ var (
maxBufferedMessages = defaultMaxBufferedMessages
)

//export FLBPluginPreRegister
func FLBPluginPreRegister(hotReloading C.int) int {
if hotReloading == C.int(1) {
initWG.Add(1)
registerWG.Add(1)
}

return input.FLB_OK
}

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
// can be provided.
//
Expand Down Expand Up @@ -72,6 +82,24 @@ func FLBPluginRegister(def unsafe.Pointer) int {
return out
}

func cleanup() int {
if unregister != nil {
unregister()
unregister = nil
}

if runCancel != nil {
runCancel()
runCancel = nil
}

if theChannel != nil {
defer close(theChannel)
}

return input.FLB_OK
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialized and any data or flag required for
// plugins to execute the collect or flush callback.
Expand All @@ -80,8 +108,6 @@ func FLBPluginRegister(def unsafe.Pointer) int {
func FLBPluginInit(ptr unsafe.Pointer) int {
defer initWG.Done()

registerWG.Wait()

if theInput == nil && theOutput == nil {
fmt.Fprintf(os.Stderr, "no input or output registered\n")
return input.FLB_RETRY
Expand Down Expand Up @@ -167,6 +193,118 @@ var theInputLock sync.Mutex
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
func prepareInputCollector() (err error) {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

go func(theChannel chan<- Message) {
err = theInput.Collect(runCtx, theChannel)
}(theChannel)

for {
select {
case <-runCtx.Done():
log.Printf("goroutine will be stopping: name=%q\n", theName)
return
}
}

if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)

return err
}

// FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin invoked only once before executing the input callbacks.
//
//export FLBPluginInputPreRun
func FLBPluginInputPreRun(useHotReload C.int) int {
registerWG.Wait()

var err error
err = prepareInputCollector()

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

return input.FLB_OK
}


// FLBPluginInputPause this method gets invoked by the fluent-bit runtime, once the plugin has been
// paused, the plugin invoked this method and entering paused state.
//
//export FLBPluginInputPause
func FLBPluginInputPause() {
if runCancel != nil {
runCancel()
runCancel = nil
}

if theChannel != nil {
close(theChannel)
theChannel = nil
}
}

// FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been
// resumeed, the plugin invoked this method and re-running state.
//
//export FLBPluginInputResume
func FLBPluginInputResume() {
var err error
err = prepareInputCollector()

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
}
}

//export FLBPluginOutputPreRun
func FLBPluginOutputPreRun(useHotReload C.int) int {
registerWG.Wait()

var err error
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func(runCtx context.Context) {
go func(runCtx context.Context) {
err = theOutput.Flush(runCtx, theChannel)
}(runCtx)

for {
select {
case <-runCtx.Done():
log.Printf("goroutine will be stopping: name=%q\n", theName)
return
}
}

}(runCtx)

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return output.FLB_ERROR
}

return output.FLB_OK
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
Expand All @@ -177,23 +315,6 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

theInputLock.Lock()

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()

err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
}(theChannel)
})

buf := bytes.NewBuffer([]byte{})

for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
Expand Down Expand Up @@ -261,18 +382,6 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message)
go func() {
err = theOutput.Flush(runCtx, theChannel)
}()
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return output.FLB_ERROR
}

select {
case <-runCtx.Done():
err = runCtx.Err()
Expand Down Expand Up @@ -319,24 +428,29 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
return output.FLB_ERROR
}

if d := len(entry); d != 2 {
fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", d)
slice := reflect.ValueOf(entry)
if slice.Kind() != reflect.Slice || slice.Len() < 2 {
fmt.Fprintf(os.Stderr, "unexpected entry length: %d\n", slice.Len())
return output.FLB_ERROR
}

ft, ok := entry[0].(bigEndianTime)
if !ok {
var t time.Time
ts := slice.Index(0).Interface()
switch ft := ts.(type) {
case bigEndianTime:
t = time.Time(ft)
case []interface{}:
s := reflect.ValueOf(ft)
st := s.Index(0).Interface()
ty := st.(bigEndianTime)
t = time.Time(ty)
default:
fmt.Fprintf(os.Stderr, "unexpected entry time type: %T\n", entry[0])
return output.FLB_ERROR
}

t := time.Time(ft)

recVal, ok := entry[1].(map[any]any)
if !ok {
fmt.Fprintf(os.Stderr, "unexpected entry record type: %T\n", entry[1])
return output.FLB_ERROR
}
data := slice.Index(1)
recVal := data.Interface().(map[interface{}]interface{})

var rec map[string]string
if d := len(recVal); d != 0 {
Expand All @@ -348,13 +462,22 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
return output.FLB_ERROR
}

val, ok := v.([]uint8)
if !ok {
var val string
switch tv := v.(type) {
case []uint8:
val = string(tv)
case uint64:
val = strconv.FormatUint(tv, 10)
case int64:
val = strconv.FormatInt(tv, 10)
case bool:
val = strconv.FormatBool(tv)
default:
fmt.Fprintf(os.Stderr, "unexpected record value type: %T\n", v)
return output.FLB_ERROR
}

rec[key] = string(val)
rec[key] = val
}
}

Expand All @@ -374,21 +497,7 @@ func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
//
//export FLBPluginExit
func FLBPluginExit() int {
log.Printf("calling FLBPluginExit(): name=%q\n", theName)

if unregister != nil {
unregister()
}

if runCancel != nil {
runCancel()
}

if theChannel != nil {
defer close(theChannel)
}

return input.FLB_OK
return cleanup()
}

type flbInputConfigLoader struct {
Expand Down
33 changes: 33 additions & 0 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func TestInputCallbackCtrlC(t *testing.T) {

ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
FLBPluginInputCallback(&ptr, nil)
cdone <- true
Expand Down Expand Up @@ -86,6 +93,12 @@ func TestInputCallbackDangle(t *testing.T) {
cdone := make(chan bool)
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -156,6 +169,13 @@ func TestInputCallbackInfinite(t *testing.T) {
cshutdown := make(chan bool)
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -237,6 +257,13 @@ func TestInputCallbackLatency(t *testing.T) {
cstarted := make(chan bool)
cmsg := make(chan []byte)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
return
}

go func() {
t := time.NewTicker(collectInterval)
defer t.Stop()
Expand Down Expand Up @@ -363,6 +390,12 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {

concurrentWait.Add(64)

// prepare channel for input explicitly.
err := prepareInputCollector()
if err != nil {
t.Fail()
}

go func(cstarted chan bool) {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
Expand Down
9 changes: 7 additions & 2 deletions examples/out_gstdout/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module github.com/fluent/fluent-bit-go/examples/gstdout

go 1.21.0

require github.com/fluent/fluent-bit-go v0.0.0-20200420155746-e125cab17963
require github.com/calyptia/plugin v0.1.6

replace github.com/fluent/fluent-bit-go => ../..
require (
github.com/calyptia/cmetrics-go v0.1.7 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
)

replace github.com/calyptia/plugin => ../..
Loading

0 comments on commit aa19413

Please sign in to comment.