diff --git a/.gitignore b/.gitignore index 14cd294c5..85316e5ed 100644 --- a/.gitignore +++ b/.gitignore @@ -41,4 +41,4 @@ dumps /config/*.yml /config/*.yaml -!/config/*local* \ No newline at end of file +!/config/*local* diff --git a/ci/configuration.yaml b/ci/configuration.yaml index 3877e4539..1102dc73e 100644 --- a/ci/configuration.yaml +++ b/ci/configuration.yaml @@ -17,4 +17,6 @@ emg: results_path: 'fixtures/' celery_broker: 'redis://localhost:6379/0' celery_backend: 'redis://localhost:6379/1' - results_production_dir: '/dummy/path/results' \ No newline at end of file + results_production_dir: '/dummy/path/results' + # metagenomics exchange + me_api: 'https://wwwdev.ebi.ac.uk/ena/registry/metagenome/api' \ No newline at end of file diff --git a/config/local-tests.yml b/config/local-tests.yml index bf28e5b31..db4297888 100644 --- a/config/local-tests.yml +++ b/config/local-tests.yml @@ -14,4 +14,6 @@ emg: results_path: 'fixtures/' celery_broker: 'redis://localhost:6379/0' celery_backend: 'redis://localhost:6379/1' - results_production_dir: '/dummy/path/results' \ No newline at end of file + results_production_dir: '/dummy/path/results' + # metagenomics exchange + me_api: 'https://wwwdev.ebi.ac.uk/ena/registry/metagenome/api' \ No newline at end of file diff --git a/emgapi/management/commands/ebi_search_analysis_dump.py b/emgapi/management/commands/ebi_search_analysis_dump.py index 39d9fd12d..03edb30d3 100644 --- a/emgapi/management/commands/ebi_search_analysis_dump.py +++ b/emgapi/management/commands/ebi_search_analysis_dump.py @@ -211,6 +211,6 @@ def handle(self, *args, **options): # Small buffer into the future so that the indexing time remains ahead of auto-now updated times. for analysis in page: - analysis.last_indexed = nowish + analysis.last_ebi_search_indexed = nowish - AnalysisJob.objects.bulk_update(page, fields=["last_indexed"]) + AnalysisJob.objects.bulk_update(page, fields=["last_ebi_search_indexed"]) diff --git a/emgapi/management/commands/ebi_search_study_dump.py b/emgapi/management/commands/ebi_search_study_dump.py index 888529262..ee14b91c4 100644 --- a/emgapi/management/commands/ebi_search_study_dump.py +++ b/emgapi/management/commands/ebi_search_study_dump.py @@ -103,6 +103,6 @@ def handle(self, *args, **options): # Small buffer into the future so that the indexing time remains ahead of auto-now updated times. for study in studies: - study.last_indexed = nowish + study.last_ebi_search_indexed = nowish - Study.objects.bulk_update(studies, fields=["last_indexed"]) + Study.objects.bulk_update(studies, fields=["last_ebi_search_indexed"]) diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py new file mode 100644 index 000000000..da7e7c252 --- /dev/null +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2017-2024 EMBL - European Bioinformatics Institute +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from datetime import timedelta + +from django.conf import settings +from django.core.management import BaseCommand +from django.core.paginator import Paginator +from django.utils import timezone + +from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI +from emgapi.models import AnalysisJob + +logger = logging.getLogger(__name__) + +RETRY_COUNT = 5 + + +class Command(BaseCommand): + help = "Check and populate metagenomics exchange (ME)." + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + parser.add_argument( + "-s", + "--study", + required=False, + type=str, + help="Study accession list (rather than all)", + nargs="+", + ) + parser.add_argument( + "-p", + "--pipeline", + help="Pipeline version (rather than all). Not applicable to Genomes.", + action="store", + dest="pipeline", + choices=[1.0, 2.0, 3.0, 4.0, 4.1, 5.0], + required=False, + type=float, + ) + parser.add_argument( + "--dry-run", + action="store_true", + required=False, + help="Dry mode, no population of ME", + ) + + def handle(self, *args, **options): + self.study_accession = options.get("study") + self.dry_run = options.get("dry_run") + self.pipeline_version = options.get("pipeline") + + self.mgx_api = MetagenomicsExchangeAPI( + base_url=settings.METAGENOMICS_EXCHANGE_API + ) + + # never indexed or updated after indexed + analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add() + # suppressed only + analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.to_delete() + + if self.study_accession: + analyses_to_index_and_update = analyses_to_index_and_update.filter( + study__secondary_accession__in=self.study_accession + ) + analyses_to_delete = analyses_to_delete.filter( + study__secondary_accession__in=self.study_accession + ) + + if self.pipeline_version: + analyses_to_index_and_update = analyses_to_index_and_update.filter( + pipeline__release_version=self.pipeline_version + ) + analyses_to_delete = analyses_to_delete.filter( + pipeline__release_version=self.pipeline_version + ) + + self.process_to_index_and_update_records(analyses_to_index_and_update) + self.process_to_delete_records(analyses_to_delete) + + logging.info("Done") + + def process_to_index_and_update_records(self, analyses_to_index_and_update): + logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses") + + for page in Paginator( + analyses_to_index_and_update, + settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ): + jobs_to_update = [] + for annotation_job in page: + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + + metadata = self.mgx_api.generate_metadata( + mgya=annotation_job.accession, sequence_accession=sequence_accession + ) + registry_id, metadata_match = self.mgx_api.check_analysis( + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, + ) + # The job is not registered + if not registry_id: + logging.info(f"Add new {annotation_job}") + if self.dry_run: + logging.info( + f"Dry-mode run: no addition to real ME for {annotation_job}" + ) + continue + + response = self.mgx_api.add_analysis( + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + ) + if response.ok: + logging.info(f"Successfully added {annotation_job}") + registry_id, metadata_match = self.mgx_api.check_analysis( + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + ) + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = timezone.now() + timedelta( + minutes=1 + ) + jobs_to_update.append(annotation_job) + else: + logging.error( + f"Error adding {annotation_job}: {response.message}" + ) + + # else we have to check if the metadata matches, if not we need to update it + else: + if not metadata_match: + logging.info(f"Patch existing {annotation_job}") + if self.dry_run: + logging.info( + f"Dry-mode run: no patch to real ME for {annotation_job}" + ) + continue + if self.mgx_api.patch_analysis( + registry_id=registry_id, data=metadata + ): + logging.info( + f"Analysis {annotation_job} updated successfully" + ) + # Just to be safe, update the MGX accession + annotation_job.mgx_accession = registry_id + annotation_job.last_mgx_indexed = ( + timezone.now() + timedelta(minutes=1) + ) + jobs_to_update.append(annotation_job) + else: + logging.error(f"Analysis {annotation_job} update failed") + else: + logging.debug( + f"No edit for {annotation_job}, metadata is correct" + ) + + AnalysisJob.objects.bulk_update( + jobs_to_update, + ["last_mgx_indexed", "mgx_accession"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ) + + def process_to_delete_records(self, analyses_to_delete): + """ + This function removes suppressed records from ME. + """ + logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") + + for page in Paginator( + analyses_to_delete, settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER + ): + jobs_to_update = [] + + for annotation_job in page: + sequence_accession = "" + if annotation_job.run: + sequence_accession = annotation_job.run.accession + if annotation_job.assembly: + sequence_accession = annotation_job.assembly.accession + + metadata = self.mgx_api.generate_metadata( + mgya=annotation_job.accession, sequence_accession=sequence_accession + ) + registry_id, _ = self.mgx_api.check_analysis( + mgya=annotation_job.accession, + sequence_accession=sequence_accession, + metadata=metadata, + ) + if registry_id: + logging.info(f"Deleting {annotation_job}") + if self.dry_run: + logging.info( + f"Dry-mode run: no delete from real ME for {annotation_job}" + ) + continue + + if self.mgx_api.delete_analysis(registry_id): + logging.info(f"{annotation_job} successfully deleted") + annotation_job.last_mgx_indexed = timezone.now() + jobs_to_update.append(annotation_job) + else: + logging.info(f"{annotation_job} failed on delete") + else: + logging.info( + f"{annotation_job} doesn't exist in the registry, nothing to delete" + ) + + # BULK UPDATE # + AnalysisJob.objects.bulk_update( + jobs_to_update, + ["last_mgx_indexed"], + batch_size=settings.METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER, + ) diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py new file mode 100644 index 000000000..55ad381a3 --- /dev/null +++ b/emgapi/metagenomics_exchange.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2018-2024 EMBL - European Bioinformatics Institute +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import requests +from django.conf import settings +from requests.exceptions import HTTPError + + +class MetagenomicsExchangeAPI: + """Metagenomics Exchange API Client""" + + def __init__(self, base_url=None): + self.base_url = base_url or settings.METAGENOMICS_EXCHANGE_API + self.__token = f"mgx {settings.METAGENOMICS_EXCHANGE_API_TOKEN}" + self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER + + def get_request(self, endpoint: str, params: dict): + """Make a GET request, returns the response""" + headers = {"Accept": "application/json", "Authorization": self.__token} + response = requests.get( + f"{self.base_url}/{endpoint}", headers=headers, params=params + ) + response.raise_for_status() + return response + + def post_request(self, endpoint: str, data: dict): + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": self.__token, + } + response = requests.post( + f"{self.base_url}/{endpoint}", json=data, headers=headers + ) + response.raise_for_status() + return response + + def delete_request(self, endpoint: str): + headers = { + "Accept": "application/json", + "Authorization": self.__token, + } + response = requests.delete(f"{self.base_url}/{endpoint}", headers=headers) + return response + + def patch_request(self, endpoint: str, data: dict): + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": self.__token, + } + response = requests.patch( + f"{self.base_url}/{endpoint}", json=data, headers=headers + ) + return response + + def generate_metadata(self, mgya, sequence_accession): + """Generate the metadata object for the Metagenomics Exchange API. + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + dict + A dictionary containing metadata for the Metagenomics Exchange API. + """ + return { + "confidence": "full", + "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", + "method": ["other_metadata"], + "sourceID": mgya, + "sequenceID": sequence_accession, + "status": "public", + "brokerID": self.broker, + } + + def add_analysis(self, mgya: str, sequence_accession: str): + """Add an analysis to the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + requests.models.Response + The response object from the API request. + """ + data = self.generate_metadata(mgya, sequence_accession) + try: + response = self.post_request(endpoint="datasets", data=data) + except HTTPError as http_error: + try: + response_json = http_error.response.json() + logging.error(f"API response content: {response_json}") + except: + pass + raise http_error + return response + + def check_analysis(self, mgya: str, sequence_accession: str, metadata=None): + """Check if a sequence exists in the M. Exchange + + Parameters: + mgya : str + The MGnify Analysis accession. + sequence_accession : str + Either the Run accession or the Assembly accession related to the MGYA. + + Returns: + tuple + A tuple containing two elements: + - analysis_registry_id : str + The analysis registry ID. + - metadata_match : boolean + True, if the metadata matchs. + """ + if not mgya: + raise ValueError(f"mgya is mandatory.") + if not sequence_accession: + raise ValueError(f"sequence_accession is mandatory.") + + logging.info(f"Checking {mgya} - {sequence_accession}") + + params = { + "broker": self.broker, + } + + endpoint = f"sequences/{sequence_accession}/datasets" + analysis_registry_id = None + metadata_match = False + + try: + response = self.get_request(endpoint=endpoint, params=params) + except HTTPError as http_error: + logging.error(f"Get API request failed. HTTP Error: {http_error}") + try: + response_json = http_error.response.json() + logging.error(f"API response content: {response_json}") + except: + pass + return analysis_registry_id, metadata_match + + data = response.json() + datasets = data.get("datasets", []) + + # The API will return an emtpy datasets array if it can find the accession + if not len(datasets): + logging.info(f"{mgya} does not exist in ME") + return analysis_registry_id, metadata_match + + # TODO: this code needs some refactoring to improve it: + """ + try: + found_record = next(item for item in datasets if item.get("sourceID") == mgya) + except StopIteration + ... + """ + sourceIDs = [item.get("sourceID") for item in datasets] + if mgya in sourceIDs: + found_record = [item for item in datasets if item.get("sourceID") == mgya][ + 0 + ] + logging.info(f"{mgya} exists in ME") + analysis_registry_id = found_record.get("registryID") + if not analysis_registry_id: + raise ValueError(f"The Metagenomics Exchange 'registryID' for {mgya} is null.") + + if metadata: + for metadata_record in metadata: + if not (metadata_record in found_record): + return analysis_registry_id, False + else: + if metadata[metadata_record] != found_record[metadata_record]: + metadata_match = False + logging.info( + f"The metadata doesn't match, for field {metadata[metadata_record]} != {found_record[metadata_record]})" + ) + else: + metadata_match = True + return analysis_registry_id, metadata_match + return analysis_registry_id, metadata_match + + return analysis_registry_id, metadata_match + + def delete_analysis(self, registry_id: str): + """Delete an entry from the registry""" + response = self.delete_request(endpoint=f"datasets/{registry_id}") + if response.ok: + logging.info(f"{registry_id} was deleted with {response.status_code}") + return True + else: + if response.status_code == 400: + logging.error(f"Bad request for {registry_id}") + elif response.status_code == 404: + logging.error(f"{registry_id} not found") + elif response.status_code == 401: + logging.error(f"Failed to authenticate for {registry_id}") + else: + logging.error( + f"Deleted failed for {registry_id}, response message: {response.message}" + ) + return False + + def patch_analysis(self, registry_id: str, data: dict): + """Patch an entry on the registry""" + response = self.patch_request(endpoint=f"datasets/{registry_id}", data=data) + if response.ok: + logging.info(f"{registry_id} was patched") + return True + else: + if response.status_code == 400: + logging.error(f"Bad request for {registry_id}") + elif response.status_code == 401: + logging.error(f"Fail to authenticate for {registry_id}") + elif response.status_code == 409: + logging.error(f"Conflicts with existing data for {registry_id}") + else: + logging.error( + f"Patch failed for {registry_id}, response message: {response.message}" + ) + return False diff --git a/emgapi/migrations/0017_auto_20240129_1401.py b/emgapi/migrations/0017_auto_20240129_1401.py new file mode 100644 index 000000000..a45176c9e --- /dev/null +++ b/emgapi/migrations/0017_auto_20240129_1401.py @@ -0,0 +1,43 @@ +# Generated by Django 3.2.18 on 2024-01-29 14:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('emgapi', '0016_auto_20240117_1757'), + ] + + operations = [ + migrations.AlterField( + model_name='analysisjob', + name='last_indexed', + field=models.DateTimeField(blank=True, db_column='LAST_EBI_SEARCH_INDEXED', help_text='Date at which this model was last included in an EBI Search initial/incremental index.', null=True), + ), + migrations.AlterField( + model_name='study', + name='last_indexed', + field=models.DateTimeField(blank=True, db_column='LAST_EBI_SEARCH_INDEXED', help_text='Date at which this model was last included in an EBI Search initial/incremental index.', null=True), + ), + migrations.RenameField( + model_name='analysisjob', + old_name='last_indexed', + new_name='last_ebi_search_indexed', + ), + migrations.RenameField( + model_name='study', + old_name='last_indexed', + new_name='last_ebi_search_indexed', + ), + migrations.AddField( + model_name='analysisjob', + name='last_mgx_indexed', + field=models.DateTimeField(blank=True, db_column='LAST_MGX_INDEXED', help_text='Date at which this model was last indexed in the Metagenomics Exchange', null=True), + ), + migrations.AddField( + model_name='analysisjob', + name='mgx_accession', + field=models.CharField(blank=True, db_column='MGX_ACCESSION', help_text='The Metagenomics Exchange accession.', max_length=10, null=True, unique=True), + ), + ] diff --git a/emgapi/models.py b/emgapi/models.py index 9d0830c04..31db68bb3 100644 --- a/emgapi/models.py +++ b/emgapi/models.py @@ -279,7 +279,17 @@ class Meta: abstract = True -class EbiSearchIndexQueryset(models.QuerySet): +class IndexableModel(models.Model): + last_update = models.DateTimeField( + db_column='LAST_UPDATE', + auto_now=True + ) + + class Meta: + abstract = True + + +class IndexableModelQueryset(models.QuerySet): """ to_delete: Objects that have been suppressed since they were last indexed, or that have been indexed but updated since. @@ -288,7 +298,7 @@ class EbiSearchIndexQueryset(models.QuerySet): or that have been indexed but updated since. """ def to_delete(self): - updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False) + updated_after_indexing = Q(last_update__gte=F(self.index_field), **{f"{self.index_field}__isnull": False}) try: self.model._meta.get_field("suppressed_at") @@ -298,12 +308,12 @@ def to_delete(self): ) else: return self.filter( - Q(suppressed_at__gte=F("last_indexed")) | updated_after_indexing + Q(suppressed_at__gte=F(self.index_field)) | updated_after_indexing ) def to_add(self): - updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False) - never_indexed = Q(last_indexed__isnull=True) + updated_after_indexing = Q(last_update__gte=F(self.index_field), **{f"{self.index_field}__isnull": False}) + never_indexed = Q(**{f"{self.index_field}__isnull": True}) try: self.model._meta.get_field("is_suppressed") @@ -322,19 +332,64 @@ def to_add(self): return self.filter(never_indexed | updated_after_indexing, not_suppressed, not_private) -class EbiSearchIndexedModel(models.Model): - last_update = models.DateTimeField( - db_column='LAST_UPDATE', - auto_now=True - ) - last_indexed = models.DateTimeField( - db_column='LAST_INDEXED', +class EBISearchIndexQueryset(IndexableModelQueryset): + + index_field = "last_ebi_search_indexed" + + +class EBISearchIndexedModel(IndexableModel): + + last_ebi_search_indexed = models.DateTimeField( + db_column='LAST_EBI_SEARCH_INDEXED', null=True, blank=True, help_text="Date at which this model was last included in an EBI Search initial/incremental index." ) - objects_for_indexing = EbiSearchIndexQueryset.as_manager() + objects_for_ebisearch_indexing = EBISearchIndexQueryset.as_manager() + + class Meta: + abstract = True + + +class MetagenomicsExchangeQueryset(IndexableModelQueryset): + + index_field = "last_mgx_indexed" + + def to_delete(self): + try: + self.model._meta.get_field("suppressed_at") + except FieldDoesNotExist: + return Q() + else: + return self.filter( + Q(suppressed_at__gte=F(self.index_field)) + ) + + +class MetagenomicsExchangeIndexedModel(models.Model): + """Model to track Metagenomics Exchange indexation of analysis jobs + TODO: this model should have the last_update field as it's a requirement. + The current implementation of this works because the analysis jobs are + also extending the EBISearchIndexable model which provided the + last_update field. + """ + last_mgx_indexed = models.DateTimeField( + db_column='LAST_MGX_INDEXED', + null=True, + blank=True, + help_text="Date at which this model was last indexed in the Metagenomics Exchange" + ) + mgx_accession = models.CharField( + db_column='MGX_ACCESSION', + max_length=10, + unique=True, + null=True, + blank=True, + help_text="The Metagenomics Exchange accession." + ) + + objects_for_mgx_indexing = MetagenomicsExchangeQueryset.as_manager() class Meta: abstract = True @@ -1026,7 +1081,7 @@ def mydata(self, request): return self.get_queryset().mydata(request) -class Study(ENASyncableModel, EbiSearchIndexedModel): +class Study(ENASyncableModel, EBISearchIndexedModel): suppressible_descendants = ['samples', 'runs', 'assemblies', 'analyses'] def __init__(self, *args, **kwargs): @@ -1060,8 +1115,6 @@ def _custom_pk(self): db_column='AUTHOR_EMAIL', max_length=100, blank=True, null=True) author_name = models.CharField( db_column='AUTHOR_NAME', max_length=100, blank=True, null=True) - last_update = models.DateTimeField( - db_column='LAST_UPDATE', auto_now=True) submission_account_id = models.CharField( db_column='SUBMISSION_ACCOUNT_ID', max_length=15, blank=True, null=True) @@ -1694,8 +1747,7 @@ def get_queryset(self): .select_related( 'analysis_status','experiment_type', 'assembly', 'pipeline', 'run', 'sample', 'study') - -class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EbiSearchIndexedModel): +class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EBISearchIndexedModel, MetagenomicsExchangeIndexedModel): def __init__(self, *args, **kwargs): super(AnalysisJob, self).__init__(*args, **kwargs) setattr(self, 'accession', @@ -2273,4 +2325,4 @@ def __str__(self): return f"Legacy Assembly:{self.legacy_accession} - New Accession:{self.new_accession}" models.CharField.register_lookup(Search) -models.TextField.register_lookup(Search) +models.TextField.register_lookup(Search) \ No newline at end of file diff --git a/emgapi/serializers.py b/emgapi/serializers.py index 0079a0c66..0e1a236ff 100644 --- a/emgapi/serializers.py +++ b/emgapi/serializers.py @@ -1021,7 +1021,8 @@ class Meta: 'is_suppressed', 'suppressed_at', 'suppression_reason', - 'last_indexed' + 'last_ebi_search_indexed', + 'last_mgx_indexed', ) @@ -1410,7 +1411,7 @@ class Meta: 'is_suppressed', 'suppression_reason', 'suppressed_at', - 'last_indexed', + 'last_ebi_search_indexed', ) diff --git a/emgcli/__init__.py b/emgcli/__init__.py index 57d8f469a..43e1294ff 100644 --- a/emgcli/__init__.py +++ b/emgcli/__init__.py @@ -1 +1 @@ -__version__: str = "2.4.44" +__version__: str = "2.4.45" diff --git a/emgcli/settings.py b/emgcli/settings.py index 4fb645d91..c778d1f5a 100644 --- a/emgcli/settings.py +++ b/emgcli/settings.py @@ -682,3 +682,16 @@ def create_secret_key(var_dir): os.environ['ENA_API_USER'] = EMG_CONF['emg']['ena_api_user'] if 'ena_api_password' in EMG_CONF['emg']: os.environ['ENA_API_PASSWORD'] = EMG_CONF['emg']['ena_api_password'] + +# Metagenomics Exchange +METAGENOMICS_EXCHANGE_MGNIFY_BROKER = "EMG" +METAGENOMICS_EXCHANGE_API = "" +METAGENOMICS_EXCHANGE_API_TOKEN = "" +METAGENOMICS_EXCHANGE_PAGINATOR_NUMBER = 100 +try: + METAGENOMICS_EXCHANGE_API = EMG_CONF['emg']['me_api'] + METAGENOMICS_EXCHANGE_API_TOKEN = os.getenv('METAGENOMICS_EXCHANGE_API_TOKEN') + if not METAGENOMICS_EXCHANGE_API_TOKEN: + METAGENOMICS_EXCHANGE_API_TOKEN = EMG_CONF['emg']['me_api_token'] +except KeyError: + warnings.warn("The metagenomics exchange API and Token are not configured properly") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index cd268fd7b..4407d26c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,7 +117,7 @@ max-line-length = 119 """ [tool.bumpversion] -current_version = "2.4.44" +current_version = "2.4.45" [[tool.bumpversion.files]] filename = "emgcli/__init__.py" diff --git a/tests/me/test_metagenomics_exchange.py b/tests/me/test_metagenomics_exchange.py new file mode 100644 index 000000000..05148d497 --- /dev/null +++ b/tests/me/test_metagenomics_exchange.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pytest +import requests +import responses +from django.conf import settings + +from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI + + +class TestME: + + @responses.activate + def test_check_existing_analysis_me(self, settings): + # FIXME: this test doesn't check if the metadata matches or not. + mgya = "MGYA00293719" + sequence_accession = "ERR3063408" + responses.add( + responses.GET, + f"{settings.METAGENOMICS_EXCHANGE_API}/sequences/{sequence_accession}/datasets", + json={"datasets": [{"sourceID": mgya, "registryID": "MGX_FAKE"}]}, + status=200, + ) + me_api = MetagenomicsExchangeAPI() + + registry_id, _ = me_api.check_analysis(mgya, sequence_accession) + assert registry_id == "MGX_FAKE" + + @responses.activate + def test_check_not_existing_analysis_me(self): + mgya = "MGYA10293719" + sequence_accession = "ERR3063408" + responses.add( + responses.GET, + f"{settings.METAGENOMICS_EXCHANGE_API}/sequences/{sequence_accession}/datasets", + json={"datasets": []}, + status=200, + ) + me_api = MetagenomicsExchangeAPI() + return_values = me_api.check_analysis(mgya, sequence_accession) + assert not return_values[0] + + @pytest.mark.skip(reason="Error on ME API side") + def test_post_existing_analysis_me(self): + me_api = MetagenomicsExchangeAPI() + source_id = "MGYA00293719" + # Should return -> https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409 + with pytest.raises(requests.HTTPError, match="401 Client Error"): + me_api.add_analysis(mgya=source_id, sequence_accession="ERR3063408").json() + + @responses.activate + def test_mock_post_new_analysis(self): + me_api = MetagenomicsExchangeAPI() + endpoint = "datasets" + url = settings.METAGENOMICS_EXCHANGE_API + f"/{endpoint}" + + responses.add(responses.POST, url, json={"success": True}, status=201) + + response = me_api.add_analysis( + mgya="MGYA00593709", sequence_accession="SRR3960575" + ) + + assert response.status_code == 201 + assert response.json() == {"success": True} + + @responses.activate + def test_mock_delete_analysis_from_me(self): + me_api = MetagenomicsExchangeAPI() + registry_id = "MGX0000780" + endpoint = f"datasets/{registry_id}" + url = settings.METAGENOMICS_EXCHANGE_API + f"/{endpoint}" + + responses.add(responses.DELETE, url, json={"success": True}, status=201) + response = me_api.delete_request(endpoint) + + assert response.status_code == 201 + assert response.json() == {"success": True} + + @responses.activate + def test_incorrect_delete_request_me(self): + # TODO: this test doesn't make much sense + me_api = MetagenomicsExchangeAPI() + responses.add( + responses.DELETE, + f"{settings.METAGENOMICS_EXCHANGE_API}/dataset/MGX0000780", + status=401, + ) + registry_id = "MGX0000780" + endpoint = f"dataset/{registry_id}" + response = me_api.delete_request(endpoint) + assert response.status_code == 401 + + @responses.activate + def test_patch_analysis_me(self): + me_api = MetagenomicsExchangeAPI() + + registry_id = "MGX0000788" + mgya = "MGYA00593709" + run_accession = "SRR3960575" + data = { + "confidence": "full", + "endPoint": f"https://www.ebi.ac.uk/metagenomics/analyses/{mgya}", + "method": ["other_metadata"], + "sourceID": mgya, + "sequenceID": run_accession, + "status": "public", + "brokerID": "EMG", + } + + responses.add( + responses.PATCH, + f"{settings.METAGENOMICS_EXCHANGE_API}/datasets/{registry_id}", + status=200, + ) + + assert me_api.patch_analysis(registry_id, data) diff --git a/tests/me/test_populate_metagenomics_exchange.py b/tests/me/test_populate_metagenomics_exchange.py new file mode 100644 index 000000000..d10083ac1 --- /dev/null +++ b/tests/me/test_populate_metagenomics_exchange.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2020 EMBL - European Bioinformatics Institute +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from django.core.management import call_command +from django.utils import timezone + +from test_utils.emg_fixtures import * # noqa + +from emgapi.models import AnalysisJob + + +@pytest.mark.django_db +class TestPopulateMeAPI: + @pytest.mark.usefixtures("run_multiple_analysis_me") + def test_population_dry_mode(self, caplog): + """ + 2 of 4 analyses require indexing, both are not in ME API + 1 of 4 needs to be deleted because it was suppressed + 1 was indexed after updated - no action needed + """ + call_command( + "populate_metagenomics_exchange", + dry_run=True, + ) + assert "Indexing 2 new analyses" in caplog.text + assert "Dry-mode run: no addition to real ME for MGYA00466090" in caplog.text + assert "Dry-mode run: no addition to real ME for MGYA00466091" in caplog.text + assert "Processing 1 analyses to remove" in caplog.text + assert ( + "MGYA00005678 doesn't exist in the registry, nothing to delete" + in caplog.text + ) + + @pytest.mark.usefixtures("run_multiple_analysis_me") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.add_analysis") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") + def test_add_new_analysis(self, mock_check_analysis, mock_add_analysis, caplog): + """ + Test checks new added analysis that was not indexed before, It should be added to ME. + Post process is mocked. + AnalysisJob should have updated indexed field and assigned MGX + """ + pipeline = 4.1 + registry_id = "MGX1" + + class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + self.ok = True + + def json(self): + return self.json_data + + def mock_check_process(*args, **kwargs): + if "metadata" in kwargs: + return None, True + else: + return registry_id, True + + mock_add_analysis.return_value = MockResponse({}, 200) + mock_check_analysis.side_effect = mock_check_process + + call_command( + "populate_metagenomics_exchange", + pipeline=pipeline, + ) + assert "Indexing 1 new analyses" in caplog.text + assert "Processing 0 analyses to remove" in caplog.text + assert "Successfully added MGYA00466090" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed + assert ajob.mgx_accession == registry_id + + @pytest.mark.usefixtures("run_multiple_analysis_me") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.delete_analysis") + def test_removals(self, mock_delete_analysis, mock_check_analysis, caplog): + """ + Test delete process. + 1 analysis should be removed and updated indexed field in DB + """ + pipeline = 4.0 + mock_check_analysis.return_value = True, True + mock_delete_analysis.return_value = True + + call_command("populate_metagenomics_exchange", pipeline=pipeline) + assert "Indexing 0 new analyses" in caplog.text + assert "Processing 1 analyses to remove" in caplog.text + assert "Deleting MGYA00005678" in caplog.text + assert "MGYA00005678 successfully deleted" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed.date() == timezone.now().date() + + @pytest.mark.usefixtures("run_multiple_analysis_me") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.patch_analysis") + def test_update(self, mock_patch_analysis, mock_check_analysis, caplog): + """ + Test update process for job that was indexed before updated. + MGX accession and last_mgx_indexed should be updated + """ + pipeline = 5.0 + registry_id = "MGX2" + mock_check_analysis.return_value = registry_id, False + mock_patch_analysis.return_value = True + call_command( + "populate_metagenomics_exchange", + pipeline=pipeline, + ) + + assert "Indexing 1 new analyses" in caplog.text + assert "Processing 0 analyses to remove" in caplog.text + assert "Patch existing MGYA00466091" in caplog.text + assert "Analysis MGYA00466091 updated successfully" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed.date() == timezone.now().date() + assert ajob.mgx_accession == registry_id diff --git a/tests/test_utils/emg_fixtures.py b/tests/test_utils/emg_fixtures.py index f8275c9a8..5719b506b 100644 --- a/tests/test_utils/emg_fixtures.py +++ b/tests/test_utils/emg_fixtures.py @@ -36,8 +36,9 @@ 'ena_private_studies', 'ena_suppressed_studies', 'ena_public_runs', 'ena_private_runs', 'ena_suppressed_runs', 'ena_public_samples', 'ena_private_samples', 'ena_suppressed_samples', 'ena_public_assemblies', 'ena_private_assemblies', 'ena_suppressed_assemblies', - 'assembly_extra_annotation', 'ena_suppression_propagation_studies', 'ena_suppression_propagation_runs', 'ena_suppression_propagation_samples', 'ena_suppression_propagation_assemblies', + 'assembly_extra_annotation', 'ena_suppression_propagation_studies', 'ena_suppression_propagation_runs', + 'suppressed_analysis_jobs', 'run_multiple_analysis_me' ] @@ -1093,3 +1094,119 @@ def ena_suppression_propagation_assemblies(experiment_type_assembly, study): study=study, ) return assemblies + + +def make_suppressed_analysis_jobs(quantity, emg_props=None): + emg_props = emg_props or {} + analyses = baker.make(emg_models.AnalysisJob, _quantity=quantity, **emg_props) + return analyses + + +@pytest.fixture +def suppressed_analysis_jobs(ena_suppressed_runs): + suppressed_analysisjobs = make_suppressed_analysis_jobs(quantity=5, + emg_props={"is_suppressed": True, + "suppressed_at": '1980-01-01 00:00:00', + 'last_mgx_indexed': '1970-01-01 00:00:00'}) + return suppressed_analysisjobs + + +@pytest.fixture +def run_multiple_analysis_me(study, sample, analysis_status, + experiment_type): + """ + Run: ERR1806500 + MGYA0000147343: pipeline v1: indexed after created - no action needed + MGYA0000005678: pipeline v4.0: suppressed - delete from ME + MGYA0000466090: pipeline v4.1: never indexed - add to ME + MGYA0000466091: pipeline v5: update in ME + """ + pipeline, created = emg_models.Pipeline.objects.get_or_create( + pk=1, + release_version='1.0', + release_date='2000-01-01', + ) + pipeline4, created4 = emg_models.Pipeline.objects.get_or_create( + pk=4, + release_version='4.0', + release_date='2015-01-01', + ) + pipeline4_1, created4_1 = emg_models.Pipeline.objects.get_or_create( + pk=5, + release_version='4.1', + release_date='2016-01-01', + ) + pipeline5, created5 = emg_models.Pipeline.objects.get_or_create( + pk=6, + release_version='5.0', + release_date='2020-01-01', + ) + run = emg_models.Run.objects.create( + run_id=111, + accession='ERR1806500', + sample=sample, + study=study, + is_private=False, + experiment_type=experiment_type + ) + _anl1 = emg_models.AnalysisJob.objects.create( + job_id=147343, + sample=sample, + study=study, + run=run, + is_private=False, + experiment_type=experiment_type, + pipeline=pipeline, + analysis_status=analysis_status, + input_file_name='ABC_FASTQ', + result_directory='test_data/version_1.0/ABC_FASTQ', + submit_time='1970-01-01 00:00:00', + last_mgx_indexed='2970-01-01 20:00:00', + is_suppressed=False, + ) + _anl4 = emg_models.AnalysisJob.objects.create( + job_id=5678, + sample=sample, + study=study, + run=run, + is_private=False, + experiment_type=experiment_type, + pipeline=pipeline4, + analysis_status=analysis_status, + input_file_name='ABC_FASTQ', + result_directory='test_data/version_4.0/ABC_FASTQ', + submit_time='1970-01-01 00:00:00', + last_mgx_indexed='1970-01-01 20:00:00', + is_suppressed=True, + suppressed_at='1980-01-01 20:00:00', + ) + _anl5 = emg_models.AnalysisJob.objects.create( + job_id=466090, + sample=sample, + study=study, + run=run, + is_private=False, + experiment_type=experiment_type, + pipeline=pipeline4_1, + analysis_status=analysis_status, + input_file_name='ABC_FASTQ', + result_directory='test_data/version_5.0/ABC_FASTQ', + submit_time='2020-01-01 00:00:00', + is_suppressed=False, + ) + _anl51 = emg_models.AnalysisJob.objects.create( + job_id=466091, + sample=sample, + study=study, + run=run, + is_private=False, + experiment_type=experiment_type, + pipeline=pipeline5, + analysis_status=analysis_status, + input_file_name='ABC_FASTQ', + result_directory='test_data/version_5.0/ABC_FASTQ', + submit_time='2020-01-01 00:00:00', + last_mgx_indexed='2020-01-01 20:00:00', + is_suppressed=False, + ) + return (_anl1, _anl4, _anl5, _anl51)