Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

296 scheduler fails unable to find assembly contigs #300

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
11fe8b9
refactor Scheduler args and get_mongo_db() to work with local db
mbthornton-lbl Nov 21, 2024
33977ca
refactoring for clarity and added test.
mbthornton-lbl Nov 21, 2024
4a842fa
normalize record for DataObject construction
mbthornton-lbl Nov 21, 2024
3e5473e
refactor for readability
mbthornton-lbl Nov 21, 2024
b720b97
get basic test case working with realistic data
mbthornton-lbl Nov 21, 2024
d5abe2a
update unit test
mbthornton-lbl Nov 22, 2024
3677acb
fix FileTypeEnum issue
mbthornton-lbl Nov 22, 2024
19f4e31
clean up comments
mbthornton-lbl Nov 22, 2024
845130b
update fixtures and get unit tests working
mbthornton-lbl Nov 23, 2024
f9a0e4e
test that parent / child is set correctly for the node
mbthornton-lbl Nov 23, 2024
65060d2
add unit test for find_new_jobs
mbthornton-lbl Nov 23, 2024
62a9570
add assertions for scheduler.create_job
mbthornton-lbl Nov 23, 2024
9fe2335
update log statement
mbthornton-lbl Nov 23, 2024
b5ced79
ensure record tupe and workflow type match
mbthornton-lbl Nov 23, 2024
c457d75
update tests and logging
mbthornton-lbl Nov 23, 2024
d809078
remove unused api method
mbthornton-lbl Nov 23, 2024
eda7251
restore args in sched.main
mbthornton-lbl Nov 23, 2024
fc622db
fix main() argument handling
mbthornton-lbl Nov 25, 2024
85a5bac
Match was_informed_by AND workflow.type to find earlier db entries
mbthornton-lbl Nov 25, 2024
5e6e73e
Merge branch 'main' into 296-scheduler-fails-unable-to-find-assembly-…
mbthornton-lbl Nov 25, 2024
bc5a534
fix config.inputs.proj not getting workflow_execution id
mbthornton-lbl Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions nmdc_automation/models/nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def workflow_process_factory(record: Dict[str, Any], validate: bool = False) ->
def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)


if not normalized_record.get("type"):
return normalized_record
# get rid of any legacy 'Activity' suffixes in the type
normalized_record["type"] = normalized_record["type"].replace("Activity", "")
# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)
Expand Down Expand Up @@ -113,12 +113,20 @@ class DataObject(nmdc.DataObject):
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
normalized_record = _normalize_record(record)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

# override the base class data_object_type (FileTypeEnum) to return a string
@property
def data_object_type_text(self) -> str:
return self.data_object_type.code.text



def as_dict(self) -> Dict[str, Any]:
""" Convert the object to a dictionary """
return yaml.safe_load(yaml_dumper.dumps(self))


62 changes: 31 additions & 31 deletions nmdc_automation/workflow_automation/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from nmdc_automation.workflow_automation.workflow_process import load_workflow_process_nodes
from nmdc_automation.models.workflow import WorkflowConfig, WorkflowProcessNode
from semver.version import Version
import sys


_POLL_INTERVAL = 60
Expand All @@ -29,17 +30,15 @@
logger = logging.getLogger(__name__)
@lru_cache
def get_mongo_db() -> MongoDatabase:
for k in ["HOST", "USERNAME", "PASSWORD", "DBNAME"]:
if f"MONGO_{k}" not in os.environ:
raise KeyError(f"Missing MONGO_{k}")
_client = MongoClient(
host=os.getenv("MONGO_HOST"),
host=os.getenv("MONGO_HOST", "localhost"),
port=int(os.getenv("MONGO_PORT", "27017")),
username=os.getenv("MONGO_USERNAME"),
password=os.getenv("MONGO_PASSWORD"),
username=os.getenv("MONGO_USERNAME", None),
password=os.getenv("MONGO_PASSWORD", None),
directConnection=True,
)
return _client[os.getenv("MONGO_DBNAME")]
)[os.getenv("MONGO_DBNAME", "nmdc")]
return _client



def within_range(wf1: WorkflowConfig, wf2: WorkflowConfig, force=False) -> bool:
Expand Down Expand Up @@ -84,12 +83,12 @@ def __init__(self, workflow: WorkflowConfig, trigger_act: WorkflowProcessNode):

class Scheduler:

def __init__(self, db, wfn="workflows.yaml",
def __init__(self, db, workflow_yaml,
site_conf="site_configuration.toml"):
logging.info("Initializing Scheduler")
# Init
wf_file = os.environ.get(_WF_YAML_ENV, wfn)
self.workflows = load_workflow_configs(wf_file)
# wf_file = os.environ.get(_WF_YAML_ENV, wfn)
self.workflows = load_workflow_configs(workflow_yaml)
self.db = db
self.api = NmdcRuntimeApi(site_conf)
# TODO: Make force a optional parameter
Expand Down Expand Up @@ -124,9 +123,9 @@ def create_job_rec(self, job: SchedulerJob):

wf = job.workflow
base_id, iteration = self.get_activity_id(wf, job.informed_by)
activity_id = f"{base_id}.{iteration}"
inp_objects = []
inp = dict()
workflow_execution_id = f"{base_id}.{iteration}"
input_data_objects = []
inputs = dict()
optional_inputs = wf.optional_inputs
for k, v in job.workflow.inputs.items():
if v.startswith("do:"):
Expand All @@ -136,31 +135,31 @@ def create_job_rec(self, job: SchedulerJob):
if k in optional_inputs:
continue
raise ValueError(f"Unable to find {do_type} in {do_by_type}")
inp_objects.append(dobj.as_dict())
input_data_objects.append(dobj.as_dict())
v = dobj["url"]
# TODO: Make this smarter
elif v == "{was_informed_by}":
v = job.informed_by
elif v == "{activity_id}":
v = activity_id
elif v == "{workflow_execution_id}":
v = workflow_execution_id
elif v == "{predecessor_activity_id}":
v = job.trigger_act.id

inp[k] = v
inputs[k] = v

# Build the respoonse
job_config = {
"git_repo": wf.git_repo,
"release": wf.version,
"wdl": wf.wdl,
"activity_id": activity_id,
"activity_id": workflow_execution_id,
"activity_set": wf.collection,
"was_informed_by": job.informed_by,
"trigger_activity": job.trigger_id,
"iteration": iteration,
"input_prefix": wf.input_prefix,
"inputs": inp,
"input_data_objects": inp_objects,
"inputs": inputs,
"input_data_objects": input_data_objects,
}
if wf.workflow_execution:
job_config["activity"] = wf.workflow_execution
Expand Down Expand Up @@ -216,7 +215,7 @@ def get_activity_id(self, wf: WorkflowConfig, informed_by: str):
# We need to see if any version exist and
# if so get its ID
ct = 0
q = {"was_informed_by": informed_by}
q = {"was_informed_by": informed_by, "type": wf.type}
for doc in self.db[wf.collection].find(q):
ct += 1
last_id = doc["id"]
Expand Down Expand Up @@ -284,11 +283,6 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
"""
This function does a single cycle of looking for new jobs
"""
filt = {}
if allowlist:
filt = {"was_informed_by": {"$in": list(allowlist)}}
# TODO: Quite a lot happens under the hood here. This function should be broken down into smaller
# functions to improve readability and maintainability.
wfp_nodes = load_workflow_process_nodes(self.db, self.workflows, allowlist)

self.get_existing_jobs.cache_clear()
Expand All @@ -301,6 +295,8 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
logging.debug(f"Skipping: {wfp_node.id}, workflow disabled.")
continue
jobs = self.find_new_jobs(wfp_node)
if jobs:
logging.info(f"Found {len(jobs)} new jobs for {wfp_node.id}")
for job in jobs:
if dryrun:
msg = f"new job: informed_by: {job.informed_by} trigger: {job.trigger_id} "
Expand All @@ -318,12 +314,13 @@ def cycle(self, dryrun: bool = False, skiplist: set = set(),
return job_recs


def main(): # pragma: no cover
def main(site_conf, wf_file): # pragma: no cover
"""
Main function
"""
site_conf = os.environ.get("NMDC_SITE_CONF", "site_configuration.toml")
sched = Scheduler(get_mongo_db(), site_conf=site_conf)
# site_conf = os.environ.get("NMDC_SITE_CONF", "site_configuration.toml")
db = get_mongo_db()
sched = Scheduler(db, wf_file, site_conf=site_conf)
dryrun = False
if os.environ.get("DRYRUN") == "1":
dryrun = True
Expand All @@ -338,6 +335,8 @@ def main(): # pragma: no cover
with open(os.environ.get("ALLOWLISTFILE")) as f:
for line in f:
allowlist.add(line.rstrip())
# for local testing
allowlist = ["nmdc:omprc-13-01jx8727"]
while True:
sched.cycle(dryrun=dryrun, skiplist=skiplist, allowlist=allowlist)
if dryrun:
Expand All @@ -347,4 +346,5 @@ def main(): # pragma: no cover

if __name__ == "__main__": # pragma: no cover
logging.basicConfig(level=logging.INFO)
main()
# site_conf and wf_file are passed in as arguments
main(site_conf=sys.argv[1], wf_file=sys.argv[2])
108 changes: 53 additions & 55 deletions nmdc_automation/workflow_automation/workflow_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def get_required_data_objects_map(db, workflows: List[WorkflowConfig]) -> Dict[s
for wf in workflows:
required_types.update(set(wf.data_object_types))

required_data_objs_by_id = dict()
required_data_object_map = dict()
for rec in db.data_object_set.find({"data_object_type": {"$ne": None}}):
do = DataObject(**rec)
if do.data_object_type.code.text not in required_types:
data_object = DataObject(**rec)
if data_object.data_object_type.code.text not in required_types:
continue
required_data_objs_by_id[do.id] = do
return required_data_objs_by_id
required_data_object_map[data_object.id] = data_object
return required_data_object_map


@lru_cache
Expand Down Expand Up @@ -130,6 +130,8 @@ def get_current_workflow_process_nodes(

records = db[wf.collection].find(q)
for rec in records:
if rec['type'] != wf.type:
continue
# legacy JGI sequencing records
if rec.get("type") == "nmdc:MetagenomeSequencing" or rec["name"].startswith("Metagenome Sequencing"):
continue
Expand All @@ -155,7 +157,7 @@ def _determine_analyte_category(workflows: List[WorkflowConfig]) -> str:


# TODO: Make public, give a better name, add type hints and unit tests.
def _resolve_relationships(wfp_nodes: List[WorkflowProcessNode], wfp_nodes_by_data_object_id: Dict[str, WorkflowProcessNode]) -> List[WorkflowProcessNode]:
def _resolve_relationships(current_nodes: List[WorkflowProcessNode], node_data_object_map: Dict[str, WorkflowProcessNode]) -> List[WorkflowProcessNode]:
"""
Find the parents and children relationships
between the activities
Expand All @@ -164,79 +166,75 @@ def _resolve_relationships(wfp_nodes: List[WorkflowProcessNode], wfp_nodes_by_da
# a map of all of the data objects they generated.
# Let's use this to find the parent activity
# for each child activity
for wfp_node in wfp_nodes:
logging.debug(f"Processing {wfp_node.id} {wfp_node.name} {wfp_node.workflow.name}")
wfp_node_predecessors = wfp_node.workflow.parents
if not wfp_node_predecessors:
for node in current_nodes:
logging.debug(f"Processing {node.id} {node.name} {node.workflow.name}")
node_predecessors = node.workflow.parents
if not node_predecessors:
logging.debug("- No Predecessors")
continue
# Go through its inputs
for do_id in wfp_node.has_input:
if do_id not in wfp_nodes_by_data_object_id:
for data_object_id in node.has_input:
if data_object_id not in node_data_object_map:
# This really shouldn't happen
if do_id not in warned_objects:
logging.warning(f"Missing data object {do_id}")
warned_objects.add(do_id)
if data_object_id not in warned_objects:
logging.warning(f"Missing data object {data_object_id}")
warned_objects.add(data_object_id)
continue
parent_wfp_node = wfp_nodes_by_data_object_id[do_id]
parent_node = node_data_object_map[data_object_id]
# This is to cover the case where it was a duplicate.
# This shouldn't happen in the future.
if not parent_wfp_node:
if not parent_node:
logging.warning("Parent node is none")
continue
# Let's make sure these came from the same source
# This is just a safeguard
if wfp_node.was_informed_by != parent_wfp_node.was_informed_by:
if node.was_informed_by != parent_node.was_informed_by:
logging.warning(
"Mismatched informed by for "
f"{do_id} in {wfp_node.id} "
f"{wfp_node.was_informed_by} != "
f"{parent_wfp_node.was_informed_by}"
f"{data_object_id} in {node.id} "
f"{node.was_informed_by} != "
f"{parent_node.was_informed_by}"
)
continue
# We only want to use it as a parent if it is the right
# parent workflow. Some inputs may come from ancestors
# further up
if parent_wfp_node.workflow in wfp_node_predecessors:
if parent_node.workflow in node_predecessors:
# This is the one
wfp_node.parent = parent_wfp_node
parent_wfp_node.children.append(wfp_node)
node.parent = parent_node
parent_node.children.append(node)
logging.debug(
f"Found parent: {parent_wfp_node.id}"
f" {parent_wfp_node.name}"
f"Found parent: {parent_node.id}"
f" {parent_node.name}"
)
break
if len(wfp_node.workflow.parents) > 0 and not wfp_node.parent:
if wfp_node.id not in warned_objects:
logging.warning(f"Didn't find a parent for {wfp_node.id}")
warned_objects.add(wfp_node.id)
if len(node.workflow.parents) > 0 and not node.parent:
if node.id not in warned_objects:
logging.info(f"Skipping obsolete WorkflowExecution: {node.id}, {node.type} {node.version}")
warned_objects.add(node.id)
# Now all the activities have their parent
return wfp_nodes
return current_nodes


def _associate_workflow_process_nodes_to_data_objects(wfp_nodes: List[WorkflowProcessNode], data_objs_by_id):
def _map_nodes_to_data_objects(current_nodes: List[WorkflowProcessNode], required_data_object_map):
"""
Associate the data objects with workflow process nodes
"""
wfp_nodes_by_data_object_id = dict()
for wfp_node in wfp_nodes:
for do_id in wfp_node.has_output:
if do_id in data_objs_by_id:
do = data_objs_by_id[do_id]
wfp_node.add_data_object(do)
# If its a dupe, set it to none
# so we can ignore it later.
# Once we re-id the data objects this
# Post re-id we would not expect thi
if do_id in wfp_nodes_by_data_object_id:
if do_id not in warned_objects:
logging.warning(f"Duplicate output object {do_id}")
warned_objects.add(do_id)
wfp_nodes_by_data_object_id[do_id] = None
node_data_object_map = dict()
for node in current_nodes:
for data_object_id in node.has_output:
if data_object_id in required_data_object_map:
do = required_data_object_map[data_object_id]
node.add_data_object(do)

if data_object_id in node_data_object_map:
if data_object_id not in warned_objects:
logging.warning(f"Duplicate output object {data_object_id}")
warned_objects.add(data_object_id)
node_data_object_map[data_object_id] = None
else:
wfp_nodes_by_data_object_id[do_id] = wfp_node
return wfp_nodes_by_data_object_id, wfp_nodes

node_data_object_map[data_object_id] = node
return node_data_object_map, current_nodes


def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist: list[str] = None) -> List[WorkflowProcessNode]:
Expand All @@ -256,15 +254,15 @@ def load_workflow_process_nodes(db, workflows: list[WorkflowConfig], allowlist:

# This is map from the data object ID to the activity
# that created it.
data_objs_by_id = get_required_data_objects_map(db, workflows)
data_object_map = get_required_data_objects_map(db, workflows)

# Build up a set of relevant activities and a map from
# the output objects to the activity that generated them.
wfp_nodes = get_current_workflow_process_nodes(db, workflows, data_objs_by_id, allowlist)
current_nodes = get_current_workflow_process_nodes(db, workflows, data_object_map, allowlist)

wfp_nodes_by_data_object_id, wfp_nodes = _associate_workflow_process_nodes_to_data_objects(wfp_nodes, data_objs_by_id)
node_data_object_map, current_nodes = _map_nodes_to_data_objects(current_nodes, data_object_map)

# Now populate the parent and children values for the
wfp_nodes = _resolve_relationships(wfp_nodes, wfp_nodes_by_data_object_id)
return wfp_nodes
resolved_nodes = _resolve_relationships(current_nodes, node_data_object_map)
return resolved_nodes

Loading