Skip to content

Commit

Permalink
update workflow_process to handle Metagenome Sequencing
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 20, 2024
1 parent 3a57b66 commit f6a52f0
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions nmdc_automation/workflow_automation/workflow_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
from nmdc_automation.models.nmdc import DataObject
from nmdc_automation.models.workflow import WorkflowConfig, WorkflowProcessNode

# Internal sequencing -> data_generation_set
# External sequencing -> workflow_execution_set
DATA_GENERATION_NAMES = [
"Sequencing Noninterleaved", "Sequencing Interleaved", "Metagenome Sequencing",
"Metatranscriptome Sequencing"
]

warned_objects = set()


Expand Down Expand Up @@ -99,20 +106,31 @@ def get_current_workflow_process_nodes(
analyte_category = _determine_analyte_category(workflows)

data_generation_ids = set()
data_generation_workflows = [wf for wf in workflows if wf.collection == "data_generation_set"]
workflow_execution_workflows = [wf for wf in workflows if wf.collection == "workflow_execution_set"]
data_generation_workflows = [wf for wf in workflows if wf.name in DATA_GENERATION_NAMES]
workflow_execution_workflows = [wf for wf in workflows if wf.name not in DATA_GENERATION_NAMES]

# default query for data_generation_set records filtered by analyte category
q = {"analyte_category": analyte_category}
# override query with allowlist
if allowlist:
q["id"] = {"$in": list(allowlist)}
dg_execution_records = db["data_generation_set"].find(q)
dg_execution_records = list(dg_execution_records)
dg_records = db["data_generation_set"].find(q)
wfe_records = db["workflow_execution_set"].find(q)
else:
dg_records = db["data_generation_set"].find(q)
if analyte_category == "metagenome": # External sequencing - we only support metagenome for now
q2 = {"type": "nmdc:MetagenomeSequencing"}
wfe_records = db["workflow_execution_set"].find(q2)
else:
wfe_records = []

dg_records = list(dg_records)
wfe_records = list(wfe_records)
dg_records.extend(wfe_records)

for wf in data_generation_workflows:
# Sequencing workflows don't have a git repo
for rec in dg_execution_records:
for rec in dg_records:
if _is_missing_required_input_output(wf, rec, data_objects_by_id):
continue
data_generation_ids.add(rec["id"])
Expand Down

0 comments on commit f6a52f0

Please sign in to comment.