Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into release_3
Browse files Browse the repository at this point in the history
  • Loading branch information
sunset666 committed Feb 25, 2025
2 parents ea19bf5 + 91ae740 commit e0e9107
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 707 deletions.
94 changes: 46 additions & 48 deletions src/ingest-pipeline/airflow/dags/azimuth_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,57 +64,30 @@
cwl_workflows_annotations_salmon = [
{
"workflow_path": str(get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl"))),
"input_parameters": [
{"parameter_name": "--reference", "value": ""},
{"parameter_name": "--matrix", "value": "expr.h5ad"},
{
"parameter_name": "--secondary-analysis-matrix",
"value": "secondary_analysis.h5ad",
},
{"parameter_name": "--assay", "value": ""},
],
"documentation_url": "",
},
{
"workflow_path": str(
get_absolute_workflow(Path("portal-containers", "h5ad-to-arrow.cwl"))
),
"input_parameters": [
{"parameter_name": "--input_dir", "value": ".."},
],
"documentation_url": "",
},
{
"workflow_path": str(
get_absolute_workflow(Path("portal-containers", "anndata-to-ui.cwl"))
),
"input_parameters": [
{"parameter_name": "--input_dir", "value": ".."},
],
"documentation_url": "",
},
]
cwl_workflows_annotations_multiome = [
{
"workflow_path": str(get_absolute_workflow(Path("azimuth-annotate", "pipeline.cwl"))),
"input_parameters": [
{"parameter_name": "--reference", "value": ""},
{"parameter_name": "--matrix", "value": "mudata_raw.h5mu"},
{
"parameter_name": "--secondary-analysis-matrix",
"value": "secondary_analysis.h5mu",
},
{"parameter_name": "--assay", "value": ""},
],
"documentation_url": "",
},
{
"workflow_path": str(
get_absolute_workflow(Path("portal-containers", "mudata-to-ui.cwl"))
),
"input_parameters": [
{"parameter_name": "--input_dir", "value": ".."},
],
"documentation_url": "",
},
]
Expand Down Expand Up @@ -169,15 +142,33 @@ def build_cwltool_cmd1(**kwargs):
organ_code = organ_list[0] if len(organ_list) == 1 else "multi"
assay, matrix, secondary_analysis, _ = get_assay_previous_version(**kwargs)

workflows = (
cwl_workflows_annotations_multiome
if "mudata" in secondary_analysis
else cwl_workflows_annotations_salmon
)
if "mudata" in secondary_analysis:
workflows = cwl_workflows_annotations_multiome
input_parameters = [
{"parameter_name": "--reference", "value": organ_code},
{
"parameter_name": "--matrix",
"value": str(tmpdir / "cwl_out/mudata_raw.h5mu"),
},
{
"parameter_name": "--secondary-analysis-matrix",
"value": str(tmpdir / "cwl_outsecondary_analysis.h5mu"),
},
{"parameter_name": "--assay", "value": assay},
]
else:
workflows = cwl_workflows_annotations_salmon
input_parameters = [
{"parameter_name": "--reference", "value": organ_code},
{"parameter_name": "--matrix", "value": str(tmpdir / "cwl_out/expr.h5ad")},
{
"parameter_name": "--secondary-analysis-matrix",
"value": str(tmpdir / "cwl_out/secondary_analysis.h5ad"),
},
{"parameter_name": "--assay", "value": assay},
]

# [--reference, --matrix, --secondary-analysis-matrix, --assay]
input_param_vals = [organ_code, matrix, secondary_analysis, assay]
command = get_cwl_cmd_from_workflows(workflows, 0, input_param_vals, tmpdir, kwargs["ti"])
command = get_cwl_cmd_from_workflows(workflows, 0, input_parameters, tmpdir, kwargs["ti"])

return join_quote_command_str(command)

Expand All @@ -189,9 +180,15 @@ def build_cwltool_cmd2(**kwargs):

workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd1")

# [--input_dir]
input_param_vals = []
command = get_cwl_cmd_from_workflows(workflows, 1, input_param_vals, tmpdir, kwargs["ti"])
cwl_parameters = [
{"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")},
]
input_parameters = [
{"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")},
]
command = get_cwl_cmd_from_workflows(
workflows, 1, input_parameters, tmpdir, kwargs["ti"], cwl_parameters
)
kwargs["ti"].xcom_push(key="skip_cwl3", value=1 if workflow == 0 else 0)

