diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py index 14f340a64..d1e13f1f8 100644 --- a/emgapi/management/commands/populate_metagenomics_exchange.py +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -15,10 +15,10 @@ # limitations under the License. import logging -import responses from django.conf import settings from django.core.management import BaseCommand +from django.utils import timezone from emgapi.models import AnalysisJob from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI @@ -27,8 +27,10 @@ RETRY_COUNT = 5 + class Command(BaseCommand): help = "Check and populate metagenomics exchange (ME)." + def add_arguments(self, parser): super(Command, self).add_arguments(parser) parser.add_argument( @@ -37,7 +39,7 @@ def add_arguments(self, parser): required=False, type=str, help="Study accession list (rather than all)", - nargs='+', + nargs="+", ) parser.add_argument( "-p", @@ -61,12 +63,6 @@ def add_arguments(self, parser): required=False, help="Dry mode, no population of ME", ) - # TODO: do I need it? - parser.add_argument( - "--full", - action="store_true", - help="Do a full check of DB", - ) def generate_metadata(self, mgya, run_accession, status): return { @@ -76,74 +72,103 @@ def generate_metadata(self, mgya, run_accession, status): "sourceID": mgya, "sequenceID": run_accession, "status": status, - "brokerID": settings.MGNIFY_BROKER, + "brokerID": settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER, } def handle(self, *args, **options): self.study_accession = options.get("study") self.dry_run = options.get("dry_run") self.pipeline_version = options.get("pipeline") - if options.get("dev"): - base_url = settings.ME_API_DEV - else: - base_url = settings.ME_API - ME = MetagenomicsExchangeAPI(base_url=base_url) - - new_analyses = AnalysisJob.objects_for_population.to_add() - removals = AnalysisJob.objects_for_population.to_delete() + + mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) + + analyses_to_index = AnalysisJob.objects_for_mgx_indexing.to_add() + analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.to_delete() + if self.study_accession: - new_analyses = new_analyses.filter(study__secondary_accession__in=self.study_accession) - removals = removals.filter(study__secondary_accession__in=self.study_accession) + analyses_to_index = analyses_to_index.filter( + study__secondary_accession__in=self.study_accession + ) + analyses_to_delete = analyses_to_delete.filter( + study__secondary_accession__in=self.study_accession + ) + if self.pipeline_version: - new_analyses = new_analyses.filter(pipeline__pipeline_id=self.pipeline_version) - removals = removals.filter(pipeline__pipeline_id=self.pipeline_version) - logging.info(f"Processing {len(new_analyses)} new analyses") - for ajob in new_analyses: - metadata = self.generate_metadata(mgya=ajob.accession, run_accession=ajob.run, - status="public" if not ajob.is_private else "private") - registryID, metadata_match = ME.check_analysis(source_id=ajob.accession, sequence_id=ajob.run, - metadata=metadata) - if not registryID: + analyses_to_index = analyses_to_index.filter( + pipeline__pipeline_id=self.pipeline_version + ) + analyses_to_delete = analyses_to_delete.filter(pipeline__pipeline_id=self.pipeline_version) + + logging.info(f"Indexig {len(analyses_to_index)} new analyses") + + jobs_to_update = [] + + for ajob in analyses_to_index: + metadata = self.generate_metadata( + mgya=ajob.accession, + run_accession=ajob.run, + status="public" if not ajob.is_private else "private", + ) + registry_id, metadata_match = mgx_api.check_analysis( + source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + ) + # The job is not registered + if not registry_id: logging.debug(f"Add new {ajob}") - if not self.dry_run: - response = ME.add_analysis(mgya=ajob.accession, run_accession=ajob.run, public=not ajob.is_private) - if response.ok: - logging.debug(f"Added {ajob}") - else: - logging.debug(f"Error adding {ajob}: {response.message}") - else: + if self.dry_run: logging.info(f"Dry-mode run: no addition to real ME for {ajob}") + continue + + response = mgx_api.add_analysis( + mgya=ajob.accession, + run_accession=ajob.run, + public=not ajob.is_private, + ) + if response.ok: + logging.debug(f"Added {ajob}") + ajob.last_mgx_indexed = timezone.now() + jobs_to_update.append(ajob) + else: + logging.error(f"Error adding {ajob}: {response.message}") + + # else we have to check if the metadata matches, if not we need to update it else: if not metadata_match: logging.debug(f"Patch existing {ajob}") - if not self.dry_run: - if ME.patch_analysis(registry_id=registryID, data=metadata): - logging.info(f"Analysis {ajob} updated successfully") - else: - logging.info(f"Analysis {ajob} update failed") - else: + if self.dry_run: logging.info(f"Dry-mode run: no patch to real ME for {ajob}") + continue + if mgx_api.patch_analysis(registry_id=registry_id, data=metadata): + logging.info(f"Analysis {ajob} updated successfully") + ajob.last_mgx_indexed = timezone.now() + jobs_to_update.append(ajob) + else: + logging.error(f"Analysis {ajob} update failed") else: logging.debug(f"No edit for {ajob}, metadata is correct") - logging.info(f"Processing {len(removals)} analyses to remove") - for ajob in removals: - metadata = self.generate_metadata(mgya=ajob.accession, run_accession=ajob.run, - status="public" if not ajob.is_private else "private") - registryID, _ = ME.check_analysis(source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata) - if registryID: - if not self.dry_run: - if ME.delete_analysis(registryID): - logging.info(f"{ajob} successfully deleted") - else: - logging.info(f"{ajob} failed on delete") - else: + # BULK UPDATE # + AnalysisJob.objects.bulk_update(jobs_to_update, ["last_mgx_indexed"]) + + logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") + for ajob in analyses_to_delete: + metadata = self.generate_metadata( + mgya=ajob.accession, + run_accession=ajob.run, + status="public" if not ajob.is_private else "private", + ) + registry_id, _ = mgx_api.check_analysis( + source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata + ) + if registry_id: + if self.dry_run: logging.info(f"Dry-mode run: no delete from real ME for {ajob}") - else: - logging.info(f"No {ajob} in ME, nothing to delete") - logging.info("Done") - - - + if mgx_api.delete_analysis(registry_id): + logging.info(f"{ajob} successfully deleted") + else: + logging.info(f"{ajob} failed on delete") + else: + logging.info(f"{ajob} doesn't exist in the registry, nothing to delete") + logging.info("Done") diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py index 92f54fc59..f876c573c 100644 --- a/emgapi/metagenomics_exchange.py +++ b/emgapi/metagenomics_exchange.py @@ -25,8 +25,8 @@ class MetagenomicsExchangeAPI: """Metagenomics Exchange API Client""" def __init__(self, base_url=None): - self.base_url = base_url if base_url else settings.ME_API - self.__token = settings.ME_API_TOKEN + self.base_url = base_url if base_url else settings.METAGENOMICS_EXCHANGE_API + self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN self.broker = settings.MGNIFY_BROKER def get_request(self, endpoint: str, params: dict): diff --git a/emgapi/models.py b/emgapi/models.py index 6b1d9bb32..2bfe19780 100644 --- a/emgapi/models.py +++ b/emgapi/models.py @@ -161,7 +161,13 @@ class Meta: abstract = True -class EbiSearchIndexQueryset(models.QuerySet): +class IndexableModel(models.Model): + last_update = models.DateTimeField( + db_column='LAST_UPDATE', + auto_now=True + ) + +class IndexableModelQueryset(models.QuerySet): """ to_delete: Objects that have been suppressed since they were last indexed, or that have been indexed but updated since. @@ -170,7 +176,8 @@ class EbiSearchIndexQueryset(models.QuerySet): or that have been indexed but updated since. """ def to_delete(self): - updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False) + not_indexed_filter = {f"{self._index_field__isnull}": False} + updated_after_indexing = Q(last_update__gte=F(self._index_field), **not_indexed_filter) try: self.model._meta.get_field("suppressed_at") @@ -180,11 +187,12 @@ def to_delete(self): ) else: return self.filter( - Q(suppressed_at__gte=F("last_indexed")) | updated_after_indexing + Q(suppressed_at__gte=F(self._index_field)) | updated_after_indexing ) def to_add(self): - updated_after_indexing = Q(last_update__gte=F("last_indexed"), last_indexed__isnull=False) + not_indexed_filter = {f"{self._index_field__isnull}": False} + updated_after_indexing = Q(last_update__gte=F(self._index_field), **not_indexed_filter) never_indexed = Q(last_indexed__isnull=True) try: @@ -204,19 +212,43 @@ def to_add(self): return self.filter(never_indexed | updated_after_indexing, not_suppressed, not_private) -class EbiSearchIndexedModel(models.Model): - last_update = models.DateTimeField( - db_column='LAST_UPDATE', - auto_now=True - ) - last_indexed = models.DateTimeField( - db_column='LAST_INDEXED', + +class EBISearchIndexQueryset(IndexableModelQueryset): + + _index_field = "last_ebi_search_indexed" + + +class EBISearchIndexedModel(IndexableModel): + + last_ebi_search_indexed = models.DateTimeField( + db_column='LAST_EBI_SEARCH_INDEXED', null=True, blank=True, help_text="Date at which this model was last included in an EBI Search initial/incremental index." ) - objects_for_indexing = EbiSearchIndexQueryset.as_manager() + objects_for_ebisearch_indexing = EBISearchIndexQueryset.as_manager() + + class Meta: + abstract = True + + +class MetagenomicsExchangeQueryset(IndexableModelQueryset): + + _index_field = "last_mgx_indexed" + + +class MetagenomicsExchangeIndexedModel(IndexableModel): + """Model to track Metagenomics Exchange indexation of analysis jobs + """ + last_mgx_indexed = models.DateTimeField( + db_column='LAST_MGX_INDEXED', + null=True, + blank=True, + help_text="Date at which this model was last indexed in the Metagenomics Exchange" + ) + + objects_for_mgx_indexing = MetagenomicsExchangeQueryset.as_manager() class Meta: abstract = True @@ -904,7 +936,7 @@ def mydata(self, request): return self.get_queryset().mydata(request) -class Study(ENASyncableModel, EbiSearchIndexedModel): +class Study(ENASyncableModel, EBISearchIndexedModel): def __init__(self, *args, **kwargs): super(Study, self).__init__(*args, **kwargs) @@ -1505,71 +1537,6 @@ class Meta: def __str__(self): return 'Assembly:{} - Sample:{}'.format(self.assembly, self.sample) - -class MetagenomicsExchangeQueryset(models.QuerySet): - """ - to_delete: Objects that have been suppressed since they were last populated, - or that have been added but updated since. - - to_add: Objects that have never been added, - or that have been added but updated since. - """ - def to_delete(self): - updated_after_populating = Q(last_updated_me__gte=F("last_populated_me"), last_populated_me__isnull=False) - - try: - self.model._meta.get_field("suppressed_at") - except FieldDoesNotExist: - return self.filter( - updated_after_populating - ) - else: - return self.filter( - Q(suppressed_at__gte=F("last_populated_me")) - ) - - def to_add(self): - updated_after_populating = Q(last_updated_me__gte=F("last_populated_me"), last_populated_me__isnull=False) - never_populated = Q(last_populated_me__isnull=True) - - try: - self.model._meta.get_field("is_suppressed") - except FieldDoesNotExist: - not_suppressed = Q() - else: - not_suppressed = Q(is_suppressed=False) - - try: - self.model._meta.get_field("is_private") - except FieldDoesNotExist: - not_private = Q() - else: - not_private = Q(is_private=False) - - return self.filter(never_populated | updated_after_populating, not_suppressed, not_private) - - -class MetagenomicsExchangeModel(models.Model): - """Model to track Metagenomics Exchange population - https://www.ebi.ac.uk/ena/registry/metagenome/api/ - """ - last_updated_me = models.DateTimeField( - db_column='LAST_UPDATED_ME', - auto_now=True - ) - last_populated_me = models.DateTimeField( - db_column='LAST_POPULATED_ME', - null=True, - blank=True, - help_text="Date at which this model was last appeared in Metagenomics Exchange" - ) - - objects_for_population = MetagenomicsExchangeQueryset.as_manager() - - class Meta: - abstract = True - - class AnalysisJobQuerySet(BaseQuerySet, MySQLQuerySet, SuppressQuerySet): def __init__(self, *args, **kwargs): @@ -1686,7 +1653,7 @@ class MetagenomicsExchange(models.Model): last_update = models.DateTimeField(db_column='LAST_UPDATE', auto_now=True) -class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EbiSearchIndexedModel, MetagenomicsExchangeModel): +class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EBISearchIndexedModel, MetagenomicsExchangeModel): def __init__(self, *args, **kwargs): super(AnalysisJob, self).__init__(*args, **kwargs) setattr(self, 'accession', diff --git a/emgcli/settings.py b/emgcli/settings.py index 8f4cc8314..3e5eccd83 100644 --- a/emgcli/settings.py +++ b/emgcli/settings.py @@ -663,18 +663,11 @@ def create_secret_key(var_dir): os.environ['ENA_API_PASSWORD'] = EMG_CONF['emg']['ena_api_password'] # Metagenomics Exchange -MGNIFY_BROKER = "EMG" +METAGENOMICS_EXCHANGE_MGNIFY_BROKER = "EMG" +METAGENOMICS_EXCHANGE_API = "" +METAGENOMICS_EXCHANGE_API_TOKEN = "" try: - ME_API = EMG_CONF['emg']['me_api'] - ME_API_TOKEN = EMG_CONF['emg']['me_api_token'] + METAGENOMICS_EXCHANGE_API = EMG_CONF['emg']['me_api'] + METAGENOMICS_EXCHANGE_API_TOKEN = EMG_CONF['emg']['me_api_token'] except KeyError: - ME_API = "" - ME_API_TOKEN = "" warnings.warn("The metagenomics exchange API and Token are not configured properly") -try: - ME_API_DEV = EMG_CONF['emg']['me_api_dev'] - ME_API_TOKEN = EMG_CONF['emg']['me_api_token_dev'] -except KeyError: - ME_API_DEV = "" - ME_API_TOKEN = "" - warnings.warn("The metagenomics exchange DEV API and Token are not configured properly") diff --git a/tests/me/test_metagenomics_exchange.py b/tests/me/test_metagenomics_exchange.py index 6de1757e8..58558df01 100644 --- a/tests/me/test_metagenomics_exchange.py +++ b/tests/me/test_metagenomics_exchange.py @@ -37,7 +37,7 @@ def test_post_existing_analysis_me(self): def test_mock_post_new_analysis(self): me_api = MetagenomicsExchangeAPI() endpoint = "datasets" - url = settings.ME_API + f"/{endpoint}" + url = settings.METAGENOMICS_EXCHANGE_API + f"/{endpoint}" responses.add(responses.POST, url, json={'success': True}, status=201) @@ -51,7 +51,7 @@ def test_mock_delete_analysis_from_me(self): me_api = MetagenomicsExchangeAPI() registry_id = "MGX0000780" endpoint = f"datasets/{registry_id}" - url = settings.ME_API + f"/{endpoint}" + url = settings.METAGENOMICS_EXCHANGE_API + f"/{endpoint}" responses.add(responses.DELETE, url, json={'success': True}, status=201) response = me_api.delete_request(endpoint)