From 6a05424ba80b7aa7ccb9078a93da3eac95174f27 Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Fri, 2 Aug 2024 14:32:48 -0400 Subject: [PATCH 1/9] Add phenocycler-pipeline at v1.0 --- .gitmodules | 3 +++ src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline | 1 + 2 files changed, 4 insertions(+) create mode 160000 src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline diff --git a/.gitmodules b/.gitmodules index b5542e277..bdc318caa 100644 --- a/.gitmodules +++ b/.gitmodules @@ -64,3 +64,6 @@ [submodule "src/ingest-pipeline/airflow/dags/cwl/deepcelltypes"] path = src/ingest-pipeline/airflow/dags/cwl/deepcelltypes url = https://github.com/hubmapconsortium/deepcelltypes +[submodule "src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline"] + path = src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline + url = https://github.com/hubmapconsortium/phenocycler-pipeline diff --git a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline new file mode 160000 index 000000000..14fac4e67 --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline @@ -0,0 +1 @@ +Subproject commit 14fac4e6763abc780eca6de0a3fb1c7951490b92 From c47dc1a5b081923d69fcf7201a465e0e458ed936 Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Fri, 2 Aug 2024 14:38:59 -0400 Subject: [PATCH 2/9] Add DAG --- .../airflow/dags/phenocycler_deepcell.py | 528 ++++++++++++++++++ .../airflow/dags/resource_map.yml | 10 + .../airflow/dags/workflow_map.yml | 4 +- 3 files changed, 541 insertions(+), 1 deletion(-) create mode 100644 src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py diff --git a/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py b/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py new file mode 100644 index 000000000..98f65d21f --- /dev/null +++ b/src/ingest-pipeline/airflow/dags/phenocycler_deepcell.py @@ -0,0 +1,528 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator, PythonOperator + +import utils +from utils import ( + get_cwltool_base_cmd, + get_dataset_uuid, + get_named_absolute_workflows, + get_parent_dataset_uuids_list, + get_parent_data_dir, + build_dataset_name as inner_build_dataset_name, + get_previous_revision_uuid, + get_uuid_for_error, + join_quote_command_str, + make_send_status_msg_function, + get_tmp_dir_path, + HMDAG, + get_queue_resource, + get_preserve_scratch_resource, +) +from hubmap_operators.common_operators import ( + CleanupTmpDirOperator, + CreateTmpDirOperator, + JoinOperator, + LogInfoOperator, + MoveDataOperator, + SetDatasetProcessingOperator, +) + + +default_args = { + "owner": "hubmap", + "depends_on_past": False, + "start_date": datetime(2019, 1, 1), + "email": ["joel.welling@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), + "xcom_push": True, + "queue": get_queue_resource("phenocycler_deepcell"), + "on_failure_callback": utils.create_dataset_state_error_callback(get_uuid_for_error), +} + +with HMDAG( + "phenocycler_deepcell", + schedule_interval=None, + is_paused_upon_creation=False, + default_args=default_args, + user_defined_macros={ + "tmp_dir_path": get_tmp_dir_path, + "preserve_scratch": get_preserve_scratch_resource("phenocycler_deepcell"), + }, +) as dag: + + pipeline_name = "phenocycler-pipeline" + cwl_workflows = get_named_absolute_workflows( + segmentation=Path(pipeline_name, "pipeline.cwl"), + sprm=Path("sprm", "pipeline.cwl"), + create_vis_symlink_archive=Path("create-vis-symlink-archive", "pipeline.cwl"), + ome_tiff_pyramid=Path("ome-tiff-pyramid", "pipeline.cwl"), + ome_tiff_offsets=Path("portal-containers", "ome-tiff-offsets.cwl"), + sprm_to_json=Path("portal-containers", "sprm-to-json.cwl"), + sprm_to_anndata=Path("portal-containers", "sprm-to-anndata.cwl"), + ) + + def build_dataset_name(**kwargs): + return inner_build_dataset_name(dag.dag_id, pipeline_name, **kwargs) + + prepare_cwl_segmentation = DummyOperator(task_id="prepare_cwl_segmentation") + + def build_cwltool_cwl_segmentation(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + data_dir = get_parent_data_dir(**kwargs) + print("data_dir: ", data_dir) + + workflow = cwl_workflows["segmentation"] + meta_yml_path = workflow.parent / "meta.yaml" + + command = [ + *get_cwltool_base_cmd(tmpdir), + # '--singularity', + workflow, + "--gpus=all", + "--segmentation_method", + "deepcell", + "--data_dir", + data_dir, + ] + + return join_quote_command_str(command) + + t_build_cwl_segmentation = PythonOperator( + task_id="build_cwl_segmentation", + python_callable=build_cwltool_cwl_segmentation, + provide_context=True, + ) + + t_pipeline_exec_cwl_segmentation = BashOperator( + task_id="pipeline_exec_cwl_segmentation", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cwl_segmentation')}} > $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_segmentation = BranchPythonOperator( + task_id="maybe_keep_cwl_segmentation", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_sprm", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_segmentation", + }, + ) + + prepare_cwl_sprm = DummyOperator(task_id="prepare_cwl_sprm") + + def build_cwltool_cmd_sprm(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows["sprm"], + "--enable_manhole", + "--image_dir", + data_dir / "pipeline_output/expr", + "--mask_dir", + data_dir / "pipeline_output/mask", + ] + + return join_quote_command_str(command) + + t_build_cmd_sprm = PythonOperator( + task_id="build_cmd_sprm", + python_callable=build_cwltool_cmd_sprm, + provide_context=True, + ) + + t_pipeline_exec_cwl_sprm = BashOperator( + task_id="pipeline_exec_cwl_sprm", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd_sprm')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_sprm = BranchPythonOperator( + task_id="maybe_keep_cwl_sprm", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_create_vis_symlink_archive", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_sprm", + }, + ) + + prepare_cwl_create_vis_symlink_archive = DummyOperator( + task_id="prepare_cwl_create_vis_symlink_archive", + ) + + def build_cwltool_cmd_create_vis_symlink_archive(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows["create_vis_symlink_archive"], + "--ometiff_dir", + data_dir / "pipeline_output", + "--sprm_output", + data_dir / "sprm_outputs", + ] + + return join_quote_command_str(command) + + t_build_cmd_create_vis_symlink_archive = PythonOperator( + task_id="build_cmd_create_vis_symlink_archive", + python_callable=build_cwltool_cmd_create_vis_symlink_archive, + provide_context=True, + ) + + t_pipeline_exec_cwl_create_vis_symlink_archive = BashOperator( + task_id="pipeline_exec_cwl_create_vis_symlink_archive", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd_create_vis_symlink_archive')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_create_vis_symlink_archive = BranchPythonOperator( + task_id="maybe_keep_cwl_create_vis_symlink_archive", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_ome_tiff_pyramid", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_create_vis_symlink_archive", + }, + ) + + prepare_cwl_ome_tiff_pyramid = DummyOperator(task_id="prepare_cwl_ome_tiff_pyramid") + + def build_cwltool_cwl_ome_tiff_pyramid(**kwargs): + run_id = kwargs["run_id"] + + # tmpdir is temp directory in /hubmap-tmp + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + + # data directory is the stitched images, which are found in tmpdir + data_dir = get_parent_data_dir(**kwargs) + print("data_dir: ", data_dir) + + # this is the call to the CWL + command = [ + *get_cwltool_base_cmd(tmpdir), + "--relax-path-checks", + cwl_workflows["ome_tiff_pyramid"], + "--ometiff_directory", + ".", + ] + return join_quote_command_str(command) + + t_build_cmd_ome_tiff_pyramid = PythonOperator( + task_id="build_cwl_ome_tiff_pyramid", + python_callable=build_cwltool_cwl_ome_tiff_pyramid, + provide_context=True, + ) + + t_pipeline_exec_cwl_ome_tiff_pyramid = BashOperator( + task_id="pipeline_exec_cwl_ome_tiff_pyramid", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + mkdir -p ${tmp_dir}/cwl_out ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cwl_ome_tiff_pyramid')}} >> $tmp_dir/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_ome_tiff_pyramid = BranchPythonOperator( + task_id="maybe_keep_cwl_ome_tiff_pyramid", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_ome_tiff_offsets", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_ome_tiff_pyramid", + }, + ) + + prepare_cwl_ome_tiff_offsets = DummyOperator(task_id="prepare_cwl_ome_tiff_offsets") + + def build_cwltool_cmd_ome_tiff_offsets(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows["ome_tiff_offsets"], + "--input_dir", + data_dir / "ometiff-pyramids", + ] + + return join_quote_command_str(command) + + t_build_cmd_ome_tiff_offsets = PythonOperator( + task_id="build_cmd_ome_tiff_offsets", + python_callable=build_cwltool_cmd_ome_tiff_offsets, + provide_context=True, + ) + + t_pipeline_exec_cwl_ome_tiff_offsets = BashOperator( + task_id="pipeline_exec_cwl_ome_tiff_offsets", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd_ome_tiff_offsets')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_ome_tiff_offsets = BranchPythonOperator( + task_id="maybe_keep_cwl_ome_tiff_offsets", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_sprm_to_json", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_ome_tiff_offsets", + }, + ) + + prepare_cwl_sprm_to_json = DummyOperator(task_id="prepare_cwl_sprm_to_json") + + def build_cwltool_cmd_sprm_to_json(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" # This stage reads input from stage 1 + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows["sprm_to_json"], + "--input_dir", + data_dir / "sprm_outputs", + ] + + return join_quote_command_str(command) + + t_build_cmd_sprm_to_json = PythonOperator( + task_id="build_cmd_sprm_to_json", + python_callable=build_cwltool_cmd_sprm_to_json, + provide_context=True, + ) + + t_pipeline_exec_cwl_sprm_to_json = BashOperator( + task_id="pipeline_exec_cwl_sprm_to_json", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd_sprm_to_json')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_sprm_to_json = BranchPythonOperator( + task_id="maybe_keep_cwl_sprm_to_json", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "prepare_cwl_sprm_to_anndata", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_sprm_to_json", + }, + ) + + prepare_cwl_sprm_to_anndata = DummyOperator(task_id="prepare_cwl_sprm_to_anndata") + + def build_cwltool_cmd_sprm_to_anndata(**kwargs): + run_id = kwargs["run_id"] + tmpdir = get_tmp_dir_path(run_id) + print("tmpdir: ", tmpdir) + parent_data_dir = get_parent_data_dir(**kwargs) + print("parent_data_dir: ", parent_data_dir) + data_dir = tmpdir / "cwl_out" # This stage reads input from stage 1 + print("data_dir: ", data_dir) + + command = [ + *get_cwltool_base_cmd(tmpdir), + cwl_workflows["sprm_to_anndata"], + "--input_dir", + data_dir / "sprm_outputs", + ] + + return join_quote_command_str(command) + + t_build_cmd_sprm_to_anndata = PythonOperator( + task_id="build_cmd_sprm_to_anndata", + python_callable=build_cwltool_cmd_sprm_to_anndata, + provide_context=True, + ) + + t_pipeline_exec_cwl_sprm_to_anndata = BashOperator( + task_id="pipeline_exec_cwl_sprm_to_anndata", + bash_command=""" \ + tmp_dir={{tmp_dir_path(run_id)}} ; \ + cd ${tmp_dir}/cwl_out ; \ + {{ti.xcom_pull(task_ids='build_cmd_sprm_to_anndata')}} >> ${tmp_dir}/session.log 2>&1 ; \ + echo $? + """, + ) + + t_maybe_keep_cwl_sprm_to_anndata = BranchPythonOperator( + task_id="maybe_keep_cwl_sprm_to_anndata", + python_callable=utils.pythonop_maybe_keep, + provide_context=True, + op_kwargs={ + "next_op": "move_data", + "bail_op": "set_dataset_error", + "test_op": "pipeline_exec_cwl_sprm_to_anndata", + }, + ) + + t_send_create_dataset = PythonOperator( + task_id="send_create_dataset", + python_callable=utils.pythonop_send_create_dataset, + provide_context=True, + op_kwargs={ + "parent_dataset_uuid_callable": get_parent_dataset_uuids_list, + "previous_revision_uuid_callable": get_previous_revision_uuid, + "http_conn_id": "ingest_api_connection", + "dataset_name_callable": build_dataset_name, + "pipeline_shorthand": "DeepCell + SPRM", + }, + ) + + t_set_dataset_error = PythonOperator( + task_id="set_dataset_error", + python_callable=utils.pythonop_set_dataset_state, + provide_context=True, + trigger_rule="all_done", + op_kwargs={ + "dataset_uuid_callable": get_dataset_uuid, + "ds_state": "Error", + "message": "An error occurred in {}".format(pipeline_name), + }, + ) + + t_expand_symlinks = BashOperator( + task_id="expand_symlinks", + bash_command=""" + tmp_dir="{{tmp_dir_path(run_id)}}" ; \ + ds_dir="{{ti.xcom_pull(task_ids='send_create_dataset')}}" ; \ + groupname="{{conf.as_dict()['connections']['OUTPUT_GROUP_NAME']}}" ; \ + cd "$ds_dir" ; \ + tar -xf symlinks.tar ; \ + echo $? + """, + ) + + send_status_msg = make_send_status_msg_function( + dag_file=__file__, + retcode_ops=[ + "pipeline_exec_cwl_segmentation", + "pipeline_exec_cwl_sprm", + "pipeline_exec_cwl_create_vis_symlink_archive", + "pipeline_exec_cwl_ome_tiff_offsets", + "pipeline_exec_cwl_sprm_to_json", + "pipeline_exec_cwl_sprm_to_anndata", + "move_data", + ], + cwl_workflows=list(cwl_workflows.values()), + ) + + t_send_status = PythonOperator( + task_id="send_status_msg", python_callable=send_status_msg, provide_context=True + ) + + t_log_info = LogInfoOperator(task_id="log_info") + t_join = JoinOperator(task_id="join") + t_create_tmpdir = CreateTmpDirOperator(task_id="create_tmpdir") + t_cleanup_tmpdir = CleanupTmpDirOperator(task_id="cleanup_tmpdir") + t_set_dataset_processing = SetDatasetProcessingOperator(task_id="set_dataset_processing") + t_move_data = MoveDataOperator(task_id="move_data") + + ( + t_log_info + >> t_create_tmpdir + >> t_send_create_dataset + >> t_set_dataset_processing + >> prepare_cwl_segmentation + >> t_build_cwl_segmentation + >> t_pipeline_exec_cwl_segmentation + >> t_maybe_keep_cwl_segmentation + >> prepare_cwl_sprm + >> t_build_cmd_sprm + >> t_pipeline_exec_cwl_sprm + >> t_maybe_keep_cwl_sprm + >> prepare_cwl_create_vis_symlink_archive + >> t_build_cmd_create_vis_symlink_archive + >> t_pipeline_exec_cwl_create_vis_symlink_archive + >> t_maybe_keep_cwl_create_vis_symlink_archive + >> prepare_cwl_ome_tiff_pyramid + >> t_build_cmd_ome_tiff_pyramid + >> t_pipeline_exec_cwl_ome_tiff_pyramid + >> t_maybe_keep_cwl_ome_tiff_pyramid + >> prepare_cwl_ome_tiff_offsets + >> t_build_cmd_ome_tiff_offsets + >> t_pipeline_exec_cwl_ome_tiff_offsets + >> t_maybe_keep_cwl_ome_tiff_offsets + >> prepare_cwl_sprm_to_json + >> t_build_cmd_sprm_to_json + >> t_pipeline_exec_cwl_sprm_to_json + >> t_maybe_keep_cwl_sprm_to_json + >> prepare_cwl_sprm_to_anndata + >> t_build_cmd_sprm_to_anndata + >> t_pipeline_exec_cwl_sprm_to_anndata + >> t_maybe_keep_cwl_sprm_to_anndata + >> t_move_data + >> t_expand_symlinks + >> t_send_status + >> t_join + ) + t_maybe_keep_cwl_segmentation >> t_set_dataset_error + t_maybe_keep_cwl_sprm >> t_set_dataset_error + t_maybe_keep_cwl_create_vis_symlink_archive >> t_set_dataset_error + t_maybe_keep_cwl_ome_tiff_pyramid >> t_set_dataset_error + t_maybe_keep_cwl_ome_tiff_offsets >> t_set_dataset_error + t_maybe_keep_cwl_sprm_to_json >> t_set_dataset_error + t_maybe_keep_cwl_sprm_to_anndata >> t_set_dataset_error + t_set_dataset_error >> t_join + t_join >> t_cleanup_tmpdir diff --git a/src/ingest-pipeline/airflow/dags/resource_map.yml b/src/ingest-pipeline/airflow/dags/resource_map.yml index a6106ba78..210143650 100644 --- a/src/ingest-pipeline/airflow/dags/resource_map.yml +++ b/src/ingest-pipeline/airflow/dags/resource_map.yml @@ -36,6 +36,16 @@ resource_map: - 'task_re': '.*' 'queue': 'general' 'threads': 6 + - 'dag_re': 'phenocycler_deepcell' + 'preserve_scratch': true + 'lanes': 2 + 'tasks': + - 'task_re': '.*segmentation' + 'queue': 'gpu000_q1' + 'threads': 6 + - 'task_re': '.*' + 'queue': 'general' + 'threads': 6 - 'dag_re': 'salmon_rnaseq_.*' 'preserve_scratch': true 'lanes': 2 diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index f2c096609..e14854078 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -26,6 +26,9 @@ workflow_map: - 'collection_type': '.*' 'assay_type': 'MIBI' 'workflow': 'mibi_deepcell' + - 'collection_type': '.*' + 'assay_type': 'PhenoCycler' + 'workflow': 'phenocycler_deepcell' - 'collection_type': '.*' 'assay_type': 'MxIF' 'workflow': 'ometiff_pyramid' @@ -128,4 +131,3 @@ workflow_map: - 'collection_type': '.*' 'assay_type': '10x-multiome' 'workflow': 'multiome_10x' - From 8e5efa5c0ce89b91345b134fa79c76a19bff632f Mon Sep 17 00:00:00 2001 From: Matt Ruffalo Date: Mon, 5 Aug 2024 10:08:49 -0400 Subject: [PATCH 3/9] Bump phenocycler-pipeline submodule to v1.0.1 --- src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline index 14fac4e67..1e3521bb4 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/phenocycler-pipeline @@ -1 +1 @@ -Subproject commit 14fac4e6763abc780eca6de0a3fb1c7951490b92 +Subproject commit 1e3521bb49c13d901ac9f7c856450dd00c63332c From a3e4a1fad95f69a0dc3df8ff126051e56e10291f Mon Sep 17 00:00:00 2001 From: Juan Puerto <=> Date: Mon, 5 Aug 2024 10:22:49 -0400 Subject: [PATCH 4/9] General: Update assay_type to phenocycler to match the output from the rules engine. --- src/ingest-pipeline/airflow/dags/workflow_map.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index e14854078..d62dfb45c 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -27,7 +27,7 @@ workflow_map: 'assay_type': 'MIBI' 'workflow': 'mibi_deepcell' - 'collection_type': '.*' - 'assay_type': 'PhenoCycler' + 'assay_type': 'phenocycler' 'workflow': 'phenocycler_deepcell' - 'collection_type': '.*' 'assay_type': 'MxIF' From 9f31c1e4367cb124557829bd47cce3d5b9e198ee Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 6 Aug 2024 09:51:48 -0400 Subject: [PATCH 5/9] Bump multiome-rna-atac-pipeline to v1.1.7 --- src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline index ec8a7e87c..4aba16d3b 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/multiome-rna-atac-pipeline @@ -1 +1 @@ -Subproject commit ec8a7e87c40f0c98cb649d4ffa1fc52e44fffb27 +Subproject commit 4aba16d3bca7a9e8142c02f70aac6aec5c433da4 From 597b169fb4448f27dc44829fd477cdf5409ff6a1 Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 6 Aug 2024 09:52:23 -0400 Subject: [PATCH 6/9] Bump salmon-rnaseq to v2.2.7 --- src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq index 1550b3e16..0e913979a 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq +++ b/src/ingest-pipeline/airflow/dags/cwl/salmon-rnaseq @@ -1 +1 @@ -Subproject commit 1550b3e1687b4b42c45f3fc32c2c239199f9a493 +Subproject commit 0e913979ac4a989a482211dfdf6fe204d26b05ab From e81232fdfc424796e77a6fea80a48e4c77e4d42d Mon Sep 17 00:00:00 2001 From: Sean Date: Tue, 6 Aug 2024 09:53:28 -0400 Subject: [PATCH 7/9] Bumping sc-atac-seq-pipeline to v2.1.6 --- src/ingest-pipeline/airflow/dags/cwl/sc-atac-seq-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/sc-atac-seq-pipeline b/src/ingest-pipeline/airflow/dags/cwl/sc-atac-seq-pipeline index 2ce5afa58..be8597bb9 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/sc-atac-seq-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/sc-atac-seq-pipeline @@ -1 +1 @@ -Subproject commit 2ce5afa585d52f9e11c995be8d08b1f208d1fd7f +Subproject commit be8597bb9e35b34eab501b6f4751d8bb899a2457 From 8900f2ab1396df4cad058edbb8550d790f4a0c6c Mon Sep 17 00:00:00 2001 From: David Betancur Date: Wed, 7 Aug 2024 11:49:11 -0400 Subject: [PATCH 8/9] Syncing bugfix for visium RNA organism required parameter. --- src/ingest-pipeline/airflow/dags/visium.py | 19 +++++++++++++++++++ .../airflow/dags/workflow_map.yml | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/visium.py b/src/ingest-pipeline/airflow/dags/visium.py index 6a9e43d96..e138826be 100644 --- a/src/ingest-pipeline/airflow/dags/visium.py +++ b/src/ingest-pipeline/airflow/dags/visium.py @@ -27,6 +27,7 @@ make_send_status_msg_function, get_tmp_dir_path, HMDAG, + pythonop_get_dataset_state, get_queue_resource, get_threads_resource, get_preserve_scratch_resource, @@ -86,6 +87,22 @@ def build_cwltool_cmd1(**kwargs): data_dir = get_parent_data_dir(**kwargs) print("data_dirs: ", data_dir) + source_type = "" + unique_source_types = set() + for parent_uuid in get_parent_dataset_uuids_list(**kwargs): + dataset_state = pythonop_get_dataset_state( + dataset_uuid_callable=lambda **kwargs: parent_uuid, **kwargs) + source_type = dataset_state.get("source_type") + if source_type == "mixed": + print("Force failure. Should only be one unique source_type for a dataset.") + else: + unique_source_types.add(source_type) + + if len(unique_source_types) > 1: + print("Force failure. Should only be one unique source_type for a dataset.") + else: + source_type = unique_source_types.pop().lower() + command = [ *get_cwltool_base_cmd(tmpdir), "--relax-path-checks", @@ -97,6 +114,8 @@ def build_cwltool_cmd1(**kwargs): "visium-ff", "--threads", get_threads_resource(dag.dag_id), + "--organism", + source_type, ] command.append("--fastq_dir") diff --git a/src/ingest-pipeline/airflow/dags/workflow_map.yml b/src/ingest-pipeline/airflow/dags/workflow_map.yml index e14854078..d62dfb45c 100644 --- a/src/ingest-pipeline/airflow/dags/workflow_map.yml +++ b/src/ingest-pipeline/airflow/dags/workflow_map.yml @@ -27,7 +27,7 @@ workflow_map: 'assay_type': 'MIBI' 'workflow': 'mibi_deepcell' - 'collection_type': '.*' - 'assay_type': 'PhenoCycler' + 'assay_type': 'phenocycler' 'workflow': 'phenocycler_deepcell' - 'collection_type': '.*' 'assay_type': 'MxIF' From 95847a86945e8810aac234805437bf64381e9ab4 Mon Sep 17 00:00:00 2001 From: pennycuda <141153875+pennycuda@users.noreply.github.com> Date: Wed, 7 Aug 2024 12:17:44 -0400 Subject: [PATCH 9/9] Bump CODEX to 2.6.1 --- src/ingest-pipeline/airflow/dags/cwl/codex-pipeline | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline index c1b7ae981..51c7fbcc4 160000 --- a/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline +++ b/src/ingest-pipeline/airflow/dags/cwl/codex-pipeline @@ -1 +1 @@ -Subproject commit c1b7ae981e9bea1b4227fb0dc84caea703a34808 +Subproject commit 51c7fbcc4fb20eec6aa56e8f52387cf5edee62fc