-
Notifications
You must be signed in to change notification settings - Fork 12
/
scheduler.py
85 lines (73 loc) · 3.25 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import dataLoader, gpuCalc, dataSaver
from multiprocessing import Process, Pipe, active_children
from time import sleep, time
"""
scheduler.py
Starts and manages processes which load data, do raster calculations on GPU,
and save data back to disk.
copyright : (C) 2016 by Alex Feurst, Charles Kazer, William Hoffman
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
#NOTE: USAGE: scheduler.py input output_1 func_1 output_2 func_2 ... output_n func_n
def run(inputFile, outputFiles, functions, disk_rows = 15):
start = time()
# create input and output pipes
inputPipe = Pipe()
outputPipes = []
for i in range(len(outputFiles)):
outputPipes.append(Pipe())
loader = dataLoader.dataLoader(inputFile, inputPipe[0], disk_rows)
loader.start()
header = loader.getHeaderInfo()
calc = gpuCalc.GPUCalculator(header, inputPipe[1], map((lambda x: x[0]), outputPipes), functions)
calc.start()
savers = []
for i in range(len(outputFiles)):
savers.append(dataSaver.dataSaver(outputFiles[i], header, outputPipes[i][1], disk_rows))
# start all threads
for i in range(len(outputFiles)):
savers[i].start()
# join all threads
try:
while active_children():
if loader.exitcode != None and loader.exitcode != 0:
print "Error encountered in data loader, ending tasks"
calc.stop()
for saver in savers:
saver.stop()
break
if calc.exitcode != None and calc.exitcode != 0:
loader.stop()
for saver in savers:
saver.stop()
print "Error encountered in GPU calculater, ending tasks"
break
sleep(1)
total = time() - start
print "Total time: %d mins, %f secs" % (total / 60, total % 60)
except: # if anything crashes stop the rest of threads
if loader.exitcode != None:
loader.stop()
if calc.exitcode != None:
calc.stop()
for saver in savers:
if saver.exitcode != None:
saver.stop()
if __name__ == '__main__':
#If run from the command line, parse arguments.
from sys import argv
outFiles = []
funcs = []
disk_rows = 15 # ~15 appears to be optimal number of rows to read at a time for any file
for i in range(2,len(argv), 2):
outFiles.append(argv[i])
funcs.append(argv[i+1].lower())
run(argv[1], outFiles, funcs, disk_rows)