Skip to content

Commit

Permalink
Merge pull request #300 from microbiomedata/296-scheduler-fails-unabl…
Browse files Browse the repository at this point in the history
…e-to-find-assembly-contigs

296 scheduler fails unable to find assembly contigs
  • Loading branch information
mbthornton-lbl authored Nov 25, 2024
2 parents b79eb3c + bc5a534 commit 2aec24f
Show file tree
Hide file tree
Showing 10 changed files with 1,835 additions and 770 deletions.
20 changes: 14 additions & 6 deletions nmdc_automation/models/nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def workflow_process_factory(record: Dict[str, Any], validate: bool = False) ->
def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)


if not normalized_record.get("type"):
return normalized_record
# get rid of any legacy 'Activity' suffixes in the type
normalized_record["type"] = normalized_record["type"].replace("Activity", "")
# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)
Expand Down Expand Up @@ -113,12 +113,20 @@ class DataObject(nmdc.DataObject):
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
normalized_record = _normalize_record(record)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

# override the base class data_object_type (FileTypeEnum) to return a string
@property
def data_object_type_text(self) -> str:
return self.data_object_type.code.text



def as_dict(self) -> Dict[str, Any]:
""" Convert the object to a dictionary """
return yaml.safe_load(yaml_dumper.dumps(self))


62 changes: 31 additions & 31 deletions nmdc_automation/workflow_automation/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from nmdc_automation.workflow_automation.workflow_process import load_workflow_process_nodes
from nmdc_automation.models.workflow import WorkflowConfig, WorkflowProcessNode
from semver.version import Version
import sys


_POLL_INTERVAL = 60
Expand All @@ -29,17 +30,15 @@
logger = logging.getLogger(__name__)
@lru_cache
def get_mongo_db() -> MongoDatabase:
for k in ["HOST", "USERNAME", "PASSWORD", "DBNAME"]:
if f"MONGO_{k}" not in os.environ:
raise KeyError(f"Missing MONGO_{k}")
_client = MongoClient(
host=os.getenv("MONGO_HOST"),
host=os.getenv("MONGO_HOST", "localhost"),
port=int(os.getenv("MONGO_PORT", "27017")),
username=os.getenv("MONGO_USERNAME"),
password=os.getenv("MONGO_PASSWORD"),
username=os.getenv("MONGO_USERNAME", None),
password=os.getenv("MONGO_PASSWORD", None),
directConnection=True,
)
return _client[os.getenv("MONGO_DBNAME")]
)[os.getenv("MONGO_DBNAME", "nmdc")]
return _client



def within_range(wf1: WorkflowConfig, wf2: WorkflowConfig, force=False) -> bool:
Expand Down Expand Up @@ -84,12 +83,12 @@ def __init__(self, workflow: WorkflowConfig, trigger_act: WorkflowProcessNode):

class Scheduler:

