diff --git a/janis_assistant/cli.py b/janis_assistant/cli.py index 631947ef..72e782a0 100644 --- a/janis_assistant/cli.py +++ b/janis_assistant/cli.py @@ -25,6 +25,7 @@ abort_wids, spider_tool, pause, + fromspec, ) from janis_assistant.management.configmanager import ConfigManager from janis_assistant.utils import parse_additional_arguments @@ -47,6 +48,7 @@ def process_args(sysargs=None): "version": do_version, "docs": do_docs, "run": do_run, + "run-spec": do_run_spec, "translate": do_translate, "inputs": do_inputs, "watch": do_watch, @@ -70,6 +72,9 @@ def process_args(sysargs=None): subparsers = parser.add_subparsers(dest="command") add_run_args(subparsers.add_parser("run", help="Run a Janis workflow")) + add_run_spec_args( + subparsers.add_parser("run-spec", help="Run another type of workflow") + ) add_init_args( subparsers.add_parser("init", help="Initialise a Janis configuration") ) @@ -397,6 +402,12 @@ def add_run_args(parser): help="Skip checking if files exist before the start of a workflow.", ) + parser.add_argument( + "--copy-partial-outputs", + action="store_true", + help="Copy outputs, even if the workflow doesn't succesfully complete", + ) + # development settings parser.add_argument( @@ -555,6 +566,22 @@ def add_run_args(parser): return parser +def add_run_spec_args(parser): + parser.add_argument( + "workflow", + help="Run the workflow defined in this file or available within the toolbox", + ) + + parser.add_argument("-c", "--config", help="Path to config file") + + parser.add_argument( + "-i", + "--inputs", + help="YAML or JSON inputs file to provide values for the workflow (can specify multiple times)", + action="append", + ) + + def add_reconnect_args(parser): parser.add_argument("wid", help="task-id to reconnect to") return parser @@ -628,6 +655,7 @@ def do_version(_): from janis_assistant.__meta__ import __version__ as jr_version from janis_core.__meta__ import __version__ as jc_version + from janis_core.utils.logger import Logger import janis_core.toolbox.entrypoints as EP fields = [["janis-core", jc_version], ["janis-assistant", jr_version]] @@ -703,6 +731,13 @@ def do_rm(args): Logger.critical(f"Can't remove {wid}: " + str(e)) +def do_run_spec(args): + jc = JanisConfiguration.initial_configuration(args.config) + wf = args.workflow + + fromspec(workflow=wf, inputs=args.inputs) + + def do_run(args): jc = JanisConfiguration.initial_configuration(args.config) @@ -787,6 +822,7 @@ def do_run(args): allow_empty_container=args.allow_empty_container, check_files=not args.skip_file_check, container_override=parse_container_override_format(args.container_override), + copy_partial_outputs=args.copy_partial_outputs, ) Logger.info("Exiting") diff --git a/janis_assistant/data/enums/workflowmetadatakeys.py b/janis_assistant/data/enums/workflowmetadatakeys.py index cc5b6dfe..50747d2c 100644 --- a/janis_assistant/data/enums/workflowmetadatakeys.py +++ b/janis_assistant/data/enums/workflowmetadatakeys.py @@ -33,3 +33,5 @@ class WorkflowMetadataDbKeys(Enum): submission_inputs = "submission_inputs" submission_resources = "submission_resources" dbconfig = "dbconfig" + + copy_partial_outputs = "copy_partial_outputs" diff --git a/janis_assistant/data/providers/workflowmetadataprovider.py b/janis_assistant/data/providers/workflowmetadataprovider.py index f613152d..f51b13d6 100644 --- a/janis_assistant/data/providers/workflowmetadataprovider.py +++ b/janis_assistant/data/providers/workflowmetadataprovider.py @@ -53,6 +53,7 @@ def __init__(self, dblocation, wid, readonly=False): self.dbconfig = None self.author = lookup_username() + self.copy_partial_outputs = None self.kvdb.autocommit = True self.kvdb.commit() diff --git a/janis_assistant/engines/cromwell/main.py b/janis_assistant/engines/cromwell/main.py index 0f227d52..784e1ef1 100644 --- a/janis_assistant/engines/cromwell/main.py +++ b/janis_assistant/engines/cromwell/main.py @@ -99,6 +99,9 @@ def __init__( self.connectionerrorcount = 0 self.should_stop = False + self._timer_thread = threading.Event() + self.poll_metadata() + if not self.connect_to_instance: # To avoid conflicts between version of Cromwell, we'll find an open @@ -265,9 +268,6 @@ def start_engine(self, additional_cromwell_options: List[str] = None): # exit_function=self.something_has_happened_to_cromwell, ) - self._timer_thread = threading.Event() - self.poll_metadata() - return self def did_fail(self, rc): diff --git a/janis_assistant/engines/engine.py b/janis_assistant/engines/engine.py index c0a9f926..0bbd35f4 100644 --- a/janis_assistant/engines/engine.py +++ b/janis_assistant/engines/engine.py @@ -2,6 +2,8 @@ from abc import ABC, abstractmethod from typing import Dict, Any, Optional, List, Callable +from janis_core import Logger + from janis_assistant.data.models.workflow import WorkflowModel from janis_assistant.engines.enginetypes import EngineType from janis_assistant.management import Archivable @@ -25,6 +27,7 @@ def __init__( def add_callback( self, engine_identifier: str, callback: Callable[[WorkflowModel], None] ): + Logger.debug(f"Adding callback from engineId {engine_identifier}") self.progress_callbacks[engine_identifier] = self.progress_callbacks.get( engine_identifier, [] ) + [callback] diff --git a/janis_assistant/main.py b/janis_assistant/main.py index 5b3357c6..38f8f310 100644 --- a/janis_assistant/main.py +++ b/janis_assistant/main.py @@ -358,6 +358,82 @@ def get_config(): ruamel.yaml.dump(get_config(), sys.stdout, default_flow_style=False) +def fromspec( + workflow: str, # path + inputs: str, # inputs file + engine: Union[str, Engine] = None, + run_in_background=False, + run_in_foreground=True, + output_dir: str = None, + watch=True, + no_store=False, + dbconfig=None, + **kwargs, +): + cm = ConfigManager.manager() + jc = JanisConfiguration.manager() + + try: + from os.path import basename + + inputs = inputs[0] + + row = cm.create_task_base( + basename(workflow), outdir=output_dir, store_in_centraldb=not no_store + ) + print(row.wid, file=sys.stdout) + + engine = engine or jc.engine + + eng = get_engine_from_eng( + engine, + wid=row.wid, + execdir=WorkflowManager.get_path_for_component_and_dir( + row.outputdir, WorkflowManager.WorkflowManagerPath.execution + ), + confdir=WorkflowManager.get_path_for_component_and_dir( + row.outputdir, WorkflowManager.WorkflowManagerPath.configuration + ), + logfile=os.path.join( + WorkflowManager.get_path_for_component_and_dir( + row.outputdir, WorkflowManager.WorkflowManagerPath.logs + ), + "engine.log", + ), + watch=watch, + **kwargs, + ) + fs = get_filescheme_from_fs(LocalFileScheme(), **kwargs) + environment = Environment(f"custom_{basename(workflow)}", eng, fs) + + # Note: run_in_foreground can be None, so + # (not (run_in_foreground is True)) != (run_in_foreground is False) + + should_run_in_background = ( + run_in_background is True or jc.run_in_background is True + ) and not (run_in_foreground is True) + + tm = WorkflowManager.from_spec( + wid=row.wid, + wf=workflow, + outdir=row.outputdir, + inputs=inputs, + environment=environment, + dryrun=False, + watch=watch, + run_in_background=should_run_in_background, + dbconfig=dbconfig, + **kwargs, + ) + Logger.log("Finished starting task task") + return tm + except Exception as e: + Logger.critical( + "An error occurred during workflow start, please see exception for more information" + ) + raise e + + def fromjanis( workflow: Union[str, j.Tool, Type[j.Tool]], name: str = None, @@ -455,7 +531,7 @@ def fromjanis( run_in_background is True or jc.run_in_background is True ) and not (run_in_foreground is True) - tm = cm.start_task( + tm = WorkflowManager.from_janis( wid=row.wid, tool=wf, environment=environment, @@ -474,6 +550,7 @@ def fromjanis( allow_empty_container=allow_empty_container, container_override=container_override, check_files=check_files, + **kwargs, ) Logger.log("Finished starting task task") return tm diff --git a/janis_assistant/management/configmanager.py b/janis_assistant/management/configmanager.py index 2b6d3a89..28955232 100644 --- a/janis_assistant/management/configmanager.py +++ b/janis_assistant/management/configmanager.py @@ -98,7 +98,7 @@ def remove_task(self, task: Union[str, TaskRow], keep_output: bool): self.get_lazy_db_connection().remove_by_id(task.wid) Logger.info("Deleted task: " + task.wid) - def create_task_base(self, wf: Workflow, outdir=None, store_in_centraldb=True): + def create_task_base(self, name: str, outdir=None, store_in_centraldb=True): config = JanisConfiguration.manager() """ @@ -115,7 +115,7 @@ def create_task_base(self, wf: Workflow, outdir=None, store_in_centraldb=True): default_outdir = None if config.outputdir: - default_outdir = os.path.join(config.outputdir, wf.id()) + default_outdir = os.path.join(config.outputdir, name) forbiddenids = set() if store_in_centraldb: @@ -182,6 +182,7 @@ def start_task( allow_empty_container=False, container_override: dict = None, check_files=True, + **kwargs, ) -> WorkflowManager: return WorkflowManager.from_janis( @@ -203,6 +204,7 @@ def start_task( allow_empty_container=allow_empty_container, container_override=container_override, check_files=check_files, + **kwargs, ) def from_wid(self, wid, readonly=False): diff --git a/janis_assistant/management/workflowmanager.py b/janis_assistant/management/workflowmanager.py index 307556d4..9b414981 100644 --- a/janis_assistant/management/workflowmanager.py +++ b/janis_assistant/management/workflowmanager.py @@ -134,6 +134,78 @@ def has( def watch(self): self.show_status_screen() + @staticmethod + def from_spec( + wid: str, + wf: str, + inputs: str, + outdir: str, + environment: Environment, + watch=True, + run_in_background=True, + dbconfig=None, + dryrun=False, + ): + + from os.path import basename + + jc = JanisConfiguration.manager() + + environment.identifier += "_" + wid + + tm = WorkflowManager(wid=wid, outdir=outdir, environment=environment) + + tm.database.runs.insert(wid) + + tm.database.workflowmetadata.wid = wid + tm.database.workflowmetadata.engine = environment.engine + tm.database.workflowmetadata.filescheme = environment.filescheme + tm.database.workflowmetadata.environment = environment.id() + tm.database.workflowmetadata.name = basename(wf) + tm.database.workflowmetadata.start = DateUtil.now() + tm.database.workflowmetadata.executiondir = None + tm.database.workflowmetadata.keepexecutiondir = True + tm.database.workflowmetadata.configuration = jc + tm.database.workflowmetadata.dbconfig = dbconfig + + # This is the only time we're allowed to skip the tm.set_status + # This is a temporary stop gap until "notification on status" is implemented. + # tm.set_status(TaskStatus.PROCESSING) + tm.database.workflowmetadata.status = TaskStatus.PROCESSING + + spec = get_ideal_specification_for_engine(environment.engine) + + # outdir_workflow = tm.get_path_for_component( + # WorkflowManager.WorkflowManagerPath.workflow + # ) + + # TODO: Move workflow and inputs to output_dir for better tracking + # would then have to consider the deps though... + + tm.database.workflowmetadata.submission_workflow = wf + tm.database.workflowmetadata.submission_inputs = inputs + + tm.database.commit() + + if not dryrun: + if ( + not run_in_background + and jc.template + and jc.template.template + and jc.template.template.can_run_in_foreground is False + ): + raise Exception( + f"Your template '{jc.template.template.__class__.__name__}' is not allowed to run " + f"in the foreground, try adding the '--background' argument" + ) + tm.start_or_submit(run_in_background=run_in_background, watch=watch) + else: + tm.set_status(TaskStatus.DRY_RUN) + + tm.database.commit() + + return tm + @staticmethod def from_janis( wid: str, @@ -154,6 +226,8 @@ def from_janis( allow_empty_container=False, container_override: dict = None, check_files=True, + copy_partial_outputs=False, + **kwargs, ): jc = JanisConfiguration.manager() @@ -176,6 +250,7 @@ def from_janis( tm.database.workflowmetadata.keepexecutiondir = keep_intermediate_files tm.database.workflowmetadata.configuration = jc tm.database.workflowmetadata.dbconfig = dbconfig + tm.database.workflowmetadata.copy_partial_outputs = copy_partial_outputs # This is the only time we're allowed to skip the tm.set_status # This is a temporary stop gap until "notification on status" is implemented. @@ -793,7 +868,11 @@ def copy_outputs_if_required(self): if self.database.progressDB.has(ProgressKeys.copiedOutputs): return Logger.debug(f"Workflow '{self.wid}' has copied outputs, skipping") - if self.database.workflowmetadata.status != TaskStatus.COMPLETED: + should_copy_outputs = ( + self.database.workflowmetadata.copy_partial_outputs + or self.database.workflowmetadata.status == TaskStatus.COMPLETED + ) + if not should_copy_outputs: return Logger.warn( f"Skipping copying outputs as workflow " f"status was not completed ({self.database.workflowmetadata.status})" @@ -809,7 +888,7 @@ def copy_outputs_if_required(self): if eout is None: Logger.warn( - f"Couldn't find expected output with tag {out.tag}, found outputs ({', '.join(eoutkeys)}" + f"Couldn't find expected output with tag {out.tag}, found outputs ({', '.join(eoutkeys)})" ) continue originalfile, newfilepath = self.copy_output( @@ -833,7 +912,8 @@ def copy_outputs_if_required(self): tag=out.tag, original_path=originalfile, new_path=newfilepath ) - self.database.progressDB.set(ProgressKeys.copiedOutputs) + if self.database.workflowmetadata.status == TaskStatus.COMPLETED: + self.database.progressDB.set(ProgressKeys.copiedOutputs) Logger.info(f"View the task outputs: file://{self.get_task_path()}") def copy_output(