-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpool_test.go
129 lines (110 loc) · 3.37 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pipeline
import (
"context"
"testing"
"time"
)
func TestFixedWorkerPool(t *testing.T) {
num := 10
syncCh := make(chan struct{})
rendezvousCh := make(chan struct{})
task := TaskFunc(func(_ context.Context, _ Data, _ TaskParams) (Data, error) {
// Signal that we have reached the sync point and wait for the
// green light to proceed by the test code
syncCh <- struct{}{}
<-rendezvousCh
return nil, nil
})
src := &sourceStub{data: stringDataValues(num)}
p := NewPipeline(FixedPool("", task, num))
doneCh := make(chan struct{})
go func() {
if err := p.Execute(context.TODO(), src, nil); err != nil {
t.Errorf("Error executing the Pipeline: %v", err)
}
close(doneCh)
}()
// Wait for all workers to reach sync point. This means that each input
// from the source is currently handled by a worker in parallel
for i := 0; i < num; i++ {
select {
case <-syncCh:
case <-time.After(10 * time.Second):
t.Errorf("timed out waiting for worker %d to reach sync point", i)
}
}
// Allow workers to proceed and wait for the pipeline to complete
close(rendezvousCh)
select {
case <-doneCh:
case <-time.After(10 * time.Second):
t.Errorf("timed out waiting for pipeline to complete")
}
}
func TestDynamicWorkerPool(t *testing.T) {
num := 5
syncCh := make(chan struct{}, num)
rendezvousCh := make(chan struct{})
task := TaskFunc(func(_ context.Context, _ Data, _ TaskParams) (Data, error) {
// Signal that we have reached the sync point and wait for the
// green light to proceed by the test code
syncCh <- struct{}{}
<-rendezvousCh
return nil, nil
})
src := &sourceStub{data: stringDataValues(num * 2)}
p := NewPipeline(DynamicPool("", task, num))
doneCh := make(chan struct{})
go func() {
if err := p.Execute(context.TODO(), src, nil); err != nil {
t.Errorf("Error executing the Pipeline: %v", err)
}
close(doneCh)
}()
// Wait for all workers to reach sync point. This means that the pool
// has scaled up to the max number of workers
for i := 0; i < num; i++ {
select {
case <-syncCh:
case <-time.After(10 * time.Second):
t.Errorf("timed out waiting for worker %d to reach sync point", i)
}
}
// Allow workers to proceed and process the next batch of records
close(rendezvousCh)
select {
case <-doneCh:
case <-time.After(10 * time.Second):
t.Errorf("timed out waiting for pipeline to complete")
}
}
func BenchmarkFixedPool(b *testing.B) {
for i := 0; i < b.N; i++ {
p := NewPipeline(FixedPool("", makePassthroughTask(), 1))
src := &sourceStub{data: []Data{&stringData{val: "benchmark"}}}
_ = p.Execute(context.TODO(), src, new(sinkStub))
}
}
func BenchmarkFixedPoolDataElements(b *testing.B) {
sink := new(sinkStub)
src := &sourceStub{data: stringDataValues(b.N)}
p := NewPipeline(FixedPool("", makePassthroughTask(), 100))
b.StartTimer()
_ = p.Execute(context.TODO(), src, sink)
b.StopTimer()
}
func BenchmarkDynamicPool(b *testing.B) {
for i := 0; i < b.N; i++ {
p := NewPipeline(DynamicPool("", makePassthroughTask(), 1))
src := &sourceStub{data: []Data{&stringData{val: "benchmark"}}}
_ = p.Execute(context.TODO(), src, new(sinkStub))
}
}
func BenchmarkDynamicPoolDataElements(b *testing.B) {
sink := new(sinkStub)
src := &sourceStub{data: stringDataValues(b.N)}
p := NewPipeline(DynamicPool("", makePassthroughTask(), 1000))
b.StartTimer()
_ = p.Execute(context.TODO(), src, sink)
b.StopTimer()
}