-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrun_recipe.py
644 lines (499 loc) · 24.2 KB
/
run_recipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
#!/usr/bin/python3
import pprint
import sys
import os
import shutil
import argparse
import subprocess
import traceback
import threading
from typing import Callable
# ---
import logging
# ---
import yaml
from yaml import dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
import pandas as pd
import numexpr
import dask
import dask.distributed
from dask.distributed import Client, LocalCluster
from dask_jobqueue import SLURMCluster
# ---
from common.logging_facilities import logi, loge, logd, setup_logging_defaults, set_logging_level
from recipe import Recipe
import extractors
import transforms
import exporters
import plots
import yaml_helper
# ---
import tag_regular_expressions as tag_regex
# import debug helper for usage in function definitions
from common.debug import start_debug # noqa: F401recipe import Recipe
from utility.code import compile_and_evaluate_function_definition
_debug = False
def evaluate_function_definitions(recipe:Recipe):
r"""
Evaluate the functions defined in `recipe.function_definitions` and make the compiled code available in a copy of
the global environment of the interpreter.
A copy of the global environment is used so as to not pollute the global namespace itself. All the defined functions share the same copy.
Parameters
----------
recipe: Recipe
The recipe in which the functions are to be made available.
Returns
-------
A shallow copy of the global environment with the compiled functions as members, i.e. a dictionary with the
function names as key and the code as associated value.
"""
# Create a copy of the global environment for evaluating the extra code fragments so as to not pollute the global
# namespace itself.
#
# NOTE: This is a shallow copy and thus a rather simple method to prevent accidental overwrites, *not* a defense
# against deliberately malicious modifications.
global_env = globals().copy()
for function_name in recipe.function_definitions:
# Get the string with the source code.
function_code = recipe.function_definitions[function_name]
# Actually evaluate the code within the given namespace to allow
# access to all the defined symbols, such as helper functions that are not defined inline.
function, global_env = compile_and_evaluate_function_definition(function_code, function_name, global_env)
# Bind the function to the specified name.
recipe.function_definitions[function_name] = function
# Return the environment with the compiled functions.
return global_env
def eval_recipe_tag_definitions(recipe:Recipe
, attributes_regex_map, iterationvars_regex_map, parameters_regex_map
, function_definitions_global_env:dict
):
def eval_and_add_tags(tag_set_name, regex_map):
for tag_name in recipe.evaluation.tags[tag_set_name]:
# The `eval` is necessary here since the `transform` function of the tag
# can be an arbitrary function and has to be parsed into a`Callable`.
tag_list = eval(recipe.evaluation.tags[tag_set_name][tag_name], function_definitions_global_env) # pylint: disable=W0123:eval-used
# Check that the transform is indeed a `Callable` .
for tag in tag_list:
if not isinstance(tag['transform'], Callable):
raise RuntimeError(f'transform for {tag=} is not a Callable!')
regex_map[tag_name] = tag_list
if 'attributes' in recipe.evaluation.tags:
eval_and_add_tags('attributes', attributes_regex_map)
if 'iterationvars' in recipe.evaluation.tags:
eval_and_add_tags('iterationvars', iterationvars_regex_map)
if 'parameters' in recipe.evaluation.tags:
eval_and_add_tags('parameters', parameters_regex_map)
return attributes_regex_map, iterationvars_regex_map, parameters_regex_map
def prepare_evaluation_phase(recipe:Recipe, options, data_repo, function_definitions_global_env:dict):
logi(f'prepare_evaluation_phase: {recipe} {recipe.name}')
if hasattr(recipe.evaluation, 'tags'):
attributes_regex_map, iterationvars_regex_map, parameters_regex_map = eval_recipe_tag_definitions(recipe \
, tag_regex.attributes_regex_map, tag_regex.iterationvars_regex_map, tag_regex.parameters_regex_map
, function_definitions_global_env)
else:
attributes_regex_map, iterationvars_regex_map, parameters_regex_map = \
tag_regex.attributes_regex_map, tag_regex.iterationvars_regex_map, tag_regex.parameters_regex_map
if not hasattr(recipe.evaluation, 'extractors'):
logi('prepare_evaluation_phase: no `extractors` in recipe.Evaluation')
return data_repo, []
for extractor_tuple in recipe.evaluation.extractors:
extractor_name = list(extractor_tuple.keys())[0]
extractor = list(extractor_tuple.values())[0]
if options.run_tree and extractor_name not in options.run_tree['evaluation']['extractors']:
logi(f'skipping extractor {extractor_name}')
continue
if extractor_name in options.extraction_overrides:
extractor.input_files = [ options.extraction_overrides[extractor_name] ]
logi(f'overriding {extractor_name} with {extractor.input_files}')
extractor.set_tag_maps(attributes_regex_map, iterationvars_regex_map, parameters_regex_map)
delayed_data = extractor.prepare()
# print(f'{extractor=}')
# print(f'{delayed_data.memory_usage(deep=True) = }')
# print(f'-<-<-<-<-<-<-')
data_repo[extractor_name] = delayed_data
logi(f'added extractor {extractor_name}')
if not hasattr(recipe.evaluation, 'transforms'):
logi('prepare_evaluation_phase: no `transforms` in recipe.Evaluation')
else:
for transform_tuple in recipe.evaluation.transforms:
transform_name = list(transform_tuple.keys())[0]
transform = list(transform_tuple.values())[0]
if options.run_tree and transform_name not in options.run_tree['evaluation']['transforms']:
logi(f'skipping transform {transform_name}')
continue
logi(f'preparing transform {transform_name}')
transform.set_name(transform_name)
transform.set_data_repo(data_repo)
transform.prepare()
logi(f'added transform {transform_name}')
jobs = []
if recipe.evaluation.exporter is None:
logi('prepare_evaluation_phase: no `exporter` in recipe.Evaluation')
else:
for exporter_tuple in recipe.evaluation.exporter:
exporter_name = list(exporter_tuple.keys())[0]
exporter = list(exporter_tuple.values())[0]
if options.run_tree and exporter_name not in options.run_tree['evaluation']['exporter']:
logi(f'skipping exporter {exporter_name}')
continue
if exporter_name in options.export_overrides:
exporter.output_filename = options.export_overrides[exporter_name]
logi(f'overriding {exporter_name} with {exporter.output_filename}')
exporter.set_data_repo(data_repo)
job = exporter.prepare()
jobs.extend(job)
logi(f'added exporter {exporter_name}')
logi(f'{jobs=}')
if options.plot_task_graphs:
for i in range(0, len(jobs)):
graph_output_file = f'{options.tmpdir}/dask_task_graph_evaluation_job-{i}.png'
jobs[i].visualize(graph_output_file)
logi(f'saved the evaluation phase task graph for job {i}:{jobs[i]} to: {graph_output_file}')
return data_repo, jobs
def prepare_plotting_phase(recipe:Recipe, options, data_repo):
logi(f'prepare_plotting_phase: {recipe} {recipe.name}')
if not hasattr(recipe.plot, 'reader'):
logi('prepare_plotting_phase: no `reader` in recipe.Plot')
else:
for dataset_tuple in recipe.plot.reader:
dataset_name = list(dataset_tuple.keys())[0]
reader = list(dataset_tuple.values())[0]
if options.run_tree and dataset_name not in options.run_tree['plot']['reader']:
logi(f'skipping reader {dataset_name}')
continue
logi(f'plot: loading dataset: "{dataset_name=}"')
if dataset_name in options.reader_overrides:
reader.input_files = options.reader_overrides[dataset_name]
logi(f'plot: prepare_plotting_phase overriding input files for "{dataset_name}": "{reader.input_files=}"')
data = reader.prepare()
data_repo[dataset_name] = data
logi(f'added reader {dataset_name}')
logd('<<<-<-<--<-<-<--<-<-<')
logd(f'plot: {data_repo=}')
logd('<<<-<-<--<-<-<--<-<-<')
if not hasattr(recipe.plot, 'transforms'):
logi('prepare_plotting_phase: no `transforms` in recipe.Plot')
else:
for transform_tuple in recipe.plot.transforms:
transform_name = list(transform_tuple.keys())[0]
transform = list(transform_tuple.values())[0]
if options.run_tree and transform_name not in options.run_tree['plot']['transforms']:
logi(f'skipping transform {transform_name}')
continue
transform.set_name(transform_name)
transform.set_data_repo(data_repo)
transform.prepare()
logi(f'added transform {transform_name}')
jobs = []
for task_tuple in recipe.plot.tasks:
task_name = list(task_tuple.keys())[0]
task = list(task_tuple.values())[0]
if options.run_tree and task_name not in options.run_tree['plot']['tasks']:
logi(f'skipping task {task_name}')
continue
if task_name in options.plot_overrides:
task.output_file = options.plot_overrides[task_name]
logi(f'overriding {task_name} with {task.output_file}')
task.set_data_repo(data_repo)
logi(f'plot: preparing plotting task {task_name}')
job = task.prepare()
# logi(f'plot: {job=}')
jobs.append(job)
logi(f'added task {task_name}')
if options.plot_task_graphs:
for i in range(0, len(jobs)):
graph_output_file = f'{options.tmpdir}/dask_task_graph_plotting_job-{i}.png'
jobs[i].visualize(graph_output_file)
logi(f'saved the plot phase task graph for job {i}:{jobs[i]} to: {graph_output_file}')
return data_repo, jobs
def process_recipe(options):
f = open(options.recipe, mode='r')
recipe = yaml.unsafe_load(f.read())
if options.dump_recipe or options.dump_recipe_only:
output = dump(recipe, Dumper=Dumper)
terminal_size = shutil.get_terminal_size()
logd(pprint.pformat(output, width=terminal_size.columns))
if options.dump_recipe_only:
exit()
data_repo = {}
job_list = []
# Compile all the functions defined in `function_definitions` and make them available in a shallow copy
# of the runtime environment.
if hasattr(recipe, 'function_definitions'):
function_definitions_global_env = evaluate_function_definitions(recipe)
else:
# Or just use the default environment.
function_definitions_global_env = globals()
if not options.plot_only:
if not hasattr(recipe, 'evaluation'):
logi('process_recipe: no Evaluation in recipe')
return data_repo, []
data_repo, jobs = prepare_evaluation_phase(recipe, options, data_repo, function_definitions_global_env)
job_list.extend(jobs)
if options.eval_only:
return data_repo, job_list
if not hasattr(recipe, 'plot'):
logi('process_recipe: no Plot in recipe')
return data_repo, job_list
data_repo, jobs = prepare_plotting_phase(recipe, options, data_repo)
job_list.extend(jobs)
return data_repo, job_list
def extract_dict_from_string(string):
return dict([ token.strip(',').split(':') for token in string.strip('{}').split(',') ])
def parse_arguments(arguments):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('recipe', help='Input recipe')
parser.add_argument('--override-extractor', type=str, help='Override extractor parameters')
parser.add_argument('--override-exporter', type=str, help='Override exporter parameters')
parser.add_argument('--override-reader', type=str, help='Override reader parameters')
parser.add_argument('--override-plot', type=str, help='Override plot parameters')
parser.add_argument('--eval-only', action='store_true', default=False, help='Run evaluation phase only')
parser.add_argument('--plot-only', action='store_true', default=False, help='Run plot phase only')
parser.add_argument('--run', type=str, default='all', help='Run selected tasks only, in the format [evaluation_phase_tasks]:[plot_phase_tasks]'
' where each (optional) phase consists of a comma-separated list of qualified task names,'
' i.e. the name of the sub-phase and the task name'
' e.g. `extractors.e1,transforms.t1,exporter.e1:reader.r1,transforms.t1,tasks.plot1`'
)
parser.add_argument('--worker', type=int, default=4, help='The number of worker processes')
parser.add_argument('--mem', type=int, default=1, help='The memory, in GB, to limit each worker process to. This is only obeyed if the cluster is not already running.')
parser.add_argument('--cluster', type=str, help='The address of an already running cluster')
parser.add_argument('--single-threaded', action='store_true', default=False, help='Run in single-threaded mode; this overrides the value of the `--worker` flag')
parser.add_argument('--dashboard-port', type=int, default=8787, help='The port for the dashboard of the cluster. For the default localhost cluster')
parser.add_argument('--slurm', action='store_true', default=False, help='Use a SLURM cluster')
parser.add_argument('--partition', type=str, help='The partition for the SLURM cluster')
parser.add_argument('--nodelist', type=str, help='The nodelist for the SLURM cluster')
parser.add_argument('--tmpdir', type=str, default='/opt/tmpssd/tmp', help='The directory for temporary files')
parser.add_argument('--plot-task-graphs', action='store_true', default=False, help='Plot the evaluation and plotting phase task graph')
parser.add_argument('--verbose', '-v', action='count', default=0, help='Increase logging verbosity')
parser.add_argument('--dump-recipe', action='store_true', default=False, help='Dump the loaded recipe; useful for finding errors in the recipe')
parser.add_argument('--dump-recipe-only', action='store_true', default=False, help='Dump the loaded recipe and exit; useful for finding errors in the recipe')
parser.add_argument('--lint-recipe', action='store_true', default=False, help='Run yamllint over the specified recipe; useful for finding errors in the recipe')
parser.add_argument('--debug', action='store_true', default=False, help='Enable debug mode. If an exception is encountered, drop into an ipdb debugger session')
args = parser.parse_args(arguments)
if args.debug:
global _debug
_debug = True
if args.single_threaded:
args.worker = 1
if args.slurm:
if not args.nodelist:
raise Exception('A nodelist ist required when using SLURM.')
if not args.partition:
raise Exception('A partition ist required when using SLURM.')
def set_dict_arg_from_string(option, arg_name):
if option:
try:
option_dict = extract_dict_from_string(option)
except Exception as e:
loge(f'>>>> ERROR: extracting parameters for `{arg_name}` from `{option}` failed:\n>>>> {e}')
return
logd(f'{option_dict=}')
setattr(args, arg_name, option_dict)
else:
setattr(args, arg_name, dict())
set_dict_arg_from_string(args.override_extractor, 'extraction_overrides')
set_dict_arg_from_string(args.override_exporter, 'export_overrides')
set_dict_arg_from_string(args.override_reader, 'reader_overrides')
set_dict_arg_from_string(args.override_plot, 'plot_overrides')
def get_run_tree(option, phase_names):
d = dict()
d[phase_names[0]] = set()
d[phase_names[1]] = set()
d[phase_names[2]] = set()
for qualified_task_name in option.split(','):
if len(qualified_task_name) == 0:
continue
if len(r := qualified_task_name.split('.')) == 2:
top_level, task_name = r
else:
loge(f'>>>> ERROR: bad task name: {qualified_task_name}')
exit(1)
if top_level in d:
d[top_level] = d[top_level].union(set([task_name]))
else:
loge(f'>>>> ERROR: not a valid phase name: {top_level}')
exit(1)
return d
def process_run_tree(option):
eval_phases = ['extractors', 'transforms', 'exporter']
plot_phases = ['reader', 'transforms', 'tasks']
p = option.split(':')
if len(p) == 1 or (len(p) == 2 and len(p[1]) == 0):
# evaluation phase only
eval_str = p[0]
plot_str = ''
else:
eval_str = p[0]
plot_str = p[1]
run_tree = { 'evaluation': get_run_tree(eval_str, eval_phases)
, 'plot': get_run_tree(plot_str, plot_phases)
}
return run_tree
if args.run != 'all':
run_tree = process_run_tree(args.run)
else:
run_tree = None
setattr(args, 'run_tree', run_tree)
logd(f'{run_tree=}')
def map_verbosity_level_to_log_level(verbosity):
match verbosity:
case 0:
log_level = logging.WARNING
case 1:
log_level = logging.INFO
case 2:
log_level = logging.DEBUG
case 3:
log_level = logging.NOTSET
case _:
log_level = logging.NOTSET
return log_level
log_level = map_verbosity_level_to_log_level(args.verbose)
setattr(args, 'log_level', log_level)
return args
def setup_pandas():
# verbose printing of DataFrames
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
# print the numexpr thread pool config to the log
numexp_max_threads = os.getenv('NUMEXPR_MAX_THREADS')
numexp_num_threads = os.getenv('NUMEXPR_NUM_THREADS')
numexpr_threads_log_msg = f'thread_id={threading.get_native_id()}'
if numexp_max_threads:
numexpr_threads_log_msg += f' NUMEXPR_MAX_THREADS={numexp_max_threads}'
if numexp_num_threads:
numexpr_threads_log_msg += f' NUMEXPR_NUM_THREADS={numexp_num_threads}'
numexpr_threads_log_msg += f' {numexpr.ncores=} {numexpr.nthreads=} {numexpr.MAX_THREADS=}'
logd(numexpr_threads_log_msg)
class WorkerPlugin(dask.distributed.WorkerPlugin):
r"""
A dask worker plugin for setting defaults in the worker process
Parameters
----------
options : dict
The dictionary containing the configuration for the worker, usually the same as for the launcher
"""
def __init__(self, options, *args, **kwargs):
self.options = options
def setup(self, worker: dask.distributed.Worker):
# append the current path to the PYTHONPATH of the worker
sys.path.append('.')
setup_logging_defaults(level=self.options.log_level)
set_logging_level(self.options.log_level)
setup_pandas()
def setup_dask(options):
r"""
Setup and configure the dask cluster and its workers.
Parameters
----------
options : dict
The dictionary containing the configuration for the launcher
"""
plugin = WorkerPlugin(options)
dask.config.set({'distributed.scheduler.worker-ttl': None})
# single-threaded mode for debugging
if options.single_threaded:
logi('using local single-threaded process cluster')
dask.config.set(scheduler='synchronous')
# no client is returned, creating a client here leads to sqlite
# connections objects being transported between threads
return None
if options.slurm:
logi('using SLURM cluster')
cluster = SLURMCluster(cores = 1
, n_workers = options.worker
, memory = str(options.mem) + 'GB'
, job_extra_directives = [ f'--nodelist={options.nodelist} --partition={options.partition}' ]
, interface = 'lo'
, shared_temp_directory = options.tmpdir
)
client = Client(cluster)
client.register_worker_plugin(plugin)
return client
elif options.cluster:
if options.cluster == 'local':
logi('using local process cluster')
cluster = LocalCluster(n_workers=options.worker
, host='localhost'
# , interface='lo'
, memory_limit = str(options.mem) + 'GB'
, local_directory = options.tmpdir
)
cluster.scale(options.worker)
client = Client(cluster)
client.register_worker_plugin(plugin)
return client
else:
logi(f'using distributed cluster at {options.cluster}')
client = Client(options.cluster)
client.register_worker_plugin(plugin)
return client
else:
dashboard_address = f'localhost:{options.dashboard_port}'
logi(f'using local cluster with dashboard at {dashboard_address}')
client = Client(dashboard_address=dashboard_address
, n_workers=options.worker
, memory_limit = str(options.mem) + 'GB'
, local_directory = options.tmpdir
)
client.register_worker_plugin(plugin)
return client
def compute_graph(jobs):
r"""
Compute the task graph
Parameters
----------
jobs : List[dask.Delayed]
The list of jobs/tasks to compute
"""
logi('=-!!'*40)
logi('recombobulating splines...')
logi(f'compute_graph: {jobs=}')
result = dask.compute(*jobs)
logi('=-!!'*40)
return result
def main():
setup_logging_defaults(logging.WARNING)
options = parse_arguments(sys.argv[1:])
# setup logging level again
set_logging_level(options.log_level)
logd(f'{options=}')
if options.lint_recipe:
# Just run yamllint in a subprocess for now. This also provides a nicely colored output.
subprocess.run(['yamllint', options.recipe])
exit()
setup_pandas()
client = setup_dask(options)
# This is needed for proper pickling of DataFrames to JSON.
exporters.register_jsonpickle_handlers()
# register constructors for all YAML objects
yaml_helper.register_constructors()
extractors.register_constructors()
transforms.register_constructors()
exporters.register_constructors()
plots.register_constructors()
data_repo, job_list = process_recipe(options)
if len(job_list) == 0:
loge('No tasks to run')
return
# now actually compute the constructed computation graph
result = compute_graph(job_list)
# ...
return
if __name__=='__main__':
try:
main()
except Exception as e:
loge(f'{e=}')
loge(''.join(traceback.format_exception(e)))
if sys.stdin.isatty() and _debug:
loge('dropping into an interactive debugging environment')
# drop into the debugger in the context of the exception thrown
import ipdb
ipdb.post_mortem(e.__traceback__)
else:
loge('not dropping into an interactive debugging environment since the executing interpreter is not interactive')