-
Notifications
You must be signed in to change notification settings - Fork 1
/
framework_heppy.py
executable file
·250 lines (224 loc) · 9.41 KB
/
framework_heppy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Tai Sakuma <[email protected]>
import os
import sys
import logging
import gzip
try:
import cPickle as pickle
except:
import pickle
import alphatwirl
from .yes_no import query_yes_no
##__________________________________________________________________||
import logging
logger = logging.getLogger(__name__)
log_handler = logging.StreamHandler(stream=sys.stdout)
log_formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
##__________________________________________________________________||
from profile_func import profile_func
##__________________________________________________________________||
class FrameworkHeppy(object):
"""A simple framework for using alphatwirl
Args:
outdir (str): the output directory
datamc (str): 'data' or 'mc'
force (bool): overwrite the output if True
quiet (bool): don't show progress bars if True
parallel_mode (str): 'multiprocessing', 'subprocess', 'htcondor'
process (int): the number of processes for the 'multiprocessing' mode
user_modules (list of str): names of python modules to be copied for the 'subprocess' mode
max_events_per_dataset (int):
max_events_per_process (int):
profile (bool): run cProfile if True
profile_out_path (bool): path to store the result of the profile. stdout if None
"""
def __init__(self, outdir, heppydir,
datamc='mc',
force=False, quiet=False,
parallel_mode='multiprocessing',
htcondor_job_desc_extra=[ ],
process=8,
user_modules=set(),
max_events_per_dataset=-1, max_events_per_process=-1,
profile=False, profile_out_path=None
):
user_modules = set(user_modules)
user_modules.add('fwtwirl')
self.parallel = alphatwirl.parallel.build_parallel(
parallel_mode=parallel_mode,
quiet=quiet,
processes=process,
user_modules=user_modules,
htcondor_job_desc_extra=htcondor_job_desc_extra
)
self.outdir = outdir
self.heppydir = heppydir
self.datamc = datamc
self.force = force
self.max_events_per_dataset = max_events_per_dataset
self.max_events_per_process = max_events_per_process
self.profile = profile
self.profile_out_path = profile_out_path
self.parallel_mode = parallel_mode
def run(self, components,
reader_collector_pairs,
analyzerName='roctree',
fileName='tree.root',
treeName='tree'
):
self.parallel.begin()
try:
loop = self._configure(components, reader_collector_pairs, analyzerName, fileName, treeName)
self._run(loop)
except KeyboardInterrupt:
logger = logging.getLogger(__name__)
logger.warning('received KeyboardInterrupt')
if query_yes_no('terminate running jobs'):
logger.warning('terminating running jobs')
self.parallel.terminate()
else:
logger.warning('not terminating running jobs')
self.parallel.end()
def _configure(self, components, reader_collector_pairs, analyzerName, fileName, treeName):
component_readers = alphatwirl.datasetloop.DatasetReaderComposite()
# tbl_heppyresult.txt
tbl_heppyresult_path = os.path.join(self.outdir, 'tbl_heppyresult.txt')
if self.force or not os.path.exists(tbl_heppyresult_path):
# e.g., '74X/MC/20150810_MC/20150810_SingleMu'
heppydir_rel = '/'.join(self.heppydir.rstrip('/').split('/')[-4:])
alphatwirl.mkdir_p(os.path.dirname(tbl_heppyresult_path))
f = open(tbl_heppyresult_path, 'w')
f.write('heppyresult\n')
f.write(heppydir_rel + '\n')
f.close()
# tbl_tree.txt
tbl_tree_path = os.path.join(self.outdir, 'tbl_tree.txt')
if self.force or not os.path.exists(tbl_tree_path):
tblTree = alphatwirl.heppyresult.TblTree(
analyzerName=analyzerName,
fileName=fileName,
treeName=treeName,
outPath=tbl_tree_path,
)
component_readers.add(tblTree)
# tbl_branch.txt
tbl_branch_path = os.path.join(self.outdir, 'tbl_branch.txt')
if self.force or not os.path.exists(tbl_branch_path):
tblBranch = alphatwirl.heppyresult.TblBranch(
analyzerName=analyzerName,
fileName=fileName,
treeName=treeName,
outPath=tbl_branch_path,
)
component_readers.add(tblBranch)
# tbl_branch_size.tx
tbl_branch_size_path = os.path.join(self.outdir, 'tbl_branch_size.txt')
if self.force or not os.path.exists(tbl_branch_size_path):
tblBranchSize = alphatwirl.heppyresult.TblBranch(
analyzerName=analyzerName,
fileName=fileName,
treeName=treeName,
outPath=tbl_branch_size_path,
addType=False,
addSize=True,
sortBySize=True,
)
component_readers.add(tblBranchSize)
# tbl_branch_title.txt
tbl_branch_title_path = os.path.join(self.outdir, 'tbl_branch_title.txt')
if self.force or not os.path.exists(tbl_branch_title_path):
tblBranchTitle = alphatwirl.heppyresult.TblBranch(
analyzerName=analyzerName,
fileName=fileName,
treeName=treeName,
outPath=tbl_branch_title_path,
addType=False,
addSize=False,
addTitle=True,
)
component_readers.add(tblBranchTitle)
# tbl_dataset.txt
tbl_dataset_path = os.path.join(self.outdir, 'tbl_dataset.txt')
if self.force or not os.path.exists(tbl_dataset_path):
tblDataset = alphatwirl.heppyresult.TblComponentConfig(
outPath=tbl_dataset_path,
columnNames=('dataset', ),
keys=('dataset', ),
)
component_readers.add(tblDataset)
# tbl_xsec.txt for MC
if self.datamc == 'mc':
tbl_xsec_path = os.path.join(self.outdir, 'tbl_xsec.txt')
if self.force or not os.path.exists(tbl_xsec_path):
tblXsec = alphatwirl.heppyresult.TblComponentConfig(
outPath=tbl_xsec_path,
columnNames=('xsec', ),
keys=('xSection', ),
)
component_readers.add(tblXsec)
# tbl_nevt.txt for MC
if self.datamc == 'mc':
tbl_nevt_path = os.path.join(self.outdir, 'tbl_nevt.txt')
if self.force or not os.path.exists(tbl_nevt_path):
tblNevt = alphatwirl.heppyresult.TblCounter(
outPath=tbl_nevt_path,
columnNames=('nevt', 'nevt_sumw'),
analyzerName='skimAnalyzerCount',
fileName='SkimReport.txt',
levels=('All Events', 'Sum Weights')
)
component_readers.add(tblNevt)
# event loop
reader = alphatwirl.loop.ReaderComposite()
collector = alphatwirl.loop.CollectorComposite()
for r, c in reader_collector_pairs:
reader.add(r)
collector.add(c)
eventLoopRunner = alphatwirl.loop.MPEventLoopRunner(self.parallel.communicationChannel)
eventBuilderConfigMaker = alphatwirl.heppyresult.EventBuilderConfigMaker(
analyzerName=analyzerName,
fileName=fileName,
treeName=treeName,
)
datasetIntoEventBuildersSplitter = alphatwirl.loop.DatasetIntoEventBuildersSplitter(
EventBuilder=alphatwirl.roottree.BEventBuilder,
eventBuilderConfigMaker=eventBuilderConfigMaker,
maxEvents=self.max_events_per_dataset,
maxEventsPerRun=self.max_events_per_process
)
eventReader = alphatwirl.loop.EventsInDatasetReader(
eventLoopRunner=eventLoopRunner,
reader=reader,
collector=collector,
split_into_build_events=datasetIntoEventBuildersSplitter
)
component_readers.add(eventReader)
if components == ['all']: components = None
heppyResult = alphatwirl.heppyresult.HeppyResult(
path=self.heppydir,
componentNames=components,
componentHasTheseFiles=[analyzerName]
)
if self.parallel_mode in ('subprocess', 'htcondor'):
componentLoop = alphatwirl.datasetloop.ResumableDatasetLoop(
datasets=heppyResult.components(),
reader=component_readers,
workingarea=self.parallel.workingarea
)
else:
componentLoop= alphatwirl.datasetloop.DatasetLoop(
datasets=heppyResult.components(),
reader=component_readers
)
return componentLoop
def _run(self, componentLoop):
if not self.profile:
componentLoop()
else:
alphatwirl.misc.print_profile_func(
func=componentLoop,
profile_out_path=self.profile_out_path
)
##__________________________________________________________________||