Skip to content

Commit

Permalink
pickValue=first_non_null
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-c committed Nov 11, 2020
1 parent e2728a3 commit e257e7a
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 134 deletions.
140 changes: 97 additions & 43 deletions cwl_utils/cwl_v1_0_expression_refactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import argparse
import copy
import hashlib
import logging
import shutil
import sys
from collections.abc import Mapping
from pathlib import Path
from typing import (
Any,
Dict,
Expand All @@ -22,16 +24,14 @@

from cwltool.errors import WorkflowException
from cwltool.expression import do_eval
from cwltool.loghandler import _logger as _cwltoollogger
from cwltool.sandboxjs import JavascriptException
from cwltool.utils import CWLObjectType, CWLOutputType
from cwltool.loghandler import _logger as _cwltoollogger
from ruamel import yaml
from schema_salad.sourceline import SourceLine
from schema_salad.utils import json_dumps
from pathlib import Path
import cwl_utils.parser_v1_0 as cwl

import logging
import cwl_utils.parser_v1_0 as cwl

_logger = logging.getLogger("cwl-expression-refactor") # pylint: disable=invalid-name
defaultStreamHandler = logging.StreamHandler() # pylint: disable=invalid-name
Expand Down Expand Up @@ -147,10 +147,12 @@ def escape_expression_field(contents: str) -> str:
return contents.replace("${", "$/{").replace("$(", "$/(")


def clean_type_ids(cwltype: Any) -> Any:
def clean_type_ids(
cwltype: Union[cwl.ArraySchema, cwl.InputRecordSchema]
) -> Union[cwl.ArraySchema, cwl.InputRecordSchema]:
"""Simplify type identifiers."""
result = copy.deepcopy(cwltype)
if isinstance(cwltype, cwl.ArraySchema):
if isinstance(result, cwl.ArraySchema):
if isinstance(result.items, MutableSequence):
for item in result.items:
if hasattr(item, "id"):
Expand All @@ -161,7 +163,7 @@ def clean_type_ids(cwltype: Any) -> Any:
if result.items.fields:
for field in result.items.fields:
field.name = field.name.split("/")[-1]
elif isinstance(cwltype, cwl.InputRecordSchema):
elif isinstance(result, cwl.InputRecordSchema):
result.name = result.name.split("/")[-1]
if result.fields:
for field in result.fields:
Expand Down Expand Up @@ -364,7 +366,7 @@ def load_step(
modified = False
if isinstance(step.run, str):
step.run, modified = traverse(
cwl.load_document(step.run),
cwl.load_document(step.run, baseuri=step.loadingOptions.fileuri),
replace_etool,
True,
skip_command_line1,
Expand All @@ -378,7 +380,11 @@ def generate_etool_from_expr(
target: Union[cwl.CommandInputParameter, cwl.InputParameter],
no_inputs: bool = False,
self_type: Optional[
Union[cwl.InputParameter, cwl.CommandInputParameter]
Union[
cwl.InputParameter,
cwl.CommandInputParameter,
List[Union[cwl.InputParameter, cwl.CommandInputParameter]],
]
] = None, # if the "self" input should be a different type than the "result" output
extra_processes: Optional[
Sequence[Union[cwl.Workflow, cwl.WorkflowStep, cwl.CommandLineTool]]
Expand All @@ -389,18 +395,32 @@ def generate_etool_from_expr(
if not no_inputs:
if not self_type:
self_type = target
new_type = clean_type_ids(self_type.type)
if isinstance(self_type, list):
new_type: Union[
List[Union[cwl.ArraySchema, cwl.InputRecordSchema]],
Union[cwl.ArraySchema, cwl.InputRecordSchema],
] = [clean_type_ids(t.type) for t in self_type]
else:
new_type = clean_type_ids(self_type.type)
inputs.append(
cwl.InputParameter(
id="self",
label=self_type.label,
secondaryFiles=self_type.secondaryFiles,
streamable=self_type.streamable,
doc=self_type.doc,
format=self_type.format,
label=self_type.label if not isinstance(self_type, list) else None,
secondaryFiles=self_type.secondaryFiles
if not isinstance(self_type, list)
else None,
streamable=self_type.streamable
if not isinstance(self_type, list)
else None,
doc=self_type.doc if not isinstance(self_type, list) else None,
format=self_type.format if not isinstance(self_type, list) else None,
type=new_type,
extension_fields=self_type.extension_fields,
loadingOptions=self_type.loadingOptions,
extension_fields=self_type.extension_fields
if not isinstance(self_type, list)
else None,
loadingOptions=self_type.loadingOptions
if not isinstance(self_type, list)
else None,
)
)
outputs = yaml.comments.CommentedSeq()
Expand Down Expand Up @@ -606,33 +626,41 @@ def type_for_source(
process: Union[cwl.CommandLineTool, cwl.Workflow, cwl.ExpressionTool],
sourcenames: Union[str, List[str]],
parent: Optional[cwl.Workflow] = None,
) -> Any:
# TODO: if there are multiple source names, why don't we return a mixed type?
) -> Union[List[Any], Any]:
"""Determine the type for the given sourcenames."""
return param_for_source_id(process, sourcenames, parent).type
params = param_for_source_id(process, sourcenames, parent)
if not isinstance(params, list):
return params.type
new_type: List[Any] = []
for p in params:
if isinstance(p, str) and p not in new_type:
new_type.append(p)
elif hasattr(p, "type") and p.type not in new_type:
new_type.append(p.type)
return new_type


def param_for_source_id(
process: Union[cwl.CommandLineTool, cwl.Workflow, cwl.ExpressionTool],
sourcenames: Union[str, List[str]],
parent: Optional[cwl.Workflow] = None,
) -> Any:
) -> Union[List[cwl.InputParameter], cwl.InputParameter]:
"""Find the process input parameter that matches one of the given sourcenames."""
# TODO: if there are multiple source names, why don't we return multipe parameters?
if isinstance(sourcenames, str):
sourcenames = [sourcenames]
params: List[cwl.InputParameter] = []
for sourcename in sourcenames:
for param in process.inputs:
if param.id.split("#")[-1] == sourcename.split("#")[-1]:
return param
params.append(param)
targets = [process]
if parent:
targets.append(parent)
for target in targets:
if isinstance(target, cwl.Workflow):
for inp in target.inputs:
if inp.id.split("#")[-1] == sourcename.split("#")[-1]:
return inp
params.append(inp)
for step in target.steps:
if sourcename.split("/")[0] == step.id.split("#")[-1] and step.out:
for outp in step.out:
Expand All @@ -644,7 +672,11 @@ def param_for_source_id(
output.id.split("#")[-1]
== sourcename.split("/", 1)[1]
):
return output
params.append(output)
if len(params) == 1:
return params[0]
elif len(params) > 1:
return params
raise WorkflowException(
"param {} not found in {}\n or\n {}.".format(
sourcename,
Expand Down Expand Up @@ -676,8 +708,9 @@ def param_for_source_id(

def process_workflow_inputs_and_outputs(
workflow: cwl.Workflow, replace_etool: bool
) -> None:
) -> bool:
"""Do any needed conversions on the given Workflow's inputs and outputs."""
modified = False
inputs = empty_inputs(workflow)
for index, param in enumerate(workflow.inputs):
with SourceLine(workflow.inputs, index, WorkflowException):
Expand Down Expand Up @@ -705,6 +738,7 @@ def process_workflow_inputs_and_outputs(
"Entry {},".format(index)
+ TOPLEVEL_SF_EXPR_ERROR.format(param.id.split("#")[-1])
)
return modified


def process_workflow_reqs_and_hints(
Expand Down Expand Up @@ -1848,16 +1882,23 @@ def traverse_step(
)
)
else:
self.append(
example_input(type_for_source(parent, source).type)
)
scattered_source_type = type_for_source(parent, source)
if isinstance(scattered_source_type, list):
for stype in scattered_source_type:
self.append(example_input(stype.type))
else:
self.append(example_input(scattered_source_type.type))
else:
if not step.scatter:
self = example_input(
type_for_source(parent, inp.source.split("#")[-1])
)
else:
self = example_input(type_for_source(parent, inp.source).type)
scattered_source_type2 = type_for_source(parent, inp.source)
if isinstance(scattered_source_type2, list):
self = example_input(scattered_source_type2[0].type)
else:
self = example_input(scattered_source_type2.type)
expression = get_expression(inp.valueFrom, inputs, self)
if expression:
modified = True
Expand All @@ -1866,19 +1907,24 @@ def traverse_step(
if not target:
raise WorkflowException("target not found")
input_source_id = None
source_type = None
source_type: Optional[
Union[List[cwl.InputParameter], cwl.InputParameter]
] = None
if inp.source:
if isinstance(inp.source, MutableSequence):
input_source_id = []
source_types: List[cwl.InputParameter] = []
for source in inp.source:
source_id = source.split("#")[-1]
input_source_id.append(source_id)
temp_type = param_for_source_id(
step.run, source_id, parent
).type
if temp_type not in source_types:
source_types.append(temp_type)
temp_type = type_for_source(step.run, source_id, parent)
if isinstance(temp_type, list):
for ttype in temp_type:
if ttype not in source_types:
source_types.append(ttype)
else:
if temp_type not in source_types:
source_types.append(temp_type)
source_type = cwl.InputParameter(
id=None,
type=cwl.ArraySchema(source_types, "array"),
Expand Down Expand Up @@ -1955,9 +2001,15 @@ def workflow_step_to_InputParameters(
inp_id = inp.id.split("#")[-1].split("/")[-1]
if inp.source and inp_id != except_in_id:
param = copy.deepcopy(param_for_source_id(parent, sourcenames=inp.source))
param.id = inp_id
param.type = clean_type_ids(param.type)
params.append(param)
if isinstance(param, list):
for p in param:
p.id = inp_id
p.type = clean_type_ids(p.type)
params.append(p)
else:
param.id = inp_id
param.type = clean_type_ids(param.type)
params.append(param)
return params


Expand All @@ -1970,9 +2022,9 @@ def replace_step_valueFrom_expr_with_etool(
step_inp: cwl.WorkflowStepInput,
original_process: Union[cwl.CommandLineTool, cwl.ExpressionTool],
original_step_ins: List[cwl.WorkflowStepInput],
source: Union[str, List[Any]],
source: Union[str, List[str]],
replace_etool: bool,
source_type: Optional[Union[cwl.InputParameter, cwl.CommandInputParameter]] = None,
source_type: Optional[Union[cwl.InputParameter, List[cwl.InputParameter]]] = None,
) -> None:
"""Replace a WorkflowStep level 'valueFrom' expression with a sibling ExpressionTool step."""
step_inp_id = step_inp.id.split("/")[-1]
Expand Down Expand Up @@ -2066,8 +2118,10 @@ def traverse_workflow(
)
if step_modified:
modified = True
process_workflow_inputs_and_outputs(workflow, replace_etool)
process_workflow_reqs_and_hints(workflow, replace_etool)
if process_workflow_inputs_and_outputs(workflow, replace_etool):
modified = True
if process_workflow_reqs_and_hints(workflow, replace_etool):
modified = True
if workflow.requirements:
workflow.requirements[:] = [
x
Expand Down
Loading

0 comments on commit e257e7a

Please sign in to comment.