diff --git a/cshared_test.go b/cshared_test.go index 76f2fe7..9f46343 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -40,10 +40,17 @@ func TestInputCallbackCtrlC(t *testing.T) { ptr := unsafe.Pointer(nil) - go func() { + go func(test *testing.T) { + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + test.Fail() + return + } + FLBPluginInputCallback(&ptr, nil) cdone <- true - }() + }(t) select { case <-cdone: @@ -86,7 +93,13 @@ func TestInputCallbackDangle(t *testing.T) { cdone := make(chan bool) ptr := unsafe.Pointer(nil) - go func() { + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + } + + go func(test *testing.T) { t := time.NewTicker(collectInterval) defer t.Stop() @@ -99,7 +112,7 @@ func TestInputCallbackDangle(t *testing.T) { return } } - }() + }(t) timeout := time.NewTimer(5 * time.Second) @@ -156,10 +169,17 @@ func TestInputCallbackInfinite(t *testing.T) { cshutdown := make(chan bool) ptr := unsafe.Pointer(nil) - go func() { + go func(test *testing.T) { t := time.NewTicker(collectInterval) defer t.Stop() + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + test.Fail() + return + } + for { select { case <-t.C: @@ -172,7 +192,7 @@ func TestInputCallbackInfinite(t *testing.T) { return } } - }() + }(t) timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() @@ -237,10 +257,17 @@ func TestInputCallbackLatency(t *testing.T) { cstarted := make(chan bool) cmsg := make(chan []byte) - go func() { + go func(test *testing.T) { t := time.NewTicker(collectInterval) defer t.Stop() + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + test.Fail() + return + } + buf, _ := testFLBPluginInputCallback() if len(buf) > 0 { cmsg <- buf @@ -259,7 +286,7 @@ func TestInputCallbackLatency(t *testing.T) { } } } - }() + }(t) <-cstarted fmt.Println("---- started") @@ -363,6 +390,12 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { concurrentWait.Add(64) + // prepare channel for input explicitly. + err := prepareInputCollector() + if err != nil { + t.Fail() + } + go func(cstarted chan bool) { ticker := time.NewTicker(time.Second * 1) defer ticker.Stop()