-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator_test.go
96 lines (72 loc) · 1.33 KB
/
aggregator_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package aggregator
import (
"testing"
"time"
"sync/atomic"
)
//test stop
func TestStopAggregator(t *testing.T){
ag := NewAggregator(2, 10, 20)
ag.SetErrorHandler(func(msg string) {
t.Log(msg)
})
ag.StopHandle()
if !ag.IsStopped(){
t.Error("stop failed")
}
}
//test work
func TestWork(t *testing.T){
type Iv struct{
j int
}
var addCount uint32 = 0
ag := NewAggregator(2, 10, 20)
ag.SetErrorHandler(func(msg string) {
t.Log(msg)
})
ag.SetTickFrequency(time.Millisecond * 100)
ag.SetHandler(func(v []interface{})error{
var list []*Iv
for _,v1 := range v{
var v2 *Iv
var ok bool
v2,ok = v1.(*Iv)
if !ok {
continue
}
list = append(list, v2)
atomic.AddUint32(&addCount,1)
}
t.Logf("%+v", list)
var list2 []int
for _,v1 := range list {
list2 = append(list2, v1.j)
}
t.Logf("%+v", list2)
return nil
})
ag.Start()
addTimes := 100
for i := 0; i < addTimes;i++{
var iv Iv
iv.j = i
go func( iv *Iv){
ag.Receive(iv)
time.Sleep(time.Millisecond * 5)
}(&iv)
}
time.Sleep(time.Second * 1)
ag.Stop()
ag.Receive("test")
if uint32(addTimes) != addCount {
t.Fatalf("addTimes:%d != addCount:%d\n", addTimes, addCount)
}
//do left
for i:=0; i< addTimes;i++ {
var iv Iv
iv.j = i
ag.aggregatorData = append(ag.aggregatorData, &iv)
}
ag.handleLeft()
}