Skip to content

Commit

Permalink
test: add latency test.
Browse files Browse the repository at this point in the history
Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Oct 4, 2023
1 parent e2ec446 commit f16bef3
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 0 deletions.
14 changes: 14 additions & 0 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ func flbPluginReset() {
close(theChannel)
}

func testFLBPluginInputCallback() ([]byte, error) {
data := unsafe.Pointer(nil)
var csize C.size_t

FLBPluginInputCallback(&data, &csize)

if data == nil {
return []byte{}, nil
}

defer C.free(data)
return C.GoBytes(data, C.int(csize)), nil
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit
// runtime, once the plugin has been initialized, the plugin
// implementation is responsible for handling the incoming data and the
Expand Down
98 changes: 98 additions & 0 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"
"unsafe"

"github.com/calyptia/plugin/output"
)

type testPluginInputCallbackCtrlC struct{}
Expand Down Expand Up @@ -173,6 +175,100 @@ func TestInputCallbackInfinite(t *testing.T) {
}
}

type testPluginInputCallbackLatency struct{}

func (t testPluginInputCallbackLatency) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- Message) error {
tick := time.NewTimer(time.Second * 1)
for {
select {
case <-tick.C:
for i := 0; i < 128; i++ {
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"Foo": "BAR",
},
}
}
tick.Reset(time.Second * 1)
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfiniteLatency is a test of the latency between
// messages.
func TestInputCallbackLatency(t *testing.T) {
defer flbPluginReset()

theInput = testPluginInputCallbackLatency{}
cdone := make(chan bool)
cmsg := make(chan []byte)

go func() {
t := time.NewTicker(collectInterval)
for {
select {
case <-cdone:
return
case <-t.C:
buf, _ := testFLBPluginInputCallback()
if len(buf) > 0 {
cmsg <- buf
}
}
}
}()

timeout := time.NewTimer(5 * time.Second)
msgs := 0

for {
select {
case buf := <-cmsg:
dec := output.NewByteDecoder(buf)
if dec == nil {
t.Fatal("dec is nil")
}

for {
ret, timestamp, _ := output.GetRecord(dec)
if ret == -1 {
break
}
if ret < 0 {
t.Fatalf("ret is negative: %d", ret)
}

msgs++

ts, ok := timestamp.(output.FLBTime)
if !ok {
t.Fatal()
}

if time.Since(ts.Time) > time.Millisecond*5 {
t.Errorf("latency too high: %fms",
float64(time.Since(ts.Time)/time.Millisecond))
}
}
case <-timeout.C:
timeout.Stop()
runCancel()

if msgs < 128 {
t.Fatalf("too few messages: %d", msgs)
}
return
}
}
}

type testInputCallbackInfiniteConcurrent struct{}

var concurrentWait sync.WaitGroup
Expand All @@ -182,6 +278,8 @@ func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Flu
}

func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error {
defer flbPluginReset()

for i := 0; i < 64; i++ {
go func(ch chan<- Message, id int) {
ch <- Message{
Expand Down
11 changes: 11 additions & 0 deletions output/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder {
return dec
}

func NewByteDecoder(b []byte) *FLBDecoder {
dec := new(FLBDecoder)
dec.handle = new(codec.MsgpackHandle)
// TODO: handle error.
_ = dec.handle.SetBytesExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{})

dec.mpdec = codec.NewDecoderBytes(b, dec.handle)

return dec
}

func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) {
var check error
var m interface{}
Expand Down

0 comments on commit f16bef3

Please sign in to comment.