From 0749b9556e05ff14580f75e56ee9787b260aaff3 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 09:55:22 -0500 Subject: [PATCH 01/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/bulk_atacseq.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py index ec05663a..4b3dcb7c 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py +++ b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py @@ -65,10 +65,6 @@ "workflow_path": str( get_absolute_workflow(Path("sc-atac-seq-pipeline", "bulk-atac-seq-pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--threads", "value": ""}, - {"parameter_name": "--sequence_directory", "value": []}, - ], "documentation_url": "", } ] @@ -84,16 +80,21 @@ def build_cwltool_cmd1(**kwargs): data_dirs = get_parent_data_dirs_list(**kwargs) - # ["--threads", "--sequence_directory"] - input_param_vals = [ - get_threads_resource(dag.dag_id), - [str(data_dir) for data_dir in data_dirs], - ] cwl_params = [ {"parameter_name": "--parallel", "value": ""}, + {"parameter_name": "--outdir", "value": str(tmpdir)}, + ] + + input_parameters = [ + {"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)}, + { + "parameter_name": "--sequence_directory", + "value": [str(data_dir) for data_dir in data_dirs], + }, ] + command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) From 9663c14b2376779b3effc7b317482d8cbf6ec122 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 09:55:30 -0500 Subject: [PATCH 02/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/utils.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/utils.py b/src/ingest-pipeline/airflow/dags/utils.py index 19167b94..4f9d203b 100644 --- a/src/ingest-pipeline/airflow/dags/utils.py +++ b/src/ingest-pipeline/airflow/dags/utils.py @@ -285,22 +285,27 @@ def get_cwl_cmd_from_workflows( """ # Grab the workflow from the list of workflows workflow = workflows[workflow_index] - - # Update the input parameters based on the list of input values - for i, param_val in enumerate(input_param_vals): - if param_val: - workflow["input_parameters"][i]["value"] = param_val + workflow["input_parameters"] = input_param_vals # Get the cwl invocation - command = [*get_cwltool_base_cmd(tmp_dir), "--outdir", str(tmp_dir / "cwl_out")] + command = [*get_cwltool_base_cmd(tmp_dir)] + # Rather than setting outdir, cycle through cwl_param vals and see whether its present + # if not, then we set it to the default value. + outdir_present = False for param in cwl_param_vals if cwl_param_vals is not None else []: + if param["parameter_name"] == "--outdir": + outdir_present = True + if isinstance(param["value"], list): for param_val in param["value"]: command.extend([param["parameter_name"], param_val]) else: command.extend([param["parameter_name"], param["value"]]) + if not outdir_present: + command.extend(["--outdir", str(tmp_dir / "cwl_out")]) + command.append(Path(workflow["workflow_path"])) # Extend the command with the input parameters From 4ff6cd5deab328ecd3c1cc1cb9d9d2e7367c0ccc Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:15:50 -0500 Subject: [PATCH 03/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/celldive_deepcell.py | 107 +++++++----------- 1 file changed, 38 insertions(+), 69 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py index 9622325c..fc199c8d 100644 --- a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py @@ -65,68 +65,38 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path(pipeline_name, "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--gpus", "value": "all"}, - {"parameter_name": "--meta_path", "value": ""}, - {"parameter_name": "--segmentation_method", "value": "deepcell"}, - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("sprm", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--enable_manhole", "value": ""}, - {"parameter_name": "--options_preset", "value": "celldive"}, - {"parameter_name": "--image_dir", "value": ""}, - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--mask_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("create-vis-symlink-archive", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--ometiff_dir", "value": ""}, - {"parameter_name": "--sprm_output", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-json.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-anndata.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -146,11 +116,15 @@ def build_cwltool_cwl_segmentation(**kwargs): workflow = cwl_workflows[0] meta_yml_path = str(Path(workflow["workflow_path"]).parent / "meta.yaml") - # ["--gpus=all", "--meta_path", "--segmentation_method", "--data_dir"] - input_param_vals = ["", meta_yml_path, "", str(data_dir / "HuBMAP_OME")] + input_parameters = [ + {"parameter_name": "--gpus", "value": "all"}, + {"parameter_name": "--meta_path", "value": meta_yml_path}, + {"parameter_name": "--segmentation_method", "value": "deepcell"}, + {"parameter_name": "--data_dir", "value": str(data_dir / "HuBMAP_OME")}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -195,15 +169,15 @@ def build_cwltool_cmd_sprm(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_segmentation") - # ["--enabled_manhole", "--options_preset=celldive", "--image_dir", "--processes", "--mask_dir"] - input_param_vals = [ - "", - "", - str(data_dir / "pipeline_output/expr"), - get_threads_resource(dag.dag_id), - str(data_dir / "pipeline_output/mask"), + input_parameters = [ + {"parameter_name": "--enable_manhole", "value": ""}, + {"parameter_name": "--options_preset", "value": "celldive"}, + {"parameter_name": "--image_dir", "value": str(data_dir / "pipeline_output/expr")}, + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--mask_dir", "value": str(data_dir / "pipeline_output/mask")}, ] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -248,12 +222,12 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm") - # ["--ometiff_dir", "--sprm_output"] - input_param_vals = [ - str(data_dir / "pipeline_output"), - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--ometiff_dir", "value": str(data_dir / "pipeline_outputs")}, + {"parameter_name": "--sprm_output", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -267,7 +241,6 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): task_id="pipeline_exec_cwl_create_vis_symlink_archive", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_create_vis_symlink_archive')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -301,13 +274,12 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): key="cwl_workflows", task_ids="build_cmd_create_vis_symlink_archive" ) - # ["-processes", "--ometiff_directory"] - input_param_vals = [ - get_threads_resource(dag.dag_id), - "." + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": "."}, ] - - command = get_cwl_cmd_from_workflows(workflows, 3, [], tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) t_build_cmd_ome_tiff_pyramid = PythonOperator( @@ -320,7 +292,6 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -352,11 +323,11 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "ometiff-pyramids"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, ] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -370,7 +341,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_offsets')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -402,11 +372,11 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 5, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 5, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -420,7 +390,6 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): task_id="pipeline_exec_cwl_sprm_to_json", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_json')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -450,11 +419,12 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm_to_json") - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 6, input_param_vals, tmpdir, kwargs["ti"]) + + + command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -468,7 +438,6 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): task_id="pipeline_exec_cwl_sprm_to_anndata", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_anndata')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, From acc3da82aff7c7df4abeb6a90a54ba717952dbcb Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:16:04 -0500 Subject: [PATCH 04/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/celldive_deepcell.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py index fc199c8d..59b3a5f4 100644 --- a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py @@ -423,7 +423,6 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) From 72ac534c9dcc50066faa91e4add5557969cbd978 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:31:03 -0500 Subject: [PATCH 05/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/codex_cytokit.py | 139 ++++++++---------- 1 file changed, 59 insertions(+), 80 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/codex_cytokit.py b/src/ingest-pipeline/airflow/dags/codex_cytokit.py index 388284da..1146d340 100644 --- a/src/ingest-pipeline/airflow/dags/codex_cytokit.py +++ b/src/ingest-pipeline/airflow/dags/codex_cytokit.py @@ -67,94 +67,54 @@ "workflow_path": str( get_absolute_workflow(steps_dir / "illumination_first_stitching.cwl") ), - "input_parameters": [ - {"parameter_name": "--gpus", "value": "0,1"}, - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(steps_dir / "run_cytokit.cwl")), - "input_parameters": [ - {"parameter_name": "--data_dir", "value": ""}, - {"parameter_name": "--yaml_config", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(steps_dir / "ometiff_second_stitching.cwl") ), - "input_parameters": [ - {"parameter_name": "--cytokit_config", "value": ""}, - {"parameter_name": "--cytokit_output", "value": ""}, - {"parameter_name": "--slicing_pipeline_config", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("deepcelltypes", "run_deepcelltypes.cwl")) ), - "input_parameters": [ - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("sprm", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--enable_manhole", "value": ""}, - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--image_dir", "value": ""}, - {"parameter_name": "--mask_dir", "value": ""}, - {"parameter_name": "--cell_types_file", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("create-vis-symlink-archive", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--ometiff_dir", "value": ""}, - {"parameter_name": "--sprm_output", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-json.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-anndata.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -173,10 +133,13 @@ def build_cwltool_cwl_illumination_first_stitching(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - # [--gpus=0,1, --data_dir] - input_param_vals = ["", str(data_dir)] + input_parameters = [ + {"parameter_name": "--gpus", "value": "0,1"}, + {"parameter_name": "--data_dir", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -192,7 +155,6 @@ def build_cwltool_cwl_illumination_first_stitching(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_illumination_first_stitching')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -222,9 +184,12 @@ def build_cwltool_cwl_cytokit(**kwargs): key="cwl_workflows", task_ids="build_cwl_illumination_first_stitching" ) - # [--data_dir, --yaml_config] - input_param_vals = [str(data_dir / "new_tiles"), str(data_dir / "experiment.yaml")] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--data_dir", "value": str(data_dir / "new_tiles")}, + {"parameter_name": "--yaml_config", "value": str(data_dir / "experiment.yaml")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -267,13 +232,15 @@ def build_cwltool_cwl_ometiff_second_stitching(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_cytokit") - # [--cytokit_config, --cytokit_output, --slicing_pipeline_config] - input_param_vals = [ - str(data_dir / "experiment.yaml"), - str(data_dir / "cytokit"), - str(data_dir / "pipelineConfig.json"), + input_parameters = [ + {"parameter_name": "--cytokit_config", "value": str(data_dir / "experiment.yaml")}, + {"parameter_name": "--cytokit_output", "value": str(data_dir / "cytokit")}, + { + "parameter_name": "--slicing_pipeline_config", + "value": str(data_dir / "pipelineConfig.json"), + }, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -327,9 +294,11 @@ def build_cwltool_cmd_deepcelltypes(**kwargs): key="cwl_workflows", task_ids="build_cwl_ometiff_second_stitching" ) - # [--data_dir] - input_param_vals = [str(data_dir / "pipeline_output")] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--data_dir", "value": str(data_dir / "pipeline_output")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -372,15 +341,18 @@ def build_cwltool_cmd_sprm(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_deepcelltypes") - # [--enable_manhole, --processes, --image_dir, --mask_dir, --cell_types_file] - input_param_vals = [ - "", - get_threads_resource(dag.dag_id), - str(data_dir / "pipeline_output/expr"), - str(data_dir / "pipeline_output/mask"), - str(data_dir / "deepcelltypes_predictions.csv"), + input_parameters = [ + {"parameter_name": "--enable_manhole", "value": ""}, + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--image_dir", "value": str(data_dir / "pipeline_output/expr")}, + {"parameter_name": "--mask_dir", "value": str(data_dir / "pipeline_output/mask")}, + { + "parameter_name": "--cell_types_file", + "value": str(data_dir / "deepcelltypes_predictions.csv"), + }, ] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -425,9 +397,11 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm") - # [--ometiff_dir, --sprm_output] - input_param_vals = [str(data_dir / "pipeline_output"), str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 5, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--ometiff_dir", "value": str(data_dir / "pipeline_output")}, + {"parameter_name": "--sprm_output", "value": str(data_dir / "sprm_outputs")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 5, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -474,9 +448,11 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): key="cwl_workflows", task_ids="build_cmd_create_vis_symlink_archive" ) - # [--processes, --ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), ""] - command = get_cwl_cmd_from_workflows(workflows, 6, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": "."}, + ] + command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -490,7 +466,6 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -522,9 +497,10 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid" ) - # [--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 7, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 7, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -569,9 +545,10 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # [--input_dir] - input_param_vals = [str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 8, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 8, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -614,9 +591,11 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm_to_json") - # [--input_dir] - input_param_vals = [str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 9, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 9, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) From 1c13acd570887ed1a0403206fc523a03dace232e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:33:29 -0500 Subject: [PATCH 06/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/bulk_atacseq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py index 4b3dcb7c..ec9017d0 100644 --- a/src/ingest-pipeline/airflow/dags/bulk_atacseq.py +++ b/src/ingest-pipeline/airflow/dags/bulk_atacseq.py @@ -82,7 +82,6 @@ def build_cwltool_cmd1(**kwargs): cwl_params = [ {"parameter_name": "--parallel", "value": ""}, - {"parameter_name": "--outdir", "value": str(tmpdir)}, ] input_parameters = [ @@ -109,6 +108,7 @@ def build_cwltool_cmd1(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, From 6696eb0595bf0fd87f170cadfa8328916c76362c Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:39:43 -0500 Subject: [PATCH 07/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/kaggle_2_segmentation.py | 90 +++++++++---------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py b/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py index b0d3d4fb..9f4ee1d6 100644 --- a/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py @@ -66,39 +66,26 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path(pipeline_name, "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--data_directory", "value": ""}, - {"parameter_name": "--tissue_type", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""} - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""} - ], "documentation_url": "", }, { - "workflow_path": str(get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl"))), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ] + "workflow_path": str( + get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) + ), + "documentation_url": "", }, { - "workflow_path": str(get_absolute_workflow(Path("portal-containers", "ome-tiff-metadata.cwl"))), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], + "workflow_path": str( + get_absolute_workflow(Path("portal-containers", "ome-tiff-metadata.cwl")) + ), "documentation_url": "", }, ] @@ -121,14 +108,16 @@ def build_cwltool_cwl_segmentation(**kwargs): organ_list = list(set(ds_rslt["organs"])) organ_code = organ_list[0] if len(organ_list) == 1 else "multi" - # [--data_directory, --tissue_type] - input_param_vals = [ - str(data_dir), - organ_code, + input_parameters = [ + {"parameter_name": "--data_directory", "value": str(data_dir)}, + {"parameter_name": "--tissue_type", "value": organ_code}, ] - command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], + cwl_workflows, + 0, + input_parameters, + tmpdir, + kwargs["ti"], ) return join_quote_command_str(command) @@ -144,7 +133,6 @@ def build_cwltool_cwl_segmentation(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_segmentation')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -172,9 +160,11 @@ def build_cwltool_cwl_ome_tiff_pyramid_processed(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_segmentation") - # ["--processes", "--ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), "."] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": "."}, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -188,8 +178,6 @@ def build_cwltool_cwl_ome_tiff_pyramid_processed(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid_processed", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid_processed')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -217,11 +205,15 @@ def build_cwltool_cwl_ome_tiff_pyramid_raw(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_processed") + workflows = kwargs["ti"].xcom_pull( + key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_processed" + ) - # ["--processes", "--ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), str(data_dir)] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -235,8 +227,6 @@ def build_cwltool_cwl_ome_tiff_pyramid_raw(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid_raw", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid_raw')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -264,11 +254,14 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): data_dir = tmpdir / "cwl_out" print("data_dir: ", data_dir) - workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_raw") + workflows = kwargs["ti"].xcom_pull( + key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_raw" + ) - # ["--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -282,7 +275,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_offsets')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -310,11 +302,14 @@ def build_cwltool_cmd_ome_tiff_metadata(**kwargs): data_dir = tmpdir / "cwl_out" print("data_dir: ", data_dir) - workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets") + workflows = kwargs["ti"].xcom_pull( + key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" + ) - # ["--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -328,7 +323,6 @@ def build_cwltool_cmd_ome_tiff_metadata(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_metadata", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_metadata')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, From 942cc17c52cd1aa5883330ba5b9e5532a6caf2ee Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 10:58:09 -0500 Subject: [PATCH 08/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/mibi_deepcell.py | 99 ++++++------------- 1 file changed, 31 insertions(+), 68 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/mibi_deepcell.py b/src/ingest-pipeline/airflow/dags/mibi_deepcell.py index 285abdcc..1d1ec8ac 100644 --- a/src/ingest-pipeline/airflow/dags/mibi_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/mibi_deepcell.py @@ -64,67 +64,38 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path(pipeline_name, "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--gpus", "value": "all"}, - {"parameter_name": "--meta_path", "value": ""}, - {"parameter_name": "--segmentation_method", "value": "deepcell"}, - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("sprm", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--enable_manhole", "value": ""}, - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--image_dir", "value": ""}, - {"parameter_name": "--mask_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("create-vis-symlink-archive", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--ometiff_dir", "value": ""}, - {"parameter_name": "--sprm_output", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-json.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-anndata.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -144,11 +115,14 @@ def build_cwltool_cwl_segmentation(**kwargs): workflow = cwl_workflows[0] meta_yml_path = str(Path(workflow["workflow_path"]).parent / "meta.yaml") - # ["--gpus=all", "--meta_path", "--segmentation_method", "--data_dir"] - input_param_vals = ["", meta_yml_path, "", str(data_dir)] - + input_parameters = [ + {"parameter_name": "--gpus", "value": "all"}, + {"parameter_name": "--meta_path", "value": meta_yml_path}, + {"parameter_name": "--segmentation_method", "value": "deepcell"}, + {"parameter_name": "--data_dir", "value": str(data_dir)}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -164,7 +138,6 @@ def build_cwltool_cwl_segmentation(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_segmentation')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -194,14 +167,13 @@ def build_cwltool_cmd_sprm(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_segmentation") - # ["--enable_manhole", "--processes", "--image_dir", "--mask_dir"] - input_param_vals = [ - "", - get_threads_resource(dag.dag_id), - str(data_dir / "pipeline_output/expr"), - str(data_dir / "pipeline_output/mask"), + input_parameters = [ + {"parameter_name": "--enable_manhole", "value": ""}, + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--image_dir", "value": str(data_dir / "pipeline_output/expr")}, + {"parameter_name": "--mask_dir", "value": str(data_dir / "pipeline_output/mask")}, ] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -215,7 +187,6 @@ def build_cwltool_cmd_sprm(**kwargs): task_id="pipeline_exec_cwl_sprm", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -247,12 +218,11 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm") - # ["--ometiff_dir", "--sprm_output"] - input_param_vals = [ - str(data_dir / "pipeline_output"), - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--ometiff_dir", "value": str(data_dir / "pipeline_output")}, + {"parameter_name": "--sprm_output", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -266,7 +236,6 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): task_id="pipeline_exec_cwl_create_vis_symlink_archive", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_create_vis_symlink_archive')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -300,9 +269,11 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): key="cwl_workflows", task_ids="build_cmd_create_vis_symlink_archive" ) - # ["processes", "--ometiff_directory"] - input_param_vals = [get_threads_resource(dag.dag_id)] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": "."}, + ] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -316,8 +287,6 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -349,11 +318,10 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "ometiff-pyramids"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, ] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -367,7 +335,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_offsets')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -399,11 +366,10 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 5, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 5, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -417,7 +383,6 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): task_id="pipeline_exec_cwl_sprm_to_json", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_json')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -447,11 +412,10 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm_to_json") - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 6, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -465,7 +429,6 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): task_id="pipeline_exec_cwl_sprm_to_anndata", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_anndata')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, From eafb13cc27d35fa24c4c164f427833cd717bbb94 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:09:24 -0500 Subject: [PATCH 09/29] fix: fix relative pathing to be the full path --- src/ingest-pipeline/airflow/dags/celldive_deepcell.py | 2 +- src/ingest-pipeline/airflow/dags/codex_cytokit.py | 2 +- src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py | 2 +- src/ingest-pipeline/airflow/dags/mibi_deepcell.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py index 59b3a5f4..3512ed54 100644 --- a/src/ingest-pipeline/airflow/dags/celldive_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/celldive_deepcell.py @@ -276,7 +276,7 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): input_parameters = [ {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, - {"parameter_name": "--ometiff_directory", "value": "."}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, ] command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) diff --git a/src/ingest-pipeline/airflow/dags/codex_cytokit.py b/src/ingest-pipeline/airflow/dags/codex_cytokit.py index 1146d340..eb1caf5c 100644 --- a/src/ingest-pipeline/airflow/dags/codex_cytokit.py +++ b/src/ingest-pipeline/airflow/dags/codex_cytokit.py @@ -450,7 +450,7 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): input_parameters = [ {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, - {"parameter_name": "--ometiff_directory", "value": "."}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, ] command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) diff --git a/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py b/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py index 9f4ee1d6..9c29e58f 100644 --- a/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/kaggle_2_segmentation.py @@ -162,7 +162,7 @@ def build_cwltool_cwl_ome_tiff_pyramid_processed(**kwargs): input_parameters = [ {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, - {"parameter_name": "--ometiff_directory", "value": "."}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, ] command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) diff --git a/src/ingest-pipeline/airflow/dags/mibi_deepcell.py b/src/ingest-pipeline/airflow/dags/mibi_deepcell.py index 1d1ec8ac..7e7bf421 100644 --- a/src/ingest-pipeline/airflow/dags/mibi_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/mibi_deepcell.py @@ -271,7 +271,7 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): input_parameters = [ {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, - {"parameter_name": "--ometiff_directory", "value": "."}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, ] command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) From 2890e3d080c1f94f180ca8a2e1afe90a5f49bb1d Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:11:33 -0500 Subject: [PATCH 10/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/multiome.py | 84 +++++++++----------- 1 file changed, 38 insertions(+), 46 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index 6233a4b8..c9b67cec 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -91,39 +91,18 @@ def generate_multiome_dag(params: MultiomeSequencingDagParameters) -> DAG: "workflow_path": str( get_absolute_workflow(Path("multiome-rna-atac-pipeline", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--threads_rna", "value": ""}, - {"parameter_name": "--threads_atac", "value": ""}, - {"parameter_name": "--organism", "value": ""}, - {"parameter_name": "--assay_rna", "value": ""}, - {"parameter_name": "--fastq_dir_rna", "value": []}, - {"parameter_name": "--assay_atac", "value": ""}, - {"parameter_name": "--fastq_dir_atac", "value": []}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--reference", "value": ""}, - {"parameter_name": "--matrix", "value": "mudata_raw.h5mu"}, - { - "parameter_name": "--secondary-analysis-matrix", - "value": "secondary_analysis.h5mu", - }, - {"parameter_name": "--assay", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "mudata-to-ui.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, ] @@ -162,35 +141,36 @@ def build_cwltool_cmd1(**kwargs): else: source_type = unique_source_types.pop().lower() - # [--threads_rna, --threads_atac, --organism] - input_param_vals = [ - get_threads_resource(dag.dag_id), - get_threads_resource(dag.dag_id), - source_type, - ] - cwl_params = [ {"parameter_name": "--parallel", "value": ""}, ] - # [--assay_rna, --fastq_dir_rna, --assay_atac, --fastq_dir_atac] - for component in ["RNA", "ATAC"]: - input_param_vals.append(getattr(params, f"assay_{component.lower()}")) - input_param_vals.append( - [str(data_dir / Path(f"raw/fastq/{component}")) for data_dir in data_dirs] - ) + input_parameters = [ + {"parameter_name": "--threads_rna", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--threads_atac", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--organism", "value": source_type}, + {"parameter_name": "--assay_rna", "value": params.assay_rna}, + { + "parameter_name": "--fastq_dir_rna", + "value": [str(data_dir / Path(f"raw/fastq/rna")) for data_dir in data_dirs], + }, + {"parameter_name": "--assay_atac", "value": params.assay_atac}, + { + "parameter_name": "--fastq_dir_atac", + "value": [str(data_dir / Path(f"raw/fastq/atac")) for data_dir in data_dirs], + }, + ] - # Not always included: [--atac_metadata_file] atac_metadata_files = [find_atac_metadata_file(data_dir) for data_dir in data_dirs] if params.requires_one_atac_metadata_file: if (count := len(atac_metadata_files)) != 1: raise ValueError(f"Need 1 ATAC-seq metadata file, found {count}") - cwl_workflows[0]["input_parameters"].append( + input_parameters.append( {"parameter_name": "--atac_metadata_file", "value": atac_metadata_files[0]} ) command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) @@ -208,10 +188,17 @@ def build_cwltool_cmd2(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--reference, --matrix, --secondary-analysis-matrix, --assay] - input_param_vals = [organ_code, "", "", params.assay_azimuth] + input_parameters = [ + {"parameter_name": "--reference", "value": organ_code}, + {"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/mudata_raw.h5mu")}, + { + "parameter_name": "--secondary-analysis-matrix", + "value": "secondary_analysis.h5mu", + }, + {"parameter_name": "--assay", "value": params.assay_azimuth}, + ] command = get_cwl_cmd_from_workflows( - workflows, 1, input_param_vals, tmpdir, kwargs["ti"] + workflows, 1, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -223,8 +210,15 @@ def build_cwltool_cmd3(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 2, [], tmpdir, kwargs["ti"]) + cwl_parameters = { + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")} + } + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows( + workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) @@ -250,6 +244,7 @@ def build_cwltool_cmd3(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -259,7 +254,6 @@ def build_cwltool_cmd3(**kwargs): task_id="pipeline_exec_azimuth_annotate", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "$tmp_dir"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -270,9 +264,7 @@ def build_cwltool_cmd3(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ + mkdir -p "$tmp_dir"/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 26f967c78675f3ee39a55ad16414cbf52dadd39d Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:14:31 -0500 Subject: [PATCH 11/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/ometiff_pyramid.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/ometiff_pyramid.py b/src/ingest-pipeline/airflow/dags/ometiff_pyramid.py index e06d4f72..aac7a85f 100644 --- a/src/ingest-pipeline/airflow/dags/ometiff_pyramid.py +++ b/src/ingest-pipeline/airflow/dags/ometiff_pyramid.py @@ -73,19 +73,12 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_directory", "value": "./ometiff-pyramids"}, - ], "documentation_url": "", }, ] @@ -108,10 +101,12 @@ def build_cwltool_cmd1(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - # [--processes, --ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), str(data_dir)] + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(data_dir)}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -144,8 +139,13 @@ def build_cwltool_cmd2(**kwargs): print("tmpdir: ", tmpdir) workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # ["--input_directory"] - command = get_cwl_cmd_from_workflows(workflows, 1, [], tmpdir, kwargs["ti"]) + input_parameters = [ + { + "parameter_name": "--input_directory", + "value": str(tmpdir / "cwl_out/ometiff-pyramids"), + }, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -159,7 +159,6 @@ def build_cwltool_cmd2(**kwargs): task_id="pipeline_exec_cwl2", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 649f1cc593bd3220941060acc943aebd138a1b1d Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:17:30 -0500 Subject: [PATCH 12/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/ometiff_pyramid_ims.py | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/ometiff_pyramid_ims.py b/src/ingest-pipeline/airflow/dags/ometiff_pyramid_ims.py index f28fba8c..6678eb95 100644 --- a/src/ingest-pipeline/airflow/dags/ometiff_pyramid_ims.py +++ b/src/ingest-pipeline/airflow/dags/ometiff_pyramid_ims.py @@ -75,20 +75,12 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--downsample_type", "value": DOWNSAMPLE_TYPE}, - {"parameter_name": "--ometiff_directory", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_directory", "value": "./ometiff-pyramids"}, - ], "documentation_url": "", }, ] @@ -111,12 +103,14 @@ def build_cwltool_cmd1(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - # [--processes, --downsample_type, --ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), "", str(data_dir)] + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--downsample_type", "value": DOWNSAMPLE_TYPE}, + {"parameter_name": "--ometiff_directory", "value": str(data_dir)}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) - # this is the call to the CWL return join_quote_command_str(command) @@ -148,8 +142,14 @@ def build_cwltool_cmd2(**kwargs): print("tmpdir: ", tmpdir) workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--input_directory] - command = get_cwl_cmd_from_workflows(workflows, 1, [], tmpdir, kwargs["ti"]) + + input_parameters = [ + { + "parameter_name": "--input_directory", + "value": str(tmpdir / "cwl_out/ometiff-pyramids"), + }, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -163,7 +163,6 @@ def build_cwltool_cmd2(**kwargs): task_id="pipeline_exec_cwl2", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 3e30b66b8a63713f1738898c580172c85313f65e Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:22:24 -0500 Subject: [PATCH 13/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/pas_ftu_segmentation.py | 52 ++++++++----------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index b650917d..c2f0728b 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -69,40 +69,26 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path(pipeline_name, "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--data_directory", "value": ""}, - {"parameter_name": "--tissue_type", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [{"parameter_name": "--input_dir", "value": ""}], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-metadata.cwl")) ), - "input_parameters": [{"parameter_name": "--input_dir", "value": ""}], "documentation_url": "", }, ] @@ -126,9 +112,13 @@ def build_cwltool_cwl_segmentation(**kwargs): organ_code = organ_list[0] if len(organ_list) == 1 else "multi" # [--data_directory, --tissue_type] - input_param_vals = [str(data_dir), organ_code] + + input_parameters = [ + {"parameter_name": "--data_directory", "value": str(data_dir)}, + {"parameter_name": "--tissue_type", "value": organ_code}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -171,9 +161,11 @@ def build_cwltool_cwl_ome_tiff_pyramid_processed(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_segmentation") - # [--processes] - input_param_vals = [get_threads_resource(dag.dag_id)] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -187,7 +179,6 @@ def build_cwltool_cwl_ome_tiff_pyramid_processed(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid_processed", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid_processed')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -219,9 +210,11 @@ def build_cwltool_cwl_ome_tiff_pyramid_raw(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_processed" ) - # [--processes, --ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), str(data_dir)] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -267,8 +260,9 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): ) # [--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + + input_parameters = [{"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -300,7 +294,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): prepare_cwl_ome_tiff_metadata = DummyOperator(task_id="prepare_cwl_ome_tiff_metadata") - def build_cwltool_cmd_ome_tiff_metadata(**kwargs): run_id = kwargs["run_id"] tmpdir = get_tmp_dir_path(run_id) @@ -314,13 +307,11 @@ def build_cwltool_cmd_ome_tiff_metadata(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # [--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [{"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}] + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) - t_build_cmd_ome_tiff_metadata = PythonOperator( task_id="build_cmd_ome_tiff_metadata", python_callable=build_cwltool_cmd_ome_tiff_metadata, @@ -331,7 +322,6 @@ def build_cwltool_cmd_ome_tiff_metadata(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_metadata", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_metadata')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, From b99fd3bb0752921ede3024ec96391830da10c785 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:22:34 -0500 Subject: [PATCH 14/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index c2f0728b..16956dc3 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -261,7 +261,9 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): # [--input_dir] - input_parameters = [{"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")} + ] command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -307,7 +309,9 @@ def build_cwltool_cmd_ome_tiff_metadata(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - input_parameters = [{"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")} + ] command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) From 479e7240dcaee4e1b246958db8f1117a23268915 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:29:20 -0500 Subject: [PATCH 15/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/phenocycler_deepcell.py | 89 +++++++------------ 1 file changed, 30 insertions(+), 59 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py b/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py index 8a477339..9bbae9c7 100644 --- a/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py +++ b/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py @@ -64,66 +64,38 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path(pipeline_name, "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--gpus", "value": "all"}, - {"parameter_name": "--segmentation_method", "value": "deepcell"}, - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("sprm", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--enable_manhole", "value": ""}, - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--image_dir", "value": ""}, - {"parameter_name": "--mask_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("create-vis-symlink-archive", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--ometiff_dir", "value": ""}, - {"parameter_name": "--sprm_output", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-json.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-anndata.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -140,10 +112,13 @@ def build_cwltool_cwl_segmentation(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - # ["--gpus=all", "--segmentation_method", "--data_dir"] - input_param_vals = ["", "", str(data_dir)] + input_parameters = [ + {"parameter_name": "--gpus", "value": "all"}, + {"parameter_name": "--segmentation_method", "value": "deepcell"}, + {"parameter_name": "--data_dir", "value": str(data_dir)}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -188,14 +163,13 @@ def build_cwltool_cmd_sprm(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_segmentation") - # ["--enable_manhole", "--processes", "--image_dir", "--mask_dir"] - input_param_vals = [ - "", - get_threads_resource(dag.dag_id), - str(data_dir / "pipeline_output/expr"), - str(data_dir / "pipeline_output/mask"), + input_parameters = [ + {"parameter_name": "--enable_manhole", "value": ""}, + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--image_dir", "value": str(data_dir / "pipeline_output/expr")}, + {"parameter_name": "--mask_dir", "value": str(data_dir / "pipeline_output/mask")}, ] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -240,12 +214,11 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm") - # ["--ometiff_dir", "--sprm_output"] - input_param_vals = [ - str(data_dir / "pipeline_output"), - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--ometiff_dir", "value": str(data_dir / "pipeline_output")}, + {"parameter_name": "--sprm_output", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -292,9 +265,11 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): key="cwl_workflows", task_ids="build_cmd_create_vis_symlink_archive" ) - # ["processes", "--ometiff_directory"] - input_param_vals = [get_threads_resource(dag.dag_id)] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -308,7 +283,6 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -340,11 +314,10 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "ometiff-pyramids"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, ] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -389,11 +362,10 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 5, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 5, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -436,11 +408,10 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm_to_json") - # ["--input_dir"] - input_param_vals = [ - str(data_dir / "sprm_outputs"), + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, ] - command = get_cwl_cmd_from_workflows(workflows, 6, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) From c63d4ca54dc1a65d590abb70a39789c625db05c5 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:36:45 -0500 Subject: [PATCH 16/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/salmon_rnaseq.py | 83 +++++++++---------- 1 file changed, 39 insertions(+), 44 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py index 988c9ea9..149df9d3 100644 --- a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py +++ b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py @@ -68,45 +68,24 @@ def generate_salmon_rnaseq_dag(params: SequencingDagParameters) -> DAG: cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path("salmon-rnaseq", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--assay", "value": ""}, - {"parameter_name": "--threads", "value": ""}, - {"parameter_name": "--organism", "value": ""}, - {"parameter_name": "--fastq_dir", "value": []}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--reference", "value": ""}, - {"parameter_name": "--matrix", "value": "expr.h5ad"}, - { - "parameter_name": "--secondary-analysis-matrix", - "value": "secondary_analysis.h5ad", - }, - {"parameter_name": "--assay", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "h5ad-to-arrow.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "anndata-to-ui.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, ] @@ -147,20 +126,22 @@ def build_cwltool_cmd1(**kwargs): else: source_type = unique_source_types.pop().lower() - # [--assay, --threads, --organism, --fastq_dir] - input_param_vals = [ - params.assay, - get_threads_resource(dag.dag_id), - source_type, - [str(data_dir) for data_dir in data_dirs], - ] - cwl_params = [ {"parameter_name": "--parallel", "value": ""}, ] + input_parameters = [ + {"parameter_name": "--assay", "value": params.assay}, + {"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--organism", "value": source_type}, + { + "parameter_name": "--fastq_dir", + "value": [str(data_dir) for data_dir in data_dirs], + }, + ] + command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) @@ -178,10 +159,17 @@ def build_cwltool_cmd2(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--reference, --matrix, --secondary-analysis-matrix, --assay] - input_param_vals = [organ_code, "", "", params.assay] + input_parameters = [ + {"parameter_name": "--reference", "value": organ_code}, + {"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/expr.h5ad")}, + { + "parameter_name": "--secondary-analysis-matrix", + "value": str(tmpdir / "cwl_out/secondary_analysis.h5ad"), + }, + {"parameter_name": "--assay", "value": params.assay}, + ] command = get_cwl_cmd_from_workflows( - workflows, 1, input_param_vals, tmpdir, kwargs["ti"] + workflows, 1, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -193,8 +181,15 @@ def build_cwltool_cmd3(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 2, [], tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows( + workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) @@ -205,8 +200,13 @@ def build_cwltool_cmd4(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 3, [], tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) return join_quote_command_str(command) @@ -238,6 +238,7 @@ def build_cwltool_cmd4(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -247,7 +248,6 @@ def build_cwltool_cmd4(**kwargs): task_id="pipeline_exec_azimuth_annotate", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "$tmp_dir"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -258,9 +258,7 @@ def build_cwltool_cmd4(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ + mkdir -p "$tmp_dir"/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -271,9 +269,6 @@ def build_cwltool_cmd4(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From bc3855203e9197bad2cf9c6879e82e81e065555f Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:36:49 -0500 Subject: [PATCH 17/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/salmon_rnaseq.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py index 149df9d3..c1916a2b 100644 --- a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py +++ b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py @@ -206,7 +206,9 @@ def build_cwltool_cmd4(**kwargs): input_parameters = [ {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, ] - command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) + command = get_cwl_cmd_from_workflows( + workflows, 3, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) From 916d3ba319a5c8b4fdf5a86098973fac10be4390 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:40:18 -0500 Subject: [PATCH 18/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/multiome.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/multiome.py b/src/ingest-pipeline/airflow/dags/multiome.py index c9b67cec..5bf505da 100644 --- a/src/ingest-pipeline/airflow/dags/multiome.py +++ b/src/ingest-pipeline/airflow/dags/multiome.py @@ -264,7 +264,7 @@ def build_cwltool_cmd3(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - mkdir -p "$tmp_dir"/cwl_out/hubmap_ui ; \ + mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 6264e249175f4362814edd25b9258db6332bfb25 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:40:49 -0500 Subject: [PATCH 19/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/salmon_rnaseq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py index c1916a2b..8967a5ea 100644 --- a/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py +++ b/src/ingest-pipeline/airflow/dags/salmon_rnaseq.py @@ -260,7 +260,7 @@ def build_cwltool_cmd4(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - mkdir -p "$tmp_dir"/cwl_out/hubmap_ui ; \ + mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From e7be23fd79f8ab2e87662519587c8065d8348dd3 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:41:31 -0500 Subject: [PATCH 20/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/salmon_rnaseq_bulk.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/salmon_rnaseq_bulk.py b/src/ingest-pipeline/airflow/dags/salmon_rnaseq_bulk.py index 1b69ce9c..ab464afd 100644 --- a/src/ingest-pipeline/airflow/dags/salmon_rnaseq_bulk.py +++ b/src/ingest-pipeline/airflow/dags/salmon_rnaseq_bulk.py @@ -72,11 +72,6 @@ def get_organism_name() -> str: "workflow_path": str( get_absolute_workflow(Path("salmon-rnaseq", "bulk-pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--fastq_dir", "value": ""}, - {"parameter_name": "--threads", "value": ""}, - {"parameter_name": "--organism", "value": ""}, - ], "documentation_url": "", } ] @@ -108,15 +103,17 @@ def build_cwltool_cmd1(**kwargs): else: source_type = unique_source_types.pop().lower() - # [--fastq_dir, --threads, --organism] - input_param_vals = [str(data_dir), get_threads_resource(dag.dag_id), source_type] - cwl_params = [ {"parameter_name": "--parallel", "value": ""}, ] + input_parameters = [ + {"parameter_name": "--fastq_dir", "value": str(data_dir)}, + {"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--organism", "value": source_type}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) @@ -131,6 +128,7 @@ def build_cwltool_cmd1(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, From b36a4b4e512eb8f822999adb37fb8745d3687b44 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:43:35 -0500 Subject: [PATCH 21/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/sc_atac_seq.py | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py index e11b3812..486d35a3 100644 --- a/src/ingest-pipeline/airflow/dags/sc_atac_seq.py +++ b/src/ingest-pipeline/airflow/dags/sc_atac_seq.py @@ -71,21 +71,12 @@ def generate_atac_seq_dag(params: SequencingDagParameters) -> DAG: Path("sc-atac-seq-pipeline", "sc_atac_seq_prep_process_analyze.cwl") ) ), - "input_parameters": [ - {"parameter_name": "--assay", "value": ""}, - {"parameter_name": "--exclude_bam", "value": ""}, - {"parameter_name": "--threads", "value": ""}, - {"parameter_name": "--sequence_directory", "value": []}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "scatac-csv-to-arrow.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": "."}, - ], "documentation_url": "", }, ] @@ -104,20 +95,21 @@ def build_cwltool_cmd1(**kwargs): data_dirs = get_parent_data_dirs_list(**kwargs) print("data_dirs: ", data_dirs) - # [--assay, --exclude_bam, --threads, --sequence_directory] - input_param_vals = [ - params.assay, - "", - get_threads_resource(dag.dag_id), - [str(data_dir) for data_dir in data_dirs], - ] - cwl_params = [ {"parameter_name": "--parallel", "value": ""}, ] + input_parameters = [ + {"parameter_name": "--assay", "value": params.assay}, + {"parameter_name": "--exclude_bam", "value": ""}, + {"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)}, + { + "parameter_name": "--sequence_directory", + "value": [str(data_dir) for data_dir in data_dirs], + }, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) @@ -129,8 +121,12 @@ def build_cwltool_cmd2(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 1, [], tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows( + workflows, 1, input_parameters, tmpdir, kwargs["ti"] + ) return join_quote_command_str(command) @@ -150,6 +146,7 @@ def build_cwltool_cmd2(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -160,7 +157,6 @@ def build_cwltool_cmd2(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 919dd687020835bb7a7520d8d3b53690bb52ea9d Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:52:23 -0500 Subject: [PATCH 22/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/visium.py | 102 +++++++++------------ 1 file changed, 44 insertions(+), 58 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/visium.py b/src/ingest-pipeline/airflow/dags/visium.py index 878b6289..a5de4510 100644 --- a/src/ingest-pipeline/airflow/dags/visium.py +++ b/src/ingest-pipeline/airflow/dags/visium.py @@ -64,53 +64,28 @@ cwl_workflows = [ { "workflow_path": str(get_absolute_workflow(Path("salmon-rnaseq", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--assay", "value": "visium-ff"}, - {"parameter_name": "--threads", "value": ""}, - {"parameter_name": "--organism", "value": ""}, - {"parameter_name": "--fastq_dir", "value": ""}, - {"parameter_name": "--img_dir", "value": ""}, - {"parameter_name": "--metadata_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "h5ad-to-arrow.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "anndata-to-ui.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""}, - { - "parameter_name": "--output_filename", - "value": "visium_histology_hires_pyramid.ome.tif", - }, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -153,22 +128,20 @@ def build_cwltool_cmd1(**kwargs): else: source_type = unique_source_types.pop().lower() - # [--assay, --threads, --organism, --fastq_dir, --img_dir, --metadata_dir] - input_param_vals = [ - "", - get_threads_resource(dag.dag_id), - source_type, - str(data_dir / "raw/fastq/"), - str(data_dir), - str(data_dir), - ] - cwl_params = [ {"parameter_name": "--parallel", "value": ""}, ] + input_parameters = [ + {"parameter_name": "--assay", "value": "visium-ff"}, + {"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--organism", "value": source_type}, + {"parameter_name": "--fastq_dir", "value": str(data_dir / "raw/fastq")}, + {"parameter_name": "--img_dir", "value": str(data_dir)}, + {"parameter_name": "--metadata_dir", "value": str(data_dir)}, + ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params ) return join_quote_command_str(command) @@ -180,8 +153,15 @@ def build_cwltool_cmd2(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 1, [], tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows( + workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) @@ -192,8 +172,15 @@ def build_cwltool_cmd3(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2") - # [--input_dir] - command = get_cwl_cmd_from_workflows(workflows, 2, [], tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows( + workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) @@ -210,13 +197,18 @@ def build_cwltool_cmd4(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd3") - # [--processes, --ometiff_directory, --output_filename] - input_param_vals = [ - get_threads_resource(dag.dag_id), - str(data_dir / "lab_processed/images/"), - "", + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + { + "parameter_name": "--ometiff_directory", + "value": str(data_dir / "lab_processed/images/"), + }, + { + "parameter_name": "--output_filename", + "value": str(tmpdir / "cwl_out/visium_histology_hires_pyramid.ome.tif"), + }, ] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -231,9 +223,10 @@ def build_cwltool_cmd5(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd4") - # [--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -271,6 +264,7 @@ def build_cwltool_cmd5(**kwargs): task_id="pipeline_exec", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -281,9 +275,7 @@ def build_cwltool_cmd5(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ + mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -294,9 +286,6 @@ def build_cwltool_cmd5(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -306,8 +295,6 @@ def build_cwltool_cmd5(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -317,7 +304,6 @@ def build_cwltool_cmd5(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd5')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, From 30e6f16fb7e8abaec19eb53eb8761a540cc6bf10 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 12:58:59 -0500 Subject: [PATCH 23/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py index 16956dc3..8e30840b 100644 --- a/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py +++ b/src/ingest-pipeline/airflow/dags/pas_ftu_segmentation.py @@ -111,8 +111,6 @@ def build_cwltool_cwl_segmentation(**kwargs): organ_list = list(set(ds_rslt["organs"])) organ_code = organ_list[0] if len(organ_list) == 1 else "multi" - # [--data_directory, --tissue_type] - input_parameters = [ {"parameter_name": "--data_directory", "value": str(data_dir)}, {"parameter_name": "--tissue_type", "value": organ_code}, @@ -259,8 +257,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_raw" ) - # [--input_dir] - input_parameters = [ {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")} ] From d2280507b8f0462a68f89598265d687f9c8759c9 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 13:06:19 -0500 Subject: [PATCH 24/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../dags/transformations/tsv_to_mudata.py | 64 +++++++------------ 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py b/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py index 73f1b65d..116a4be4 100644 --- a/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py +++ b/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py @@ -73,43 +73,26 @@ "workflow_path": str( get_absolute_workflow(Path("epic-obj-csv-to-mudata", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "seg-mudata-to-zarr.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_directory", "value": "./ometiff-pyramids"}, - ], "documentation_url": "", }, ] @@ -236,9 +219,10 @@ def build_cwl_cmd_tsv_to_mudata(**kwargs): data_dir = ti.xcom_pull(task_ids="create_or_use_dataset") print("data_dir: ", data_dir) - # [data_dir] - input_param_vals = [str(data_dir)] - command = get_cwl_cmd_from_workflows(cwl_workflows, 0, input_param_vals, tmpdir, ti) + input_parameters = [ + {"parameter_name": "--data_dir", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows(cwl_workflows, 0, input_parameters, tmpdir, ti) return join_quote_command_str(command) @@ -253,7 +237,6 @@ def build_cwl_cmd_tsv_to_mudata(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_cmd_tsv_to_mudata')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -279,8 +262,13 @@ def build_cwl_cmd_seg_mudata_to_zarr(**kwargs): key="cwl_workflows", task_ids="build_cwl_cmd_tsv_to_mudata" ) - # [input_dir] - command = get_cwl_cmd_from_workflows(workflows, 1, [], tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) return join_quote_command_str(command) @@ -295,9 +283,7 @@ def build_cwl_cmd_seg_mudata_to_zarr(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="create_or_use_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ + mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cwl_cmd_seg_mudata_to_zarr')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -330,12 +316,11 @@ def build_cwl_cmd_ome_tiff_pyramid_processed(**kwargs): key="cwl_workflows", task_ids="build_cwl_cmd_seg_mudata_to_zarr" ) - # [processes, ometiff_directory] - input_param_vals = [ - get_threads_resource(dag.dag_id), - f"{data_dir}/derived/segmentation_masks", + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": f"{data_dir}/derived/segmentation_masks"}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, ti) + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, ti) return join_quote_command_str(command) @@ -349,8 +334,6 @@ def build_cwl_cmd_ome_tiff_pyramid_processed(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid_processed", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_cmd_ome_tiff_pyramid_processed')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -376,9 +359,11 @@ def build_cwltool_cwl_ome_tiff_pyramid_raw(**kwargs): key="cwl_workflows", task_ids="build_cwl_cmd_ome_tiff_pyramid_processed" ) - # [processes, ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), str(data_dir)] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -392,8 +377,6 @@ def build_cwltool_cwl_ome_tiff_pyramid_raw(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid_raw", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p ${tmp_dir}/cwl_out ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid_raw')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -422,7 +405,9 @@ def build_cwl_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid_raw" ) - # [input_directory] + input_parameters = [ + {"parameter_name": "--input_directory", "value": str(tmpdir / "cwl_out/ometiff-pyramids")}, + ] command = get_cwl_cmd_from_workflows(workflows, 4, [], tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -437,7 +422,6 @@ def build_cwl_cmd_ome_tiff_offsets(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_cmd_ome_tiff_offsets')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From 657047e944160c1f8130a5f5f66aace4acc94c40 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 13:06:38 -0500 Subject: [PATCH 25/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../dags/transformations/tsv_to_mudata.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py b/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py index 116a4be4..8e9f5199 100644 --- a/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py +++ b/src/ingest-pipeline/airflow/dags/transformations/tsv_to_mudata.py @@ -268,7 +268,9 @@ def build_cwl_cmd_seg_mudata_to_zarr(**kwargs): input_parameters = [ {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, ] - command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) + command = get_cwl_cmd_from_workflows( + workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) @@ -318,7 +320,10 @@ def build_cwl_cmd_ome_tiff_pyramid_processed(**kwargs): input_parameters = [ {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, - {"parameter_name": "--ometiff_directory", "value": f"{data_dir}/derived/segmentation_masks"}, + { + "parameter_name": "--ometiff_directory", + "value": f"{data_dir}/derived/segmentation_masks", + }, ] command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, ti) @@ -406,9 +411,12 @@ def build_cwl_cmd_ome_tiff_offsets(**kwargs): ) input_parameters = [ - {"parameter_name": "--input_directory", "value": str(tmpdir / "cwl_out/ometiff-pyramids")}, + { + "parameter_name": "--input_directory", + "value": str(tmpdir / "cwl_out/ometiff-pyramids"), + }, ] - command = get_cwl_cmd_from_workflows(workflows, 4, [], tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) From 4168f78a8066ec6a090ee169b29d9299c6b24069 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 13:06:47 -0500 Subject: [PATCH 26/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- src/ingest-pipeline/airflow/dags/visium.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/visium.py b/src/ingest-pipeline/airflow/dags/visium.py index a5de4510..a9194c16 100644 --- a/src/ingest-pipeline/airflow/dags/visium.py +++ b/src/ingest-pipeline/airflow/dags/visium.py @@ -128,7 +128,7 @@ def build_cwltool_cmd1(**kwargs): else: source_type = unique_source_types.pop().lower() - cwl_params = [ + cwl_parameters = [ {"parameter_name": "--parallel", "value": ""}, ] input_parameters = [ @@ -141,7 +141,7 @@ def build_cwltool_cmd1(**kwargs): ] command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_parameters ) return join_quote_command_str(command) From 575837d8a5163835b0d7c99a65c77c68ff850159 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 13:17:19 -0500 Subject: [PATCH 27/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/azimuth_annotations.py | 87 +++++++++---------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/azimuth_annotations.py b/src/ingest-pipeline/airflow/dags/azimuth_annotations.py index 15f0436a..0c6c6de8 100644 --- a/src/ingest-pipeline/airflow/dags/azimuth_annotations.py +++ b/src/ingest-pipeline/airflow/dags/azimuth_annotations.py @@ -64,57 +64,30 @@ cwl_workflows_annotations_salmon = [ { "workflow_path": str(get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--reference", "value": ""}, - {"parameter_name": "--matrix", "value": "expr.h5ad"}, - { - "parameter_name": "--secondary-analysis-matrix", - "value": "secondary_analysis.h5ad", - }, - {"parameter_name": "--assay", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "h5ad-to-arrow.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "anndata-to-ui.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, ] cwl_workflows_annotations_multiome = [ { "workflow_path": str(get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--reference", "value": ""}, - {"parameter_name": "--matrix", "value": "mudata_raw.h5mu"}, - { - "parameter_name": "--secondary-analysis-matrix", - "value": "secondary_analysis.h5mu", - }, - {"parameter_name": "--assay", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "mudata-to-ui.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ".."}, - ], "documentation_url": "", }, ] @@ -169,15 +142,30 @@ def build_cwltool_cmd1(**kwargs): organ_code = organ_list[0] if len(organ_list) == 1 else "multi" assay, matrix, secondary_analysis, _ = get_assay_previous_version(**kwargs) - workflows = ( - cwl_workflows_annotations_multiome - if "mudata" in secondary_analysis - else cwl_workflows_annotations_salmon - ) + if "mudata" in secondary_analysis: + workflows = cwl_workflows_annotations_multiome + input_parameters = [ + {"parameter_name": "--reference", "value": organ_code}, + {"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/mudata_raw.h5mu")}, + { + "parameter_name": "--secondary-analysis-matrix", + "value": str(tmpdir / "cwl_outsecondary_analysis.h5mu"), + }, + {"parameter_name": "--assay", "value": assay}, + ], + else: + workflows = cwl_workflows_annotations_salmon + input_parameters = [ + {"parameter_name": "--reference", "value": organ_code}, + {"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/expr.h5ad")}, + { + "parameter_name": "--secondary-analysis-matrix", + "value": str(tmpdir / "cwl_out/secondary_analysis.h5ad"), + }, + {"parameter_name": "--assay", "value": assay}, + ] - # [--reference, --matrix, --secondary-analysis-matrix, --assay] - input_param_vals = [organ_code, matrix, secondary_analysis, assay] - command = get_cwl_cmd_from_workflows(workflows, 0, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 0, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -189,9 +177,13 @@ def build_cwltool_cmd2(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1") - # [--input_dir] - input_param_vals = [] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) kwargs["ti"].xcom_push(key="skip_cwl3", value=1 if workflow == 0 else 0) return join_quote_command_str(command) @@ -203,9 +195,13 @@ def build_cwltool_cmd4(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2") - # [--input_dir] - input_param_vals = [] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + cwl_parameters = [ + {"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")}, + ] + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) return join_quote_command_str(command) @@ -233,7 +229,7 @@ def build_cwltool_cmd4(**kwargs): task_id="pipeline_exec_azimuth_annotate", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "$tmp_dir"/cwl_out ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd1')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -244,9 +240,7 @@ def build_cwltool_cmd4(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ + mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -257,9 +251,6 @@ def build_cwltool_cmd4(**kwargs): bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ - cd "$tmp_dir"/cwl_out ; \ - mkdir -p hubmap_ui ; \ - cd hubmap_ui ; \ {{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, From bb427cac735e47bc322aa0504a7d184b7cd922d9 Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Fri, 21 Feb 2025 13:17:56 -0500 Subject: [PATCH 28/29] fix: changes to make input_parameters additive and allow for a default --outdir and the ability to override that value as well. --- .../airflow/dags/azimuth_annotations.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/azimuth_annotations.py b/src/ingest-pipeline/airflow/dags/azimuth_annotations.py index 0c6c6de8..4ef1f1f5 100644 --- a/src/ingest-pipeline/airflow/dags/azimuth_annotations.py +++ b/src/ingest-pipeline/airflow/dags/azimuth_annotations.py @@ -146,13 +146,16 @@ def build_cwltool_cmd1(**kwargs): workflows = cwl_workflows_annotations_multiome input_parameters = [ {"parameter_name": "--reference", "value": organ_code}, - {"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/mudata_raw.h5mu")}, + { + "parameter_name": "--matrix", + "value": str(tmpdir / "cwl_out/mudata_raw.h5mu"), + }, { "parameter_name": "--secondary-analysis-matrix", "value": str(tmpdir / "cwl_outsecondary_analysis.h5mu"), }, {"parameter_name": "--assay", "value": assay}, - ], + ] else: workflows = cwl_workflows_annotations_salmon input_parameters = [ @@ -183,7 +186,9 @@ def build_cwltool_cmd2(**kwargs): input_parameters = [ {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, ] - command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) + command = get_cwl_cmd_from_workflows( + workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) kwargs["ti"].xcom_push(key="skip_cwl3", value=1 if workflow == 0 else 0) return join_quote_command_str(command) @@ -201,7 +206,9 @@ def build_cwltool_cmd4(**kwargs): input_parameters = [ {"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters) + command = get_cwl_cmd_from_workflows( + workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters + ) return join_quote_command_str(command) From 4fd40f0d92470e23feca27bd7f6c563b2dc2ec3f Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 25 Feb 2025 14:48:55 -0500 Subject: [PATCH 29/29] Bug fix: adding codex parameters --- .../airflow/dags/codex_cytokit.py | 148 +++++++----------- 1 file changed, 60 insertions(+), 88 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/codex_cytokit.py b/src/ingest-pipeline/airflow/dags/codex_cytokit.py index ec4fd071..eee8677b 100644 --- a/src/ingest-pipeline/airflow/dags/codex_cytokit.py +++ b/src/ingest-pipeline/airflow/dags/codex_cytokit.py @@ -67,94 +67,54 @@ "workflow_path": str( get_absolute_workflow(steps_dir / "illumination_first_stitching.cwl") ), - "input_parameters": [ - {"parameter_name": "--gpus", "value": "0,1"}, - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(steps_dir / "run_cytokit.cwl")), - "input_parameters": [ - {"parameter_name": "--data_dir", "value": ""}, - {"parameter_name": "--yaml_config", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(steps_dir / "ometiff_second_stitching.cwl") ), - "input_parameters": [ - {"parameter_name": "--cytokit_config", "value": ""}, - {"parameter_name": "--cytokit_output", "value": ""}, - {"parameter_name": "--slicing_pipeline_config", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("deepcelltypes", "run_deepcelltypes.cwl")) ), - "input_parameters": [ - {"parameter_name": "--data_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("sprm", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--enable_manhole", "value": ""}, - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--image_dir", "value": ""}, - {"parameter_name": "--mask_dir", "value": ""}, - {"parameter_name": "--cell_types_file", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("create-vis-symlink-archive", "pipeline.cwl")) ), - "input_parameters": [ - {"parameter_name": "--ometiff_dir", "value": ""}, - {"parameter_name": "--sprm_output", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str(get_absolute_workflow(Path("ome-tiff-pyramid", "pipeline.cwl"))), - "input_parameters": [ - {"parameter_name": "--processes", "value": ""}, - {"parameter_name": "--ometiff_directory", "value": "."}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "ome-tiff-offsets.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-json.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, { "workflow_path": str( get_absolute_workflow(Path("portal-containers", "sprm-to-anndata.cwl")) ), - "input_parameters": [ - {"parameter_name": "--input_dir", "value": ""}, - ], "documentation_url": "", }, ] @@ -173,10 +133,13 @@ def build_cwltool_cwl_illumination_first_stitching(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dir: ", data_dir) - # [--gpus=0,1, --data_dir] - input_param_vals = ["", str(data_dir)] + input_parameters = [ + {"parameter_name": "--gpus", "value": "0,1"}, + {"parameter_name": "--data_dir", "value": str(data_dir)}, + ] + command = get_cwl_cmd_from_workflows( - cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"] + cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"] ) return join_quote_command_str(command) @@ -191,8 +154,7 @@ def build_cwltool_cwl_illumination_first_stitching(**kwargs): task_id="pipeline_exec_cwl_illumination_first_stitching", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - mkdir -p "${tmp_dir}"/cwl_out ; \ - cd "${tmp_dir}"/cwl_out ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_illumination_first_stitching')}} > $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -222,9 +184,12 @@ def build_cwltool_cwl_cytokit(**kwargs): key="cwl_workflows", task_ids="build_cwl_illumination_first_stitching" ) - # [--data_dir, --yaml_config] - input_param_vals = [str(data_dir / "new_tiles"), str(data_dir / "experiment.yaml")] - command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--data_dir", "value": str(data_dir / "new_tiles")}, + {"parameter_name": "--yaml_config", "value": str(data_dir / "experiment.yaml")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 1, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -238,7 +203,6 @@ def build_cwltool_cwl_cytokit(**kwargs): task_id="pipeline_exec_cwl_cytokit", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_cytokit')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -268,13 +232,15 @@ def build_cwltool_cwl_ometiff_second_stitching(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cwl_cytokit") - # [--cytokit_config, --cytokit_output, --slicing_pipeline_config] - input_param_vals = [ - str(data_dir / "experiment.yaml"), - str(data_dir / "cytokit"), - str(data_dir / "pipelineConfig.json"), + input_parameters = [ + {"parameter_name": "--cytokit_config", "value": str(data_dir / "experiment.yaml")}, + {"parameter_name": "--cytokit_output", "value": str(data_dir / "cytokit")}, + {"parameter_name": "--slicing_pipeline_config", + "value": str(data_dir / "pipelineConfig.json"), }, + {"parameter_name": "--num_concurrent_tasks", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--data_dir", "value": str(get_parent_data_dir(**kwargs))}, ] - command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"]) + command = get_cwl_cmd_from_workflows(workflows, 2, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -288,7 +254,6 @@ def build_cwltool_cwl_ometiff_second_stitching(**kwargs): task_id="pipeline_exec_cwl_ometiff_second_stitching", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ometiff_second_stitching')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -329,9 +294,11 @@ def build_cwltool_cmd_deepcelltypes(**kwargs): key="cwl_workflows", task_ids="build_cwl_ometiff_second_stitching" ) - # [--data_dir] - input_param_vals = [str(data_dir / "pipeline_output")] - command = get_cwl_cmd_from_workflows(workflows, 3, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--data_dir", "value": str(data_dir / "pipeline_output")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 3, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -375,15 +342,18 @@ def build_cwltool_cmd_sprm(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_deepcelltypes") - # [--enable_manhole, --processes, --image_dir, --mask_dir, --cell_types_file] - input_param_vals = [ - "", - get_threads_resource(dag.dag_id), - str(data_dir / "pipeline_output/expr"), - str(data_dir / "pipeline_output/mask"), - str(data_dir / "deepcelltypes_predictions.csv"), + input_parameters = [ + {"parameter_name": "--enable_manhole", "value": ""}, + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--image_dir", "value": str(data_dir / "pipeline_output/expr")}, + {"parameter_name": "--mask_dir", "value": str(data_dir / "pipeline_output/mask")}, + { + "parameter_name": "--cell_types_file", + "value": str(data_dir / "deepcelltypes_predictions.csv"), + }, ] - command = get_cwl_cmd_from_workflows(workflows, 4, input_param_vals, tmpdir, kwargs["ti"]) + + command = get_cwl_cmd_from_workflows(workflows, 4, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -397,7 +367,6 @@ def build_cwltool_cmd_sprm(**kwargs): task_id="pipeline_exec_cwl_sprm", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -429,9 +398,11 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm") - # [--ometiff_dir, --sprm_output] - input_param_vals = [str(data_dir / "pipeline_output"), str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 5, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--ometiff_dir", "value": str(data_dir / "pipeline_output")}, + {"parameter_name": "--sprm_output", "value": str(data_dir / "sprm_outputs")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 5, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -445,7 +416,6 @@ def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): task_id="pipeline_exec_cwl_create_vis_symlink_archive", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_create_vis_symlink_archive')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -479,9 +449,11 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): key="cwl_workflows", task_ids="build_cmd_create_vis_symlink_archive" ) - # [--processes, --ometiff_directory] - input_param_vals = [get_threads_resource(dag.dag_id), ""] - command = get_cwl_cmd_from_workflows(workflows, 6, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--processes", "value": get_threads_resource(dag.dag_id)}, + {"parameter_name": "--ometiff_directory", "value": str(tmpdir / "cwl_out")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 6, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -495,7 +467,6 @@ def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_pyramid", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, @@ -527,9 +498,10 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): key="cwl_workflows", task_ids="build_cwl_ome_tiff_pyramid" ) - # [--input_dir] - input_param_vals = [str(data_dir / "ometiff-pyramids")] - command = get_cwl_cmd_from_workflows(workflows, 7, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "ometiff-pyramids")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 7, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -543,7 +515,6 @@ def build_cwltool_cmd_ome_tiff_offsets(**kwargs): task_id="pipeline_exec_cwl_ome_tiff_offsets", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_offsets')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -575,9 +546,10 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): key="cwl_workflows", task_ids="build_cmd_ome_tiff_offsets" ) - # [--input_dir] - input_param_vals = [str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 8, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, + ] + command = get_cwl_cmd_from_workflows(workflows, 8, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -591,7 +563,6 @@ def build_cwltool_cmd_sprm_to_json(**kwargs): task_id="pipeline_exec_cwl_sprm_to_json", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_json')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """, @@ -621,9 +592,11 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd_sprm_to_json") - # [--input_dir] - input_param_vals = [str(data_dir / "sprm_outputs")] - command = get_cwl_cmd_from_workflows(workflows, 9, input_param_vals, tmpdir, kwargs["ti"]) + input_parameters = [ + {"parameter_name": "--input_dir", "value": str(data_dir / "sprm_outputs")}, + ] + + command = get_cwl_cmd_from_workflows(workflows, 9, input_parameters, tmpdir, kwargs["ti"]) return join_quote_command_str(command) @@ -637,7 +610,6 @@ def build_cwltool_cmd_sprm_to_anndata(**kwargs): task_id="pipeline_exec_cwl_sprm_to_anndata", bash_command=""" \ tmp_dir={{tmp_dir_path(run_id)}} ; \ - cd "${tmp_dir}"/cwl_out ; \ {{ti.xcom_pull(task_ids='build_cmd_sprm_to_anndata')}} >> ${tmp_dir}/session.log 2>&1 ; \ echo $? """,