From 0722b2d15aa659e08ca0c42a019e5ed1f8d85c80 Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 7 Jan 2025 15:17:49 -0500 Subject: [PATCH 01/12] Adding rna-probes-pipeline as submodule --- .gitmodules | 3 +++ src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline | 1 + 2 files changed, 4 insertions(+) create mode 160000 src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline diff --git a/.gitmodules b/.gitmodules index 1563d979..952b848d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -76,3 +76,6 @@ [submodule "src/ingest-pipeline/airflow/dags/cwl/visium-pipeline"] path = src/ingest-pipeline/airflow/dags/cwl/visium-pipeline url = https://github.com/hubmapconsortium/visium-pipeline +[submodule "src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline"] + path = src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline + url = https://github.com/hubmapconsortium/rna-probes-pipeline diff --git a/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline b/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline new file mode 160000 index 00000000..11cf382f --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline @@ -0,0 +1 @@ +Subproject commit 11cf382fd1580f9b71d8ff4155fd700676aa7749 From 6cb9c0f5bdba3904e672b4dde8bc8f573b48d7ab Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 7 Jan 2025 15:28:40 -0500 Subject: [PATCH 02/12] Initial commit of dag for scnra-probes processing --- .../airflow/dags/rna_probes.py | 320 ++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 src/ingest-pipeline/airflow/dags/rna_probes.py diff --git a/src/ingest-pipeline/airflow/dags/rna_probes.py b/src/ingest-pipeline/airflow/dags/rna_probes.py new file mode 100644 index 00000000..90d26f45 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/rna_probes.py @@ -0,0 +1,320 @@ +from datetime import datetime, timedelta +from pathlib import Path +import pandas as pd + +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator +from hubmap_operators.common_operators import ( + CleanupTmpDirOperator, + CreateTmpDirOperator, + JoinOperator, + LogInfoOperator, + MoveDataOperator, + SetDatasetProcessingOperator, +) + +import utils +from utils import ( + get_absolute_workflows, + get_cwltool_base_cmd, + get_dataset_uuid, + get_parent_dataset_uuids_list, + get_parent_data_dir, + build_dataset_name as inner_build_dataset_name, + get_previous_revision_uuid, + get_uuid_for_error, + join_quote_command_str, + make_send_status_msg_function, + get_tmp_dir_path, + HMDAG, + pythonop_get_dataset_state, + get_queue_resource, + get_threads_resource, + get_preserve_scratch_resource, +) + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("rna_with_probes"), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), +} + +def find_rna_metadata_file(data_dir: Path) -> Path: + for path in data_dir.glob("*.tsv"): + name_lower = path.name.lower() + if path.is_file() and "rna" in name_lower and "metadata" in name_lower: + return path + raise ValueError("Couldn't find RNA-seq metadata file") + +with HMDAG( + "rna_with_probes", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("rna_with_probes"), + }, +) as dag: + cwl_workflows = get_absolute_workflows( + Path("rna-probes-pipeline", "pipeline.cwl"), + Path("portal-containers", "h5ad-to-arrow.cwl"), + Path("portal-containers", "anndata-to-ui.cwl"), + ) + + def build_dataset_name(**kwargs): + return inner_build_dataset_name(dag.dag_id, "rna-probes-pipeline", **kwargs) + + prepare_cwl1 = DummyOperator(task_id="prepare_cwl1") + + prepare_cwl2 = DummyOperator(task_id="prepare_cwl2") + + prepare_cwl3 = DummyOperator(task_id="prepare_cwl3") + + + def build_cwltool_cmd1(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + data_dir = get_parent_data_dir(**kwargs) + print("data_dirs: ", data_dir) + + source_type = "" + unique_source_types = set() + for parent_uuid in get_parent_dataset_uuids_list(**kwargs): + dataset_state = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: parent_uuid, **kwargs) + source_type = dataset_state.get("source_type") + if source_type == "mixed": + print("Force failure. Should only be one unique source_type for a dataset.") + else: + unique_source_types.add(source_type) + + if len(unique_source_types) > 1: + print("Force failure. Should only be one unique source_type for a dataset.") + else: + source_type = unique_source_types.pop().lower() + + command = [ + *get_cwltool_base_cmd(tmpdir), + "--outdir", + tmpdir / "cwl_out", + "--parallel", + cwl_workflows[0], + "--assay", + "10x_v3", + "--threads", + get_threads_resource(dag.dag_id), + "--organism", + source_type + + ] + + command.append("--fastq_dir") + command.append(data_dir) + + command.append("--metadata_dir") + command.append(data_dir) + + return join_quote_command_str(command) + + def build_cwltool_cmd2(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[1], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd3(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[2], + "--input_dir", + # This pipeline invocation runs in a 'hubmap_ui' subdirectory, + # so use the parent directory as input + "..", + ] + + return join_quote_command_str(command) + + + t_build_cmd1 = PythonOperator( + task_id="build_cmd1", + python_callable=build_cwltool_cmd1, + provide_context=True, + ) + + t_build_cmd2 = PythonOperator( + task_id="build_cmd2", + python_callable=build_cwltool_cmd2, + provide_context=True, + ) + + t_build_cmd3 = PythonOperator( + task_id="build_cmd3", + python_callable=build_cwltool_cmd3, + provide_context=True, + ) + + t_pipeline_exec = BashOperator( + task_id="pipeline_exec", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + {{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui = BashOperator( + task_id="convert_for_ui", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_convert_for_ui_2 = BashOperator( + task_id="convert_for_ui_2", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \ + cd "$tmp_dir"/cwl_out ; \ + mkdir -p hubmap_ui ; \ + cd hubmap_ui ; \ + {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl1 = BranchPythonOperator( + task_id="maybe_keep_cwl1", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl2", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec", + }, + ) + + t_maybe_keep_cwl2 = BranchPythonOperator( + task_id="maybe_keep_cwl2", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl3", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui", + }, + ) + + t_maybe_keep_cwl3 = BranchPythonOperator( + task_id="maybe_keep_cwl3", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "move_data", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui_2", + }, + ) + + t_send_create_dataset = PythonOperator( + task_id="send_create_dataset", + python_callable=utils.pythonop_send_create_dataset, + provide_context=True, + op_kwargs={ + "parent_dataset_uuid_callable": get_parent_dataset_uuids_list, + "previous_revision_uuid_callable": get_previous_revision_uuid, + "http_conn_id": "ingest_api_connection", + "dataset_name_callable": build_dataset_name, + "pipeline_shorthand": "BWA + Scanpy", + }, + ) + + t_set_dataset_error = PythonOperator( + task_id="set_dataset_error", + python_callable=utils.pythonop_set_dataset_state, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "dataset_uuid_callable": get_dataset_uuid, + "ds_state": "Error", + "message": f"An error occurred in rna probes pipeline", + }, + ) + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=["pipeline_exec", "move_data", "convert_for_ui", "convert_for_ui_2"], + cwl_workflows=cwl_workflows, + ) + + t_send_status = PythonOperator( + task_id="send_status_msg", + python_callable=send_status_msg, + provide_context=True, + ) + + t_log_info = LogInfoOperator(task_id="log_info") + t_join = JoinOperator(task_id="join") + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") + t_set_dataset_processing = SetDatasetProcessingOperator(task_id="set_dataset_processing") + t_move_data = MoveDataOperator(task_id="move_data") + + ( + t_log_info + >> t_create_tmpdir + >> t_send_create_dataset + >> t_set_dataset_processing + >> prepare_cwl1 + >> t_build_cmd1 + >> t_pipeline_exec + >> t_maybe_keep_cwl1 + >> prepare_cwl2 + >> t_build_cmd2 + >> t_convert_for_ui + >> t_maybe_keep_cwl2 + >> prepare_cwl3 + >> t_build_cmd3 + >> t_convert_for_ui_2 + >> t_maybe_keep_cwl3 + >> t_move_data + >> t_send_status + >> t_join + ) + t_maybe_keep_cwl1 >> t_set_dataset_error + t_maybe_keep_cwl2 >> t_set_dataset_error + t_maybe_keep_cwl3 >> t_set_dataset_error + t_set_dataset_error >> t_join + t_join >> t_cleanup_tmpdir From 6d73908f13f129ace5057c8d4e64006038d8dd6e Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 7 Jan 2025 15:35:53 -0500 Subject: [PATCH 03/12] Adding azimuth annotation to dag --- .../airflow/dags/cwl/azimuth-annotate | 2 +- .../airflow/dags/cwl/celldive-pipeline | 2 +- .../airflow/dags/cwl/codex-pipeline | 2 +- .../dags/cwl/create-vis-symlink-archive | 2 +- .../airflow/dags/cwl/deepcelltypes | 2 +- .../airflow/dags/cwl/epic-obj-csv-to-mudata | 2 +- .../airflow/dags/cwl/kaggle-2-segmentation | 2 +- .../airflow/dags/cwl/mibi-pipeline | 2 +- .../dags/cwl/multiome-rna-atac-pipeline | 2 +- .../airflow/dags/cwl/ome-tiff-pyramid | 2 +- .../dags/cwl/pas-ftu-segmentation-pipeline | 2 +- .../airflow/dags/cwl/phenocycler-pipeline | 2 +- .../airflow/dags/cwl/portal-containers | 2 +- src/ingest-pipeline/airflow/dags/cwl/sprm | 2 +- .../airflow/dags/rna_probes.py | 77 +++++++++++++++++-- .../submodules/hubmap-inventory | 2 +- .../submodules/ingest-validation-tests | 2 +- .../submodules/ingest-validation-tools | 2 +- 18 files changed, 88 insertions(+), 23 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate index d1cc262d..6694e4ba 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate +++ b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate @@ -1 +1 @@ -Subproject commit d1cc262d0cb752b6bbe3432c3ea19f3a3c424d63 +Subproject commit 6694e4ba1c14688664b79b3bf36ad86c8069970c diff --git a/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline b/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline index 20ea8dad..1365d8a4 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline @@ -1 +1 @@ -Subproject commit 20ea8dadc74d2ce47bbed0c8f18fe89662504258 +Subproject commit 1365d8a452796f3c8c240bfb71556414b8ebb23e diff --git a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline index 51c7fbcc..2e574a85 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline @@ -1 +1 @@ -Subproject commit 51c7fbcc4fb20eec6aa56e8f52387cf5edee62fc +Subproject commit 2e574a8574db7672da327e20eea40935d269eda3 diff --git a/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive b/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive index 6e55233f..b3e880c3 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive +++ b/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive @@ -1 +1 @@ -Subproject commit 6e55233fb14a96fc5ed6f40b4e5a259ab0e8446c +Subproject commit b3e880c363cae3f2c15ddca5486e57199427ff33 diff --git a/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes b/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes index ca97c29e..c5a469dc 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes +++ b/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes @@ -1 +1 @@ -Subproject commit ca97c29e8e271df9b3b76b6d250f8d161a0ac361 +Subproject commit c5a469dc16f29a638dac95c30e385ac1ca4949da diff --git a/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata b/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata index ce2e23f1..8e9fedb7 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata +++ b/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata @@ -1 +1 @@ -Subproject commit ce2e23f1a8b382027c23de262ebcebb98525bc21 +Subproject commit 8e9fedb72aa237206433890cddff5c19f879920d diff --git a/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation b/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation index 110ca4e5..dac4e967 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation +++ b/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation @@ -1 +1 @@ -Subproject commit 110ca4e5f65fb85843540d116b72d23f9bd04018 +Subproject commit dac4e9673bf2ed6fadf55617a3fb42b7636299f4 diff --git a/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline b/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline index 7501938b..ede8bbfd 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline @@ -1 +1 @@ -Subproject commit 7501938b53db117551ff02ca5597256d6795852e +Subproject commit ede8bbfd4d49385c47add35c29014f8ef05610ab diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 14985c97..2ce8d87d 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 14985c972d540929cacbdf33925af2d4c6c9edb1 +Subproject commit 2ce8d87df5b5f73433f15ce0d31fd02f66ef05ce diff --git a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid index a063a340..9e2885ca 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid +++ b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid @@ -1 +1 @@ -Subproject commit a063a3404a5f4345292508cb82313bd41ea6c615 +Subproject commit 9e2885cab028ba54e910932e0d804398cd56cc0b diff --git a/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline b/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline index ec6a7437..b15aadce 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline @@ -1 +1 @@ -Subproject commit ec6a743798e38d61b81cebdf443572213859f3fa +Subproject commit b15aadcee768e7daa1141d9de0b42b567addcee9 diff --git a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline index 117ef12a..9923f278 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline @@ -1 +1 @@ -Subproject commit 117ef12a6449cf9128a00c365f139a88d884dd4c +Subproject commit 9923f2786186ff11a8b9fa193149c9c2d669809a diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 2046af76..8270e467 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 2046af766bf5a47b08a905a1fb0fa1f9d5178338 +Subproject commit 8270e467fe979d373df0cd448e7d38bd3bd85728 diff --git a/src/ingest-pipeline/airflow/dags/cwl/sprm b/src/ingest-pipeline/airflow/dags/cwl/sprm index d2d51272..00e61211 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/sprm +++ b/src/ingest-pipeline/airflow/dags/cwl/sprm @@ -1 +1 @@ -Subproject commit d2d5127251fbec9f9621df0c6c8c989e3f1b76ba +Subproject commit 00e612112d6807ba7193a1ab615a67d90f533a5c diff --git a/src/ingest-pipeline/airflow/dags/rna_probes.py b/src/ingest-pipeline/airflow/dags/rna_probes.py index 90d26f45..31248822 100644 --- a/src/ingest-pipeline/airflow/dags/rna_probes.py +++ b/src/ingest-pipeline/airflow/dags/rna_probes.py @@ -68,6 +68,7 @@ def find_rna_metadata_file(data_dir: Path) -> Path: ) as dag: cwl_workflows = get_absolute_workflows( Path("rna-probes-pipeline", "pipeline.cwl"), + Path("azimuth-annotate", "pipeline.cwl"), Path("portal-containers", "h5ad-to-arrow.cwl"), Path("portal-containers", "anndata-to-ui.cwl"), ) @@ -81,6 +82,8 @@ def build_dataset_name(**kwargs): prepare_cwl3 = DummyOperator(task_id="prepare_cwl3") + prepare_cwl4 = DummyOperator(task_id="prepare_cwl4") + def build_cwltool_cmd1(**kwargs): run_id = kwargs["run_id"] @@ -129,11 +132,41 @@ def build_cwltool_cmd1(**kwargs): return join_quote_command_str(command) + def build_cwltool_cmd2(**kwargs): run_id = kwargs["run_id"] tmpdir = get_tmp_dir_path(run_id) print("tmpdir: ", tmpdir) + # get organ type + ds_rslt = pythonop_get_dataset_state( + dataset_uuid_callable=get_dataset_uuid, + **kwargs + ) + + organ_list = list(set(ds_rslt['organs'])) + organ_code = organ_list[0] if len(organ_list) == 1 else 'multi' + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows[1], + "--reference", + organ_code, + "--matrix", + "expr.h5ad", + "--secondary-analysis-matrix", + "secondary_analysis.h5ad", + "--assay", + "10x_v3", + ] + + return join_quote_command_str(command) + + def build_cwltool_cmd3(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + command = [ *get_cwltool_base_cmd(tmpdir), cwl_workflows[1], @@ -145,7 +178,7 @@ def build_cwltool_cmd2(**kwargs): return join_quote_command_str(command) - def build_cwltool_cmd3(**kwargs): + def build_cwltool_cmd4(**kwargs): run_id = kwargs["run_id"] tmpdir = get_tmp_dir_path(run_id) print("tmpdir: ", tmpdir) @@ -180,6 +213,12 @@ def build_cwltool_cmd3(**kwargs): provide_context=True, ) + t_build_cmd4 = PythonOperator( + task_id="build_cmd3", + python_callable=build_cwltool_cmd3, + provide_context=True, + ) + t_pipeline_exec = BashOperator( task_id="pipeline_exec", bash_command=""" \ @@ -189,6 +228,16 @@ def build_cwltool_cmd3(**kwargs): """, ) + t_pipeline_exec_azimuth_annotate = BashOperator( + task_id="pipeline_exec_azimuth_annotate", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd "$tmp_dir"/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + t_convert_for_ui = BashOperator( task_id="convert_for_ui", bash_command=""" \ @@ -197,7 +246,7 @@ def build_cwltool_cmd3(**kwargs): cd "$tmp_dir"/cwl_out ; \ mkdir -p hubmap_ui ; \ cd hubmap_ui ; \ - {{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \ + {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, ) @@ -210,7 +259,7 @@ def build_cwltool_cmd3(**kwargs): cd "$tmp_dir"/cwl_out ; \ mkdir -p hubmap_ui ; \ cd hubmap_ui ; \ - {{ti.xcom_pull(task_ids='build_cmd3')}} >> $tmp_dir/session.log 2>&1 ; \ + {{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \ echo $? """, ) @@ -233,11 +282,22 @@ def build_cwltool_cmd3(**kwargs): op_kwargs={ "next_op": "prepare_cwl3", "bail_op": "set_dataset_error", - "test_op": "convert_for_ui", + "test_op": "pipeline_exec_azimuth_annotate", }, ) t_maybe_keep_cwl3 = BranchPythonOperator( + task_id="maybe_keep_cwl3", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl4", + "bail_op": "set_dataset_error", + "test_op": "convert_for_ui", + }, + ) + + t_maybe_keep_cwl4 = BranchPythonOperator( task_id="maybe_keep_cwl3", python_callable=utils.pythonop_maybe_keep, provide_context=True, @@ -303,12 +363,16 @@ def build_cwltool_cmd3(**kwargs): >> t_maybe_keep_cwl1 >> prepare_cwl2 >> t_build_cmd2 - >> t_convert_for_ui + >> t_pipeline_exec_azimuth_annotate >> t_maybe_keep_cwl2 >> prepare_cwl3 >> t_build_cmd3 - >> t_convert_for_ui_2 + >> t_convert_for_ui >> t_maybe_keep_cwl3 + >> prepare_cwl4 + >> t_build_cmd4 + >> t_convert_for_ui_2 + >> t_maybe_keep_cwl4 >> t_move_data >> t_send_status >> t_join @@ -316,5 +380,6 @@ def build_cwltool_cmd3(**kwargs): t_maybe_keep_cwl1 >> t_set_dataset_error t_maybe_keep_cwl2 >> t_set_dataset_error t_maybe_keep_cwl3 >> t_set_dataset_error + t_maybe_keep_cwl4 >> t_set_dataset_error t_set_dataset_error >> t_join t_join >> t_cleanup_tmpdir diff --git a/src/ingest-pipeline/submodules/hubmap-inventory b/src/ingest-pipeline/submodules/hubmap-inventory index 6e7d907f..07f64bc6 160000 --- a/src/ingest-pipeline/submodules/hubmap-inventory +++ b/src/ingest-pipeline/submodules/hubmap-inventory @@ -1 +1 @@ -Subproject commit 6e7d907fc356c71a405c871925d08fa81154fb95 +Subproject commit 07f64bc65204629995ffa4cd710c0c543269867d diff --git a/src/ingest-pipeline/submodules/ingest-validation-tests b/src/ingest-pipeline/submodules/ingest-validation-tests index 7f5d6f71..bebbb49e 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tests +++ b/src/ingest-pipeline/submodules/ingest-validation-tests @@ -1 +1 @@ -Subproject commit 7f5d6f7118f90e518b1da72e542a03c38709cba0 +Subproject commit bebbb49e0012f9a2bc8cffa81ae01a57028089e7 diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index f2c40fd6..de58a303 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit f2c40fd6c6d6abf9a290da89eacc971e630243fc +Subproject commit de58a30388c2dd55a67f3d4fb8fc2742587d1549 From d91048bd8f31c07b08b029b35be404e92e665f35 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:16:28 -0500 Subject: [PATCH 04/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate index 6694e4ba..d1cc262d 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate +++ b/src/ingest-pipeline/airflow/dags/cwl/azimuth-annotate @@ -1 +1 @@ -Subproject commit 6694e4ba1c14688664b79b3bf36ad86c8069970c +Subproject commit d1cc262d0cb752b6bbe3432c3ea19f3a3c424d63 From d5c19d46c49a74daa8353e1e785f8e2ac86d474d Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:18:01 -0500 Subject: [PATCH 05/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline b/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline index 1365d8a4..20ea8dad 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/celldive-pipeline @@ -1 +1 @@ -Subproject commit 1365d8a452796f3c8c240bfb71556414b8ebb23e +Subproject commit 20ea8dadc74d2ce47bbed0c8f18fe89662504258 From 0acf08e64dff21a8fb69f07abc064c7efcfecdfd Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:19:52 -0500 Subject: [PATCH 06/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/codex-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline index 2e574a85..51c7fbcc 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline @@ -1 +1 @@ -Subproject commit 2e574a8574db7672da327e20eea40935d269eda3 +Subproject commit 51c7fbcc4fb20eec6aa56e8f52387cf5edee62fc From dca5fe13efa47a1a45a960650f33700487ba31aa Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:21:26 -0500 Subject: [PATCH 07/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive b/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive index b3e880c3..6e55233f 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive +++ b/src/ingest-pipeline/airflow/dags/cwl/create-vis-symlink-archive @@ -1 +1 @@ -Subproject commit b3e880c363cae3f2c15ddca5486e57199427ff33 +Subproject commit 6e55233fb14a96fc5ed6f40b4e5a259ab0e8446c From 02ac5af26c34a79253ebba931f8f7a1673ce8ee0 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:23:15 -0500 Subject: [PATCH 08/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/deepcelltypes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes b/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes index c5a469dc..ca97c29e 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes +++ b/src/ingest-pipeline/airflow/dags/cwl/deepcelltypes @@ -1 +1 @@ -Subproject commit c5a469dc16f29a638dac95c30e385ac1ca4949da +Subproject commit ca97c29e8e271df9b3b76b6d250f8d161a0ac361 From c07bc4e9b5522d6b88186f2a250fc7107b2db4e0 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:29:24 -0500 Subject: [PATCH 09/12] Bugfixing downgraded submodules --- src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata | 2 +- src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation | 2 +- src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline | 2 +- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid | 2 +- .../airflow/dags/cwl/pas-ftu-segmentation-pipeline | 2 +- src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline | 2 +- src/ingest-pipeline/airflow/dags/cwl/portal-containers | 2 +- src/ingest-pipeline/airflow/dags/cwl/sprm | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata b/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata index 8e9fedb7..ce2e23f1 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata +++ b/src/ingest-pipeline/airflow/dags/cwl/epic-obj-csv-to-mudata @@ -1 +1 @@ -Subproject commit 8e9fedb72aa237206433890cddff5c19f879920d +Subproject commit ce2e23f1a8b382027c23de262ebcebb98525bc21 diff --git a/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation b/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation index dac4e967..110ca4e5 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation +++ b/src/ingest-pipeline/airflow/dags/cwl/kaggle-2-segmentation @@ -1 +1 @@ -Subproject commit dac4e9673bf2ed6fadf55617a3fb42b7636299f4 +Subproject commit 110ca4e5f65fb85843540d116b72d23f9bd04018 diff --git a/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline b/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline index ede8bbfd..7501938b 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/mibi-pipeline @@ -1 +1 @@ -Subproject commit ede8bbfd4d49385c47add35c29014f8ef05610ab +Subproject commit 7501938b53db117551ff02ca5597256d6795852e diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index 2ce8d87d..14985c97 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit 2ce8d87df5b5f73433f15ce0d31fd02f66ef05ce +Subproject commit 14985c972d540929cacbdf33925af2d4c6c9edb1 diff --git a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid index 9e2885ca..a063a340 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid +++ b/src/ingest-pipeline/airflow/dags/cwl/ome-tiff-pyramid @@ -1 +1 @@ -Subproject commit 9e2885cab028ba54e910932e0d804398cd56cc0b +Subproject commit a063a3404a5f4345292508cb82313bd41ea6c615 diff --git a/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline b/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline index b15aadce..ec6a7437 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/pas-ftu-segmentation-pipeline @@ -1 +1 @@ -Subproject commit b15aadcee768e7daa1141d9de0b42b567addcee9 +Subproject commit ec6a743798e38d61b81cebdf443572213859f3fa diff --git a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline index 9923f278..117ef12a 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline @@ -1 +1 @@ -Subproject commit 9923f2786186ff11a8b9fa193149c9c2d669809a +Subproject commit 117ef12a6449cf9128a00c365f139a88d884dd4c diff --git a/src/ingest-pipeline/airflow/dags/cwl/portal-containers b/src/ingest-pipeline/airflow/dags/cwl/portal-containers index 8270e467..2046af76 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/portal-containers +++ b/src/ingest-pipeline/airflow/dags/cwl/portal-containers @@ -1 +1 @@ -Subproject commit 8270e467fe979d373df0cd448e7d38bd3bd85728 +Subproject commit 2046af766bf5a47b08a905a1fb0fa1f9d5178338 diff --git a/src/ingest-pipeline/airflow/dags/cwl/sprm b/src/ingest-pipeline/airflow/dags/cwl/sprm index 00e61211..d2d51272 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/sprm +++ b/src/ingest-pipeline/airflow/dags/cwl/sprm @@ -1 +1 @@ -Subproject commit 00e612112d6807ba7193a1ab615a67d90f533a5c +Subproject commit d2d5127251fbec9f9621df0c6c8c989e3f1b76ba From 7bca37a175231943a7abc4c8a0fbaf4b187515f0 Mon Sep 17 00:00:00 2001 From: David Betancur Date: Thu, 9 Jan 2025 10:33:47 -0500 Subject: [PATCH 10/12] Bugfixing downgraded submodules --- src/ingest-pipeline/submodules/hubmap-inventory | 2 +- src/ingest-pipeline/submodules/ingest-validation-tests | 2 +- src/ingest-pipeline/submodules/ingest-validation-tools | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ingest-pipeline/submodules/hubmap-inventory b/src/ingest-pipeline/submodules/hubmap-inventory index 07f64bc6..6e7d907f 160000 --- a/src/ingest-pipeline/submodules/hubmap-inventory +++ b/src/ingest-pipeline/submodules/hubmap-inventory @@ -1 +1 @@ -Subproject commit 07f64bc65204629995ffa4cd710c0c543269867d +Subproject commit 6e7d907fc356c71a405c871925d08fa81154fb95 diff --git a/src/ingest-pipeline/submodules/ingest-validation-tests b/src/ingest-pipeline/submodules/ingest-validation-tests index bebbb49e..7f5d6f71 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tests +++ b/src/ingest-pipeline/submodules/ingest-validation-tests @@ -1 +1 @@ -Subproject commit bebbb49e0012f9a2bc8cffa81ae01a57028089e7 +Subproject commit 7f5d6f7118f90e518b1da72e542a03c38709cba0 diff --git a/src/ingest-pipeline/submodules/ingest-validation-tools b/src/ingest-pipeline/submodules/ingest-validation-tools index de58a303..f2c40fd6 160000 --- a/src/ingest-pipeline/submodules/ingest-validation-tools +++ b/src/ingest-pipeline/submodules/ingest-validation-tools @@ -1 +1 @@ -Subproject commit de58a30388c2dd55a67f3d4fb8fc2742587d1549 +Subproject commit f2c40fd6c6d6abf9a290da89eacc971e630243fc From f55dbcec499b06035a9757c0c79c3925a207f27b Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 14 Jan 2025 13:58:19 -0500 Subject: [PATCH 11/12] Bumping rna-probes-pipeline version to v1.0.1 --- src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline b/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline index 11cf382f..5c696e3d 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/rna-probes-pipeline @@ -1 +1 @@ -Subproject commit 11cf382fd1580f9b71d8ff4155fd700676aa7749 +Subproject commit 5c696e3dca4869c4c45f617312b1883607b78360 From e6a8804870d64b82288e995467c741fde0d9ad3c Mon Sep 17 00:00:00 2001 From: David Betancur Date: Tue, 21 Jan 2025 12:47:46 -0500 Subject: [PATCH 12/12] Adding mappings for new pipeline. Bugfixes for naming tasks. --- .../airflow/dags/resource_map.yml | 8 ++++++++ .../airflow/dags/rna_probes.py | 20 ++++++++----------- .../airflow/dags/workflow_map.yml | 3 +++ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index 8e40ed21..c2bd705a 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -187,6 +187,14 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 6 + - 'dag_re': 'rna_with_probes' + 'preserve_scratch': true + 'lanes': 2 + # 'lanes': 12 + 'tasks': + - 'task_re': '.*' + 'queue': 'general' + 'threads': 6 - 'dag_re': '.*' 'preserve_scratch': true 'lanes': 2 diff --git a/src/ingest-pipeline/airflow/dags/rna_probes.py b/src/ingest-pipeline/airflow/dags/rna_probes.py index 31248822..d83dd5cf 100644 --- a/src/ingest-pipeline/airflow/dags/rna_probes.py +++ b/src/ingest-pipeline/airflow/dags/rna_probes.py @@ -1,6 +1,5 @@ from datetime import datetime, timedelta from pathlib import Path -import pandas as pd from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator @@ -120,16 +119,13 @@ def build_cwltool_cmd1(**kwargs): "--threads", get_threads_resource(dag.dag_id), "--organism", - source_type - + source_type, + "--fastq_dir", + data_dir, + "--metadata_dir", + data_dir ] - command.append("--fastq_dir") - command.append(data_dir) - - command.append("--metadata_dir") - command.append(data_dir) - return join_quote_command_str(command) @@ -214,8 +210,8 @@ def build_cwltool_cmd4(**kwargs): ) t_build_cmd4 = PythonOperator( - task_id="build_cmd3", - python_callable=build_cwltool_cmd3, + task_id="build_cmd4", + python_callable=build_cwltool_cmd4, provide_context=True, ) @@ -298,7 +294,7 @@ def build_cwltool_cmd4(**kwargs): ) t_maybe_keep_cwl4 = BranchPythonOperator( - task_id="maybe_keep_cwl3", + task_id="maybe_keep_cwl4", python_callable=utils.pythonop_maybe_keep, provide_context=True, op_kwargs={ diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index a1af05fe..179084c4 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -134,3 +134,6 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'segmentation-mask' 'workflow': 'tsv_to_mudata' + - 'collection_type': '.*' + 'assay_type': 'scRNAseq-with-probes' + 'workflow': 'rna_with_probes'