diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py index 8707cd8e9..b5525d163 100644 --- a/emgapi/management/commands/populate_metagenomics_exchange.py +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -15,15 +15,15 @@ # limitations under the License. import logging +from datetime import timedelta from django.conf import settings from django.core.management import BaseCommand -from django.utils import timezone from django.core.paginator import Paginator -from datetime import timedelta +from django.utils import timezone -from emgapi.models import AnalysisJob from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI +from emgapi.models import AnalysisJob logger = logging.getLogger(__name__) @@ -65,7 +65,9 @@ def handle(self, *args, **options): self.dry_run = options.get("dry_run") self.pipeline_version = options.get("pipeline") - self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) + self.mgx_api = MetagenomicsExchangeAPI( + base_url=settings.METAGENOMICS_EXCHANGE_API + ) # never indexed or updated after indexed analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add() @@ -96,63 +98,87 @@ def handle(self, *args, **options): def process_to_index_and_update_records(self, analyses_to_index_and_update): logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses") - for page in Paginator(analyses_to_index_and_update, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_index_and_update, + settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, run_accession=sequence_accession ) registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=annotation_job.run, + metadata=metadata, ) # The job is not registered if not registry_id: - logging.info(f"Add new {ajob}") + logging.info(f"Add new {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no addition to real ME for {ajob}") + logging.info( + f"Dry-mode run: no addition to real ME for {annotation_job}" + ) continue response = self.mgx_api.add_analysis( - mgya=ajob.accession, - run_accession=ajob.run, - public=not ajob.is_private, + mgya=annotation_job.accession, run_accession=annotation_job.run ) if response.ok: - logging.info(f"Successfully added {ajob}") + logging.info(f"Successfully added {annotation_job}") registry_id, metadata_match = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run) - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + mgya=annotation_job.accession, + sequence_id=annotation_job.run, + ) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = timezone.now() + timedelta( + minutes=1 + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Error adding {ajob}: {response.message}") + logging.error( + f"Error adding {annotation_job}: {response.message}" + ) # else we have to check if the metadata matches, if not we need to update it else: if not metadata_match: - logging.info(f"Patch existing {ajob}") + logging.info(f"Patch existing {annotation_job}") if self.dry_run: logging.info( - f"Dry-mode run: no patch to real ME for {ajob}" + f"Dry-mode run: no patch to real ME for {annotation_job}" ) continue if self.mgx_api.patch_analysis( - registry_id=registry_id, data=metadata + registry_id=registry_id, data=metadata ): - logging.info(f"Analysis {ajob} updated successfully") + logging.info( + f"Analysis {annotation_job} updated successfully" + ) # Just to be safe, update the MGX accession - ajob.mgx_accession = registry_id - ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) - jobs_to_update.append(ajob) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = ( + timezone.now() + timedelta(minutes=1) + ) + jobs_to_update.append(annotation_job) else: - logging.error(f"Analysis {ajob} update failed") + logging.error(f"Analysis {annotation_job} update failed") else: - logging.debug(f"No edit for {ajob}, metadata is correct") + logging.debug( + f"No edit for {annotation_job}, metadata is correct" + ) AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed", "mgx_accession"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) def process_to_delete_records(self, analyses_to_delete): @@ -161,36 +187,49 @@ def process_to_delete_records(self, analyses_to_delete): """ logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") - for page in Paginator(analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER): + for page in Paginator( + analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + ): jobs_to_update = [] - for ajob in page: + for annotation_job in page: + + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + metadata = self.mgx_api.generate_metadata( - mgya=ajob.accession, - run_accession=ajob.run, - status="public" if not ajob.is_private else "private", + mgya=annotation_job.accession, run_accession=annotation_job.run ) registry_id, _ = self.mgx_api.check_analysis( - source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, ) if registry_id: - logging.info(f"Deleting {ajob}") + logging.info(f"Deleting {annotation_job}") if self.dry_run: - logging.info(f"Dry-mode run: no delete from real ME for {ajob}") + logging.info( + f"Dry-mode run: no delete from real ME for {annotation_job}" + ) continue if self.mgx_api.delete_analysis(registry_id): - logging.info(f"{ajob} successfully deleted") - ajob.last_mgx_indexed = timezone.now() - jobs_to_update.append(ajob) + logging.info(f"{annotation_job} successfully deleted") + annotation_job.last_mgx_indexed = timezone.now() + jobs_to_update.append(annotation_job) else: - logging.info(f"{ajob} failed on delete") + logging.info(f"{annotation_job} failed on delete") else: logging.info( - f"{ajob} doesn't exist in the registry, nothing to delete" + f"{annotation_job} doesn't exist in the registry, nothing to delete" ) # BULK UPDATE # AnalysisJob.objects.bulk_update( - jobs_to_update, ["last_mgx_indexed"], batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + jobs_to_update, + ["last_mgx_indexed"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, ) diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py index dd69f5902..79b693571 100644 --- a/emgapi/metagenomics_exchange.py +++ b/emgapi/metagenomics_exchange.py @@ -15,9 +15,10 @@ # limitations under the License. import logging -import requests +import requests from django.conf import settings +from requests.exceptions import HTTPError class MetagenomicsExchangeAPI: @@ -25,7 +26,7 @@ class MetagenomicsExchangeAPI: def __init__(self, base_url=None): self.base_url = base_url or settings.METAGENOMICS_EXCHANGE_API - self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN + self.__token = f"mgx {settings.METAGENOMICS_EXCHANGE_API_TOKEN}" self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER def get_request(self, endpoint: str, params: dict): @@ -68,40 +69,78 @@ def patch_request(self, endpoint: str, data: dict): ) return response - def generate_metadata(self, mgya, run_accession, status): + def generate_metadata(self, mgya, sequence_accession): + """Generate the metadata object for the Metagenomics Exchange API. + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + dict + A dictionary containing metadata for the Metagenomics Exchange API. + """ return { "confidence": "full", "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", "method": ["other_metadata"], "sourceID": mgya, - "sequenceID": run_accession, - "status": status, + "sequenceID": sequence_accession, + "status": "public", "brokerID": self.broker, } - def add_analysis(self, mgya: str, run_accession: str, public: bool): - data = self.generate_metadata(mgya, run_accession, public) + def add_analysis(self, mgya: str, sequence_accession: str): + """Add an analysis to the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + data = self.generate_metadata(mgya, sequence_accession) response = self.post_request(endpoint="datasets", data=data) return response - def check_analysis( - self, source_id: str, sequence_id: str, public=None, metadata=None - ): - logging.info(f"Check {source_id} {sequence_id}") - params = {} - if public: - params = { - "status": "public" if public else "private", - "broker": self.broker, - } - endpoint = f"sequences/{sequence_id}/datasets" + def check_analysis(self, mgya: str, sequence_accesion: str, metadata=None): + """Check if a sequence exists in the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accesion : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + if not mgya: + raise ValueError(f"mgya is mandatory.") + if not sequence_accesion: + raise ValueError(f"sequence_accesion is mandatory.") + + logging.info(f"Checking {mgya} - {sequence_accesion}") + + params = { + "broker": self.broker, + } + + endpoint = f"sequences/{sequence_accesion}/datasets" analysis_registry_id = None metadata_match = True try: response = self.get_request(endpoint=endpoint, params=params) - except: - logging.error(f"Get API request failed") + except HTTPError as http_error: + logging.error(f"Get API request failed. HTTP Error: {http_error}") return analysis_registry_id, metadata_match data = response.json() @@ -109,15 +148,15 @@ def check_analysis( # The API will return an emtpy datasets array if it can find the accession if not len(datasets): - logging.info(f"{source_id} does not exist in ME") + logging.info(f"{mgya} does not exist in ME") return analysis_registry_id, metadata_match sourceIDs = [item.get("sourceID") for item in datasets] - if source_id in sourceIDs: - found_record = [ - item for item in datasets if item.get("sourceID") == source_id - ][0] - logging.info(f"{source_id} exists in ME") + if mgya in sourceIDs: + found_record = [item for item in datasets if item.get("sourceID") == mgya][ + 0 + ] + logging.info(f"{mgya} exists in ME") analysis_registry_id = found_record.get("registryID") if metadata: for metadata_record in metadata: