From ed246e982fa30da8192d105a78fba2bf15641d38 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Wed, 18 Dec 2024 08:44:52 -0800 Subject: [PATCH] simplify map_sequenc_file - we only support unique metagenome raw reads --- .../import_automation/activity_mapper.py | 95 +++++++++++-------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/nmdc_automation/import_automation/activity_mapper.py b/nmdc_automation/import_automation/activity_mapper.py index 4240504f..545196dc 100644 --- a/nmdc_automation/import_automation/activity_mapper.py +++ b/nmdc_automation/import_automation/activity_mapper.py @@ -17,6 +17,8 @@ class GoldMapper: + + METAGENOME_RAW_READS = "Metagenome Raw Reads" def __init__( self, iteration, @@ -37,16 +39,16 @@ def __init__( project_directory: Project directory path. """ - self.import_data = self.load_yaml_file(yaml_file) + self.import_specifications = self.load_yaml_file(yaml_file) self.nmdc_db = nmdc.Database() self.iteration = iteration self.file_list = file_list self.nucelotide_sequencing_id = nucelotide_sequencing_id self.root_dir = os.path.join( - self.import_data["Workflow Metadata"]["Root Directory"], nucelotide_sequencing_id + self.import_specifications["Workflow Metadata"]["Root Directory"], nucelotide_sequencing_id ) self.project_dir = project_directory - self.url = self.import_data["Workflow Metadata"]["Source URL"] + self.url = self.import_specifications["Workflow Metadata"]["Source URL"] self.data_object_type = "nmdc:DataObject" self.data_object_map = {} self.workflow_execution_ids = {} @@ -61,51 +63,64 @@ def load_yaml_file(self, yaml_file: Union[str, Path]) -> Dict: def build_workflows_by_type(self) -> Dict: """Builds a dictionary of workflows by their type.""" - return {wf["Type"]: wf for wf in self.import_data["Workflows"]} + return {wf["Type"]: wf for wf in self.import_specifications["Workflows"]} def link_sequencing_data_file(self) -> Dict[str, dict]: """ Create a link to the sequencing file if it does not exist. Return a dictionary with the sequencing data object record by md5 checksum. + + Currently only supports a unique sequencing data object of type "Metagenome Raw Reads". """ - sequencing_types = ["Metagenome Raw Reads", ] - sequencing_import_data = [ - d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types + sequencing_import_specifications = [ + d for d in self.import_specifications["Data Objects"]["Unique"] if d["data_object_type"] == self.METAGENOME_RAW_READS ] - sequencing_data = {} + # We can only have one sequencing data object import specification + if len(sequencing_import_specifications) > 1: + raise ValueError("More than one sequencing import specification found") + import_spec = sequencing_import_specifications[0] + + # make the root directory if it does not exist try: os.makedirs(self.root_dir) except FileExistsError: logger.info(f"{self.root_dir} already exists") - for data_object_dict in sequencing_import_data: - for import_file in self.file_list: - import_file = str(import_file) - if re.search(data_object_dict["import_suffix"], import_file): - file_destination_name = object_action( - import_file, - data_object_dict["action"], - self.nucelotide_sequencing_id, - data_object_dict["nmdc_suffix"], - ) - export_file = os.path.join(self.root_dir, file_destination_name) - try: - os.link(import_file, export_file) - logger.info(f"Linked {import_file} to {export_file}") - except FileExistsError: - logger.info(f"{export_file} already exists") - md5 = get_md5(export_file) - sequencing_data[md5] = { - "name": file_destination_name, - "file_size_bytes": os.stat(export_file).st_size, - "md5_checksum": md5, - "data_object_type": data_object_dict["data_object_type"], - "description": data_object_dict["description"].replace( - "{id}", self.nucelotide_sequencing_id - ) - } - return sequencing_data + + # Get the import file that matches the nmdc_suffix given in the data object spec - we can only have one + import_files = [str(f) for f in self.file_list if re.search(import_spec["import_suffix"], str(f))] + if len(import_files) > 1: + raise ValueError("More than one sequencing data object found") + if not import_files: + raise ValueError("No sequencing data object found") + import_file = import_files[0] + + file_destination_name = object_action( + import_file, + import_spec["action"], + self.nucelotide_sequencing_id, + import_spec["nmdc_suffix"], + ) + export_file = os.path.join(self.root_dir, file_destination_name) + try: + os.link(import_file, export_file) + logger.info(f"Linked {import_file} to {export_file}") + except FileExistsError: + logger.info(f"{export_file} already exists") + md5 = get_md5(export_file) + sequencing_data_by_md5 = { + md5: { + "name": file_destination_name, + "file_size_bytes": os.stat(export_file).st_size, + "md5_checksum": md5, + "data_object_type": import_spec["data_object_type"], + "description": import_spec["description"].replace( + "{id}", self.nucelotide_sequencing_id + ) + } + } + return sequencing_data_by_md5 @@ -120,7 +135,7 @@ def map_sequencing_data(self) -> Tuple[nmdc.Database, Dict]: # get the Metagenome Raw Reads import data sequencing_import_data = [ - d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types + d for d in self.import_specifications["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types ] has_output = [] for data_object_dict in sequencing_import_data: @@ -230,9 +245,9 @@ def process_files(files: Union[str, List[str]], data_object_dict: Dict, workflow # Select the correct data source (unique or multiple) data_objects_key = "Unique" if unique else "Multiples" - data_object_specs = self.import_data["Data Objects"][data_objects_key] + data_object_specs = self.import_specifications["Data Objects"][data_objects_key] for data_object_spec in data_object_specs: - if not filter_import_by_type(self.import_data["Workflows"], data_object_spec["output_of"]): + if not filter_import_by_type(self.import_specifications["Workflows"], data_object_spec["output_of"]): continue if not "import_suffix" in data_object_spec: logging.warning("Missing suffix") @@ -269,7 +284,7 @@ def map_workflow_executions(self, db) -> nmdc.Database: section with an 'Execution Resource'. """ - for workflow in self.import_data["Workflows"]: + for workflow in self.import_specifications["Workflows"]: if not workflow.get("Import"): continue logging.info(f"Processing {workflow['Name']}") @@ -302,7 +317,7 @@ def map_workflow_executions(self, db) -> nmdc.Database: "has_output": has_output_list, "git_url": workflow["Git_repo"], "version": workflow["Version"], - "execution_resource": self.import_data["Workflow Metadata"]["Execution Resource"], + "execution_resource": self.import_specifications["Workflow Metadata"]["Execution Resource"], "started_at_time": datetime.datetime.now(pytz.utc).isoformat(), "ended_at_time": datetime.datetime.now(pytz.utc).isoformat(), "was_informed_by": self.nucelotide_sequencing_id