diff --git a/contrib/slurm-test/slurm_test.sh b/contrib/slurm-test/slurm_test.sh index 2bb011d529..5b1668ebdc 100755 --- a/contrib/slurm-test/slurm_test.sh +++ b/contrib/slurm-test/slurm_test.sh @@ -12,14 +12,13 @@ docker cp -L sort.py ${LEADER}:/home/admin docker cp fileToSort.txt ${LEADER}:/home/admin docker cp toil_workflow.py ${LEADER}:/home/admin GIT_COMMIT=$(git rev-parse HEAD) -docker exec ${LEADER} sudo apt install python3-pip -y -docker exec ${LEADER} pip3 install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}" +docker exec ${LEADER} python3.9 -m pip install "git+https://github.com/DataBiosphere/toil.git@${GIT_COMMIT}" docker exec ${LEADER} sinfo -N -l # Test 1: A really basic workflow to check Slurm is working correctly -docker exec ${LEADER} python3 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 --batchLogsDir ./nonexistent/paths +docker exec ${LEADER} python3.9 /home/admin/toil_workflow.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 --batchLogsDir ./nonexistent/paths docker cp ${LEADER}:/home/admin/output.txt output_Docker.txt # Test 2: Make sure that "sort" workflow runs under slurm -docker exec ${LEADER} python3 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 +docker exec ${LEADER} python3.9 /home/admin/sort.py file:my-job-store --batchSystem slurm --disableCaching --retryCount 0 docker cp ${LEADER}:/home/admin/sortedFile.txt sortedFile.txt docker compose stop ./check_out.sh diff --git a/docker/Dockerfile.py b/docker/Dockerfile.py index 219663cd32..8c0c25bcf1 100644 --- a/docker/Dockerfile.py +++ b/docker/Dockerfile.py @@ -110,11 +110,12 @@ def heredoc(s): # Find a repo with a Mesos build. # This one was archived like: # mkdir mesos-repo && cd mesos-repo - # wget --recursive --restrict-file-names=windows -k --convert-links --no-parent --page-requisites https://rpm.aventer.biz/Ubuntu/ https://www.aventer.biz/assets/support_aventer.asc https://rpm.aventer.biz/README.txt + # wget --recursive --restrict-file-names=windows -k --convert-links --no-parent --page-requisites -m https://rpm.aventer.biz/Ubuntu/ https://www.aventer.biz/assets/support_aventer.asc https://rpm.aventer.biz/README.txt # ipfs add -r . - RUN echo "deb https://public.gi.ucsc.edu/~anovak/outbox/toil/ipfs/QmeaErHzK4Dajz2mCMd36eUDQp7GX2bSECVRpGfrqdragR/rpm.aventer.biz/Ubuntu/focal focal main" \ + # It contains a GPG key that will expire 2026-09-28 + RUN echo "deb https://public.gi.ucsc.edu/~anovak/outbox/toil/ipfs/QmRXnGNiWk523zgNkuamENVkghMJ2zJtinVfgjHbc4Dcpr/rpm.aventer.biz/Ubuntu/focal focal main" \ > /etc/apt/sources.list.d/mesos.list \ - && curl https://public.gi.ucsc.edu/~anovak/outbox/toil/ipfs/QmeaErHzK4Dajz2mCMd36eUDQp7GX2bSECVRpGfrqdragR/www.aventer.biz/assets/support_aventer.asc | apt-key add - + && curl https://public.gi.ucsc.edu/~anovak/outbox/toil/ipfs/QmRXnGNiWk523zgNkuamENVkghMJ2zJtinVfgjHbc4Dcpr/www.aventer.biz/assets/support_aventer.asc | apt-key add - RUN apt-get -y update --fix-missing && \ DEBIAN_FRONTEND=noninteractive apt-get -y upgrade && \ diff --git a/requirements.txt b/requirements.txt index 85bac42914..4f302d5c70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ requests<=2.31.0 docker>=6.1.0, <8 urllib3>=1.26.0,<3 python-dateutil -psutil >= 3.0.1, < 7 +psutil >= 6.1.0, < 7 PyPubSub >=4.0.3, <5 addict>=2.2.1, <2.5 backports.zoneinfo[tzdata];python_version<"3.9" diff --git a/src/toil/common.py b/src/toil/common.py index 81ea6395df..393ce16f60 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -777,22 +777,39 @@ def check_arguments(typ: str) -> None: # if cwl is set, format the namespace for cwl and check that wdl options are not set on the command line if cwl: - parser.add_argument("cwltool", type=str, help="CWL file to run.") - parser.add_argument( - "cwljob", - nargs="*", - help="Input file or CWL options. If CWL workflow takes an input, " - "the name of the input can be used as an option. " - 'For example: "%(prog)s workflow.cwl --file1 file". ' - "If an input has the same name as a Toil option, pass '--' before it.", - ) + # So we can manually write out the help for this and the inputs + # file/workflow options in the argument parser description, we suppress + # help for this option. + parser.add_argument("cwltool", metavar="WORKFLOW", type=str, help=SUPPRESS) + # We also need a "cwljob" command line argument, holding possibly a + # positional input file and possibly a whole string of option flags + # only known to the workflow. + # + # We don't want to try and parse out the positional argument here + # since, on Python 3.12, we can grab what's really supposed to be an + # argument to a workflow-defined option. + # + # We don't want to use the undocumented argparse.REMAINDER, since that + # will eat any Toil-defined option flags after the first positional + # argument. + # + # So we just use parse_known_args and dump all unknown args into it, + # and manually write help text in the argparse description. So don't + # define it here. check_arguments(typ="cwl") # if wdl is set, format the namespace for wdl and check that cwl options are not set on the command line if wdl: parser.add_argument("wdl_uri", type=str, help="WDL document URI") + # We want to have an inputs_url that can be either a positional or a flag. + # We can't just have them share a single-item dest in Python 3.12; + # argparse does not guarantee that will work, and we can get the + # positional default value clobbering the flag. See + # . + # So we make them accumulate to the same list. + # Note that we will get a None in the list when there's no positional inputs. parser.add_argument( - "inputs_uri", type=str, nargs="?", help="WDL input JSON URI" + "inputs_uri", type=str, nargs='?', action="append", help="WDL input JSON URI" ) parser.add_argument( "--input", @@ -800,6 +817,7 @@ def check_arguments(typ: str) -> None: "-i", dest="inputs_uri", type=str, + action="append", help="WDL input JSON URI", ) check_arguments(typ="wdl") diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 3a57418da6..482f098bab 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -51,6 +51,8 @@ import cwltool.main import cwltool.resolver import schema_salad.ref_resolver +# This is also in configargparse but MyPy doesn't know it +from argparse import RawDescriptionHelpFormatter from configargparse import ArgParser, Namespace from cwltool.loghandler import _logger as cwllogger from cwltool.loghandler import defaultStreamHandler @@ -90,8 +92,9 @@ from toil.batchSystems.abstractBatchSystem import InsufficientSystemResources from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM -from toil.common import Toil, addOptions +from toil.common import Config, Toil, addOptions from toil.cwl import check_cwltool_version +from toil.lib.integration import resolve_workflow from toil.lib.misc import call_command from toil.provisioners.clusterScaler import JobTooBigError @@ -3525,7 +3528,7 @@ def import_workflow_inputs( # We cast this because import_file is overloaded depending on if we # pass a shared file name or not, and we know the way we call it we # always get a FileID out. - file_import_function = cast( + input_import_function = cast( Callable[[str], FileID], functools.partial(jobstore.import_file, symlink=True), ) @@ -3535,7 +3538,7 @@ def import_workflow_inputs( logger.info("Importing input files...") fs_access = ToilFsAccess(options.basedir) import_files( - file_import_function, + input_import_function, fs_access, fileindex, existing, @@ -3545,6 +3548,15 @@ def import_workflow_inputs( bypass_file_store=options.bypass_file_store, log_level=logging.INFO, ) + + # Make another function for importing tool files. This one doesn't allow + # symlinking, since the tools might be coming from storage not accessible + # to all nodes. + tool_import_function = cast( + Callable[[str], FileID], + functools.partial(jobstore.import_file, symlink=False), + ) + # Import all the files associated with tools (binaries, etc.). # Not sure why you would have an optional secondary file here, but # the spec probably needs us to support them. @@ -3553,7 +3565,7 @@ def import_workflow_inputs( tool, functools.partial( import_files, - file_import_function, + tool_import_function, fs_access, fileindex, existing, @@ -3829,17 +3841,18 @@ def generate_default_job_store( usage_message = "\n\n" + textwrap.dedent( """ - * All positional arguments [cwl, yml_or_json] must always be specified last for toil-cwl-runner. - Note: If you're trying to specify a jobstore, please use --jobStore. - - Usage: toil-cwl-runner [options] example.cwl example-job.yaml - Example: toil-cwl-runner \\ - --jobStore aws:us-west-2:jobstore \\ - --realTimeLogging \\ - --logInfo \\ - example.cwl \\ - example-job.yaml - """[ + NOTE: If you're trying to specify a jobstore, you must use --jobStore, not a positional argument. + + Usage: toil-cwl-runner [options] [] [workflow options] + + Example: toil-cwl-runner \\ + --jobStore aws:us-west-2:jobstore \\ + --realTimeLogging \\ + --logInfo \\ + example.cwl \\ + example-job.yaml \\ + --wf_input="hello world" + """[ 1: ] ) @@ -3851,11 +3864,34 @@ def get_options(args: list[str]) -> Namespace: :param args: List of args from command line :return: options namespace """ - parser = ArgParser() + # We can't allow abbreviations in case the workflow defines an option that + # is a prefix of a Toil option. + parser = ArgParser( + allow_abbrev=False, + usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]", + description=textwrap.dedent(""" + positional arguments: + + WORKFLOW CWL file to run. + + INFILE YAML or JSON file of workflow inputs. + + WF_OPTIONS Additional inputs to the workflow as command-line + flags. If CWL workflow takes an input, the name of the + input can be used as an option. For example: + + %(prog)s workflow.cwl --file1 file + + If an input has the same name as a Toil option, pass + '--' before it. + """), + formatter_class=RawDescriptionHelpFormatter, + ) + addOptions(parser, jobstore_as_flag=True, cwl=True) options: Namespace - options, cwl_options = parser.parse_known_args(args) - options.cwljob.extend(cwl_options) + options, extra = parser.parse_known_args(args) + options.cwljob = extra return options @@ -3987,177 +4023,190 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: runtime_context.research_obj = research_obj try: - with Toil(options) as toil: - if options.restart: - outobj = toil.restart() - else: - # TODO: why are we doing this? Does this get applied to all - # tools as a default or something? - loading_context.hints = [ - { - "class": "ResourceRequirement", - "coresMin": toil.config.defaultCores, - # Don't include any RAM requirement because we want to - # know when tools don't manually ask for RAM. - "outdirMin": toil.config.defaultDisk / (2**20), - "tmpdirMin": 0, - } - ] - loading_context.construct_tool_object = toil_make_tool - loading_context.strict = not options.not_strict - options.workflow = options.cwltool - options.job_order = options.cwljob - try: - uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri( - options.cwltool, - loading_context.resolver, - loading_context.fetcher_constructor, - ) - except ValidationException: - print( - "\nYou may be getting this error because your arguments are incorrect or out of order." - + usage_message, - file=sys.stderr, - ) - raise + if not options.restart: + # Make a version of the config based on the initial options, for + # setting up CWL option stuff + expected_config = Config() + expected_config.setOptions(options) - # Attempt to prepull the containers - if not options.no_prepull: - if not options.enable_ext: - # The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled - # See https://github.com/common-workflow-language/cwl-utils/issues/309 - try_prepull(uri, runtime_context, toil.config.batchSystem) - else: - logger.debug( - "Not prepulling containers as cwltool extensions are not supported." - ) + # Before showing the options to any cwltool stuff that wants to + # load the workflow, transform options.cwltool, where our + # argument for what to run is, to handle Dockstore workflows. + options.cwltool = resolve_workflow(options.cwltool) - options.tool_help = None - options.debug = options.logLevel == "DEBUG" - job_order_object, options.basedir, jobloader = ( - cwltool.main.load_job_order( - options, - sys.stdin, - loading_context.fetcher_constructor, - loading_context.overrides_list, - tool_file_uri, - ) + # TODO: why are we doing this? Does this get applied to all + # tools as a default or something? + loading_context.hints = [ + { + "class": "ResourceRequirement", + "coresMin": expected_config.defaultCores, + # Don't include any RAM requirement because we want to + # know when tools don't manually ask for RAM. + "outdirMin": expected_config.defaultDisk / (2**20), + "tmpdirMin": 0, + } + ] + loading_context.construct_tool_object = toil_make_tool + loading_context.strict = not options.not_strict + options.workflow = options.cwltool + options.job_order = options.cwljob + + try: + uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri( + options.cwltool, + loading_context.resolver, + loading_context.fetcher_constructor, ) - if options.overrides: - loading_context.overrides_list.extend( - cwltool.load_tool.load_overrides( - schema_salad.ref_resolver.file_uri( - os.path.abspath(options.overrides) - ), - tool_file_uri, - ) + except ValidationException: + print( + "\nYou may be getting this error because your arguments are incorrect or out of order." + + usage_message, + file=sys.stderr, + ) + raise + + # Attempt to prepull the containers + if not options.no_prepull: + if not options.enable_ext: + # The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled + # See https://github.com/common-workflow-language/cwl-utils/issues/309 + try_prepull(uri, runtime_context, expected_config.batchSystem) + else: + logger.debug( + "Not prepulling containers as cwltool extensions are not supported." ) - loading_context, workflowobj, uri = cwltool.load_tool.fetch_document( - uri, loading_context + options.tool_help = None + options.debug = options.logLevel == "DEBUG" + job_order_object, options.basedir, jobloader = ( + cwltool.main.load_job_order( + options, + sys.stdin, + loading_context.fetcher_constructor, + loading_context.overrides_list, + tool_file_uri, + ) + ) + if options.overrides: + loading_context.overrides_list.extend( + cwltool.load_tool.load_overrides( + schema_salad.ref_resolver.file_uri( + os.path.abspath(options.overrides) + ), + tool_file_uri, + ) ) - loading_context, uri = cwltool.load_tool.resolve_and_validate_document( - loading_context, workflowobj, uri + + loading_context, workflowobj, uri = cwltool.load_tool.fetch_document( + uri, loading_context + ) + loading_context, uri = cwltool.load_tool.resolve_and_validate_document( + loading_context, workflowobj, uri + ) + if not loading_context.loader: + raise RuntimeError("cwltool loader is not set.") + processobj, metadata = loading_context.loader.resolve_ref(uri) + processobj = cast(Union[CommentedMap, CommentedSeq], processobj) + + document_loader = loading_context.loader + + if options.provenance and runtime_context.research_obj: + cwltool.cwlprov.writablebagfile.packed_workflow( + runtime_context.research_obj, + cwltool.main.print_pack(loading_context, uri), ) - if not loading_context.loader: - raise RuntimeError("cwltool loader is not set.") - processobj, metadata = loading_context.loader.resolve_ref(uri) - processobj = cast(Union[CommentedMap, CommentedSeq], processobj) - document_loader = loading_context.loader + try: + tool = cwltool.load_tool.make_tool(uri, loading_context) + scan_for_unsupported_requirements( + tool, bypass_file_store=options.bypass_file_store + ) + except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: + logging.error(err) + return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE + runtime_context.secret_store = SecretStore() - if options.provenance and runtime_context.research_obj: - cwltool.cwlprov.writablebagfile.packed_workflow( - runtime_context.research_obj, - cwltool.main.print_pack(loading_context, uri), + try: + # Get the "order" for the execution of the root job. CWLTool + # doesn't document this much, but this is an "order" in the + # sense of a "specification" for running a single job. It + # describes the inputs to the workflow. + initialized_job_order = cwltool.main.init_job_order( + job_order_object, + options, + tool, + jobloader, + sys.stdout, + make_fs_access=runtime_context.make_fs_access, + input_basedir=options.basedir, + secret_store=runtime_context.secret_store, + input_required=True, + ) + except SystemExit as err: + if err.code == 2: # raised by argparse's parse_args() function + print( + "\nIf both a CWL file and an input object (YAML/JSON) file were " + "provided, the problem may be the argument order." + usage_message, + file=sys.stderr, ) + raise - try: - tool = cwltool.load_tool.make_tool(uri, loading_context) - scan_for_unsupported_requirements( - tool, bypass_file_store=options.bypass_file_store - ) - except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: - logging.error(err) - return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE - runtime_context.secret_store = SecretStore() + # Leave the defaults un-filled in the top-level order. The tool or + # workflow will fill them when it runs - try: - # Get the "order" for the execution of the root job. CWLTool - # doesn't document this much, but this is an "order" in the - # sense of a "specification" for running a single job. It - # describes the inputs to the workflow. - initialized_job_order = cwltool.main.init_job_order( - job_order_object, - options, - tool, - jobloader, - sys.stdout, - make_fs_access=runtime_context.make_fs_access, - input_basedir=options.basedir, - secret_store=runtime_context.secret_store, - input_required=True, - ) - except SystemExit as err: - if err.code == 2: # raised by argparse's parse_args() function - print( - "\nIf both a CWL file and an input object (YAML/JSON) file were " - "provided, this may be the argument order." + usage_message, - file=sys.stderr, - ) - raise + for inp in tool.tool["inputs"]: + if ( + shortname(inp["id"]) in initialized_job_order + and inp["type"] == "File" + ): + cast( + CWLObjectType, initialized_job_order[shortname(inp["id"])] + )["streamable"] = inp.get("streamable", False) + # TODO also for nested types that contain streamable Files + + runtime_context.use_container = not options.no_container + runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix) + runtime_context.job_script_provider = job_script_provider + runtime_context.force_docker_pull = options.force_docker_pull + runtime_context.no_match_user = options.no_match_user + runtime_context.no_read_only = options.no_read_only + runtime_context.basedir = options.basedir + if not options.bypass_file_store: + # If we're using the file store we need to start moving output + # files now. + runtime_context.move_outputs = "move" + + # We instantiate an early builder object here to populate indirect + # secondaryFile references using cwltool's library because we need + # to resolve them before toil imports them into the filestore. + # A second builder will be built in the job's run method when toil + # actually starts the cwl job. + # Note that this accesses input files for tools, so the + # ToilFsAccess needs to be set up if we want to be able to use + # URLs. + builder = tool._init_job(initialized_job_order, runtime_context) + + # make sure this doesn't add listing items; if shallow_listing is + # selected, it will discover dirs one deep and then again later on + # (probably when the cwltool builder gets ahold of the job in the + # CWL job's run()), producing 2+ deep listings instead of only 1. + builder.loadListing = "no_listing" + + builder.bind_input( + tool.inputs_record_schema, + initialized_job_order, + discover_secondaryFiles=True, + ) - # Leave the defaults un-filled in the top-level order. The tool or - # workflow will fill them when it runs - - for inp in tool.tool["inputs"]: - if ( - shortname(inp["id"]) in initialized_job_order - and inp["type"] == "File" - ): - cast( - CWLObjectType, initialized_job_order[shortname(inp["id"])] - )["streamable"] = inp.get("streamable", False) - # TODO also for nested types that contain streamable Files - - runtime_context.use_container = not options.no_container - runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix) - runtime_context.job_script_provider = job_script_provider - runtime_context.force_docker_pull = options.force_docker_pull - runtime_context.no_match_user = options.no_match_user - runtime_context.no_read_only = options.no_read_only - runtime_context.basedir = options.basedir - if not options.bypass_file_store: - # If we're using the file store we need to start moving output - # files now. - runtime_context.move_outputs = "move" - - # We instantiate an early builder object here to populate indirect - # secondaryFile references using cwltool's library because we need - # to resolve them before toil imports them into the filestore. - # A second builder will be built in the job's run method when toil - # actually starts the cwl job. - # Note that this accesses input files for tools, so the - # ToilFsAccess needs to be set up if we want to be able to use - # URLs. - builder = tool._init_job(initialized_job_order, runtime_context) - - # make sure this doesn't add listing items; if shallow_listing is - # selected, it will discover dirs one deep and then again later on - # (probably when the cwltool builder gets ahold of the job in the - # CWL job's run()), producing 2+ deep listings instead of only 1. - builder.loadListing = "no_listing" - - builder.bind_input( - tool.inputs_record_schema, - initialized_job_order, - discover_secondaryFiles=True, - ) + logger.info("Creating root job") + logger.debug("Root tool: %s", tool) + tool = remove_pickle_problems(tool) - logger.info("Creating root job") - logger.debug("Root tool: %s", tool) - tool = remove_pickle_problems(tool) + with Toil(options) as toil: + if options.restart: + outobj = toil.restart() + else: try: wf1 = makeRootJob( tool=tool, diff --git a/src/toil/lib/integration.py b/src/toil/lib/integration.py new file mode 100644 index 0000000000..d05b39b11e --- /dev/null +++ b/src/toil/lib/integration.py @@ -0,0 +1,341 @@ +# Copyright (C) 2024 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Contains functions for integrating Toil with external services such as +Dockstore. +""" + +import hashlib +import logging +import os +import shutil +import sys +import tempfile +import zipfile +from typing import Any, Dict, List, Optional, Set, Tuple, cast + +from urllib.parse import urlparse, unquote, quote +import requests + +from toil.lib.retry import retry +from toil.lib.io import file_digest, robust_rmtree +from toil.version import baseVersion + +logger = logging.getLogger(__name__) + +# We manage a Requests session at the module level in case we're supposed to be +# doing cookies, and to send a sensible user agent. +# We expect the Toil and Python version to not be personally identifiable even +# in theory (someone might make a new Toil version first, buit there's no way +# to know for sure that nobody else did the same thing). +session = requests.Session() +session.headers.update({"User-Agent": f"Toil {baseVersion} on Python {'.'.join([str(v) for v in sys.version_info])}"}) + +def is_dockstore_workflow(workflow: str) -> bool: + """ + Returns True if a workflow string smells Dockstore-y. + + Detects Dockstore page URLs and strings that could be Dockstore TRS IDs. + """ + + return workflow.startswith("https://dockstore.org/workflows/") or workflow.startswith("#workflow/") + +def find_trs_spec(workflow: str) -> str: + """ + Parse a Dockstore workflow URL or TSR ID to a string that is definitely a TRS ID. + """ + + if workflow.startswith("#workflow/"): + # Looks like a Dockstore TRS ID already. + # TODO: Does Dockstore guartantee we can recognize its TRS IDs like this? + logger.debug("Workflow %s is a TRS specifier already", workflow) + trs_spec = workflow + else: + # We need to get the right TRS ID from the Docstore URL + parsed = urlparse(workflow) + # TODO: We assume the Docksotre page URL structure and the TRS IDs are basically the same. + page_path = unquote(parsed.path) + if not page_path.startswith("/workflows/"): + raise RuntimeError("Cannot parse Dockstore URL " + workflow) + trs_spec = "#workflow/" + page_path[len("/workflows/"):] + logger.debug("Translated %s to TRS: %s", workflow, trs_spec) + + return trs_spec + +def parse_trs_spec(trs_spec: str) -> tuple[str, Optional[str]]: + """ + Parse a TRS ID to workflow and optional version. + """ + parts = trs_spec.split(':', 1) + trs_workflow_id = parts[0] + if len(parts) > 1: + # The ID has the version we want after a colon + trs_version = parts[1] + else: + # We don't know the version we want, we will have to pick one somehow. + trs_version = None + return trs_workflow_id, trs_version + +@retry(errors=[requests.exceptions.ConnectionError]) +def get_workflow_root_from_dockstore(workflow: str, supported_languages: Optional[set[str]] = None) -> str: + """ + Given a Dockstore URL or TRS identifier, get the root WDL or CWL URL for the workflow. + + Accepts inputs like: + + - https://dockstore.org/workflows/github.com/dockstore-testing/md5sum-checker:master?tab=info + - #workflow/github.com/dockstore-testing/md5sum-checker + + Assumes the input is actually one of the supported formats. See is_dockstore_workflow(). + + TODO: Needs to handle multi-workflow files if Dockstore can. + + """ + + if supported_languages is not None and len(supported_languages) == 0: + raise ValueError("Set of supported languages must be nonempty if provided.") + + # Get the TRS id[:version] string from what might be a Dockstore URL + trs_spec = find_trs_spec(workflow) + # Parse out workflow and possible version + trs_workflow_id, trs_version = parse_trs_spec(trs_spec) + + logger.debug("TRS %s parses to workflow %s and version %s", trs_spec, trs_workflow_id, trs_version) + + # Fetch the main TRS document. + # See e.g. https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fdockstore-testing%2Fmd5sum-checker + trs_workflow_url = f"https://dockstore.org/api/ga4gh/trs/v2/tools/{quote(trs_workflow_id, safe='')}" + trs_workflow_document = session.get(trs_workflow_url).json() + + # Make a map from version to version info. We will need the + # "descriptor_type" array to find eligible languages, and the "url" field + # to get the version's base URL. + workflow_versions: dict[str, dict[str, Any]] = {} + + # We also check which we actually know how to run + eligible_workflow_versions: set[str] = set() + + for version_info in trs_workflow_document.get("versions", []): + version_name: str = version_info["name"] + workflow_versions[version_name] = version_info + version_languages: list[str] = version_info["descriptor_type"] + if supported_languages is not None: + # Filter to versions that have a language we know + has_supported_language = False + for language in version_languages: + if language in supported_languages: + # TODO: Also use "descriptor_type_version" dict to make + # sure we support all needed language versions to actually + # use this workflow version. + has_supported_language = True + continue + if not has_supported_language: + # Can't actually run this one. + continue + eligible_workflow_versions.add(version_name) + + for default_version in ['main', 'master']: + if trs_version is None and default_version in eligible_workflow_versions: + # Fill in a version if the user didn't provide one. + trs_version = default_version + logger.debug("Defaulting to workflow version %s", default_version) + break + + if trs_version is None and len(eligible_workflow_versions) == 1: + # If there's just one version use that. + trs_version = next(iter(eligible_workflow_versions)) + logger.debug("Defaulting to only eligible workflow version %s", trs_version) + + + # If we don't like what we found we compose a useful error message. + problems: list[str] = [] + if trs_version is None: + problems.append(f"Workflow {workflow} does not specify a version") + elif trs_version not in workflow_versions: + problems.append(f"Workflow version {trs_version} from {workflow} does not exist") + elif trs_version not in eligible_workflow_versions: + message = f"Workflow version {trs_version} from {workflow} is not available" + if supported_languages is not None: + message += f" in any of: {', '.join(supported_languages)}" + problems.append(message) + if len(problems) > 0: + if len(eligible_workflow_versions) == 0: + message = "No versions of the workflow are available" + if supported_languages is not None: + message += f" in any of: {', '.join(supported_languages)}" + problems.append(message) + elif trs_version is None: + problems.append(f"Add ':' and the name of a workflow version ({', '.join(eligible_workflow_versions)}) after '{trs_workflow_id}'") + else: + problems.append(f"Replace '{trs_version}' with one of ({', '.join(eligible_workflow_versions)})") + raise RuntimeError("; ".join(problems)) + + # Tell MyPy we now have a version, or we would have raised + assert trs_version is not None + + # Select the language we will actually run + chosen_version_languages: list[str] = workflow_versions[trs_version]["descriptor_type"] + for candidate_language in chosen_version_languages: + if supported_languages is None or candidate_language in supported_languages: + language = candidate_language + + logger.debug("Going to use %s version %s in %s", trs_workflow_id, trs_version, language) + trs_version_url = workflow_versions[trs_version]["url"] + + # Fetch the list of all the files + trs_files_url = f"{trs_version_url}/{language}/files" + logger.debug("Workflow files URL: %s", trs_files_url) + trs_files_document = session.get(trs_files_url).json() + + # Find the information we need to ID the primary descriptor file + primary_descriptor_path: Optional[str] = None + primary_descriptor_hash_algorithm: Optional[str] = None + primary_descriptor_hash: Optional[str] = None + for file_info in trs_files_document: + if file_info["file_type"] == "PRIMARY_DESCRIPTOR": + primary_descriptor_path = file_info["path"] + primary_descriptor_hash_algorithm = file_info["checksum"]["type"] + primary_descriptor_hash = file_info["checksum"]["checksum"] + break + if primary_descriptor_path is None or primary_descriptor_hash is None or primary_descriptor_hash_algorithm is None: + raise RuntimeError("Could not find a primary descriptor file for the workflow") + primary_descriptor_basename = os.path.basename(primary_descriptor_path) + + # Work out how to compute the hash we are looking for. See + # + # for the GA4GH names and + # for the Python names. + # + # TODO: We don't support the various truncated hash flavors or the other checksums not in hashlib. + python_hash_name = primary_descriptor_hash_algorithm.replace("sha-", "sha").replace("blake2b-512", "blake2b").replace("-", "_") + if python_hash_name not in hashlib.algorithms_available: + raise RuntimeError(f"Primary descriptor is identified by a {primary_descriptor_hash_algorithm} hash but {python_hash_name} is not available in hashlib") + + # Figure out where to store the workflow. We don't want to deal with temp + # dir cleanup since we don't want to run the whole workflow setup and + # execution in a context manager. So we declare a cache. + # Note that it's still not safe to symlink out of this cache since XDG + # cache directories aren't guaranteed to be on shared storage. + cache_base_dir = os.path.join(os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache")), "toil/workflows") + + # Hash the workflow file list. + hasher = hashlib.sha256() + for file_info in sorted(trs_files_document, key=lambda rec: rec["path"]): + hasher.update(file_info["path"].encode("utf-8")) + hasher.update(b"\0") + hasher.update(file_info["checksum"]["type"].encode("utf-8")) + hasher.update(b"\0") + hasher.update(file_info["checksum"]["checksum"].encode("utf-8")) + hasher.update(b"\0") + cache_workflow_dir = os.path.join(cache_base_dir, hasher.hexdigest()) + + if os.path.exists(cache_workflow_dir): + logger.debug("Workflow already cached at %s", cache_workflow_dir) + else: + # Need to download the workflow + + # Download the ZIP to a temporary file + trs_zip_file_url = f"{trs_files_url}?format=zip" + logger.debug("Workflow ZIP URL: %s", trs_zip_file_url) + with tempfile.NamedTemporaryFile(suffix=".zip") as zip_file: + # We want to stream the zip to a file, but when we do it with the Requests + # file object like we don't get + # Requests' decoding of gzip or deflate response encodings. Since this file + # is already compressed the response compression can't help a lot anyway, + # so we tell the server that we can't understand it. + headers = { + "Accept-Encoding": "identity", + # Help Dockstore avoid serving ZIP with a JSON content type. See + # . + "Accept": "application/zip" + } + # If we don't set stream=True, we can't actually read anything from the + # raw stream, since Requests will have done it already. + with session.get(trs_zip_file_url, headers=headers, stream=True) as response: + response_content_length = response.headers.get("Content-Length") + logger.debug("Server reports content length: %s", response_content_length) + shutil.copyfileobj(response.raw, zip_file) + zip_file.flush() + + logger.debug("Downloaded ZIP to %s", zip_file.name) + + # Unzip it to a directory next to where it will live + os.makedirs(cache_base_dir, exist_ok=True) + workflow_temp_dir = tempfile.mkdtemp(dir=cache_base_dir) + with zipfile.ZipFile(zip_file.name, "r") as zip_ref: + zip_ref.extractall(workflow_temp_dir) + logger.debug("Extracted workflow ZIP to %s", workflow_temp_dir) + + # Try to atomically install into the cache + try: + os.rename(workflow_temp_dir, cache_workflow_dir) + logger.debug("Moved workflow to %s", cache_workflow_dir) + except OSError: + # Collision. Someone else installed the workflow before we could. + robust_rmtree(workflow_temp_dir) + logger.debug("Workflow cached at %s by someone else while we were donwloading it", cache_workflow_dir) + + # Hunt throught he directory for a file with the right basename and hash + found_path: Optional[str] = None + for containing_dir, subdirectories, files in os.walk(cache_workflow_dir): + for filename in files: + if filename == primary_descriptor_basename: + # This could be it. Open the file off disk and hash it with the right algorithm. + file_path = os.path.join(containing_dir, filename) + file_hash = file_digest(open(file_path, "rb"), python_hash_name).hexdigest() + if file_hash == primary_descriptor_hash: + # This looks like the right file + logger.debug("Found candidate primary descriptor %s", file_path) + if found_path is not None: + # But there are multiple instances of it so we can't know which to run. + # TODO: Find out the right path from Dockstore somehow! + raise RuntimeError(f"Workflow contains multiple files named {primary_descriptor_basename} with {python_hash_name} hash {file_hash}: {found_path} and {file_path}") + # This is the first file with the right name and hash + found_path = file_path + else: + logger.debug("Rejected %s because its %s hash %s is not %s", file_path, python_hash_name, file_hash, primary_descriptor_hash) + if found_path is None: + # We couldn't find the promised primary descriptor + raise RuntimeError(f"Could not find a {primary_descriptor_basename} with {primary_descriptor_hash_algorithm} hash {primary_descriptor_hash}") + + return found_path + +def resolve_workflow(workflow: str, supported_languages: Optional[set[str]] = None) -> str: + """ + Find the real workflow URL or filename from a command line argument. + + Transform a workflow URL or path that might actually be a Dockstore page + URL or TRS specifier to an actual URL or path to a workflow document. + """ + + if is_dockstore_workflow(workflow): + # Ask Dockstore where to find Dockstore-y things + resolved = get_workflow_root_from_dockstore(workflow, supported_languages=supported_languages) + logger.info("Resolved Dockstore workflow %s to %s", workflow, resolved) + return resolved + else: + # Pass other things through. + return workflow + + + + + + + + + + + diff --git a/src/toil/lib/io.py b/src/toil/lib/io.py index fbf54668db..9bd74efa55 100644 --- a/src/toil/lib/io.py +++ b/src/toil/lib/io.py @@ -1,13 +1,15 @@ +import hashlib import logging import os import shutil import stat +import sys import tempfile import uuid from collections.abc import Iterator from contextlib import contextmanager from io import BytesIO -from typing import IO, Any, Callable, Optional, Union +from typing import IO, Any, Callable, Optional, Protocol, Union logger = logging.getLogger(__name__) @@ -308,3 +310,31 @@ def close(self): """ self.backingStream.close() + +class ReadableFileObj(Protocol): + """ + Protocol that is more specific than what file_digest takes as an argument. + Also guarantees a read() method. + Would extend the protocol from Typeshed for hashlib but those are only + declared for 3.11+. + """ + def readinto(self, buf: bytearray, /) -> int: ... + def readable(self) -> bool: ... + def read(self, number: int) -> bytes: ... + +# hashlib._Hash seems to not appear at runtime +def file_digest(f: ReadableFileObj, alg_name: str) -> "hashlib._Hash": + """ + Polyfilled hashlib.file_digest that works on Python <3.11. + """ + if sys.version_info >= (3, 11): + return hashlib.file_digest(f, alg_name) + BUFFER_SIZE = 1024 * 1024 + hasher = hashlib.new(alg_name) + buffer = f.read(BUFFER_SIZE) + while buffer: + hasher.update(buffer) + buffer = f.read(BUFFER_SIZE) + return hasher + + diff --git a/src/toil/lib/threading.py b/src/toil/lib/threading.py index 27a74d7a17..e24c7a619a 100644 --- a/src/toil/lib/threading.py +++ b/src/toil/lib/threading.py @@ -252,7 +252,7 @@ def cpu_count() -> int: return cast(int, cached) # Get the fallback answer of all the CPUs on the machine - psutil_cpu_count = cast(Optional[int], psutil.cpu_count(logical=True)) + psutil_cpu_count = psutil.cpu_count(logical=True) if psutil_cpu_count is None: logger.debug("Could not retrieve the logical CPU count.") diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 4711923ae2..8f471570b7 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -61,6 +61,7 @@ needs_lsf, needs_mesos, needs_online, + needs_singularity_or_docker, needs_slurm, needs_torque, needs_wes_server, @@ -433,6 +434,24 @@ def test_run_colon_output(self) -> None: self._expected_colon_output(self.outDir), out_name="result", ) + + @pytest.mark.integrative + @needs_singularity_or_docker + def test_run_dockstore_trs(self) -> None: + from toil.cwl import cwltoil + + stdout = StringIO() + main_args = [ + "--outdir", + self.outDir, + "#workflow/github.com/dockstore-testing/md5sum-checker", + "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/refs/heads/master/md5sum/md5sum-input-cwl.json" + ] + cwltoil.main(main_args, stdout=stdout) + out = json.loads(stdout.getvalue()) + with open(out.get("output_file", {}).get("location")[len("file://") :]) as f: + computed_hash = f.read().strip() + self.assertEqual(computed_hash, "00579a00e3e7fa0674428ac7049423e2") def test_glob_dir_bypass_file_store(self) -> None: self.maxDiff = 1000 @@ -1514,6 +1533,8 @@ def test_workflow_echo_string_scatter_capture_stdout() -> None: cmd = [toil, jobstore, option_1, option_2, option_3, cwl] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() + log.debug("Workflow standard output: %s", stdout) + assert len(stdout) > 0 outputs = json.loads(stdout) out_list = outputs["list_out"] assert len(out_list) == 2, f"outList shoud have two file elements {out_list}" diff --git a/src/toil/test/lib/test_integration.py b/src/toil/test/lib/test_integration.py new file mode 100644 index 0000000000..ede8da3ee2 --- /dev/null +++ b/src/toil/test/lib/test_integration.py @@ -0,0 +1,104 @@ +# Copyright (C) 2015-2024 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import logging +import pytest +from typing import IO +import urllib.request +from urllib.error import URLError + +from toil.lib.retry import retry +from toil.lib.integration import get_workflow_root_from_dockstore +from toil.test import ToilTest, needs_online + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +@pytest.mark.integrative +@needs_online +class DockstoreLookupTest(ToilTest): + """ + Make sure we can look up workflows on Dockstore. + """ + + @retry(errors=[URLError, RuntimeError]) + def read_result(self, url_or_path: str) -> IO[bytes]: + """ + Read a file or URL. + + Binary mode to allow testing for binary file support. + + This lets us test that we have the right workflow contents and not care + how we are being shown them. + """ + if url_or_path.startswith("http://") or url_or_path.startswith("https://"): + response = urllib.request.urlopen(url_or_path) + if response.status != 200: + raise RuntimeError(f"HTTP error response: {response}") + return response + else: + return open(url_or_path, "rb") + + # TODO: Tests that definitely test a clear cache + + def test_lookup_from_page_url(self) -> None: + PAGE_URL = "https://dockstore.org/workflows/github.com/dockstore/bcc2020-training/HelloWorld:master?tab=info" + # If we go in through the website we get an extra refs/heads/ on the branch name. + WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore/bcc2020-training/master/wdl-training/exercise1/HelloWorld.wdl" + looked_up = get_workflow_root_from_dockstore(PAGE_URL) + + data_from_lookup = self.read_result(looked_up).read() + data_from_source = self.read_result(WORKFLOW_URL).read() + self.assertEqual(data_from_lookup, data_from_source) + + def test_lookup_from_trs(self) -> None: + TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker" + # Despite "-checker" in the ID, this actually refers to the base md5sum + # workflow that just happens to have a checker *available*, not to the + # checker workflow itself. + WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/master/md5sum/md5sum-workflow.cwl" + looked_up = get_workflow_root_from_dockstore(TRS_ID) + + data_from_lookup = self.read_result(looked_up).read() + data_from_source = self.read_result(WORKFLOW_URL).read() + self.assertEqual(data_from_lookup, data_from_source) + + def test_lookup_from_trs_cached(self) -> None: + TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker" + WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/master/md5sum/md5sum-workflow.cwl" + # This lookup may or may not be cached + get_workflow_root_from_dockstore(TRS_ID) + # This lookup is definitely cached + looked_up = get_workflow_root_from_dockstore(TRS_ID) + + data_from_lookup = self.read_result(looked_up).read() + data_from_source = self.read_result(WORKFLOW_URL).read() + self.assertEqual(data_from_lookup, data_from_source) + + def test_lookup_from_trs_with_version(self) -> None: + TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker:workflowWithHTTPImport" + WORKFLOW_URL = "https://raw.githubusercontent.com/dockstore-testing/md5sum-checker/workflowWithHTTPImport/md5sum/md5sum-workflow.cwl" + looked_up = get_workflow_root_from_dockstore(TRS_ID) + + data_from_lookup = self.read_result(looked_up).read() + data_from_source = self.read_result(WORKFLOW_URL).read() + self.assertEqual(data_from_lookup, data_from_source) + + def test_lookup_from_trs_nonexistent_version(self) -> None: + TRS_ID = "#workflow/github.com/dockstore-testing/md5sum-checker:notARealVersion" + with self.assertRaises(RuntimeError): + looked_up = get_workflow_root_from_dockstore(TRS_ID) + + diff --git a/src/toil/test/src/jobServiceTest.py b/src/toil/test/src/jobServiceTest.py index 732b591444..21a540c4e7 100644 --- a/src/toil/test/src/jobServiceTest.py +++ b/src/toil/test/src/jobServiceTest.py @@ -153,18 +153,41 @@ def testServiceRecursive(self, checkpoint=True): SingleMachineBatchSystem.numCores < 4, "Need at least four cores to run this test", ) - @retry_flaky_test(prepare=[ToilTest.tearDown, ToilTest.setUp]) @pytest.mark.timeout(1200) def testServiceParallelRecursive(self, checkpoint=True): """ Tests the creation of a Job.Service, creating parallel chains of services and accessing jobs. Randomly fails the worker. """ + + # This test needs to have something like 10 jobs succeed + + BUNDLE_SIZE = 3 + BUNDLE_COUNT = 2 + RETRY_COUNT = 4 + FAIL_FRACTION = 0.5 + MAX_ATTEMPTS = 10 + + total_jobs = BUNDLE_SIZE * BUNDLE_COUNT * 2 + 1 + p_complete_job_failure = FAIL_FRACTION ** (RETRY_COUNT + 1) + p_workflow_success = (1 - p_complete_job_failure) ** total_jobs + logger.info("Going to run %s total jobs, each of which completely fails %s of the time, so the workflow will succeed with probability %s", total_jobs, p_complete_job_failure, p_workflow_success) + p_test_failure = (1 - p_workflow_success) ** MAX_ATTEMPTS + logger.info("This test will fail spuriously with probability %s", p_test_failure) + + # We want to run the workflow through several times to test restarting, so we need it to often fail but reliably sometimes succeed, and almost always succeed when repeated. + + self.assertGreater(0.8, p_workflow_success) + self.assertGreater(p_workflow_success, 0.2) + self.assertGreater(0.001, p_test_failure) + + for test in range(1): # Temporary file - outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(2)] + outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(BUNDLE_COUNT)] + # We send 3 messages each in 2 sets, each of which needs a service and a client messageBundles = [ - [random.randint(1, sys.maxsize) for i in range(3)] for j in range(2) + [random.randint(1, sys.maxsize) for i in range(BUNDLE_SIZE)] for j in range(BUNDLE_COUNT) ] try: # Wire up the services/jobs @@ -176,7 +199,7 @@ def testServiceParallelRecursive(self, checkpoint=True): ) # Run the workflow repeatedly until success - self.runToil(t, retryCount=2) + self.runToil(t, retryCount=RETRY_COUNT, badWorker=FAIL_FRACTION, max_attempts=MAX_ATTEMPTS) # Check output for messages, outFile in zip(messageBundles, outFiles): @@ -194,6 +217,7 @@ def runToil( badWorkedFailInterval=0.1, maxServiceJobs=sys.maxsize, deadlockWait=60, + max_attempts=50 ): # Create the runner for the workflow. options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) @@ -207,39 +231,34 @@ def runToil( options.deadlockWait = deadlockWait # Run the workflow - totalTrys = 0 + total_tries = 1 while True: try: Job.Runner.startToil(rootJob, options) break except FailedJobsException as e: i = e.numberOfFailedJobs - if totalTrys > 50: # p(fail after this many restarts) = 0.5**32 + logger.info("Workflow attempt %s/%s failed with %s failed jobs", total_tries, max_attempts, i) + if total_tries == max_attempts: self.fail() # Exceeded a reasonable number of restarts - totalTrys += 1 + total_tries += 1 options.restart = True + logger.info("Succeeded after %s/%s attempts", total_tries, max_attempts) class PerfectServiceTest(JobServiceTest): def runToil( self, - rootJob, - retryCount=1, - badWorker=0, - badWorkedFailInterval=1000, - maxServiceJobs=sys.maxsize, - deadlockWait=60, + *args, + **kwargs ): """ Let us run all the tests in the other service test class, but without worker failures. """ + kwargs["badWorker"] = 0 super().runToil( - rootJob, - retryCount, - badWorker, - badWorkedFailInterval, - maxServiceJobs, - deadlockWait, + *args, + **kwargs ) @@ -250,15 +269,18 @@ def serviceTest(job, outFile, messageInt): """ # Clean out out-file open(outFile, "w").close() - randInt = random.randint( + # We create a random number that is added to messageInt and subtracted by + # the serviceAccessor, to prove that when service test is checkpointed and + # restarted there is never a connection made between an earlier service and + # later serviceAccessor, or vice versa. + to_subtract = random.randint( 1, sys.maxsize - ) # We create a random number that is added to messageInt and subtracted by the serviceAccessor, to prove that - # when service test is checkpointed and restarted there is never a connection made between an earlier service and later serviceAccessor, or vice versa. + ) job.addChildJobFn( serviceAccessor, - job.addService(ToyService(messageInt + randInt)), + job.addService(ToyService(messageInt + to_subtract)), outFile, - randInt, + to_subtract, ) @@ -269,20 +291,20 @@ def serviceTestRecursive(job, outFile, messages): if len(messages) > 0: # Clean out out-file open(outFile, "w").close() - randInt = random.randint(1, sys.maxsize) - service = ToyService(messages[0] + randInt) + to_add = random.randint(1, sys.maxsize) + service = ToyService(messages[0] + to_add) child = job.addChildJobFn( - serviceAccessor, job.addService(service), outFile, randInt + serviceAccessor, job.addService(service), outFile, to_add ) for i in range(1, len(messages)): - randInt = random.randint(1, sys.maxsize) - service2 = ToyService(messages[i] + randInt, cores=0.1) + to_add = random.randint(1, sys.maxsize) + service2 = ToyService(messages[i] + to_add, cores=0.1) child = child.addChildJobFn( serviceAccessor, job.addService(service2, parentService=service), outFile, - randInt, + to_add, cores=0.1, ) service = service2 @@ -296,20 +318,20 @@ def serviceTestParallelRecursive(job, outFiles, messageBundles): # Clean out out-file open(outFile, "w").close() if len(messages) > 0: - randInt = random.randint(1, sys.maxsize) - service = ToyService(messages[0] + randInt) + to_add = random.randint(1, sys.maxsize) + service = ToyService(messages[0] + to_add) child = job.addChildJobFn( - serviceAccessor, job.addService(service), outFile, randInt + serviceAccessor, job.addService(service), outFile, to_add ) for i in range(1, len(messages)): - randInt = random.randint(1, sys.maxsize) - service2 = ToyService(messages[i] + randInt, cores=0.1) + to_add = random.randint(1, sys.maxsize) + service2 = ToyService(messages[i] + to_add, cores=0.1) child = child.addChildJobFn( serviceAccessor, job.addService(service2, parentService=service), outFile, - randInt, + to_add, cores=0.1, ) service = service2 @@ -406,14 +428,14 @@ def serviceWorker( raise -def serviceAccessor(job, communicationFiles, outFile, randInt): +def serviceAccessor(job, communicationFiles, outFile, to_subtract): """ Writes a random integer iinto the inJobStoreFileID file, then tries 10 times reading from outJobStoreFileID to get a pair of integers, the first equal to i the second written into the outputFile. """ inJobStoreFileID, outJobStoreFileID = communicationFiles - # Get a random integer + # Get a random integer to advertise ourselves key = random.randint(1, sys.maxsize) # Write the integer into the file @@ -438,10 +460,10 @@ def serviceAccessor(job, communicationFiles, outFile, randInt): if int(key2) == key: logger.debug( - f"Matched key's: {key}, writing message: {int(message) - randInt} with randInt: {randInt}" + f"Matched key's: {key}, writing message: {int(message) - to_subtract} with to_subtract: {to_subtract}" ) with open(outFile, "a") as fH: - fH.write("%s\n" % (int(message) - randInt)) + fH.write("%s\n" % (int(message) - to_subtract)) return assert 0 # Job failed to get info from the service diff --git a/src/toil/test/utils/utilsTest.py b/src/toil/test/utils/utilsTest.py index ab831a72be..1525f44d17 100644 --- a/src/toil/test/utils/utilsTest.py +++ b/src/toil/test/utils/utilsTest.py @@ -378,30 +378,47 @@ def testMultipleJobsPerWorkerStats(self): "Some jobs are not represented in the stats.", ) - def check_status(self, status, status_fn, seconds=20): - i = 0.0 - while status_fn(self.toilDir) != status: + def check_status(self, status, status_fn, process=None, seconds=20): + time_elapsed = 0.0 + has_stopped = process.poll() is not None if process else False + current_status = status_fn(self.toilDir) + while current_status != status: + if has_stopped: + # If the process has stopped and the stratus is wrong, it will never be right. + self.assertEqual( + current_status, + status, + f"Process returned {process.returncode} without status reaching {status}; stuck at {current_status}", + ) + logger.debug( + "Workflow is %s; waiting for %s (%s/%s elapsed)", + current_status, + status, + time_elapsed, + seconds + ) time.sleep(0.5) - i += 0.5 - if i > seconds: - s = status_fn(self.toilDir) + time_elapsed += 0.5 + has_stopped = process.poll() is not None if process else False + current_status = status_fn(self.toilDir) + if time_elapsed > seconds: self.assertEqual( - s, + current_status, status, - f"Waited {seconds} seconds without status reaching {status}; stuck at {s}", + f"Waited {seconds} seconds without status reaching {status}; stuck at {current_status}", ) def testGetPIDStatus(self): """Test that ToilStatus.getPIDStatus() behaves as expected.""" wf = subprocess.Popen(self.sort_workflow_cmd) - self.check_status("RUNNING", status_fn=ToilStatus.getPIDStatus, seconds=60) + self.check_status("RUNNING", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60) wf.wait() - self.check_status("COMPLETED", status_fn=ToilStatus.getPIDStatus, seconds=60) + self.check_status("COMPLETED", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60) # TODO: we need to reach into the FileJobStore's files and delete this # shared file. We assume we know its internal layout. os.remove(os.path.join(self.toilDir, "files/shared/pid.log")) - self.check_status("QUEUED", status_fn=ToilStatus.getPIDStatus, seconds=60) + self.check_status("QUEUED", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60) def testGetStatusFailedToilWF(self): """ @@ -411,9 +428,9 @@ def testGetStatusFailedToilWF(self): """ # --badWorker is set to force failure. wf = subprocess.Popen(self.sort_workflow_cmd + ["--badWorker=1"]) - self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60) wf.wait() - self.check_status("ERROR", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("ERROR", status_fn=ToilStatus.getStatus, process=wf, seconds=60) @needs_cwl @needs_docker @@ -422,6 +439,7 @@ def testGetStatusFailedCWLWF(self): # --badWorker is set to force failure. cmd = [ "toil-cwl-runner", + "--logDebug", "--jobStore", self.toilDir, "--clean=never", @@ -432,10 +450,11 @@ def testGetStatusFailedCWLWF(self): "src/toil/test/cwl/whale.txt", f"--outdir={self.tempDir}", ] + logger.info("Run command: %s", " ".join(cmd)) wf = subprocess.Popen(cmd) - self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60) wf.wait() - self.check_status("ERROR", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("ERROR", status_fn=ToilStatus.getStatus, process=wf, seconds=60) @needs_cwl @needs_docker @@ -453,9 +472,9 @@ def testGetStatusSuccessfulCWLWF(self): f"--outdir={self.tempDir}", ] wf = subprocess.Popen(cmd) - self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60) wf.wait() - self.check_status("COMPLETED", status_fn=ToilStatus.getStatus, seconds=60) + self.check_status("COMPLETED", status_fn=ToilStatus.getStatus, process=wf, seconds=60) @needs_cwl @patch("builtins.print") @@ -464,6 +483,7 @@ def testPrintJobLog(self, mock_print): # Run a workflow that will always fail cmd = [ "toil-cwl-runner", + "--logDebug", "--jobStore", self.toilDir, "--clean=never", @@ -471,6 +491,7 @@ def testPrintJobLog(self, mock_print): "--message", "Testing", ] + logger.info("Run command: %s", " ".join(cmd)) wf = subprocess.Popen(cmd) wf.wait() # print log and check output diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 9b4ae01fe8..68beb2bf79 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -1,6 +1,7 @@ import json import logging import os +import pytest import re import shutil import string @@ -542,6 +543,23 @@ def test_miniwdl_self_test_by_reference(self) -> None: """ self.test_miniwdl_self_test(extra_args=["--referenceInputs=True"]) + @pytest.mark.integrative + @needs_singularity_or_docker + def test_dockstore_trs(self, extra_args: Optional[list[str]] = None) -> None: + wdl_file = "#workflow/github.com/dockstore/bcc2020-training/HelloWorld:master" + # Needs an input but doesn't provide a good one. + json_input = json.dumps({"hello_world.hello.myName": "https://raw.githubusercontent.com/dockstore/bcc2020-training/refs/heads/master/wdl-training/exercise1/name.txt"}) + + result_json = subprocess.check_output( + self.base_command + [wdl_file, json_input, '--logDebug', '-o', self.output_dir, '--outputDialect', + 'miniwdl'] + (extra_args or [])) + result = json.loads(result_json) + + with open(result.get("outputs", {}).get("hello_world.helloFile")) as f: + result_text = f.read().strip() + + self.assertEqual(result_text, "Hello World!\nMy name is potato.") + @slow @needs_docker_cuda def test_giraffe_deepvariant(self): diff --git a/src/toil/utils/toilStatus.py b/src/toil/utils/toilStatus.py index 9fc4da97cc..cd3e43c91d 100644 --- a/src/toil/utils/toilStatus.py +++ b/src/toil/utils/toilStatus.py @@ -34,11 +34,17 @@ def __init__(self, jobStoreName: str, specifiedJobs: Optional[list[str]] = None) self.jobStore = Toil.resumeJobStore(jobStoreName) if specifiedJobs is None: - rootJob = self.fetchRootJob() - logger.info( - "Traversing the job graph gathering jobs. This may take a couple of minutes." - ) - self.jobsToReport = self.traverseJobGraph(rootJob) + try: + rootJob = self.fetchRootJob() + logger.info( + "Traversing the job graph gathering jobs. This may take a couple of minutes." + ) + self.jobsToReport = self.traverseJobGraph(rootJob) + except JobException: + # Root job isn't set. + logger.warning("Workflow does not have a root job (yet? anymore?). Cannot look for jobs.") + self.jobsToReport = [] + else: self.jobsToReport = self.fetchUserJobs(specifiedJobs) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index edc0c96c5d..2846ff0399 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -82,6 +82,7 @@ ) from toil.lib.accelerators import get_individual_local_accelerators from toil.lib.conversions import VALID_PREFIXES, convert_units, human2bytes +from toil.lib.integration import resolve_workflow from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name @@ -5324,6 +5325,15 @@ def main() -> None: "Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers." ) + # Having an nargs=? option can put a None in our inputs list, so drop that. + input_sources = [x for x in options.inputs_uri if x is not None] + if len(input_sources) > 1: + raise RuntimeError( + f"Workflow inputs cannot be specified with both the -i/--input/--inputs flag " + f"and as a positional argument at the same time. Cannot use both " + f"\"{input_sources[0]}\" and \"{input_sources[1]}\"." + ) + # Make sure we have an output directory (or URL prefix) and we don't need # to ever worry about a None, and MyPy knows it. # If we don't have a directory assigned, make one in the current directory. @@ -5338,9 +5348,14 @@ def main() -> None: if options.restart: output_bindings = toil.restart() else: + # TODO: Move all the input parsing outside the Toil context + # manager to avoid leaving a job store behind if the workflow + # can't start. + # Load the WDL document document: WDL.Tree.Document = WDL.load( - options.wdl_uri, read_source=toil_read_source + resolve_workflow(options.wdl_uri, supported_languages={"WDL"}), + read_source=toil_read_source, ) # See if we're going to run a workflow or a task @@ -5379,30 +5394,49 @@ def main() -> None: ) options.all_call_outputs = True - if options.inputs_uri: + # If our input really comes from a URI or path, remember it. + input_source_uri = None + # Also remember where we need to report JSON parse errors as + # coming from if there's no actual URI/path. + input_source_name = "empty input" + + if input_sources: + input_source = input_sources[0] # Load the inputs. Use the same loading mechanism, which means we # have to break into async temporarily. - if options.inputs_uri[0] == "{": - input_json = options.inputs_uri - elif options.inputs_uri == "-": + if input_source[0] == "{": + input_json = input_source + input_source_name = "inline JSON" + elif input_source == "-": input_json = sys.stdin.read() + input_source_name = "standard input" else: + input_source_uri = input_source + input_source_name = input_source_uri input_json = asyncio.run( - toil_read_source(options.inputs_uri, [], None) + toil_read_source(input_source_uri, [], None) ).source_text try: inputs = json.loads(input_json) except json.JSONDecodeError as e: # Complain about the JSON document. + # We need the absolute path or URL to raise the error - inputs_abspath = ( - options.inputs_uri - if not os.path.exists(options.inputs_uri) - else os.path.abspath(options.inputs_uri) - ) + if input_source_uri is not None: + # If this is a local fike, use that as the abspath. + # Otherwise just pass through a URL. + inputs_abspath = ( + input_source_uri + if not os.path.exists(input_source_uri) + else os.path.abspath(input_source_uri) + ) + else: + # There's no local file and no URL. + inputs_abspath = input_source_name + raise WDL.Error.ValidationError( WDL.Error.SourcePosition( - options.inputs_uri, + input_source_name, inputs_abspath, e.lineno, e.colno, @@ -5432,12 +5466,12 @@ def main() -> None: # Determine where to look for files referenced in the inputs, in addition to here. inputs_search_path = [] - if options.inputs_uri: - inputs_search_path.append(options.inputs_uri) + if input_source_uri: + inputs_search_path.append(input_source_uri) match = re.match( r"https://raw\.githubusercontent\.com/[^/]*/[^/]*/[^/]*/", - options.inputs_uri, + input_source_uri, ) if match: # Special magic for Github repos to make e.g.