-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_test.go
124 lines (107 loc) · 2.99 KB
/
load_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package pipelines_test
import (
"sync/atomic"
"testing"
"github.com/mdw-go/pipelines/v2"
)
func TestLoad(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
var group1 []pipelines.Station
for range 1024 {
group1 = append(group1, NewLoadTestStation())
}
var group2 []pipelines.Station
for range 8 {
group2 = append(group2, NewLoadTestStation())
}
const totalItems = 10_000_000
input := make(chan any)
go func() {
defer close(input)
for range totalItems {
input <- struct{}{}
}
}()
station3 := NewLoadTestStation()
station4 := NewLoadTestFinalStation(t, totalItems)
listener := pipelines.New(input,
pipelines.Options.Logger(&TLogger{T: t}),
pipelines.Options.StationGroup(
pipelines.GroupOptions.Stations(group1...),
pipelines.GroupOptions.SendViaSelect(station4.backdoor),
),
pipelines.Options.StationGroup(
pipelines.GroupOptions.Stations(group2...),
pipelines.GroupOptions.BufferedOutput(1000),
),
pipelines.Options.StationGroup(
pipelines.GroupOptions.Stations(station3),
pipelines.GroupOptions.BufferedOutput(1000),
),
pipelines.Options.StationGroup(
pipelines.GroupOptions.Stations(station4),
),
)
listener.Listen()
for _, station := range append(group1, group2...) {
if station.(*LoadTestStation).count == 0 {
t.Error("a fanned-out station handled 0 items")
}
}
}
/////////////////////////////
type LoadTestStation struct {
count int
}
func NewLoadTestStation() *LoadTestStation {
return &LoadTestStation{}
}
func (this *LoadTestStation) Do(input any, output func(any)) {
this.count++
output(input)
}
//////////////////////////////////
type LoadTestFinalStation struct {
t *testing.T
backdoorCount *atomic.Int64
processedCount *atomic.Int64
expectedCount int64
}
func NewLoadTestFinalStation(t *testing.T, expectedCount int) *LoadTestFinalStation {
return &LoadTestFinalStation{
t: t,
backdoorCount: new(atomic.Int64),
processedCount: new(atomic.Int64),
expectedCount: int64(expectedCount),
}
}
func (this *LoadTestFinalStation) Do(input any, output func(any)) {
actual := this.processedCount.Add(1)
this.progress(actual)
}
func (this *LoadTestFinalStation) Finalize(_ func(any)) {
processed := this.processedCount.Load()
backdoor := this.backdoorCount.Load()
this.t.Logf("Station finished after processing %d items (%d items were discarded)", processed, backdoor)
if processed+backdoor != this.expectedCount {
this.t.Logf("expected %d total items, got %d", this.expectedCount, processed+backdoor)
}
}
func (this *LoadTestFinalStation) backdoor(any) {
_ = this.backdoorCount.Add(1)
}
func (this *LoadTestFinalStation) progress(actual int64) {
backdoor := this.backdoorCount.Load()
if (actual+backdoor)%100_000 == 0 {
this.t.Logf("progress: %d/%d (%%%d) backdoor: %d/%d (%%%d)",
actual,
this.expectedCount,
int(float64(actual)/float64(this.expectedCount)*100),
backdoor,
this.expectedCount,
int(float64(backdoor)/float64(this.expectedCount)*100),
)
}
}