-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuv.py
198 lines (178 loc) · 6.36 KB
/
uv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from optparse import OptionParser, make_option
import os, pickle, platform, random, socket, sys, time, traceback, zlib
try:
import psyco
psyco.full()
except: pass
uv_ver = 1
class struct: pass
dbg_lvl = 3
def dbg_out(lvl, *s):
global dbg_lvl
if lvl <= dbg_lvl: print ''.join([str(x) for x in s])
def dmp(*s): dbg_out(5, *s)
def dbg(*s): dbg_out(4, *s)
def info(*s): dbg_out(3, *s)
def wrn(*s): dbg_out(2, *s)
def err(*s): dbg_out(1, *s)
def so_read_line(so):
s = ''
try:
t = so.recv(1)
while (t and t != '\n'):
s = s + t
t = so.recv(1)
except:
return ''
return s
def so_read_block(so):
# read length
s = so_read_line(so)
try:
lblk = long(s)
except:
lblk = 0
# if nothing to read, quit
if lblk == 0: return ''
# read payload
blk = []
blk_read = 0
while blk_read < lblk:
part = so.recv(min(lblk - blk_read, 4096))
if len(part) == 0: raise Exception, "Socket dead"
blk.append(part)
blk_read = blk_read + len(part)
blk = ''.join(blk)
s = zlib.decompress(blk)
dmp('=>', len(s), '\n', s)
return s
def so_read_task(so):
s = so_read_block(so)
task_info, task = pickle.loads(s)
return task_info, task
def so_write_block(so, r):
dmp('<=', len(r), r)
s = zlib.compress(r)
t = ''.join((str(len(s)), '\n', s))
so.sendall(t)
def so_write_task(so, s):
task_info, s = s
s = pickle.dumps((task_info, s))
so_write_block(so, s)
REGISTRY_SAVE_INTERVAL = 600
REGISTRY_FILE_NAME = 'uv.reg'
REG_UV_ID = 'uv.id'
# XXX for now just assume pwd is the right place for the registry file
def reg_save(registry):
try:
pickle.dump(registry, open(REGISTRY_FILE_NAME, 'w'))
except Exception, inst:
err("Failed to save registry file '%s': %s\nRegistry dump follows:" % (REGISTRY_FILE_NAME, inst))
try:
err(pickle.dumps(registry))
except Exception, inst:
err("Failed to dump registry: %s" % (inst))
def reg_load():
try:
return pickle.load(open(REGISTRY_FILE_NAME, 'r'))
except Exception, inst:
err("Failed to load registry file '%s': %s" % (REGISTRY_FILE_NAME, inst))
return {}
def cpus_nof_detect():
if platform.system() == 'Darwin': # Mac
return int(os.popen2("sysctl -n hw.ncpu")[1].read())
elif platform.system() == 'Windows': # Windows
if os.environ.has_key("NUMBER_OF_PROCESSORS"):
cpus_nof = int(os.environ["NUMBER_OF_PROCESSORS"]);
if cpus_nof > 0:
return cpus_nof
elif hasattr(os, "sysconf"): # Unix
if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
cpus_nof = int(os.sysconf("SC_NPROCESSORS_ONLN"))
if cpus_nof > 0:
return cpus_nof
# default to 1 if can't find anything useful
return 1
def uv_run(host, port, uv_registry):
job_globals = ''
last_time = 0
while 1:
try:
so = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
so.connect((host, port))
while 1:
if time.time() - last_time > REGISTRY_SAVE_INTERVAL:
reg_save(uv_registry)
last_time = time.time()
task_info, task = so_read_task(so)
task_base, task_globals, task_args = task_info
# only compile task code if it is given, otherwise
# use previous code
if task != '':
dbg('compiling new task code')
task_str = task[:] # keep a copy for reference if needed later
task_obj = compile(task, 'posdo', 'exec')
# execute payload
task_inst = struct()
exec(task_obj, task_inst.__dict__)
task_inst.__dict__.update({'uv_registry': uv_registry}) # registry
task_inst.__dict__.update({'uv_ver': uv_ver}) # to support upgrade
task_inst.__dict__.update({'so': so}) # XXX to support upgrade
# only evaluate globals if given
if task_globals != '':
dbg('assigning new globals')
dmp(task_globals)
job_globals = task_globals
task_inst.__dict__.update({'job_globals': job_globals})
dmp(task_args)
# Interpret zero length args as signal to exit
if len(task_args) == 0: break
task_results = []
for arg in task_args:
dmp('arg = ', arg)
result = task_inst.job_worker(arg)
task_results.append(result)
# return result
so_write_task(so, ((task_base, task_results), ''))
except Exception, inst:
err('Exception: %s' % (inst))
traceback.print_exc()
so.close()
# sleep a little before hammering the server
time.sleep(random.randint(1, 10))
if time.time() - last_time > REGISTRY_SAVE_INTERVAL:
reg_save(uv_registry)
last_time = time.time()
so.close()
def main():
option_list = [
make_option('-a', '--addr', type='string', default='localhost', dest='host_addr'),
make_option('-p', '--port', type='int', default=6666, dest='host_port'),
make_option('-c', '--cpus', type='int', default=None, dest='cpus_nof'),
]
parser = OptionParser(option_list=option_list)
(options, args) = parser.parse_args()
host = options.host_addr
port = options.host_port
# XXX registry is shared between all UVs on this machine. This will cause race
# conditions, especially considering the forking below.
registry = reg_load()
if not registry.has_key(REG_UV_ID):
random.seed(time.time())
registry[REG_UV_ID] = random.randint(0, 18446462598732840960L)
reg_save(registry)
# Get number of CPUs and launch as many UVs
cpus_nof = options.cpus_nof
if not cpus_nof:
cpus_nof = cpus_nof_detect()
info('Running %d instances' % (cpus_nof))
for i in range(cpus_nof-1):
pid = os.fork()
if pid == 0: break
try:
uv_run(host, port, registry)
except Exception, inst:
err('Exception: %s' % (inst))
traceback.print_exc()
reg_save(registry)
main()