diff --git a/janis_core/deps/__init__.py b/janis_core/deps/__init__.py index 377215f41..8e8cdd312 100644 --- a/janis_core/deps/__init__.py +++ b/janis_core/deps/__init__.py @@ -1,2 +1,4 @@ -import cwl_utils.parser_v1_2 as cwlgen +from cwl_utils import parser + +cwlgen = parser.cwl_v1_2 import wdlgen diff --git a/janis_core/ingestion/fromwdl.py b/janis_core/ingestion/fromwdl.py index db4d736a5..d8e84f621 100755 --- a/janis_core/ingestion/fromwdl.py +++ b/janis_core/ingestion/fromwdl.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 + import functools import os import re @@ -194,7 +195,7 @@ def parse_memory_requirement(self, value): raise Exception(f"Memory type {s}") elif isinstance(s, (float, int)): # in bytes? - return s / (1024 ** 3) + return s / (1024**3) elif isinstance(s, j.Selector): return s raise Exception(f"Couldn't recognise memory requirement '{value}'") @@ -238,7 +239,11 @@ def from_loaded_task(self, obj: WDL.Task): inputs = obj.inputs cpus = self.translate_expr(rt.get("cpu")) - if not isinstance(cpus, j.Selector) and cpus is not None and not isinstance(cpus, (int, float)): + if ( + cpus is not None + and not isinstance(cpus, j.Selector) + and not isinstance(cpus, (int, float)) + ): cpus = int(cpus) c = j.CommandToolBuilder( @@ -272,7 +277,10 @@ def translate_expr( if isinstance(expr, WDL.Expr.Array): # a literal array - return [self.translate_expr(e) for e in expr.items] + return [ + self.translate_expr(e, input_selector_getter=input_selector_getter) + for e in expr.items + ] if isinstance(expr, WDL.Expr.String): return self.translate_wdl_string(expr) elif isinstance(expr, (WDL.Expr.Int, WDL.Expr.Boolean, WDL.Expr.Float)): @@ -285,7 +293,7 @@ def translate_expr( n = str(expr.expr) if input_selector_getter: return input_selector_getter(n) - return j.InputSelector(n) + return j.InputSelector(str(n)) elif isinstance(expr, WDL.Expr.Apply): return self.translate_apply( expr, input_selector_getter=input_selector_getter @@ -394,8 +402,12 @@ def translate_apply( "round": j.RoundOperator, "write_lines": lambda exp: f"JANIS: write_lines({exp})", "read_tsv": lambda exp: f"JANIS: j.read_tsv({exp})", + "write_tsv": lambda exp: f"JANIS: j.write_tsv({exp})", "read_boolean": lambda exp: f"JANIS: j.read_boolean({exp})", "read_lines": lambda exp: f"JANIS: j.read_lines({exp})", + "zip": lambda exp: f"JANIS: j.zip({exp})", + "transpose": j.TransposeOperator, + "read_string": j.ReadContents, } fn = fn_map.get(expr.function_name) if fn is None: @@ -420,6 +432,8 @@ def parse_wdl_type(self, t: WDL.Type.Base): return j.Directory(optional=optional) elif isinstance(t, WDL.Type.Array): return j.Array(self.parse_wdl_type(t.item_type), optional=optional) + elif isinstance(t, WDL.Type.Pair): + return j.Array(self.parse_wdl_type(t.left_type), optional=optional) raise Exception(f"Didn't handle WDL type conversion for '{t}' ({type(t)})") diff --git a/janis_core/operators/standard.py b/janis_core/operators/standard.py index 9ee84c31c..9ad42e9f2 100644 --- a/janis_core/operators/standard.py +++ b/janis_core/operators/standard.py @@ -25,6 +25,7 @@ def argtypes(self) -> List[DataType]: return [File()] def to_python(self, unwrap_operator, *args): + return f"open({unwrap_operator(args[0])!r}).read()" raise NotImplementedError("Determine _safe_ one line solution for ReadContents") def to_wdl(self, unwrap_operator, *args): @@ -472,7 +473,7 @@ def returntype(self): if not isinstance(self.args[0], InputSelector): Logger.warn( f'Expected return type of "{self.args[0]}" to be an array, ' - f'but found {outer_rettype}, will return this as a returntype.' + f"but found {outer_rettype}, will return this as a returntype." ) else: rettype = outer_rettype.subtype() @@ -506,7 +507,6 @@ def evaluate(self, inputs): class ReplaceOperator(Operator): - @staticmethod def friendly_signature(): return "Base: String, Pattern: String, Replacement: String -> String" @@ -517,6 +517,7 @@ def argtypes(self) -> List[DataType]: def evaluate(self, inputs): base, pattern, replacement = [self.evaluate_arg(a, inputs) for a in self.args] import re + return re.sub(pattern, replacement, base) def to_wdl(self, unwrap_operator, *args): diff --git a/janis_core/tests/test_conditionals.py b/janis_core/tests/test_conditionals.py index aaf151ba8..e6bd1913d 100644 --- a/janis_core/tests/test_conditionals.py +++ b/janis_core/tests/test_conditionals.py @@ -44,7 +44,7 @@ def test_switch(self): w.output("out", source=w.echoswitch) - _, wdl_tools = WdlTranslator.translate_workflow(w) + _, wdl_tools = WdlTranslator.translate_workflow_internal(w) expected = """\ version development diff --git a/janis_core/tests/test_test_runner.py b/janis_core/tests/test_test_runner.py index 442cf98aa..fd680c148 100644 --- a/janis_core/tests/test_test_runner.py +++ b/janis_core/tests/test_test_runner.py @@ -2,6 +2,8 @@ import os from typing import Optional, List, Dict, Union from unittest import TestCase, mock +from unittest.case import skipUnless + from janis_core.tool.test_suite_runner import ToolTestSuiteRunner from janis_core.tool.test_classes import ( TTestCase, @@ -15,6 +17,13 @@ from janis_core.tool.test_suite_runner import ToolTestSuiteRunner from nose.tools import nottest +has_janis_assistant = False +try: + import janis_assistant + has_janis_assistant = True +except ImportError: + pass + valid_url = "https://abc.com/some_dir/expected_output_file.txt" valid_url_2 = "https://abc.com/some_dir/diff_file.txt" @@ -165,6 +174,7 @@ def test_get_value_to_compare(self): assert runner.get_value_to_compare(t2, file_path) == 3 @nottest + @skipUnless(has_janis_assistant, reason="Janis assistant is required to test downloading remote files") @mock.patch("urllib.request.urlopen", side_effect=mocked_urllib_urlopen) @mock.patch("urllib.request.urlretrieve", side_effect=mocked_urllib_urlretrieve) def test_download_remote_files(self, mock_urlopen, mock_urlretrieve): diff --git a/janis_core/tests/test_translation_cwl.py b/janis_core/tests/test_translation_cwl.py index 7885cf470..f0f9dc49b 100644 --- a/janis_core/tests/test_translation_cwl.py +++ b/janis_core/tests/test_translation_cwl.py @@ -440,14 +440,14 @@ def test_string_formatter_two_param(self): def test_escaped_characters(self): trans = cwl.CwlTranslator - translated = trans.translate_tool_internal(TestTool()) + translated = trans.translate_command_tool_internal(TestTool()) arg: cwlgen.CommandLineBinding = translated.arguments[0] self.assertEqual('test:\\\\t:escaped:\\\\n:characters\\"', arg.valueFrom) class TestCwlEnvVar(unittest.TestCase): def test_environment1(self): - t = CwlTranslator().translate_tool_internal(tool=TestTool()) + t = CwlTranslator().translate_command_tool_internal(tool=TestTool()) envvar: cwlgen.EnvVarRequirement = [ t for t in t.requirements if t.class_ == "EnvVarRequirement" ][0] @@ -597,11 +597,11 @@ class TestEmptyContainer(unittest.TestCase): def test_empty_container_raises(self): self.assertRaises( - Exception, CwlTranslator().translate_tool_internal, SingleTestTool() + Exception, CwlTranslator().translate_command_tool_internal, SingleTestTool() ) def test_empty_container(self): - c = CwlTranslator().translate_tool_internal( + c = CwlTranslator().translate_command_tool_internal( SingleTestTool(), allow_empty_container=True ) self.assertNotIn("DockerRequirement", c.requirements) @@ -613,9 +613,10 @@ def test_add_single_to_array_edge(self): w.input("inp1", str) w.step("stp1", ArrayTestTool(inps=w.inp1)) - c, _, _ = CwlTranslator().translate( - w, to_console=False, allow_empty_container=True + c, _, _ = CwlTranslator().translate_workflow( + w, allow_empty_container=True, to_console=False ) + self.maxDiff = None self.assertEqual(cwl_multiinput, c) @@ -794,7 +795,7 @@ def test_string_formatter(self): ) wf.step("print", EchoTestTool(inp=wf.readGroupHeaderLine)) wf.output("out", source=wf.print) - d, _ = cwl.CwlTranslator.translate_workflow( + d, _ = cwl.CwlTranslator.translate_workflow_internal( wf, with_container=False, allow_empty_container=True ) stepinputs = d.save()["steps"][0]["in"] @@ -823,7 +824,7 @@ def test_string_formatter_stepinput(self): ), ) wf.output("out", source=wf.print) - d, _ = cwl.CwlTranslator.translate_workflow( + d, _ = cwl.CwlTranslator.translate_workflow_internal( wf, with_container=False, allow_empty_container=True ) stepinputs = d.save()["steps"][0]["in"] @@ -863,7 +864,7 @@ def test_two_similar_tools(self): w.step("stp1", TestTool(testtool=w.inp)) w.step("stp2", TestToolV2(testtool=w.inp)) - wf_cwl, _ = CwlTranslator.translate_workflow(w) + wf_cwl, _ = CwlTranslator.translate_workflow_internal(w) stps = {stp.id: stp for stp in wf_cwl.steps} self.assertEqual("tools/TestTranslationtool.cwl", stps["stp1"].run) @@ -872,7 +873,7 @@ def test_two_similar_tools(self): class TestCwlResourceOperators(unittest.TestCase): def test_1(self): - tool_cwl = CwlTranslator.translate_tool_internal( + tool_cwl = CwlTranslator.translate_command_tool_internal( OperatorResourcesTestTool(), with_resource_overrides=True ) resourcereq = [ @@ -900,7 +901,7 @@ def test_read_contents_string(self): version="-1", ) - translated = CwlTranslator.translate_tool_internal( + translated = CwlTranslator.translate_command_tool_internal( t, allow_empty_container=True ) self.assertTrue(translated.outputs[0].outputBinding.loadContents) @@ -915,7 +916,7 @@ def test_read_contents_as_int(self): container=None, version="-1", ) - translated = CwlTranslator.translate_tool_internal( + translated = CwlTranslator.translate_command_tool_internal( t, allow_empty_container=True ) self.assertTrue(translated.outputs[0].outputBinding.loadContents) @@ -963,7 +964,7 @@ def test_filter_null(self): w.step("stp", T(inp=FilterNullOperator(w.inp)), scatter="inp") w.output("out", source=w.stp.out) - w_cwl = cwl.CwlTranslator().translate_workflow(w, with_container=False)[0] + w_cwl = cwl.CwlTranslator().translate_workflow_internal(w, with_container=False)[0] self.assertEqual(2, len(w_cwl.steps)) self.assertEqual( "_evaluate_prescatter-stp-inp/out", w_cwl.steps[1].in_[0].source @@ -977,7 +978,7 @@ def test_read_contents(self): w.step("stp", EchoTestTool(inp=w.inp)) w.output("out", source=w.stp.out.contents()) - w_cwl = cwl.CwlTranslator().translate_workflow(w, with_container=False)[0] + w_cwl = cwl.CwlTranslator().translate_workflow_internal(w, with_container=False)[0] self.assertEqual(2, len(w_cwl.steps)) self.assertEqual( @@ -1026,7 +1027,8 @@ class TestForEachSelectors(unittest.TestCase): def test_minimal(self): tool = TestForEach() # tool.translate("cwl", export_path="~/Desktop/tmp", to_disk=True) - w, _ = CwlTranslator.translate_workflow(tool) + w, _ = CwlTranslator.translate_workflow_internal(tool) + tool.translate("cwl") stp = w.steps[0] self.assertEqual("inp", stp.in_[0].source) diff --git a/janis_core/tests/test_translation_wdl.py b/janis_core/tests/test_translation_wdl.py index 6e7afa950..2797c59bf 100644 --- a/janis_core/tests/test_translation_wdl.py +++ b/janis_core/tests/test_translation_wdl.py @@ -390,7 +390,7 @@ def test_string_formatter_two_param(self): def test_escaped_characters(self): trans = wdl.WdlTranslator - translated = trans.translate_tool_internal(TestTool()) + translated = trans.translate_command_tool_internal(TestTool()) arg = translated.command[-1].arguments[0] self.assertEqual("'test:\\t:escaped:\\n:characters\"'", arg.value) @@ -723,7 +723,7 @@ def test_string_optional_default(self): class TestWdlEnvVar(unittest.TestCase): def test_environment1(self): - t = WdlTranslator().translate_tool_internal(tool=TestTool()) + t = WdlTranslator().translate_command_tool_internal(tool=TestTool()) s = t.get_string() print(s) @@ -1096,7 +1096,7 @@ def test_two_similar_tools(self): w.step("stp1", TestTool(testtool=w.inp)) w.step("stp2", TestToolV2(testtool=w.inp)) - wf_wdl, _ = WdlTranslator.translate_workflow(w) + wf_wdl, _ = WdlTranslator.translate_workflow_internal(w) expected = """\ version development @@ -1178,7 +1178,7 @@ def test_array_secondary_connection(self): def test_workflow_secondary_outputs(self): wf = TestWorkflowThatOutputsArraysOfSecondaryFiles() - wfwdl, _ = WdlTranslator.translate_workflow(wf) + wfwdl, _ = WdlTranslator.translate_workflow_internal(wf) outs = [o.get_string() for o in wfwdl.outputs] self.assertEqual("Array[File] out = stp.out", outs[0]) @@ -1450,7 +1450,7 @@ def test_with_str_default(self): class TestWdlResourceOperators(unittest.TestCase): def test_1(self): - tool_wdl = WdlTranslator.translate_tool_internal( + tool_wdl = WdlTranslator.translate_command_tool_internal( OperatorResourcesTestTool(), with_resource_overrides=True ).get_string() lines = tool_wdl.splitlines(keepends=False) @@ -1466,7 +1466,7 @@ def test_1(self): self.assertEqual("duration: select_first([runtime_seconds, 60, 86400])", time) def test_base(self): - tool_wdl = WdlTranslator.translate_tool_internal( + tool_wdl = WdlTranslator.translate_command_tool_internal( EchoTestTool(), with_resource_overrides=True ).get_string() lines = tool_wdl.splitlines(keepends=False) diff --git a/janis_core/tests/testtools.py b/janis_core/tests/testtools.py index b3ead5972..0d9222aff 100644 --- a/janis_core/tests/testtools.py +++ b/janis_core/tests/testtools.py @@ -19,6 +19,7 @@ InputQualityType, Workflow, ForEachSelector, + RangeOperator ) @@ -328,3 +329,17 @@ def friendly_name(self): def id(self) -> str: return "TestForEach" + +class TestForEachWithOperation(Workflow): + def constructor(self): + self.input("inp", Array(str)) + self.step( + "print", EchoTestTool(inp=self.inp[ForEachSelector()] + ForEachSelector() + "-hello"), _foreach=RangeOperator(self.inp.length()) + ) + self.output("out", source=self.print.out) + + def friendly_name(self): + return self.id() + + def id(self) -> str: + return "TestForEach" \ No newline at end of file diff --git a/janis_core/translationdeps/supportedtranslations.py b/janis_core/translationdeps/supportedtranslations.py index e3c7b4546..bca2f8311 100644 --- a/janis_core/translationdeps/supportedtranslations.py +++ b/janis_core/translationdeps/supportedtranslations.py @@ -5,6 +5,7 @@ class SupportedTranslation(Enum): CWL = "cwl" WDL = "wdl" JANIS = "janis" + HAILBATCH = "hail" def __str__(self): return self.value @@ -27,10 +28,15 @@ def get_translator(self): return JanisTranslator() + elif self == SupportedTranslation.HAILBATCH: + from ..translations.hailbatch import HailBatchTranslator + + return HailBatchTranslator() + @staticmethod def all(): return [ SupportedTranslation.CWL, SupportedTranslation.WDL, - SupportedTranslation.JANIS, + SupportedTranslation.JANIS ] diff --git a/janis_core/translations/__init__.py b/janis_core/translations/__init__.py index afdd232cb..30c91fd88 100644 --- a/janis_core/translations/__init__.py +++ b/janis_core/translations/__init__.py @@ -30,7 +30,6 @@ def translate_workflow( should_zip=True, merge_resources=False, hints=None, - allow_null_if_not_optional=True, additional_inputs: Dict = None, max_cores=None, max_mem=None, @@ -39,7 +38,7 @@ def translate_workflow( container_override: dict = None, ): translator = get_translator(translation) - return translator.translate( + return translator.translate_workflow( workflow, to_console=to_console, tool_to_console=tool_to_console, @@ -52,7 +51,6 @@ def translate_workflow( should_zip=should_zip, merge_resources=merge_resources, hints=hints, - allow_null_if_not_optional=allow_null_if_not_optional, additional_inputs=additional_inputs, max_cores=max_cores, max_mem=max_mem, diff --git a/janis_core/translations/cwl.py b/janis_core/translations/cwl.py index 55c14c178..28c5caeb7 100644 --- a/janis_core/translations/cwl.py +++ b/janis_core/translations/cwl.py @@ -54,6 +54,7 @@ from janis_core.translations.translationbase import ( TranslatorBase, TranslatorMeta, + WorkflowTranslationOutput, try_catch_translate, ) from janis_core.types.common_data_types import ( @@ -84,15 +85,15 @@ class CwlTranslator(TranslatorBase, metaclass=TranslatorMeta): def __init__(self): super().__init__(name="cwl") - @staticmethod - def stringify_commentedmap(m): + @classmethod + def stringify_commentedmap(cls, m): io = StringIO() yaml.dump(m, io) return io.getvalue() - @staticmethod + @classmethod def stringify_translated_workflow( - wf: cwlgen.Savable, should_format=True, as_json=False + cls, wf: cwlgen, should_format=True, as_json=False ): saved = wf.save() @@ -107,10 +108,8 @@ def stringify_translated_workflow( return formatted - @staticmethod - def stringify_translated_tool( - tool: cwlgen.Savable, should_format=True, as_json=False - ): + @classmethod + def stringify_translated_tool(cls, tool: cwlgen, should_format=True, as_json=False): saved = tool.save() if as_json: @@ -124,17 +123,17 @@ def stringify_translated_tool( return formatted - @staticmethod - def stringify_translated_inputs(inputs): + @classmethod + def stringify_translated_inputs(cls, inputs): return ruamel.yaml.dump(inputs, default_flow_style=False) - @staticmethod - def validate_command_for(wfpath, inppath, tools_dir_path, tools_zip_path): + @classmethod + def validate_command_for(cls, wfpath, inppath, tools_dir_path, tools_zip_path): return ["cwltool", "--validate", wfpath] @classmethod @try_catch_translate(type="workflow") - def translate_workflow( + def translate_workflow_internal( cls, wf, with_container=True, @@ -143,7 +142,7 @@ def translate_workflow( is_packed=False, allow_empty_container=False, container_override=None, - ) -> Tuple[cwlgen.Workflow, Dict[str, any]]: + ) -> WorkflowTranslationOutput: metadata = wf.metadata w = cwlgen.Workflow( @@ -213,7 +212,7 @@ def translate_workflow( for t in tools_to_build: tool: Tool = tools_to_build[t] if tool.type() == ToolType.Workflow: - wf_cwl, subtools = cls.translate_workflow( + wf_cwl, subtools = cls.translate_workflow_internal( tool, is_nested_tool=True, with_container=with_container, @@ -224,7 +223,7 @@ def translate_workflow( tools[tool.versioned_id()] = wf_cwl tools.update(subtools) elif isinstance(tool, CommandTool): - tool_cwl = cls.translate_tool_internal( + tool_cwl = cls.translate_command_tool_internal( tool, with_container=with_container, with_resource_overrides=with_resource_overrides, @@ -243,7 +242,7 @@ def translate_workflow( else: raise Exception(f"Unknown tool type: '{type(tool)}'") - return w, tools + return WorkflowTranslationOutput(workflow_obj=w, tools_dict=tools) @classmethod def convert_operator_to_commandtool( @@ -378,7 +377,7 @@ def build_inputs_file( inp = { i.id(): i.intype.cwl_input( - ad.get(i.id(), values_provided_from_tool.get(i.id())) + ad.get(i.id(), values_provided_from_tool.get(i.id())), input_name=i.id() ) for i in tool.tool_inputs() if i.default is not None @@ -475,7 +474,7 @@ def translate_workflow_to_all_in_one( @classmethod @try_catch_translate(type="tool") - def translate_tool_internal( + def translate_command_tool_internal( cls, tool: CommandTool, with_container=True, @@ -756,8 +755,8 @@ def translate_code_tool_internal( return tool_cwl - @staticmethod - def prepare_output_eval_for_python_codetool(tag: str, outtype: DataType): + @classmethod + def prepare_output_eval_for_python_codetool(cls, tag: str, outtype: DataType): return None requires_obj_capture = isinstance(outtype, (File, Directory)) @@ -1033,20 +1032,20 @@ def build_initial_workdir_from_tool(cls, tool): return cwlgen.InitialWorkDirRequirement(listing=listing) return None - @staticmethod - def workflow_filename(workflow): + @classmethod + def workflow_filename(cls, workflow): return workflow.versioned_id() + ".cwl" - @staticmethod - def inputs_filename(workflow): + @classmethod + def inputs_filename(cls, workflow): return workflow.id() + "-inp.yml" - @staticmethod - def tool_filename(tool): + @classmethod + def tool_filename(cls, tool): return (tool.versioned_id() if isinstance(tool, Tool) else str(tool)) + ".cwl" - @staticmethod - def resources_filename(workflow): + @classmethod + def resources_filename(cls, workflow): return workflow.id() + "-resources.yml" @@ -1500,7 +1499,7 @@ def get_run_ref_from_subtool( tool, allow_empty_container=allow_empty_container ) else: - return CwlTranslator.translate_tool_internal( + return CwlTranslator.translate_command_tool_internal( tool, True, with_resource_overrides=has_resources_overrides, @@ -1608,9 +1607,6 @@ def translate_step_node( scatter_fields = set(cwlstep.scatter or []) elif step.foreach is not None: - new_source = CwlTranslator.unwrap_selector_for_reference( - step.foreach, - ) if isinstance(step.foreach, Operator): additional_step_id = f"_evaluate_preforeach-{step.id()}" @@ -1622,6 +1618,10 @@ def translate_step_node( ) extra_steps.append(tool) new_source = f"{additional_step_id}/out" + else: + new_source = CwlTranslator.unwrap_selector_for_reference( + step.foreach, + ) d = cwlgen.WorkflowStepInput( id="_idx", diff --git a/janis_core/translations/hailbatch.py b/janis_core/translations/hailbatch.py new file mode 100644 index 000000000..16a7ffe68 --- /dev/null +++ b/janis_core/translations/hailbatch.py @@ -0,0 +1,1084 @@ +""" + +Does not support: + +- subworkflows +- code tool +- present_as +- secondaries_present_as +- scattering by multiple fields +""" + +import re +import inspect +from textwrap import indent +from typing import Tuple, Union, Callable + +from janis_core import ( + Selector, + ToolInput, + ToolArgument, + CodeTool, + InputNodeSelector, + StepOutputSelector, + StringFormatter, + InputSelector, + FirstOperator, + AliasSelector, + ForEachSelector, + WildcardSelector, + Operator, + apply_secondary_file_format_to_filename, + TInput, + Tool, +) +from janis_core.tool.commandtool import CommandTool +from janis_core.translationdeps.exportpath import ExportPathKeywords +from janis_core.translations.translationbase import ( + TranslatorBase, + WorkflowTranslationOutput, +) +from janis_core.translations.janis import JanisTranslator +from janis_core.types.common_data_types import * +from janis_core.workflow.workflow import WorkflowBase, StepNode + +SED_REMOVE_EXTENSION = "| sed 's/\\.[^.]*$//'" +REMOVE_EXTENSION = ( + lambda x, iterations: f"$(echo '{x}' {iterations * SED_REMOVE_EXTENSION})" + if iterations > 0 + else x +) + + +class HailBatchTranslator(TranslatorBase): + + generate_click_cli = False + name = "Hail batch" + + def __init__(self): + super().__init__(name=HailBatchTranslator.name) + + @classmethod + def translate_workflow(cls, *args, generate_click_cli=True, **kwargs): + cls.generate_click_cli = generate_click_cli + + return super().translate_workflow(cls, *args, **kwargs) + + # internal translators + + @classmethod + def translate_workflow_internal( + cls, + workflow: WorkflowBase, + allow_empty_container=False, + container_override: dict = None, + function_name="main", + **kwargs, + ) -> WorkflowTranslationOutput: + kwargs, kwargs_with_defaults = [], [] + step_definitions = [] + step_calls = [] + inputs_to_read = [] + additional_expressions = [] + + # This is hard to refactor into an individual method as it relies on: + # - additional_expressions: used for determining default if it's an operation + # ** if the expression relies on another pre-computed default, + # this might not work correctly. + # - kwargs && kwargs_with_defaults: + # for building main(**kwargs, **kwargs_with_defaults) + # function definition in correct order + # - inputs_to_read: we need to call 'prepare_input_read_for_inp' on these inputs as + # for file inputs, we'll call 'read_input' (files) or 'read_input_group' (files+secondaries) + for inp in workflow.input_nodes.values(): + dt = inp.datatype + kwarg, has_default, ad_expr = cls.get_kwarg_from_value( + inp.id(), dt, inp.default + ) + additional_expressions.extend(ad_expr) + + if not has_default: + kwargs.append(kwarg) + kwarg_no_annotation, _, _ = cls.get_kwarg_from_value( + inp.id(), dt, None, include_annotation=False + ) + else: + kwargs_with_defaults.append(kwarg) + + if dt.is_base_type(File) or ( + isinstance(dt, Array) and dt.fundamental_type().is_base_type(File) + ): + inputs_to_read.append(inp) + + for stp in workflow.step_nodes.values(): + + if isinstance(stp.tool, WorkflowBase): + raise NotImplementedError("Hail Batch doesn't support subworkflows") + if isinstance(stp.tool, CodeTool): + raise NotImplementedError( + "Janis -> Hail Batch does not support CodeTool" + ) + step_definitions.append( + cls.translate_command_tool_internal(tool=stp.tool, step_id=stp.id())[0] + ) + + call = cls.prepare_step_definition_and_call(stp) + step_calls.append(call) + + # for outp in workflow.output_nodes.values(): + # step_calls.append("b.write_output({source}, {name})") + + pd = 4 * " " + additional_preparation_expressions_str = "\n".join( + indent(t, pd) for t in additional_expressions + ) + inputs_to_read_str = "\n".join( + f"{pd}{inp.id()} = " + + cls.prepare_input_read_for_inp(inp.datatype, inp.id()) + for inp in inputs_to_read + ) + step_calls_str = "\n".join(indent(s.strip(), pd) for s in step_calls) + step_definitions_str = "\n\n".join(step_definitions) + + # TODO: in the future, when subworkflows are supported, the step_definitions + # should be embedded IN the main definition + wf_str = f"""\ +def {function_name}({', '.join([*kwargs, *kwargs_with_defaults])}): + b = hb.Batch('{workflow.id()}') +{additional_preparation_expressions_str} +{inputs_to_read_str} + +{step_calls_str} + + return b + +{step_definitions_str} +""" + return (wf_str, workflow), None + + @classmethod + def translate_command_tool_internal( + cls, + tool: CommandTool, + with_container=True, + with_resource_overrides=False, + allow_empty_container=False, + container_override: dict = None, + step_id=None, + ) -> Tuple[str, CommandTool]: + step_id = step_id or tool.id() + + # we're only going to bind on tool inputs that are REQUIRED or are in the connections, or have a default + + kwargs = [] + kwargs_with_defaults = [] + additional_expressions = [] + pd = 4 * " " + + tinputs = tool.inputs() + tinputs_map = {i.id(): i for i in tinputs} + args_to_bind: List[ToolArgument] = [*(tool.arguments() or [])] + inputs_specified_in_arguments = set() + args_to_check = [tool.cpus({}), tool.memory({}), tool.disk({})] + for arg in tool.arguments() or []: + args_to_check.append(arg) + filestocreate = tool.files_to_create() or {} + for fn, file in filestocreate.items(): + args_to_check.extend([fn, file]) + + while len(args_to_check) > 0: + arg = args_to_check.pop(0) + if arg is None: + continue + elif isinstance(arg, InputSelector): + inputs_specified_in_arguments.add(arg.input_to_select) + elif isinstance(arg, Operator): + args_to_check.extend(arg.get_leaves()) + + for inp in tinputs: + is_required = not inp.input_type.optional + is_specified = ( + inp.id() in tool.connections + or inp.id() in inputs_specified_in_arguments + ) + has_default = inp.default is not None + is_filename = isinstance(inp.input_type, Filename) + + if not any([is_required, is_specified, has_default, is_filename]): + continue + + if inp.position is not None or inp.prefix: + args_to_bind.append(inp) + + if not is_filename or is_specified: + kwarg, has_default, ad_expr = cls.get_kwarg_from_value( + inp.id(), inp.input_type, inp.default, include_annotation=False + ) + additional_expressions.extend(ad_expr) + (kwargs_with_defaults if has_default else kwargs).append(kwarg) + + if inp.input_type.is_base_type(File) and inp.localise_file: + additional_expressions.append(f'j.command(f"mv {{{inp.id()}}} .")') + # do same with secondary files + secs = inp.input_type.secondary_files() + if secs: + for sec in secs: + initial_ext, n_carats = cls.split_secondary_file_carats(sec) + src = f"{{{inp.id()}}}" + if n_carats: + src = REMOVE_EXTENSION(src, n_carats) + additional_expressions.append( + f'j.command(f"{src}{initial_ext} .")' + ) + + # outputs + command_extras = "" + output_collectors = [] + for outp in tool.outputs(): + ( + out_command_extras, + outputs_to_collect, + out_additional_expressions, + ) = cls.translate_tool_output(outp) + if out_command_extras: + command_extras += out_command_extras + if outputs_to_collect: + output_collectors.extend(outputs_to_collect) + if out_additional_expressions: + additional_expressions.extend(out_additional_expressions) + + def get_resolved_input_selector(inp): + if not isinstance(inp, InputSelector): + raise Exception( + f"Internal error when unwrapped input selector {inp} (type({type(inp)})" + ) + default = inp.input_to_select + if inp.input_to_select not in tinputs_map: + Logger.warn( + f"Couldn't find input ({inp.input_to_select}) in tool {tool.id()}" + ) + return default + + dt = tinputs_map[inp.input_to_select].input_type + if not isinstance(dt, File) or not dt.secondary_files(): + return default + + return f"{default}.base" + + if ( + tool.base_command() == ["sh", "script.sh"] + and filestocreate.get("script.sh") is not None + ): + # In the WDL converter, instead of breaking down the script, we + # just write it as StringFormatter to the files_to_create["script.sh"] + # we can unwrap it here manually I think. + script_str_formatter = tool.files_to_create().get("script.sh") + code_block = ( + cls.unwrap_expression( + script_str_formatter, + code_environment=False, + input_selector_overrider=get_resolved_input_selector, + ) + .strip() + .replace("\\n", "\\\\\n") + ) + code_block = re.sub(r"\$\{(.+?)\}", "${{\g<1>}}", code_block) + + command_constructor_str = f'''\ + j.command(f"""{code_block}""") +''' + else: + command = "" + bc = tool.base_command() + if bc is not None: + if isinstance(bc, list): + command += " ".join(bc) + else: + command += bc + + command_args = [] + precommand_args = [] + for arg in sorted(args_to_bind, key=lambda el: el.position or 0): + commandline_param, other_required_statements = None, None + if isinstance(arg, ToolInput): + ( + commandline_param, + other_required_statements, + ) = cls.get_command_argument_for_tool_input( + tool_input=arg, tool=tool + ) + elif isinstance(arg, ToolArgument): + ( + commandline_param, + other_required_statements, + ) = cls.get_command_argument_for_tool_argument(arg) + else: + commandline_param = str(arg) + + precommand_args.extend(other_required_statements) + if commandline_param: + command_args.append(commandline_param) + command_args_str = "\n".join( + indent(s, pd) for s in [*precommand_args, *command_args] + ) + command_extras_str = ("\\\\\n " + command_extras) if command_extras else "" + + command_constructor_str = f"""\ + command_args = [] +{command_args_str} + nl = " \\\\\\n " + command = f''' +{command} {{"".join(nl + a for a in command_args)}} {command_extras_str} + ''' +""" + # command += "".join(f" \\\\\\n {s}" for s in command_args) + + # container + kwargs_with_defaults.append(f'container="{tool.container()}"') + additional_expressions.append("j.image(container)") + + # memory + tmemory = tool.memory({}) + if tmemory is not None: + tmemory = cls.unwrap_expression(tmemory, code_environment=False) + additional_expressions.append(f"j.memory(f'{tmemory}G')") + tdisk = tool.disk({}) + if tdisk is not None: + tdisk = cls.unwrap_expression(tdisk, code_environment=False) + additional_expressions.append(f"j.storage(f'{tdisk}G')") + + additional_expressions_str = "\n".join( + indent(t, pd) for t in additional_expressions + ) + output_collectors_str = "\n".join( + f"{pd}j.command({o})" for o in output_collectors + ) + + return ( + f"""\ +def {cls.get_method_name_for_id(step_id)}(b, {", ".join([*kwargs, *kwargs_with_defaults])}): + j = b.new_job('{step_id}') +{additional_expressions_str} + +{command_constructor_str} +{output_collectors_str} + + return j +""", + tool, + ) + + @classmethod + def translate_code_tool_internal( + cls, + tool, + with_docker=True, + allow_empty_container=False, + container_override: dict = None, + ): + raise NotImplementedError( + "Hail batch translator does not support code tool yet" + ) + + @classmethod + def stringify_translated_workflow( + cls, wf_intermediate: Tuple[str, WorkflowBase] + ) -> str: + extra_imports = [] + wf_str, wf = wf_intermediate + + if cls.generate_click_cli: + function_name = "main_from_click" + extra_imports.append("import click") + name_equals_main_arg = f"""\ +{cls.generate_click_function(wf.tool_inputs(), to_call="main", click_function_name=function_name)} +if __name__ == "__main__": + b = {function_name}() + b.run(dry_run=True) +""" + else: + kwargs = [ + inp.id() + for inp in wf.tool_inputs() + if not (inp.intype.optional or inp.default) + ] + name_equals_main_arg = f"""\ +if __name__ == "__main__": + b = main({", ".join(k + "=None" for k in kwargs)}) + b.run(dry_run=True) + """ + extra_imports_str = "\n" + "\n".join(extra_imports) + + # Final script generation! + retval = f"""\ +import os, re, math +from typing import Union, Optional, List +{extra_imports_str} + +import hailtop.batch as hb + +{wf_str} + +{inspect.getsource(apply_secondary_file_format_to_filename)} + +{name_equals_main_arg} + """ + + try: + import black + + try: + return black.format_str(retval, mode=black.FileMode()) + except black.InvalidInput as e: + Logger.warn(f"Couldn't format python code due to Black error: {e}") + except ImportError: + # to silence the warning: + # 'black' in try block with 'except ImportError' should also be defined in except block + black = None + Logger.debug( + "Janis can automatically format generated Janis code if you install black: https://github.com/psf/black" + ) + + return retval + + @classmethod + def stringify_translated_tool(cls, tool_intermediate: Tuple[str, Tool]) -> str: + extra_imports = [] + tool_str, tool = tool_intermediate + + if cls.generate_click_cli: + function_name = "main_from_click" + extra_imports.append("import click") + name_equals_main_arg = f"""\ + {cls.generate_click_function(tool.tool_inputs(), to_call="main", click_function_name=function_name)} +if __name__ == "__main__": + b = hb.Batch("{tool.id()}") + {function_name}(b=b) + b.run(dry_run=True) + """ + else: + kwargs = [ + inp.id() + for inp in tool.tool_inputs() + if not (inp.intype.optional or inp.default) + ] + name_equals_main_arg = f"""\ +if __name__ == "__main__": + b = hb.Batch("{tool.id()}") + main(b, {", ".join(k + "=None" for k in kwargs)}) + b.run(dry_run=True) + """ + extra_imports_str = "\n" + "\n".join(extra_imports) + + # Final script generation! + retval = f"""\ +import os, re, math +from typing import Union, Optional, List +{extra_imports_str} + +import hailtop.batch as hb + +{tool_str} + +{inspect.getsource(apply_secondary_file_format_to_filename)} + +{name_equals_main_arg} + """ + + try: + import black + + try: + return black.format_str(retval, mode=black.FileMode()) + except black.InvalidInput as e: + Logger.warn(f"Couldn't format python code due to Black error: {e}") + except ImportError: + # to silence the warning: + # 'black' in try block with 'except ImportError' should also be defined in except block + black = None + Logger.debug( + "Janis can automatically format generated Janis code if you install black: https://github.com/psf/black" + ) + + return retval + + @classmethod + def stringify_translated_inputs(cls, inputs): + return JanisTranslator.get_string_repr(inputs) + + @classmethod + def workflow_filename(cls, workflow): + return f"{workflow.versioned_id()}_batch.py" + + @classmethod + def inputs_filename(cls, workflow): + return f"{workflow.versioned_id()}_inputs.py" + + @classmethod + def tool_filename(cls, tool): + return cls.workflow_filename(tool) + + @classmethod + def resources_filename(cls, workflow): + return cls.inputs_filename(workflow) + + @classmethod + def validate_command_for(cls, wfpath, inppath, tools_dir_path, tools_zip_path): + return ["python3", wfpath] + + # other helpers + + @classmethod + def get_method_name_for_id(cls, identifier): + return f"add_{identifier}_step" + + @classmethod + def janis_type_to_py_annotation(cls, dt: DataType, skip_typing=False): + annotation = None + if isinstance(dt, Array): + inner = cls.janis_type_to_py_annotation(dt.subtype()) + annotation = f"List[{inner}]" + elif isinstance(dt, UnionType): + inner = set(cls.janis_type_to_py_annotation(t) for t in dt.subtypes) + if len(inner) == 1 or skip_typing: + annotation = list(inner)[0] + else: + annotation = f"Union[{', '.join(inner)}]" + elif dt.is_base_type((File, String, Directory)): + annotation = "str" + elif dt.is_base_type(Int): + annotation = "int" + elif dt.is_base_type((Float, Double)): + annotation = "float" + elif dt.is_base_type(Boolean): + annotation = "bool" + + if annotation is None: + Logger.info(f"Couldn't generate python type annotation for {dt.name}") + elif dt.optional and not skip_typing: + annotation = f"Optional[{annotation}]" + return annotation + + @classmethod + def prepare_input_read_for_inp(cls, dt: DataType, reference_var, batch_var="b"): + if isinstance(dt, Array): + inner_ref = f"inner_{reference_var}" + return f"[{cls.prepare_input_read_for_inp(dt._t, inner_ref)} for {inner_ref} in {reference_var}]" + if not dt.is_base_type(File): + return reference_var + + if isinstance(dt, File) and dt.secondary_files(): + # we need to build a reference group + ext = (dt.extension or "").replace(".", "") + exts = "|".join( + a.replace(".", "\\\\.") + for a in [dt.extension, *(dt.alternate_extensions or [])] + if a + ) + base = f're.sub({reference_var}, r"({exts})$", "")' + dsec = {} + for sec in dt.secondary_files(): + sec_key = sec.replace("^", "").replace(".", "") + if "^" in sec: + sec_without_pattern = sec.replace("^", "") + dsec[sec_key] = f'{base} + "{sec_without_pattern}"' + else: + dsec[sec_key] = f'{reference_var} + "{sec}"' + + dsec_str = ", ".join(f"{k}={v}" for k, v in dsec.items()) + return f"{batch_var}.read_input_group(base={reference_var}, {dsec_str})" + else: + return f"{batch_var}.read_input({reference_var})" + + @classmethod + def prepare_read_group_dictionary_from_dt(cls, datatype: DataType): + if not isinstance(datatype, File): + return None + secs = datatype.secondary_files() + if not secs: + return None + # if all extension are just additions, like ".vcf" and ".vcf.idx", it's: + # {"vcf": "{root}", "idx": "{root}.idx"} + # else for example if it's ref.fasta and ref.dict, it's: + # {"fasta": "{root}.fasta", "dict": "{root}.dict"} + + extension = datatype.extension or "" + extension_without_dot = extension.replace(".", "") + nameroot_value = "{root}" + + # this only works if there's one hat + d = {} + if any(s.startswith("^") for s in secs): + # nameroot_value = "{root}.fasta" + nameroot_value = "{root}" + extension_without_dot + + if any(s.startswith("^^") for s in secs): + Logger.warn( + f"Secondary file patterns in '{datatype.name}' ({secs}) with two carats (^^) are not supported in Batch, will " + ) + + for s in secs: + sname = s.replace("^", "").replace(".", "") + if "^" in s: + d[sname] = "{root}" + s.replace("^", "") + else: + d[sname] = nameroot_value + s + else: + d = {s.replace(".", ""): nameroot_value + s for s in secs} + + d["base"] = nameroot_value + + return d + + @classmethod + def prepare_step_definition_and_call(cls, stp: StepNode): + connections = stp.sources + foreach = stp.foreach + if stp.scatter: + if len(stp.scatter.fields) == 1: + foreach = stp.scatter.fields[0] + connections[foreach] = ForEachSelector() + else: + raise NotImplementedError( + "Batch doesn't support scattering by multiple fields at the moment" + ) + + inner_connections = {} + for k, con in connections.items(): + src = con.source().source + inner_connections[k] = cls.unwrap_expression(src, code_environment=True) + + inner_connection_str = ", ".join( + f"{k}={v}" for k, v in inner_connections.items() + ) + inner_call = ( + f"{cls.get_method_name_for_id(stp.id())}(b, {inner_connection_str})" + ) + if foreach is not None: + + foreach_str = cls.unwrap_expression(foreach, code_environment=True) + call = f""" +{stp.id()} = [] +for idx in {foreach_str}: + {stp.id()}.append({inner_call})\n""" + else: + call = f"{stp.id()} = {inner_call}" + + if stp.when is not None: + when_str = cls.unwrap_expression(stp.when, code_environment=True) + call = f"""\ +{stp.id()} = None +if {when_str}: +{indent(call, 4 * ' ')} + """ + + return call + + @classmethod + def translate_tool_output(cls, outp) -> Tuple[Optional[str], List[str], List[str]]: + command_extras = "" + output_collectors, additional_expressions = [], [] + secs = ( + outp.output_type.secondary_files() + if isinstance(outp.output_type, File) + else None + ) or [] + if secs: + # prepare resource group + sec = JanisTranslator.get_string_repr( + cls.prepare_read_group_dictionary_from_dt(outp.output_type) + ) + additional_expressions.append( + f"j.declare_resource_group({outp.id()}={sec})" + ) + + values = [] + dests = [f"j.{outp.id()}"] + sec_presents_as = outp.secondaries_present_as or {} + if any(isinstance(o, Stdout) for o in [outp.selector, outp.output_type]): + command_extras += f" > {{j.{outp.id()}}}" + elif any(isinstance(o, Stderr) for o in [outp.selector, outp.output_type]): + command_extras += f" 2> {{j.{outp.id()}}}" + elif isinstance(outp.selector, WildcardSelector): + gl = cls.unwrap_expression(outp.selector.wildcard, code_environment=True) + values.append(gl) + for s in secs: + initial_sec = sec_presents_as.get(s, s).replace("^", "") + final_ext, final_iters = cls.split_secondary_file_carats(s) + values.append(gl + initial_sec) + + if final_iters: + dest_pattern = REMOVE_EXTENSION(f"j.{outp.id()}", final_iters) + dests.append(f'"{dest_pattern + final_ext}"') + else: + dests.append(f"f'{{j.{outp.id()}}}{final_ext}") + elif isinstance(outp.selector, Operator): + # maybe check leaves for wildcard, because that won't be supported + value = cls.unwrap_expression(outp.selector, code_environment=True) + values = [value] + for s in secs: + initial_ext, initial_iters = cls.split_secondary_file_carats( + sec_presents_as.get(s, s) + ) + final_ext, final_iters = cls.split_secondary_file_carats(s) + if initial_iters: + escaped_value = f"{{{value}}}" + values.append( + f'f"{REMOVE_EXTENSION(escaped_value, initial_iters)}{initial_ext}"' + ) + else: + values.append(f"f'{{{value}}}{initial_ext}") + + if final_iters: + GET_BASE_OP = REMOVE_EXTENSION(f"{{j.{outp.id()}}}", final_iters) + dests.append(f'f"{GET_BASE_OP}{final_ext}"') + else: + dests.append(f"f'{{j.{outp.id()}}}{final_ext}'") + elif isinstance(outp.selector, Selector): + value = cls.unwrap_expression(outp.selector, code_environment=True) + values = [value] + for s in secs: + initial_ext, initial_iters = cls.split_secondary_file_carats( + sec_presents_as.get(s, s) + ) + final_ext, final_iters = cls.split_secondary_file_carats(s) + if initial_iters: + escaped_value = f"{{{value}}}" + values.append( + f'f"{REMOVE_EXTENSION(escaped_value, initial_iters)}{initial_ext}"' + ) + else: + values.append(f'f"{{{value}}}{initial_ext}"') + + if final_iters: + GET_BASE_OP = REMOVE_EXTENSION(f"{{j.{outp.id()}}}", final_iters) + dests.append(f'f"{GET_BASE_OP}{final_ext}"') + else: + dests.append(f'f"{{j.{outp.id()}}}{final_ext}"') + else: + Logger.warn( + f"Couldn't translate output selector for '{outp.id()}' as it's an unrecognised type {outp.selector} ({type(outp.selector)})" + ) + + if values and dests: + for value, dest in zip(values, dests): + output_collectors.append( + f"'ln \"{{value}}\" {{dest}}'.format(value={value}, dest={dest})" + ) + + return command_extras, output_collectors, additional_expressions + + @classmethod + def get_command_argument_for_tool_input( + cls, tool_input: ToolInput, tool: CommandTool + ): + # quote entire string + q, sq = '"', '\\"' + escape_quotes = lambda el: el.replace(q, sq) + quote_value = lambda el: f"{q}{escape_quotes(el)}{q}" + + other_required_statements = [] + # will resolve to: "if {check_condition}: append ..." + check_condition = f"{tool_input.id()} is not None" + + intype = tool_input.input_type + is_flag = isinstance(intype, Boolean) + + separate_value_from_prefix = tool_input.separate_value_from_prefix is not False + prefix = tool_input.prefix if tool_input.prefix else "" + tprefix = prefix + + # prepare values array, then join it all in the end + + if prefix and separate_value_from_prefix and not is_flag: + tprefix += " " + + code_value = tool_input.id() + if is_flag: + if not tool_input.prefix: + Logger.warn( + f"Tool input '{tool_input.id()}' was a flag, but didn't have prefix: skipping" + ) + return None, [] + + check_condition = f"{tool_input.id()} is True" + code_value = quote_value(tool_input.prefix) + + elif intype.is_array(): + separator = ( + tool_input.separator if tool_input.separator is not None else " " + ) + # should_quote = ( + # isinstance(intype.subtype(), (String, File, Directory)) + # and tool_input.shell_quote is not False + # ) + if prefix: + if tool_input.prefix_applies_to_all_elements: + other_required_statements.append( + f'[e for el in {tool_input.id()} for e in ["{tool_input.prefix}", el] ]' + ) + code_value = ( + quote_value(prefix) + + " + " + + quote_value(separator) + + f".join({tool_input.id()})" + ) + else: + code_value = f'"{separator}".join({tool_input.id()})' + # elif requires_quoting: + # pass + else: + if isinstance(intype, Filename): + is_specified = tool_input.id() in tool.connections + inner_value = intype + if is_specified: + inner_value = FirstOperator(InputSelector(tool_input.id(), intype)) + + check_condition = None + expr = cls.unwrap_expression(inner_value, code_environment=False) + other_required_statements.append( + f'{tool_input.id()} = f"{escape_quotes(expr)}"' + ) + if prefix: + sep = " " + if tool_input.separate_value_from_prefix is False: + sep = "" + + code_value = f'f"{prefix}{sep}{{{tool_input.id()}}}"' + else: + # code_value = tool_input.id() + pass + + if check_condition is not None: + slash = "\\" + retval = f"""\ +if {check_condition}: + command_args.append({code_value})\ +""" + else: + retval = f"command_args.append({code_value})" + + return retval, other_required_statements + # old logic + + @classmethod + def get_command_argument_for_tool_argument(cls, arg: ToolArgument): + + requires_quotes = arg.shell_quote is not False + + # quote entire string + q, sq = '"', '\\"' + escape_quotes = lambda el: el.replace(q, sq) + quote_value_if_required = ( + lambda el: f"{q}{escape_quotes(el)}{q}" if requires_quotes else el + ) + + value = cls.unwrap_expression(arg.value, code_environment=False).replace( + "\\t", "\\\\t" + ) + + # we're quoting early, so we don't need to do any quoting later + if isinstance(value, list): + value = " ".join(quote_value_if_required(v) for v in value) + else: + value = quote_value_if_required(value) + + if arg.prefix is None: + retval = value + else: + sep = "" + if arg.separate_value_from_prefix or arg.separate_value_from_prefix is None: + sep = " " + retval = sep.join([arg.prefix, value]) + + return f"command_args.append(f'{escape_quotes(retval)}')", [] + + @classmethod + def get_kwarg_from_value( + cls, identifier: str, datatype: DataType, default: any, include_annotation=True + ) -> Tuple[str, bool, List[str]]: + has_default = default is not None or datatype.optional + kwarg = identifier + extra_statements = [] + + if include_annotation: + annotation = cls.janis_type_to_py_annotation(datatype) + if annotation is not None: + kwarg += f": {annotation}" + + if has_default: + inner_default = None + if isinstance(default, Selector): + # can't + if isinstance(default, StepOutputSelector) or ( + isinstance(default, Operator) + and any( + isinstance(l, StepOutputSelector) for l in default.get_leaves() + ) + ): + Logger.warn( + f"Can't calculate default for identifier '{identifier}': '{default}' as it relies on " + f"the output of a step, and batch won't support this" + ) + else: + # insert extra statement to start of function to evaluate this value + extra_statements.append( + f"{identifier} = {identifier} if {identifier} is not None else {cls.unwrap_expression(default, code_environment=True)}" + ) + else: + # useful to get_string_repr + has_default = True + inner_default = cls.unwrap_expression(default, code_environment=True) + + if has_default: + kwarg += f"={inner_default}" + + return kwarg, has_default, extra_statements + + @classmethod + def unwrap_expression( + cls, + value, + code_environment=False, + stringify_list=True, + input_selector_overrider: Callable[[InputSelector], str] = None, + ) -> Union[str, List[str]]: + uwkwargs = { + "input_selector_overrider": input_selector_overrider, + "stringify_list": stringify_list, + } + + if value is None: + return "None" + elif isinstance(value, list): + values = [ + cls.unwrap_expression(e, code_environment=True, **uwkwargs) + for e in value + ] + if stringify_list: + return "[" + ", ".join(values) + "]" + return values + elif isinstance(value, (str, int, float)): + if code_environment: + return JanisTranslator.get_string_repr(value) + else: + return str(value) + elif isinstance(value, InputNodeSelector): + return value.id() + elif isinstance(value, StepOutputSelector): + return f"{value.node.id()}.{value.tag}" + elif isinstance(value, InputSelector): + val = None + if input_selector_overrider is not None: + val = input_selector_overrider(value) + if code_environment: + return val or value.input_to_select + return f"{{{val or value.input_to_select}}}" + elif isinstance(value, Filename): + prefix = value.prefix + if not isinstance(prefix, str): + Logger.warn( + "Hail batch does not support filenames generated from intermediate filenames" + ) + prefix = "generated" + return value.generated_filename({"prefix": prefix}) + elif isinstance(value, StringFormatter): + fields = { + k: cls.unwrap_expression(v, code_environment=False, **uwkwargs) + for k, v in value.kwargs.items() + } + retval = value.resolve_with_resolved_values(**fields) + if code_environment: + return f'f"{retval}"' + return retval + elif isinstance(value, AliasSelector): + return cls.unwrap_expression( + value.inner_selector, code_environment=code_environment, **uwkwargs + ) + elif isinstance(value, WildcardSelector): + raise Exception("Wildcard selectors are not valid within operators") + + # i don't know about this, might need to be moved to somewhere later + elif isinstance(value, Operator): + val = cls.unwrap_operator(value, **uwkwargs) + if not code_environment: + val = "{" + str(val) + "}" + return val + elif isinstance(value, ForEachSelector): + if code_environment: + return "idx" + return "{idx}" + else: + # return str(value) + raise NotImplementedError( + f"Can't unwrap value '{value}' of type {type(value)}" + ) + + @classmethod + def unwrap_operator(cls, value: Operator, **kwargs): + # assume code_environment is True + inner_unwrap = lambda a: cls.unwrap_expression( + a, code_environment=True, **kwargs + ) + return value.to_python(inner_unwrap, *value.args) + + @staticmethod + def split_secondary_file_carats(secondary_annotation: str): + fixed_sec = secondary_annotation.lstrip("^") + leading = len(secondary_annotation) - len(fixed_sec) + return secondary_annotation[leading:], leading + + @classmethod + def generate_click_function( + cls, + inputs: List[TInput], + to_call="main", + click_function_name="main_from_click", + help=None, + ): + + escape_string = lambda s: s.replace("\n", "\\n").replace('"', '\\"') + help_if_relevant = f'help="{escape_string(help)}' if help else "" + options = [] + for inp in inputs: + inner_args = [f'"--{inp.id()}"', f'"{inp.id()}"'] + inner_annotation = inp.intype + if isinstance(inp.intype, Boolean): + inner_args.append("is_flag=True") + inner_annotation = None + elif isinstance(inp.intype, Array): + # add list flags + inner_args.append("multiple=True") + inner_annotation = inp.intype.subtype() + if inner_annotation is not None: + intype = inp.intype + if intype.is_array(): + intype = intype.fundamental_type() + annotation = cls.janis_type_to_py_annotation(intype, skip_typing=True) + inner_args.append(f"type={annotation}") + if not inp.intype.optional: + inner_args.append("required=True") + + if inp.default is not None and not ( + isinstance(inp.default, Selector) + or ( + isinstance(inp.default, list) + and any(isinstance(a, Selector) for a in inp.default) + ) + ): + inner_args.append( + f"default={JanisTranslator.get_string_repr(inp.default)}" + ) + if inp.doc and inp.doc.doc: + safer = escape_string(inp.doc.doc) + inner_args.append(f'help="{safer}"') + + options.append(f'@click.option({", ".join(inner_args)})') + + nl = "\n" + return f"""\ +@click.command({help_if_relevant}) +{nl.join(options)} +def {click_function_name}(*args, **kwargs): + return {to_call}(*args, **kwargs) +""" diff --git a/janis_core/translations/janis.py b/janis_core/translations/janis.py index 0a587ad67..d1199d297 100644 --- a/janis_core/translations/janis.py +++ b/janis_core/translations/janis.py @@ -101,7 +101,7 @@ def translate_any_tool_internal(self, tool): f"Unrecognised tool type {tool}: {tool.__class__.__name__}" ) - def translate_workflow( + def translate_workflow_internal( self, workflow, with_container=True, @@ -127,7 +127,7 @@ def translate_workflow( """ return bigger_file, {} - def translate_tool_internal( + def translate_command_tool_internal( self, tool, with_container=True, diff --git a/janis_core/translations/translationbase.py b/janis_core/translations/translationbase.py index 4a93d12d3..638a9409b 100644 --- a/janis_core/translations/translationbase.py +++ b/janis_core/translations/translationbase.py @@ -1,13 +1,15 @@ import os +import sys from abc import ABC, abstractmethod -from typing import Tuple, List, Dict +from collections import namedtuple +from typing import Tuple, List, Dict, Type import functools from path import Path from janis_core.code.codetool import CodeTool from janis_core.tool.commandtool import ToolInput -from janis_core.tool.tool import ToolType +from janis_core.tool.tool import ToolType, Tool from janis_core.translationdeps.exportpath import ExportPathKeywords from janis_core.types.common_data_types import Int from janis_core.utils import lowercase_dictkeys @@ -22,6 +24,7 @@ def __init__(self, message, inner: Exception): kwargstoignore = {"container_override"} +WorkflowTranslationOutput: Type[Tuple[any, Dict[str, any]]] = namedtuple("WorkflowTranslationOutput", ["workflow_obj", "tools_dict"]) def try_catch_translate(type): @@ -47,7 +50,7 @@ def wrapper(*args, **kwargs): message = f"Couldn't translate {type or ''} with ({components})" er = TranslationError(message, inner=e) Logger.log_ex(er) - raise er + raise er from e return wrapper @@ -96,7 +99,7 @@ class TranslatorBase(ABC): def __init__(self, name): self.name = name - def translate( + def translate_workflow( self, tool, to_console=True, @@ -109,7 +112,6 @@ def translate( should_zip=True, merge_resources=False, hints=None, - allow_null_if_not_optional=True, additional_inputs: Dict = None, max_cores=None, max_mem=None, @@ -119,33 +121,17 @@ def translate( container_override=None, ): - str_tool, tr_tools = None, [] + if tool.type() != ToolType.Workflow: + raise Exception(f"Please use the call the '{self.__class__.__name__}.translate_{tool.type.lower().replace('-', '_')}(**kwargs) instead of translate_workflow(**kwargs)") - if tool.type() == ToolType.Workflow: - tr_tool, tr_tools = self.translate_workflow( - tool, - with_container=with_container, - with_resource_overrides=with_resource_overrides, - allow_empty_container=allow_empty_container, - container_override=lowercase_dictkeys(container_override), - ) - str_tool = self.stringify_translated_workflow(tr_tool) - elif isinstance(tool, CodeTool): - tr_tool = self.translate_code_tool_internal( - tool, - allow_empty_container=allow_empty_container, - container_override=lowercase_dictkeys(container_override), - ) - str_tool = self.stringify_translated_tool(tr_tool) - else: - tr_tool = self.translate_tool_internal( - tool, - with_container=with_container, - with_resource_overrides=with_resource_overrides, - allow_empty_container=allow_empty_container, - container_override=lowercase_dictkeys(container_override), - ) - str_tool = self.stringify_translated_tool(tr_tool) + tr_tool, tr_tools = self.translate_workflow_internal( + tool, + with_container=with_container, + with_resource_overrides=with_resource_overrides, + allow_empty_container=allow_empty_container, + container_override=lowercase_dictkeys(container_override), + ) + str_tool = self.stringify_translated_workflow(tr_tool) tr_inp = self.build_inputs_file( tool, @@ -165,21 +151,16 @@ def translate( "tools/" + self.tool_filename(t), self.stringify_translated_workflow(tr_tools[t]), ) - for t in tr_tools + for t in tr_tools or [] ] str_resources = self.stringify_translated_inputs(tr_res) if to_console: - print("=== WORKFLOW ===") + print("=== WORKFLOW ===", file=sys.stderr) print(str_tool) if tool_to_console: - print("\n=== TOOLS ===") + print("\n=== TOOLS ===", file=sys.stderr) [print(f":: {t[0]} ::\n" + t[1]) for t in str_tools] - print("\n=== INPUTS ===") - print(str_inp) - if not merge_resources and with_resource_overrides: - print("\n=== RESOURCES ===") - print(str_resources) d = ExportPathKeywords.resolve( export_path, workflow_spec=self.name, workflow_name=tool.versioned_id() @@ -220,12 +201,10 @@ def translate( Logger.log(f"Written {fn_tool} to disk") if not merge_resources and with_resource_overrides: - print("\n=== RESOURCES ===") with open(os.path.join(d, fn_resources), "w+") as wf: Logger.log(f"Writing {fn_resources} to disk") wf.write(str_inp) Logger.log(f"Wrote {fn_resources} to disk") - print(str_resources) import subprocess @@ -278,7 +257,7 @@ def translate_tool( ): tool_out = self.stringify_translated_tool( - self.translate_tool_internal( + self.translate_command_tool_internal( tool, with_container=with_container, with_resource_overrides=with_resource_overrides, @@ -363,32 +342,21 @@ def validate_inputs(cls, inputs, allow_null_if_optional): # ) # ) - @staticmethod - def get_type(t): - if isinstance(t, list): - q = set(TranslatorBase.get_type(tt) for tt in t) - if len(q) == 0: - return "empty array" - val = q.pop() if len(q) == 1 else "Union[" + ", ".join(q) + "]" - return f"Array<{val}>" - - return type(t).__name__ - @classmethod @abstractmethod - def translate_workflow( + def translate_workflow_internal( cls, workflow, with_container=True, with_resource_overrides=False, allow_empty_container=False, container_override: dict = None, - ) -> Tuple[any, Dict[str, any]]: + ) -> WorkflowTranslationOutput: pass @classmethod @abstractmethod - def translate_tool_internal( + def translate_command_tool_internal( cls, tool, with_container=True, @@ -409,10 +377,11 @@ def translate_code_tool_internal( ): pass - @classmethod - @abstractmethod - def unwrap_expression(cls, expression): - pass + # this is suggested, but shouldn't be required + # @classmethod + # @abstractmethod + # def unwrap_expression(cls, expression): + # pass @classmethod def build_inputs_file( @@ -531,8 +500,8 @@ def build_resources_input( return new_inputs - @staticmethod - def inp_can_be_skipped(inp, override_value=None): + @classmethod + def inp_can_be_skipped(cls, inp, override_value=None): return ( inp.default is None and override_value is None @@ -541,8 +510,8 @@ def inp_can_be_skipped(inp, override_value=None): ) # Resource overrides - @staticmethod - def get_resource_override_inputs() -> List[ToolInput]: + @classmethod + def get_resource_override_inputs(cls, ) -> List[ToolInput]: return [ ToolInput("runtime_cpu", Int(optional=True)), # number of CPUs ToolInput("runtime_memory", Int(optional=True)), # GB of memory @@ -552,19 +521,19 @@ def get_resource_override_inputs() -> List[ToolInput]: # STRINGIFY - @staticmethod + @classmethod @abstractmethod - def stringify_translated_workflow(wf): + def stringify_translated_workflow(cls, wf): pass - @staticmethod + @classmethod @abstractmethod - def stringify_translated_tool(tool): + def stringify_translated_tool(cls, tool): pass - @staticmethod + @classmethod @abstractmethod - def stringify_translated_inputs(inputs): + def stringify_translated_inputs(cls, inputs): pass # OUTPUTS @@ -576,39 +545,39 @@ def filename(cls, tool): return cls.workflow_filename(tool) return cls.tool_filename(tool) - @staticmethod + @classmethod @abstractmethod - def workflow_filename(workflow): + def workflow_filename(cls, workflow): pass - @staticmethod + @classmethod @abstractmethod - def inputs_filename(workflow): + def inputs_filename(cls, workflow): pass - @staticmethod + @classmethod @abstractmethod - def tool_filename(tool): + def tool_filename(cls, tool): pass - @staticmethod - def dependencies_filename(workflow): + @classmethod + def dependencies_filename(cls, workflow): return "tools.zip" - @staticmethod + @classmethod @abstractmethod - def resources_filename(workflow): + def resources_filename(cls, workflow): pass # VALIDATION - @staticmethod + @classmethod @abstractmethod - def validate_command_for(wfpath, inppath, tools_dir_path, tools_zip_path): + def validate_command_for(cls, wfpath, inppath, tools_dir_path, tools_zip_path): pass - @staticmethod - def get_container_override_for_tool(tool, container_override): + @classmethod + def get_container_override_for_tool(cls, tool, container_override): if not container_override: return None diff --git a/janis_core/translations/wdl.py b/janis_core/translations/wdl.py index 663590e6f..8683dce71 100644 --- a/janis_core/translations/wdl.py +++ b/janis_core/translations/wdl.py @@ -45,7 +45,7 @@ from janis_core.translations.translationbase import ( TranslatorBase, TranslatorMeta, - try_catch_translate, + try_catch_translate, WorkflowTranslationOutput, ) from janis_core.types import get_instantiated_type, DataType from janis_core.types.common_data_types import ( @@ -100,25 +100,25 @@ class WdlTranslator(TranslatorBase, metaclass=TranslatorMeta): def __init__(self): super().__init__(name="wdl") - @staticmethod - def stringify_translated_workflow(wf): + @classmethod + def stringify_translated_workflow(cls, wf): return wf.get_string() - @staticmethod - def stringify_translated_tool(tool): + @classmethod + def stringify_translated_tool(cls, tool): return tool.get_string() - @staticmethod - def stringify_translated_inputs(inputs): + @classmethod + def stringify_translated_inputs(cls, inputs): return json.dumps(inputs, sort_keys=True, indent=4, separators=(",", ": ")) - @staticmethod - def validate_command_for(wfpath, inppath, tools_dir_path, tools_zip_path): + @classmethod + def validate_command_for(cls, wfpath, inppath, tools_dir_path, tools_zip_path): return ["java", "-jar", "$womtooljar", "validate", wfpath] @classmethod @try_catch_translate(type="workflow") - def translate_workflow( + def translate_workflow_internal( cls, wfi, with_container=True, @@ -126,7 +126,7 @@ def translate_workflow( is_nested_tool=False, allow_empty_container=False, container_override=None, - ) -> Tuple[wdl.Workflow, Dict[str, any]]: + ) -> WorkflowTranslationOutput: """ Translate the workflow into wdlgen classes! @@ -271,7 +271,7 @@ def translate_workflow( if t.versioned_id() not in wtools: if t.type() == ToolType.Workflow: - wf_wdl, wf_tools = cls.translate_workflow( + wf_wdl, wf_tools = cls.translate_workflow_internal( t, with_container=with_container, is_nested_tool=True, @@ -283,7 +283,7 @@ def translate_workflow( wtools.update(wf_tools) elif isinstance(t, CommandTool): - wtools[t.versioned_id()] = cls.translate_tool_internal( + wtools[t.versioned_id()] = cls.translate_command_tool_internal( t, with_container=with_container, with_resource_overrides=with_resource_overrides, @@ -318,11 +318,11 @@ def translate_workflow( w.calls.append(call) - return w, wtools + return WorkflowTranslationOutput(workflow_obj=w, tools_dict=wtools) @classmethod @try_catch_translate(type="command tool") - def translate_tool_internal( + def translate_command_tool_internal( cls, tool: CommandTool, with_container=True, @@ -498,8 +498,8 @@ def translate_code_tool_internal( return wdl.Task(tool.id(), tr_ins, tr_outs, commands, r, version="development") - @staticmethod - def wrap_if_string_environment(value, string_environment: bool): + @classmethod + def wrap_if_string_environment(cls, value, string_environment: bool): return f'"{value}"' if not string_environment else value @classmethod @@ -1132,20 +1132,20 @@ def build_resources_input( return {f"{tool.id()}.{k}": v for k, v in d.items()} return d - @staticmethod - def workflow_filename(workflow): + @classmethod + def workflow_filename(cls, workflow): return workflow.versioned_id() + ".wdl" - @staticmethod - def inputs_filename(workflow): + @classmethod + def inputs_filename(cls, workflow): return workflow.id() + "-inp.json" - @staticmethod - def tool_filename(tool): + @classmethod + def tool_filename(cls, tool): return (tool.versioned_id() if isinstance(tool, Tool) else str(tool)) + ".wdl" - @staticmethod - def resources_filename(workflow): + @classmethod + def resources_filename(cls, workflow): return workflow.id() + "-resources.json" diff --git a/janis_core/types/common_data_types.py b/janis_core/types/common_data_types.py index 4964f424b..967a5eb67 100644 --- a/janis_core/types/common_data_types.py +++ b/janis_core/types/common_data_types.py @@ -441,7 +441,7 @@ def schema(cls) -> Dict: def get_value_from_meta(self, meta): return meta.get("path") - def cwl_input(self, value: Any): + def cwl_input(self, value: Any, input_name=None): return {"class": "File", "path": value} def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool: @@ -525,7 +525,7 @@ def schema(cls) -> Dict: def input_field_from_input(self, meta): return meta["path"] - def cwl_input(self, value: Any): + def cwl_input(self, value: Any, input_name=None): # WDL: "{workflowName}.label" = meta["path"} return {"class": "Directory", "path": value} @@ -596,13 +596,14 @@ def map_cwl_type(self, parameter: cwlgen.Parameter) -> cwlgen.Parameter: parameter.type = cwlgen.CommandInputArraySchema(items=None, type="array") return parameter - def cwl_input(self, value: Any): + def cwl_input(self, value: Any, input_name: str=None): if isinstance(value, list): return [self._t.cwl_input(v) for v in value] if value is None: return None else: - raise Exception(f"Input value for input '{self.id()}' was not an array") + input_name_extra = f" for input '{input_name}'" if input_name else "" + raise Exception(f"Input value{input_name_extra} ({value}) did not match expected type '{self.name()}'") def wdl(self, has_default=False) -> wdlgen.WdlType: ar = wdlgen.ArrayType(self._t.wdl(has_default=False), requires_multiple=False) diff --git a/janis_core/types/data_types.py b/janis_core/types/data_types.py index db4a27264..1c64ca174 100644 --- a/janis_core/types/data_types.py +++ b/janis_core/types/data_types.py @@ -284,7 +284,7 @@ def map_cwl_type(self, parameter: cwlgen.Parameter) -> cwlgen.Parameter: parameter.secondaryFiles = self.secondary_files() return parameter - def cwl_input(self, value: Any): + def cwl_input(self, value: Any, input_name=None): return value def wdl(self, has_default=False) -> wdlgen.WdlType: diff --git a/janis_core/workflow/workflow.py b/janis_core/workflow/workflow.py index 834235a34..6d11fd742 100644 --- a/janis_core/workflow/workflow.py +++ b/janis_core/workflow/workflow.py @@ -955,7 +955,6 @@ def translate( should_validate=validate, merge_resources=merge_resources, hints=hints, - allow_null_if_not_optional=allow_null_if_not_optional, additional_inputs=additional_inputs, max_cores=max_cores, max_mem=max_mem,