Skip to content

Commit

Permalink
Simplify pipeline subsetting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gaow committed Feb 8, 2019
1 parent 0174c37 commit 7d26602
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/dsc_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(self, workflows, runtime, rerun = False, n_cpu = 4, try_catch = Fal
processed_steps[(step.name, flow, depend)] = name
if not step.name in self.depends:
self.depends[step.name] = []
if step.depends not in self.depends[step.name]:
if len(step.depends) and step.depends not in self.depends[step.name]:
self.depends[step.name].append(step.depends)
conf_translator = self.Step_Translator(step, self.db,
self.step_map[workflow_id + 1],
Expand Down Expand Up @@ -110,7 +110,7 @@ def __init__(self, workflows, runtime, rerun = False, n_cpu = 4, try_catch = Fal
tmp_str.append(f"depends: [sos_step('%s_%s' % (n2a(x[1]).lower(), x[0])) for x in IO_DB['{workflow_id + 1}']['{y}']['depends']]")
tmp_str.append(f"output: IO_DB['{workflow_id + 1}']['{y}']['output']")
tmp_str.append(f"sos_run('{y}', {y}_output_files = IO_DB['{workflow_id + 1}']['{y}']['output'], " + \
(f"{y}_input_files = IO_DB['{workflow_id + 1}']['{y}']['input'], " if self.depends[y] else "") + \
(f"{y}_input_files = IO_DB['{workflow_id + 1}']['{y}']['input'], " if len(self.depends[y]) else "") + \
f"DSC_STEP_ID_ = {abs(int(xxh(repr(exe_signatures[y])).hexdigest(), 16)) % (10**8)})")
if ii == len(sequence):
self.last_steps.append((y, workflow_id + 1))
Expand Down
19 changes: 5 additions & 14 deletions src/query_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,13 @@ def get_sequence(primary, reference):
raise ValueError(f'Requested module ``{primary[idx+1]}`` is an orphan branch. Please consider using separate queries for information from this module.')
return primary
#
pipelines = []
tables = case_insensitive_uniq_list([x[0] for x in self.target_tables] + [x[0] for x in self.condition_tables])
for pipeline in self.pipelines:
pidx = [l[0] for l in enumerate(pipeline) if l[1] in tables]
if len(pidx) == 0:
continue
pidx = [0] + pidx
if not pipeline[pidx[0]:pidx[-1]+1] in pipelines:
pipelines.append(pipeline[pidx[0]:pidx[-1]+1])
pipelines = filter_sublist(pipelines)
# remove tables in targets not exist in pipelines
valid_tables = [[item[0] for item in self.target_tables + self.condition_tables if item[0] in pipeline] for pipeline in self.pipelines]
# 1. Further filter pipelines to minimally match target table dependencies
# 2. For pipelines containing each other we only keep the longest pipelines
pipelines = filter_sublist([get_sequence(tables, pipeline) for tables, pipeline in zip(valid_tables, self.pipelines)])
target_tables = [[item for item in self.target_tables if item[0] in pipeline] for pipeline in pipelines]
non_empty_targets = [idx for idx, item in enumerate(target_tables) if len(item)>0]
# now the pipelines needs be further filtered to match target table dependencies
pipelines = [get_sequence([x[0] for x in tables], pipeline) for tables, pipeline in zip(target_tables, pipelines)]
condition_tables = [[item for item in self.condition_tables if item[0] in pipeline] for pipeline in pipelines]
non_empty_targets = [idx for idx, item in enumerate(target_tables) if len(item) > 0]
return [pipelines[i] for i in non_empty_targets], [target_tables[i] for i in non_empty_targets], [condition_tables[i] for i in non_empty_targets]

def get_from_clause(self):
Expand Down

0 comments on commit 7d26602

Please sign in to comment.