-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
145 lines (135 loc) · 3.71 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"fmt"
"image"
"math"
"sync"
"time"
)
// Ranges specifies the loop range that a particular worker
// will operate on. For example, if we wish to compute a julia set function and increase its cReal
// until it is 0.25 greater than its initial value (so as to animate the set),
// and we increment the cReal each time by some constant value such as 0.1 (to control the speed of the animation),
// and we use 5 workers, the first Ranges struct would look like,
// {
// beginRange: 0.
// endRange: 0.5
// increment: 0.1
// }
// and the worker would in turn produce 5 images.
type Ranges struct {
beginRange float64
endRange float64
increment float64
}
// CameraModifiers controls camera panning across the X and Y axis,
// as well as camera zoom.
type CameraModifiers struct {
zoom float64
moveX float64
moveY float64
}
// ImageProperties defines the size of the julia set image,
// which directly relates to the size of the
// matrices used within the computation.
type ImageProperties struct {
imageHeight int
imageWidth int
}
// InitialCondition specifies the cReal and cImaginary values
// that should be used.
type InitialCondition struct {
cReal float64
cImaginary float64
}
// Worker represents a julia set worker, capable of producing images
type Worker struct {
ID int
InitialCondition
Ranges
ImageProperties
CameraModifiers
imageNumberIndex int
index int // use zero value
amount int
ffmpegChan chan ImageInput
}
// WorkerPoolConstructor is a utility type for creating worker pools which
// have been properly initialized.
type WorkerPoolConstructor struct {
WorkerCount int
endRange float64
increment float64
InitialCondition
ImageProperties
CameraModifiers
}
func (w WorkerPoolConstructor) CreateWorkerPool(ffmpeg ...*ImageProcessor) []Worker {
var workers []Worker
totalItems := math.Ceil(w.endRange / w.increment)
workItemsPerWorker := int(totalItems) / w.WorkerCount
if len(ffmpeg) == 1 {
ffmpeg[0].ExpectedReceives = int(totalItems)
go ffmpeg[0].processVideo()
}
for i := 0; i < w.WorkerCount; i++ {
workerBeginOffset := float64(i) * (float64(workItemsPerWorker))
workerEndOffset := float64(i+1) * (float64(workItemsPerWorker))
worker := Worker{
ID: i,
InitialCondition: w.InitialCondition,
Ranges: Ranges{
beginRange: workerBeginOffset * w.increment,
endRange: workerEndOffset * w.increment,
increment: w.increment,
},
ImageProperties: w.ImageProperties,
CameraModifiers: w.CameraModifiers,
imageNumberIndex: int(workerBeginOffset),
index: 0,
amount: workItemsPerWorker,
}
if len(ffmpeg) == 1 {
worker.ffmpegChan = ffmpeg[0].Input
}
workers = append(workers, worker)
}
return workers
}
func StartWorkers(workers []Worker) {
start := time.Now()
fmt.Println("Starting Workers")
wg := &sync.WaitGroup{}
for _, w := range workers {
go func(w Worker, group *sync.WaitGroup) {
wg.Add(1)
w.CreateJuliaSetImage()
wg.Done()
}(w, wg)
}
time.Sleep(500 * time.Millisecond)
wg.Wait()
fmt.Println("Done. Total runtime = ", time.Since(start))
}
func (w *Worker) CreateJuliaSetImage() {
n := time.Now()
k := 0
for i := w.beginRange; i < w.endRange; i = i + w.increment {
fmt.Println("Worker ", w.ID, ": ", w.index, "/", w.amount, " start")
CreateJuliaSet(
image.NewRGBA(image.Rect(0, 0, w.imageWidth, w.imageHeight)),
w.cReal+i, // real constant
w.cImaginary,
w.zoom,
w.moveX,
w.moveY,
w.imageNumberIndex+k,
GetImageFilePath(w.imageNumberIndex+k),
w.ffmpegChan,
)
w.index++
k++
fmt.Println("Worker ", w.ID, ": ", w.index, "/", w.amount, " complete")
}
fmt.Println("ID: ", w.ID, " Total Runtime: ", time.Since(n), " DONE")
}