Skip to content

Commit

Permalink
simplify map_sequenc_file - we only support unique metagenome raw reads
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Dec 18, 2024
1 parent 12cd1ca commit ed246e9
Showing 1 changed file with 55 additions and 40 deletions.
95 changes: 55 additions & 40 deletions nmdc_automation/import_automation/activity_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class GoldMapper:

METAGENOME_RAW_READS = "Metagenome Raw Reads"
def __init__(
self,
iteration,
Expand All @@ -37,16 +39,16 @@ def __init__(
project_directory: Project directory path.
"""

self.import_data = self.load_yaml_file(yaml_file)
self.import_specifications = self.load_yaml_file(yaml_file)
self.nmdc_db = nmdc.Database()
self.iteration = iteration
self.file_list = file_list
self.nucelotide_sequencing_id = nucelotide_sequencing_id
self.root_dir = os.path.join(
self.import_data["Workflow Metadata"]["Root Directory"], nucelotide_sequencing_id
self.import_specifications["Workflow Metadata"]["Root Directory"], nucelotide_sequencing_id
)
self.project_dir = project_directory
self.url = self.import_data["Workflow Metadata"]["Source URL"]
self.url = self.import_specifications["Workflow Metadata"]["Source URL"]
self.data_object_type = "nmdc:DataObject"
self.data_object_map = {}
self.workflow_execution_ids = {}
Expand All @@ -61,51 +63,64 @@ def load_yaml_file(self, yaml_file: Union[str, Path]) -> Dict:

def build_workflows_by_type(self) -> Dict:
"""Builds a dictionary of workflows by their type."""
return {wf["Type"]: wf for wf in self.import_data["Workflows"]}
return {wf["Type"]: wf for wf in self.import_specifications["Workflows"]}


def link_sequencing_data_file(self) -> Dict[str, dict]:
"""
Create a link to the sequencing file if it does not exist.
Return a dictionary with the sequencing data object record by md5 checksum.
Currently only supports a unique sequencing data object of type "Metagenome Raw Reads".
"""
sequencing_types = ["Metagenome Raw Reads", ]
sequencing_import_data = [
d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types
sequencing_import_specifications = [
d for d in self.import_specifications["Data Objects"]["Unique"] if d["data_object_type"] == self.METAGENOME_RAW_READS
]
sequencing_data = {}
# We can only have one sequencing data object import specification
if len(sequencing_import_specifications) > 1:
raise ValueError("More than one sequencing import specification found")
import_spec = sequencing_import_specifications[0]


# make the root directory if it does not exist
try:
os.makedirs(self.root_dir)
except FileExistsError:
logger.info(f"{self.root_dir} already exists")
for data_object_dict in sequencing_import_data:
for import_file in self.file_list:
import_file = str(import_file)
if re.search(data_object_dict["import_suffix"], import_file):
file_destination_name = object_action(
import_file,
data_object_dict["action"],
self.nucelotide_sequencing_id,
data_object_dict["nmdc_suffix"],
)
export_file = os.path.join(self.root_dir, file_destination_name)
try:
os.link(import_file, export_file)
logger.info(f"Linked {import_file} to {export_file}")
except FileExistsError:
logger.info(f"{export_file} already exists")
md5 = get_md5(export_file)
sequencing_data[md5] = {
"name": file_destination_name,
"file_size_bytes": os.stat(export_file).st_size,
"md5_checksum": md5,
"data_object_type": data_object_dict["data_object_type"],
"description": data_object_dict["description"].replace(
"{id}", self.nucelotide_sequencing_id
)
}
return sequencing_data

# Get the import file that matches the nmdc_suffix given in the data object spec - we can only have one
import_files = [str(f) for f in self.file_list if re.search(import_spec["import_suffix"], str(f))]
if len(import_files) > 1:
raise ValueError("More than one sequencing data object found")
if not import_files:
raise ValueError("No sequencing data object found")
import_file = import_files[0]

file_destination_name = object_action(
import_file,
import_spec["action"],
self.nucelotide_sequencing_id,
import_spec["nmdc_suffix"],
)
export_file = os.path.join(self.root_dir, file_destination_name)
try:
os.link(import_file, export_file)
logger.info(f"Linked {import_file} to {export_file}")
except FileExistsError:
logger.info(f"{export_file} already exists")
md5 = get_md5(export_file)
sequencing_data_by_md5 = {
md5: {
"name": file_destination_name,
"file_size_bytes": os.stat(export_file).st_size,
"md5_checksum": md5,
"data_object_type": import_spec["data_object_type"],
"description": import_spec["description"].replace(
"{id}", self.nucelotide_sequencing_id
)
}
}
return sequencing_data_by_md5



Expand All @@ -120,7 +135,7 @@ def map_sequencing_data(self) -> Tuple[nmdc.Database, Dict]:

# get the Metagenome Raw Reads import data
sequencing_import_data = [
d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types
d for d in self.import_specifications["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types
]
has_output = []
for data_object_dict in sequencing_import_data:
Expand Down Expand Up @@ -230,9 +245,9 @@ def process_files(files: Union[str, List[str]], data_object_dict: Dict, workflow

# Select the correct data source (unique or multiple)
data_objects_key = "Unique" if unique else "Multiples"
data_object_specs = self.import_data["Data Objects"][data_objects_key]
data_object_specs = self.import_specifications["Data Objects"][data_objects_key]
for data_object_spec in data_object_specs:
if not filter_import_by_type(self.import_data["Workflows"], data_object_spec["output_of"]):
if not filter_import_by_type(self.import_specifications["Workflows"], data_object_spec["output_of"]):
continue
if not "import_suffix" in data_object_spec:
logging.warning("Missing suffix")
Expand Down Expand Up @@ -269,7 +284,7 @@ def map_workflow_executions(self, db) -> nmdc.Database:
section with an 'Execution Resource'.
"""

for workflow in self.import_data["Workflows"]:
for workflow in self.import_specifications["Workflows"]:
if not workflow.get("Import"):
continue
logging.info(f"Processing {workflow['Name']}")
Expand Down Expand Up @@ -302,7 +317,7 @@ def map_workflow_executions(self, db) -> nmdc.Database:
"has_output": has_output_list,
"git_url": workflow["Git_repo"],
"version": workflow["Version"],
"execution_resource": self.import_data["Workflow Metadata"]["Execution Resource"],
"execution_resource": self.import_specifications["Workflow Metadata"]["Execution Resource"],
"started_at_time": datetime.datetime.now(pytz.utc).isoformat(),
"ended_at_time": datetime.datetime.now(pytz.utc).isoformat(),
"was_informed_by": self.nucelotide_sequencing_id
Expand Down

0 comments on commit ed246e9

Please sign in to comment.