From f16bef3495ec8e7b50eb620377ac6867fcceb3f2 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 4 Oct 2023 17:09:03 -0300 Subject: [PATCH] test: add latency test. Signed-off-by: Phillip Whelan --- cshared.go | 14 +++++++ cshared_test.go | 98 +++++++++++++++++++++++++++++++++++++++++++++++ output/decoder.go | 11 ++++++ 3 files changed, 123 insertions(+) diff --git a/cshared.go b/cshared.go index d465677..ee6aba5 100644 --- a/cshared.go +++ b/cshared.go @@ -139,6 +139,20 @@ func flbPluginReset() { close(theChannel) } +func testFLBPluginInputCallback() ([]byte, error) { + data := unsafe.Pointer(nil) + var csize C.size_t + + FLBPluginInputCallback(&data, &csize) + + if data == nil { + return []byte{}, nil + } + + defer C.free(data) + return C.GoBytes(data, C.int(csize)), nil +} + // FLBPluginInputCallback this method gets invoked by the fluent-bit // runtime, once the plugin has been initialized, the plugin // implementation is responsible for handling the incoming data and the diff --git a/cshared_test.go b/cshared_test.go index 947a86b..74c4354 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" "unsafe" + + "github.com/calyptia/plugin/output" ) type testPluginInputCallbackCtrlC struct{} @@ -173,6 +175,100 @@ func TestInputCallbackInfinite(t *testing.T) { } } +type testPluginInputCallbackLatency struct{} + +func (t testPluginInputCallbackLatency) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- Message) error { + tick := time.NewTimer(time.Second * 1) + for { + select { + case <-tick.C: + for i := 0; i < 128; i++ { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + } + tick.Reset(time.Second * 1) + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteLatency is a test of the latency between +// messages. +func TestInputCallbackLatency(t *testing.T) { + defer flbPluginReset() + + theInput = testPluginInputCallbackLatency{} + cdone := make(chan bool) + cmsg := make(chan []byte) + + go func() { + t := time.NewTicker(collectInterval) + for { + select { + case <-cdone: + return + case <-t.C: + buf, _ := testFLBPluginInputCallback() + if len(buf) > 0 { + cmsg <- buf + } + } + } + }() + + timeout := time.NewTimer(5 * time.Second) + msgs := 0 + + for { + select { + case buf := <-cmsg: + dec := output.NewByteDecoder(buf) + if dec == nil { + t.Fatal("dec is nil") + } + + for { + ret, timestamp, _ := output.GetRecord(dec) + if ret == -1 { + break + } + if ret < 0 { + t.Fatalf("ret is negative: %d", ret) + } + + msgs++ + + ts, ok := timestamp.(output.FLBTime) + if !ok { + t.Fatal() + } + + if time.Since(ts.Time) > time.Millisecond*5 { + t.Errorf("latency too high: %fms", + float64(time.Since(ts.Time)/time.Millisecond)) + } + } + case <-timeout.C: + timeout.Stop() + runCancel() + + if msgs < 128 { + t.Fatalf("too few messages: %d", msgs) + } + return + } + } +} + type testInputCallbackInfiniteConcurrent struct{} var concurrentWait sync.WaitGroup @@ -182,6 +278,8 @@ func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Flu } func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { + defer flbPluginReset() + for i := 0; i < 64; i++ { go func(ch chan<- Message, id int) { ch <- Message{ diff --git a/output/decoder.go b/output/decoder.go index d1b438f..aa9e508 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -69,6 +69,17 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder { return dec } +func NewByteDecoder(b []byte) *FLBDecoder { + dec := new(FLBDecoder) + dec.handle = new(codec.MsgpackHandle) + // TODO: handle error. + _ = dec.handle.SetBytesExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{}) + + dec.mpdec = codec.NewDecoderBytes(b, dec.handle) + + return dec +} + func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) { var check error var m interface{}