return join_quote_command_str(command)
Expand All @@ -203,9 +200,15 @@ def build_cwltool_cmd4(**kwargs):

workflows = kwargs["ti"].xcom_pull(key="cwl_workflows", task_ids="build_cmd2")

# [--input_dir]
input_param_vals = []
command = get_cwl_cmd_from_workflows(workflows, 2, input_param_vals, tmpdir, kwargs["ti"])
cwl_parameters = [
{"parameter_name": "--outdir", "value": str(tmpdir / "cwl_out/hubmap_ui")},
]
input_parameters = [
{"parameter_name": "--input_dir", "value": str(tmpdir / "cwl_out")},
]
command = get_cwl_cmd_from_workflows(
workflows, 2, input_parameters, tmpdir, kwargs["ti"], cwl_parameters
)

return join_quote_command_str(command)

Expand Down Expand Up @@ -233,7 +236,7 @@ def build_cwltool_cmd4(**kwargs):
task_id="pipeline_exec_azimuth_annotate",
bash_command=""" \
tmp_dir={{tmp_dir_path(run_id)}} ; \
cd "$tmp_dir"/cwl_out ; \
mkdir -p ${tmp_dir}/cwl_out ; \
{{ti.xcom_pull(task_ids='build_cmd1')}} >> $tmp_dir/session.log 2>&1 ; \
echo $?
""",
Expand All @@ -244,9 +247,7 @@ def build_cwltool_cmd4(**kwargs):
bash_command=""" \
tmp_dir={{tmp_dir_path(run_id)}} ; \
ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \
cd "$tmp_dir"/cwl_out ; \
mkdir -p hubmap_ui ; \
cd hubmap_ui ; \
mkdir -p ${tmp_dir}/cwl_out/hubmap_ui ; \
{{ti.xcom_pull(task_ids='build_cmd2')}} >> $tmp_dir/session.log 2>&1 ; \
echo $?
""",
Expand All @@ -257,9 +258,6 @@ def build_cwltool_cmd4(**kwargs):
bash_command=""" \
tmp_dir={{tmp_dir_path(run_id)}} ; \
ds_dir="{{ti.xcom_pull(task_ids="send_create_dataset")}}" ; \
cd "$tmp_dir"/cwl_out ; \
mkdir -p hubmap_ui ; \
cd hubmap_ui ; \
{{ti.xcom_pull(task_ids='build_cmd4')}} >> $tmp_dir/session.log 2>&1 ; \
echo $?
""",
Expand Down
23 changes: 11 additions & 12 deletions src/ingest-pipeline/airflow/dags/bulk_atacseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@
"workflow_path": str(
get_absolute_workflow(Path("sc-atac-seq-pipeline", "bulk-atac-seq-pipeline.cwl"))
),
"input_parameters": [
{"parameter_name": "--threads", "value": ""},
{"parameter_name": "--sequence_directory", "value": []},
],
"documentation_url": "",
}
]
Expand All @@ -84,16 +80,20 @@ def build_cwltool_cmd1(**kwargs):

data_dirs = get_parent_data_dirs_list(**kwargs)

# ["--threads", "--sequence_directory"]
input_param_vals = [
get_threads_resource(dag.dag_id),
[str(data_dir) for data_dir in data_dirs],
]
cwl_params = [
{"parameter_name": "--parallel", "value": ""},
]

input_parameters = [
{"parameter_name": "--threads", "value": get_threads_resource(dag.dag_id)},
{
"parameter_name": "--sequence_directory",
"value": [str(data_dir) for data_dir in data_dirs],
},
]

command = get_cwl_cmd_from_workflows(
cwl_workflows, 0, input_param_vals, tmpdir, kwargs["ti"], cwl_params
cwl_workflows, 0, input_parameters, tmpdir, kwargs["ti"], cwl_params
)

return join_quote_command_str(command)
Expand All @@ -108,8 +108,7 @@ def build_cwltool_cmd1(**kwargs):
task_id="pipeline_exec",
bash_command=""" \
tmp_dir={{tmp_dir_path(run_id)}} ; \
mkdir -p "${tmp_dir}"/cwl_out ; \
cd "${tmp_dir}"/cwl_out ; \
mkdir -p ${tmp_dir}/cwl_out ; \
{{ti.xcom_pull(task_ids='build_cmd1')}} > $tmp_dir/session.log 2>&1 ; \
echo $?
""",
Expand Down
Loading

0 comments on commit e0e9107

Please sign in to comment.