diff --git a/src/dsc_translator.py b/src/dsc_translator.py index f513f980..72fbebe7 100644 --- a/src/dsc_translator.py +++ b/src/dsc_translator.py @@ -74,7 +74,7 @@ def __init__(self, workflows, runtime, rerun = False, n_cpu = 4, try_catch = Fal processed_steps[(step.name, flow, depend)] = name if not step.name in self.depends: self.depends[step.name] = [] - if step.depends not in self.depends[step.name]: + if len(step.depends) and step.depends not in self.depends[step.name]: self.depends[step.name].append(step.depends) conf_translator = self.Step_Translator(step, self.db, self.step_map[workflow_id + 1], @@ -110,7 +110,7 @@ def __init__(self, workflows, runtime, rerun = False, n_cpu = 4, try_catch = Fal tmp_str.append(f"depends: [sos_step('%s_%s' % (n2a(x[1]).lower(), x[0])) for x in IO_DB['{workflow_id + 1}']['{y}']['depends']]") tmp_str.append(f"output: IO_DB['{workflow_id + 1}']['{y}']['output']") tmp_str.append(f"sos_run('{y}', {y}_output_files = IO_DB['{workflow_id + 1}']['{y}']['output'], " + \ - (f"{y}_input_files = IO_DB['{workflow_id + 1}']['{y}']['input'], " if self.depends[y] else "") + \ + (f"{y}_input_files = IO_DB['{workflow_id + 1}']['{y}']['input'], " if len(self.depends[y]) else "") + \ f"DSC_STEP_ID_ = {abs(int(xxh(repr(exe_signatures[y])).hexdigest(), 16)) % (10**8)})") if ii == len(sequence): self.last_steps.append((y, workflow_id + 1)) diff --git a/src/query_engine.py b/src/query_engine.py index 0269812a..27b633d5 100644 --- a/src/query_engine.py +++ b/src/query_engine.py @@ -267,22 +267,13 @@ def get_sequence(primary, reference): raise ValueError(f'Requested module ``{primary[idx+1]}`` is an orphan branch. Please consider using separate queries for information from this module.') return primary # - pipelines = [] - tables = case_insensitive_uniq_list([x[0] for x in self.target_tables] + [x[0] for x in self.condition_tables]) - for pipeline in self.pipelines: - pidx = [l[0] for l in enumerate(pipeline) if l[1] in tables] - if len(pidx) == 0: - continue - pidx = [0] + pidx - if not pipeline[pidx[0]:pidx[-1]+1] in pipelines: - pipelines.append(pipeline[pidx[0]:pidx[-1]+1]) - pipelines = filter_sublist(pipelines) - # remove tables in targets not exist in pipelines + valid_tables = [[item[0] for item in self.target_tables + self.condition_tables if item[0] in pipeline] for pipeline in self.pipelines] + # 1. Further filter pipelines to minimally match target table dependencies + # 2. For pipelines containing each other we only keep the longest pipelines + pipelines = filter_sublist([get_sequence(tables, pipeline) for tables, pipeline in zip(valid_tables, self.pipelines)]) target_tables = [[item for item in self.target_tables if item[0] in pipeline] for pipeline in pipelines] - non_empty_targets = [idx for idx, item in enumerate(target_tables) if len(item)>0] - # now the pipelines needs be further filtered to match target table dependencies - pipelines = [get_sequence([x[0] for x in tables], pipeline) for tables, pipeline in zip(target_tables, pipelines)] condition_tables = [[item for item in self.condition_tables if item[0] in pipeline] for pipeline in pipelines] + non_empty_targets = [idx for idx, item in enumerate(target_tables) if len(item) > 0] return [pipelines[i] for i in non_empty_targets], [target_tables[i] for i in non_empty_targets], [condition_tables[i] for i in non_empty_targets] def get_from_clause(self):