def __init__(self, db, wfn="workflows.yaml",
def __init__(self, db, workflow_yaml,
site_conf="site_configuration.toml"):
logging.info("Initializing Scheduler")
# Init
wf_file = os.environ.get(_WF_YAML_ENV, wfn)
self.workflows = load_workflow_configs(wf_file)
# wf_file = os.environ.get(_WF_YAML_ENV, wfn)
self.workflows = load_workflow_configs(workflow_yaml)
self.db = db
self.api = NmdcRuntimeApi(site_conf)
# TODO: Make force a optional parameter
Expand Down Expand Up @@ -124,9 +123,9 @@ def create_job_rec(self, job: SchedulerJob):

wf = job.workflow
base_id, iteration = self.get_activity_id(wf, job.informed_by)
activity_id = f"{base_id}.{iteration}"
inp_objects = []
inp = dict()
workflow_execution_id = f"{base_id}.{iteration}"
input_data_objects = []
inputs = dict()
optional_inputs = wf.optional_inputs
for k, v in job.workflow.inputs.items():
if v.startswith("do:"):
Expand All @@ -136,31 +135,31 @@ def create_job_rec(self, job: SchedulerJob):
if k in optional_inputs:
continue
raise ValueError(f"Unable to find {do_type} in {do_by_type}")
inp_objects.append(dobj.as_dict())
input_data_objects.append(dobj.as_dict())
v = dobj["url"]
# TODO: Make this smarter
elif v == "{was_informed_by}":
v = job.informed_by
elif v == "{activity_id}":
v = activity_id
elif v == "{workflow_execution_id}":
v = workflow_execution_id
elif v == "{predecessor_activity_id}":
v = job.trigger_act.id

inp[k] = v
inputs[k] = v

# Build the respoonse
job_config = {
"git_repo": wf.git_repo,
"release": wf.version,
"wdl": wf.wdl,
"activity_id": activity_id,
"activity_id": workflow_execution_id,
"activity_set": wf.collection,
"was_informed_by": job.informed_by,
"trigger_activity": job.trigger_id,
"iteration": iteration,
"input_prefix": wf.input_prefix,
"inputs": inp,
"input_data_objects": inp_objects,
"inputs": inputs,
"input_data_objects": input_data_objects,
}
if wf.workflow_execution:
job_config["activity"] = wf.workflow_execution
Expand Down Expand Up @@ -216,7 +215,7 @@ def get_activity_id(self, wf: WorkflowConfig, informed_by: str):
# We need to see if any version exist and
# if so get its ID
ct = 0
q = {"was_informed_by": informed_by}
q = {"was_informed_by": informed_by, "type": wf.type}
for doc in self.db[wf.collection].find(q):
ct += 1
last_id = doc["id"]
Expand Down Expand Up @@ -284,11 +283,6 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
"""
This function does a single cycle of looking for new jobs
"""
filt = {}
if allowlist:
filt = {"was_informed_by": {"$in": list(allowlist)}}
# TODO: Quite a lot happens under the hood here. This function should be broken down into smaller
# functions to improve readability and maintainability.
wfp_nodes = load_workflow_process_nodes(self.db, self.workflows, allowlist)

self.get_existing_jobs.cache_clear()
Expand All @@ -301,6 +295,8 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
logging.debug(f"Skipping: {wfp_node.id}, workflow disabled.")
continue
jobs = self.find_new_jobs(wfp_node)
if jobs:
logging.info(f"Found {len(jobs)} new jobs for {wfp_node.id}")
for job in jobs:
if dryrun:
msg = f"new job: informed_by: {job.informed_by} trigger: {job.trigger_id} "
Expand All @@ -318,12 +314,13 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
return job_recs


def main(): # pragma: no cover
def main(site_conf, wf_file): # pragma: no cover
"""
Main function
"""
site_conf = os.environ.get("NMDC_SITE_CONF", "site_configuration.toml")
sched = Scheduler(get_mongo_db(), site_conf=site_conf)
# site_conf = os.environ.get("NMDC_SITE_CONF", "site_configuration.toml")
db = get_mongo_db()
sched = Scheduler(db, wf_file, site_conf=site_conf)
dryrun = False
if os.environ.get("DRYRUN") == "1":
dryrun = True
Expand All @@ -338,6 +335,8 @@ def main(): # pragma: no cover
with open(os.environ.get("ALLOWLISTFILE")) as f:
for line in f:
allowlist.add(line.rstrip())
# for local testing
allowlist = ["nmdc:omprc-13-01jx8727"]
while True:
sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist)
if dryrun:
Expand All @@ -347,4 +346,5 @@ def main(): # pragma: no cover

