From b2d60bd84825e96ee5955600b2fc3028eb6f04d2 Mon Sep 17 00:00:00 2001 From: Martin Beracochea Date: Fri, 16 Feb 2024 14:25:08 +0000 Subject: [PATCH] A few bug fixes for the metagenomics exchange command and api wrapper. Add support for Assembly and Run based jobs (both). Remove the public check, we only submit public data to the exchange. Normalize argument names in the MGX wrapper. Added some docstrings --- .../populate_metagenomics_exchange.py | 129 ++++++++++++------ emgapi/metagenomics_exchange.py | 91 ++++++++---- 2 files changed, 149 insertions(+), 71 deletions(-) diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py index 8707cd8e9..b5525d163 100644 --- a/emgapi/management/commands/populate_metagenomics_exchange.py +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -15,15 +15,15 @@ # limitations under the License. import logging +from datetime import timedelta from django.conf import settings from django.core.management import BaseCommand -from django.utils import timezone from django.core.paginator import Paginator -from datetime import timedelta +from django.utils import timezone -from emgapi.models import AnalysisJob from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI +from emgapi.models import AnalysisJob logger = logging.getLogger(__name__) @@ -65,7 +65,9 @@ def handle(self, *args, **options): self.dry_run = options.get("dry_run") self.pipeline_version = options.get("pipeline") - self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) + self.mgx_api = MetagenomicsExchangeAPI( + base_url=settings.METAGENOMICS_EXCHANGE_API + ) # never indexed or updated after indexed analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add() @@ -96,63 +98,87 @@ def handle(self, *args, **options): def process_to_index_and_update_records(self, analyses_to_index_and_update): logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses") - for page in Paginator(analyses_to_index_and_update, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_index_and_update, + settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, run_accession=sequence_accession ) registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=annotation_job.run, + metadata=metadata, ) # The job is not registered if not registry_id: - logging.info(f"Add new {ajob}") + logging.info(f"Add new {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no addition to real ME for {ajob}") + logging.info( + f"Dry-mode run: no addition to real ME for {annotation_job}" + ) continue response = self.mgx_api.add_analysis( - mgya=ajob.accession, - run_accession=ajob.run, - public=not ajob.is_private, + mgya=annotation_job.accession, run_accession=annotation_job.run ) if response.ok: - logging.info(f"Successfully added {ajob}") + logging.info(f"Successfully added {annotation_job}") registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run) - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + mgya=annotation_job.accession, + sequence_id=annotation_job.run, + ) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = timezone.now() + timedelta( + minutes=1 + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Error adding {ajob}: {response.message}") + logging.error( + f"Error adding {annotation_job}: {response.message}" + ) # else we have to check if the metadata matches, if not we need to update it else: if not metadata_match: - logging.info(f"Patch existing {ajob}") + logging.info(f"Patch existing {annotation_job}") if self.dry_run: logging.info( - f"Dry-mode run: no patch to real ME for {ajob}" + f"Dry-mode run: no patch to real ME for {annotation_job}" ) continue if self.mgx_api.patch_analysis( - registry_id=registry_id, data=metadata + registry_id=registry_id, data=metadata ): - logging.info(f"Analysis {ajob} updated successfully") + logging.info( + f"Analysis {annotation_job} updated successfully" + ) # Just to be safe, update the MGX accession - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = ( + timezone.now() + timedelta(minutes=1) + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Analysis {ajob} update failed") + logging.error(f"Analysis {annotation_job} update failed") else: - logging.debug(f"No edit for {ajob}, metadata is correct") + logging.debug( + f"No edit for {annotation_job}, metadata is correct" + ) AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed", "mgx_accession"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) def process_to_delete_records(self, analyses_to_delete): @@ -161,36 +187,49 @@ def process_to_delete_records(self, analyses_to_delete): """ logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") - for page in Paginator(analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, run_accession=annotation_job.run ) registry_id, _ = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, ) if registry_id: - logging.info(f"Deleting {ajob}") + logging.info(f"Deleting {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no delete from real ME for {ajob}") + logging.info( + f"Dry-mode run: no delete from real ME for {annotation_job}" + ) continue if self.mgx_api.delete_analysis(registry_id): - logging.info(f"{ajob} successfully deleted") - ajob.last_mgx_indexed = timezone.now() - jobs_to_update.append(ajob) + logging.info(f"{annotation_job} successfully deleted") + annotation_job.last_mgx_indexed = timezone.now() + jobs_to_update.append(annotation_job) else: - logging.info(f"{ajob} failed on delete") + logging.info(f"{annotation_job} failed on delete") else: logging.info( - f"{ajob} doesn't exist in the registry, nothing to delete" + f"{annotation_job} doesn't exist in the registry, nothing to delete" ) # BULK UPDATE # AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py index dd69f5902..79b693571 100644 --- a/emgapi/metagenomics_exchange.py +++ b/emgapi/metagenomics_exchange.py @@ -15,9 +15,10 @@ # limitations under the License. import logging -import requests +import requests from django.conf import settings +from requests.exceptions import HTTPError class MetagenomicsExchangeAPI: @@ -25,7 +26,7 @@ class MetagenomicsExchangeAPI: def __init__(self, base_url=None): self.base_url = base_url or settings.METAGENOMICS_EXCHANGE_API - self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN + self.__token = f"mgx {settings.METAGENOMICS_EXCHANGE_API_TOKEN}" self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER def get_request(self, endpoint: str, params: dict): @@ -68,40 +69,78 @@ def patch_request(self, endpoint: str, data: dict): ) return response - def generate_metadata(self, mgya, run_accession, status): + def generate_metadata(self, mgya, sequence_accession): + """Generate the metadata object for the Metagenomics Exchange API. + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + dict + A dictionary containing metadata for the Metagenomics Exchange API. + """ return { "confidence": "full", "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", "method": ["other_metadata"], "sourceID": mgya, - "sequenceID": run_accession, - "status": status, + "sequenceID": sequence_accession, + "status": "public", "brokerID": self.broker, } - def add_analysis(self, mgya: str, run_accession: str, public: bool): - data = self.generate_metadata(mgya, run_accession, public) + def add_analysis(self, mgya: str, sequence_accession: str): + """Add an analysis to the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + data = self.generate_metadata(mgya, sequence_accession) response = self.post_request(endpoint="datasets", data=data) return response - def check_analysis( - self, source_id: str, sequence_id: str, public=None, metadata=None - ): - logging.info(f"Check {source_id} {sequence_id}") - params = {} - if public: - params = { - "status": "public" if public else "private", - "broker": self.broker, - } - endpoint = f"sequences/{sequence_id}/datasets" + def check_analysis(self, mgya: str, sequence_accesion: str, metadata=None): + """Check if a sequence exists in the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accesion : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + if not mgya: + raise ValueError(f"mgya is mandatory.") + if not sequence_accesion: + raise ValueError(f"sequence_accesion is mandatory.") + + logging.info(f"Checking {mgya} - {sequence_accesion}") + + params = { + "broker": self.broker, + } + + endpoint = f"sequences/{sequence_accesion}/datasets" analysis_registry_id = None metadata_match = True try: response = self.get_request(endpoint=endpoint, params=params) - except: - logging.error(f"Get API request failed") + except HTTPError as http_error: + logging.error(f"Get API request failed. HTTP Error: {http_error}") return analysis_registry_id, metadata_match data = response.json() @@ -109,15 +148,15 @@ def check_analysis( # The API will return an emtpy datasets array if it can find the accession if not len(datasets): - logging.info(f"{source_id} does not exist in ME") + logging.info(f"{mgya} does not exist in ME") return analysis_registry_id, metadata_match sourceIDs = [item.get("sourceID") for item in datasets] - if source_id in sourceIDs: - found_record = [ - item for item in datasets if item.get("sourceID") == source_id - ][0] - logging.info(f"{source_id} exists in ME") + if mgya in sourceIDs: + found_record = [item for item in datasets if item.get("sourceID") == mgya][ + 0 + ] + logging.info(f"{mgya} exists in ME") analysis_registry_id = found_record.get("registryID") if metadata: for metadata_record in metadata: