-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch.py
executable file
·127 lines (114 loc) · 4.13 KB
/
batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import sys
if not (sys.version_info[0] >= 3 and sys.version_info[1] >= 7):
raise Exception('Requires at least Python 3.7 to use this script.')
import argparse
import concurrent.futures
import inspect
import subprocess
import time
CMD = None
def single_job(run, cmd):
global CMD
log_path = f'./logs/run-{run:04d}.log'
cmd = cmd.replace('RUN', f'{run:04d}')
cmd += f' |& tee {log_path}'
print(cmd, flush=True)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
shell=True,
executable='/bin/bash',
)
if result.stderr != '':
print(f'Error in run-{run:04d}: {result.stderr}', flush=True)
return f'run-{run:04d}: ' + result.stdout.split('\n')[-2] # last line
def main():
args = get_arguments()
n_runs = len(args.runs)
n_runs_done = 0
with concurrent.futures.ProcessPoolExecutor(max_workers=args.cores) as executor:
jobs = []
for run in args.runs:
print('Submitting job: ', end='', flush=True)
jobs.append(executor.submit(single_job, run, args.cmd))
time.sleep(2.0)
print('Done all submissions. Waiting for jobs to finish...', flush=True)
for job in concurrent.futures.as_completed(jobs):
n_runs_done += 1
print(f' [n_jobs: {n_runs_done}/{n_runs}] ' + job.result())
def get_arguments():
parser = argparse.ArgumentParser(
description='Running executable in parallel.',
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
'cmd',
type=str,
help=inspect.cleandoc(f'''
The full command to execute in parallel. Use "RUN" (case-sensitive)
as a placeholder for the run number. The run number will be replaced
using the format of "%%04d". For example,
> ./batch.py "./calibrate.exe -r RUN -o ./out_dir/run-RUN.root -n 1200" 8-10
Logging will automatically be exported to "./logs/run-RUN.log".
'''),
)
parser.add_argument(
'runs',
nargs='+',
help=inspect.cleandoc(f'''
Runs to execute in parallel.
Consecutive runs can be specified in ranges separated by the
character "-". Here is an example:
> ./batch.py "calibrate.exe -r RUN" 8-10 11 20 2-5
This will calibrate the runs 8, 9, 10, 11, 20, 2, 3, 4, 5.
'''),
)
parser.add_argument(
'-c', '--cores',
default=4,
type=int,
help=inspect.cleandoc(f'''
Number of cores to use. Please do not use more than 8 cores, as this
might cause issues for other users. By default, the number of cores
is 4.
'''),
)
parser.add_argument(
'--good-runs-only',
action='store_true',
help=inspect.cleandoc(f'''
If set, only runs that are marked as "good" in the run database will
run. This option can be only be run when e15190 conda environment is
active.
'''),
)
args = parser.parse_args()
# process the runs
runs = []
for run_str in args.runs:
run_range = [int(run) for run in run_str.split('-')]
if len(run_range) == 1:
runs.append(run_range[0])
elif len(run_range) == 2:
runs.extend(range(run_range[0], run_range[1] + 1))
else:
raise ValueError(f'Unrecognized input: {run_str}')
if args.good_runs_only:
with open('../database/runlog/good_runs.dat', 'r') as file:
good_runs = [int(line) for line in file.readlines() if len(line) > 0]
runs = [run for run in runs if run in good_runs]
args.runs = runs
# warn if cores too many
if args.cores > 8:
print('WARNING: Using too many cores might cause issues for other users.')
response = input('Are you sure you want to continue? [Y/n]')
if response != 'Y':
exit(1)
# print out message
print(f'Running calibrate.exe in parallel ({args.cores} cores) on runs:')
print(f'\t{args.runs}')
return args
if __name__ == '__main__':
main()