if __name__ == "__main__": # pragma: no cover
logging.basicConfig(level=logging.INFO)
main()
# site_conf and wf_file are passed in as arguments
main(site_conf=sys.argv[1], wf_file=sys.argv[2])
108 changes: 53 additions & 55 deletions nmdc_automation/workflow_automation/workflow_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def get_required_data_objects_map(db, workflows: List[WorkflowConfig]) -> Dict[s
for wf in workflows:
required_types.update(set(wf.data_object_types))

required_data_objs_by_id = dict()
required_data_object_map = dict()
for rec in db.data_object_set.find({"data_object_type": {"$ne": None}}):
do = DataObject(**rec)
if do.data_object_type.code.text not in required_types:
data_object = DataObject(**rec)
if data_object.data_object_type.code.text not in required_types:
continue
required_data_objs_by_id[do.id] = do
return required_data_objs_by_id
required_data_object_map[data_object.id] = data_object
return required_data_object_map


@lru_cache
Expand Down Expand Up @@ -130,6 +130,8 @@ def get_current_workflow_process_nodes(

records = db[wf.collection].find(q)
for rec in records:
if rec['type'] != wf.type:
continue
# legacy JGI sequencing records
if rec.get("type") == "nmdc:MetagenomeSequencing" or rec["name"].startswith("Metagenome Sequencing"):
continue
Expand All @@ -155,7 +157,7 @@ def _determine_analyte_category(workflows: List[WorkflowConfig]) -> str:


# TODO: Make public, give a better name, add type hints and unit tests.
def _resolve_relationships(wfp_nodes: List[WorkflowProcessNode], wfp_nodes_by_data_object_id: Dict[str, WorkflowProcessNode]) -> List[WorkflowProcessNode]:
def _resolve_relationships(current_nodes: List[WorkflowProcessNode], node_data_object_map: Dict[str, WorkflowProcessNode]) -> List[WorkflowProcessNode]:
"""
Find the parents and children relationships
between the activities
Expand All @@ -164,79 +166,75 @@ def _resolve_relationships(wfp_nodes: List[WorkflowProcessNode], wfp_nodes_by_da
# a map of all of the data objects they generated.
# Let's use this to find the parent activity
# for each child activity
for wfp_node in wfp_nodes:
logging.debug(f"Processing {wfp_node.id} {wfp_node.name} {wfp_node.workflow.name}")
wfp_node_predecessors = wfp_node.workflow.parents
if not wfp_node_predecessors:
for node in current_nodes:
logging.debug(f"Processing {node.id} {node.name} {node.workflow.name}")
node_predecessors = node.workflow.parents
if not node_predecessors:
logging.debug("- No Predecessors")
continue
# Go through its inputs
for do_id in wfp_node.has_input:
if do_id not in wfp_nodes_by_data_object_id:
for data_object_id in node.has_input:
if data_object_id not in node_data_object_map:
# This really shouldn't happen
if do_id not in warned_objects:
logging.warning(f"Missing data object {do_id}")
warned_objects.add(do_id)
if data_object_id not in warned_objects:
logging.warning(f"Missing data object {data_object_id}")
warned_objects.add(data_object_id)
continue
parent_wfp_node = wfp_nodes_by_data_object_id[do_id]
parent_node = node_data_object_map[data_object_id]
# This is to cover the case where it was a duplicate.
# This shouldn't happen in the future.
if not parent_wfp_node:
if not parent_node:
logging.warning("Parent node is none")
continue
# Let's make sure these came from the same source
# This is just a safeguard
if wfp_node.was_informed_by != parent_wfp_node.was_informed_by:
if node.was_informed_by != parent_node.was_informed_by:
logging.warning(
"Mismatched informed by for "
f"{do_id} in {wfp_node.id} "
f"{wfp_node.was_informed_by} != "
f"{parent_wfp_node.was_informed_by}"
f"{data_object_id} in {node.id} "
f"{node.was_informed_by} != "
f"{parent_node.was_informed_by}"
)
continue
# We only want to use it as a parent if it is the right
# parent workflow. Some inputs may come from ancestors
# further up
if parent_wfp_node.workflow in wfp_node_predecessors:
if parent_node.workflow in node_predecessors:
# This is the one
wfp_node.parent = parent_wfp_node
parent_wfp_node.children.append(wfp_node)
node.parent = parent_node
parent_node.children.append(node)
logging.debug(
f"Found parent: {parent_wfp_node.id}"
f" {parent_wfp_node.name}"
f"Found parent: {parent_node.id}"
f" {parent_node.name}"
)
break
if len(wfp_node.workflow.parents) > 0 and not wfp_node.parent:
if wfp_node.id not in warned_objects:
logging.warning(f"Didn't find a parent for {wfp_node.id}")
warned_objects.add(wfp_node.id)
if len(node.workflow.parents) > 0 and not node.parent:
if node.id not in warned_objects:
logging.info(f"Skipping obsolete WorkflowExecution: {node.id}, {node.type} {node.version}")
warned_objects.add(node.id)
# Now all the activities have their parent
return wfp_nodes
return current_nodes


def _associate_workflow_process_nodes_to_data_objects(wfp_nodes: List[WorkflowProcessNode], data_objs_by_id):
def _map_nodes_to_data_objects(current_nodes: List[WorkflowProcessNode], required_data_object_map):
"""
Associate the data objects with workflow process nodes
"""
wfp_nodes_by_data_object_id = dict()
for wfp_node in wfp_nodes:
for do_id in wfp_node.has_output:
if do_id in data_objs_by_id:
do = data_objs_by_id[do_id]
wfp_node.add_data_object(do)
# If its a dupe, set it to none
# so we can ignore it later.
# Once we re-id the data objects this
# Post re-id we would not expect thi
if do_id in wfp_nodes_by_data_object_id:
if do_id not in warned_objects:
logging.warning(f"Duplicate output object {do_id}")
warned_objects.add(do_id)
wfp_nodes_by_data_object_id[do_id] = None
node_data_object_map = dict()
for node in current_nodes:
for data_object_id in node.has_output:
if data_object_id in required_data_object_map:
do = required_data_object_map[data_object_id]
node.add_data_object(do)

if data_object_id in node_data_object_map:
if data_object_id not in warned_objects:
logging.warning(f"Duplicate output object {data_object_id}")
warned_objects.add(data_object_id)
node_data_object_map[data_object_id] = None
else:
wfp_nodes_by_data_object_id[do_id] = wfp_node
return wfp_nodes_by_data_object_id, wfp_nodes

node_data_object_map[data_object_id] = node
return node_data_object_map, current_nodes


def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist: list[str] = None) -> List[WorkflowProcessNode]:
Expand All @@ -256,15 +254,15 @@ def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist:

# This is map from the data object ID to the activity
# that created it.
data_objs_by_id = get_required_data_objects_map(db, workflows)
data_object_map = get_required_data_objects_map(db, workflows)

# Build up a set of relevant activities and a map from
# the output objects to the activity that generated them.
wfp_nodes = get_current_workflow_process_nodes(db, workflows, data_objs_by_id, allowlist)
current_nodes = get_current_workflow_process_nodes(db, workflows, data_object_map, allowlist)

wfp_nodes_by_data_object_id, wfp_nodes = _associate_workflow_process_nodes_to_data_objects(wfp_nodes, data_objs_by_id)
node_data_object_map, current_nodes = _map_nodes_to_data_objects(current_nodes, data_object_map)

# Now populate the parent and children values for the
wfp_nodes = _resolve_relationships(wfp_nodes, wfp_nodes_by_data_object_id)
return wfp_nodes
resolved_nodes = _resolve_relationships(current_nodes, node_data_object_map)
return resolved_nodes

Loading

0 comments on commit 2aec24f

Please sign in to comment.