-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflock_multi
executable file
·206 lines (182 loc) · 5.15 KB
/
flock_multi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
#
# Author: Martin Langhoff <[email protected]>
# License: GPLv2
#
from __future__ import with_statement
import os, sys
import traceback
import getopt
import fcntl
import re
import random
import subprocess
import time
def help():
return "Usage:\n" \
" flock_multi [-h] [-v] [-E 200] [-T 201] [-s 20] [-w 60m ] heavy 4 heavyscript \n" \
"Notes: \n" \
" -w accepts m and h suffixes\n"
def arg_to_secs(a):
if isinstance(a, (int, long)) or re.match('\d+$', a):
return int(a)
m = re.match('(\d+)m$', a)
if m:
return (int(m.group(1)) * 60)
m = re.match('(\d+)h$', a)
if m:
return (int(m.group(1)) * 60 * 60)
sys.stderr.write("ERROR: timeout parameter not an integer!\n")
sys.exit(flockerr)
def maybe_timeout(timeout, exitcode):
if timeout > 0:
if timeout < time.time():
sys.stderr.write("ERROR: flock_multi timeout\n")
sys.exit(exitcode)
def maybe_remove_qmonfile():
global qmonfname
if qmonfname and os.path.exists(qmonfname):
try:
os.remove(qmonfname)
except:
# soft error
traceback.print_exc(file=sys.stderr)
def main():
# vars overriden from env
confdir = '/mnt/cluster/conf/lock'
if 'FLOCK_MULTI_CONF_DIR' in os.environ:
confdir = os.environ['FLOCK_MULTI_CONF_DIR']
lockdir = '/mnt/cluster/lock'
if 'FLOCK_MULTI_DIR' in os.environ:
lockdir = os.environ['FLOCK_MULTI_DIR']
qmondir= os.path.join(lockdir,'queuemonitor')
if 'FLOCK_MULTI_QMON_DIR' in os.environ:
qmondir = os.environ['FLOCK_MULTI_QMON_DIR']
try:
long_opts = ["help", "verbose", "queuemonitor", "qmon", "conflict-exit-code=",
"timeout-exit-code=", "sleeptime=", "wait=", "timeout="]
opts, args = getopt.getopt(sys.argv[1:], "hvQE:T:s:w:", long_opts)
except getopt.GetoptError, e:
sys.stderr.write("ERROR: Invalid parameter: %s\n" % e[0])
sys.stderr.write(help())
sys.exit(200)
if len(args) < 3:
sys.stderr.write("ERROR: At least 3 parameters needed.\n")
sys.stderr.write(help())
sys.exit(200)
verbose = False
qmon = False
flockerr = 200
timeouterr= 201
sleeptime = 60
timeout = 0
for o, a in opts:
if o in ("-v", "--verbose") :
verbose = True
elif o in ("-Q", "--queuemonitor", "--qmon") :
qmon = True
elif o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-E", "--conflict-exit-code"):
flockerr = int(a)
elif o in ("-T", "--timeout-exit-code"):
timeouterr = int(a)
elif o in ("-s", "--sleeptime"):
sleeptime = int(a)
elif o in ("-w", "--wait", "--timeout"):
timeout = float(arg_to_secs(a)) + time.time()
else:
assert False, "unhandled option %s" % o
# argument params
lockname = args.pop(0)
maxlocks = int(args.pop(0))
cmd = args
conffile = os.path.join(confdir, lockname)
try:
if os.path.exists(conffile):
tmpval = int(open(conffile).read())
maxlocks= tmpval
except:
sys.stderr.write("WARNING: Ignoring invalid value in %s\n" % conffile)
if verbose:
print "Using %s maxlocks" % maxlocks
mypid = os.getpid()
hostname = os.uname()[1]
gotlock = False
cmdexit = 0
# cast to have better splay
sleeptime = float(sleeptime)
global qmonfname
if qmon:
try:
qmonfname = os.path.join(qmondir, '%s:%s:%s' % (lockname, hostname, mypid))
open(qmonfname, 'w').close() # "touch"
if verbose:
print "qmonfile %s" % qmonfname
except:
# soft error
traceback.print_exc(file=sys.stderr)
while True:
locks = range(1, maxlocks+1)
if len(locks) > 1:
random.shuffle(locks)
for trylock in locks:
# print "trying %s" %trylock
trylockfn = os.path.join(lockdir, lockname + '.%s' % trylock)
# we open for "append", and only move to truncate the
# file if we succeed in getting the flock
with open(trylockfn, 'a') as fh:
try:
fcntl.flock(fh, fcntl.LOCK_EX|fcntl.LOCK_NB)
fh.seek(0)
fh.truncate(0)
fh.flush()
gotlock = True
if verbose:
print "Got %s" % trylockfn
t = time.gmtime()
fh.write("%s PID: %s epoch: %s %s\n" %
(hostname, mypid, time.time(), time.strftime("%a, %d %b %Y %H:%M:%S +0000", t)))
fh.write(' '.join(cmd))
fh.write("\n")
fh.flush()
maybe_remove_qmonfile()
# execute the command requested
cmdexit = subprocess.call(cmd)
except IOError: # failed to get lock, nonfatal
pass
except OSError as e:
if e.errno == 2:
sys.stderr.write("ERROR: No such file or directory: %s\n" % cmd[0])
sys.exit(flockerr)
else:
raise
finally:
# runs on _all_ exceptions - IOError, OSError and KeyboardInterrupt
# truncate flock'd file on completion
if gotlock:
fh.seek(0)
fh.truncate(0)
fh.flush()
if gotlock:
sys.exit(cmdexit)
# all locks taken
maybe_timeout(timeout, timeouterr)
splay = sleeptime / 10
actual_sleep = sleeptime + random.uniform(0 - splay, splay)
if verbose:
print "Tried all locks - sleeping %s" % actual_sleep
time.sleep(actual_sleep)
maybe_timeout(timeout, timeouterr)
if __name__ == '__main__':
qmonfname = None
try:
main()
except KeyboardInterrupt: # user hit control-C
sys.exit(130)
except Exception: # all "interesting" exceptions, but not SystemExit
traceback.print_exc(file=sys.stdout)
exit(200)
finally:
maybe_remove_qmonfile()