From 986f2229f49b2cfa57bccdcf65e4ca2f3f47ddbf Mon Sep 17 00:00:00 2001 From: Ekaterina Sakharova Date: Wed, 31 Jan 2024 16:20:29 +0000 Subject: [PATCH] Tests for populate ME --- .../populate_metagenomics_exchange.py | 55 +++++--- emgapi/metagenomics_exchange.py | 5 +- emgapi/models.py | 9 ++ .../me/test_populate_metagenomics_exchange.py | 120 ++++++++++++++---- tests/test_utils/emg_fixtures.py | 62 ++++++--- 5 files changed, 181 insertions(+), 70 deletions(-) diff --git a/emgapi/management/commands/populate_metagenomics_exchange.py b/emgapi/management/commands/populate_metagenomics_exchange.py index 37f2b2544..bd0b80f74 100644 --- a/emgapi/management/commands/populate_metagenomics_exchange.py +++ b/emgapi/management/commands/populate_metagenomics_exchange.py @@ -76,13 +76,15 @@ def handle(self, *args, **options): self.dry_run = options.get("dry_run") self.pipeline_version = options.get("pipeline") - mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) + self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API) - analyses_to_index = AnalysisJob.objects_for_mgx_indexing.to_add() - analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.to_delete() + # never indexed or updated after indexed + analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add() + # suppressed only + analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.get_suppressed() if self.study_accession: - analyses_to_index = analyses_to_index.filter( + analyses_to_index_and_update = analyses_to_index_and_update.filter( study__secondary_accession__in=self.study_accession ) analyses_to_delete = analyses_to_delete.filter( @@ -90,16 +92,23 @@ def handle(self, *args, **options): ) if self.pipeline_version: - analyses_to_index = analyses_to_index.filter( - pipeline__pipeline_id=self.pipeline_version + analyses_to_index_and_update = analyses_to_index_and_update.filter( + pipeline__release_version=self.pipeline_version ) analyses_to_delete = analyses_to_delete.filter( - pipeline__pipeline_id=self.pipeline_version + pipeline__release_version=self.pipeline_version ) - logging.info(f"Indexing {len(analyses_to_index)} new analyses") + self.process_to_index_and_update_records(analyses_to_index_and_update) + self.process_to_delete_records(analyses_to_delete) - for page in Paginator(analyses_to_index, 100): + logging.info("Done") + + + def process_to_index_and_update_records(self, analyses_to_index_and_update): + logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses") + + for page in Paginator(analyses_to_index_and_update, 100): jobs_to_update = [] for ajob in page: @@ -108,23 +117,25 @@ def handle(self, *args, **options): run_accession=ajob.run, status="public" if not ajob.is_private else "private", ) - registry_id, metadata_match = mgx_api.check_analysis( + registry_id, metadata_match = self.mgx_api.check_analysis( source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata ) # The job is not registered if not registry_id: - logging.debug(f"Add new {ajob}") + logging.info(f"Add new {ajob}") if self.dry_run: logging.info(f"Dry-mode run: no addition to real ME for {ajob}") continue - response = mgx_api.add_analysis( + response = self.mgx_api.add_analysis( mgya=ajob.accession, run_accession=ajob.run, public=not ajob.is_private, ) if response.ok: - logging.debug(f"Added {ajob}") + logging.info(f"Successfully added {ajob}") + registry_id, metadata_match = self.mgx_api.check_analysis( + source_id=ajob.accession, sequence_id=ajob.run) ajob.mgx_accession = registry_id ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1) jobs_to_update.append(ajob) @@ -134,14 +145,14 @@ def handle(self, *args, **options): # else we have to check if the metadata matches, if not we need to update it else: if not metadata_match: - logging.debug(f"Patch existing {ajob}") + logging.info(f"Patch existing {ajob}") if self.dry_run: logging.info( f"Dry-mode run: no patch to real ME for {ajob}" ) continue - if mgx_api.patch_analysis( - registry_id=registry_id, data=metadata + if self.mgx_api.patch_analysis( + registry_id=registry_id, data=metadata ): logging.info(f"Analysis {ajob} updated successfully") # Just to be safe, update the MGX accession @@ -157,6 +168,10 @@ def handle(self, *args, **options): jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=100 ) + def process_to_delete_records(self, analyses_to_delete): + """ + This function removes suppressed records from ME. + """ logging.info(f"Processing {len(analyses_to_delete)} analyses to remove") for page in Paginator(analyses_to_delete, 100): @@ -168,14 +183,16 @@ def handle(self, *args, **options): run_accession=ajob.run, status="public" if not ajob.is_private else "private", ) - registry_id, _ = mgx_api.check_analysis( + registry_id, _ = self.mgx_api.check_analysis( source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata ) if registry_id: + logging.info(f"Deleting {ajob}") if self.dry_run: logging.info(f"Dry-mode run: no delete from real ME for {ajob}") + continue - if mgx_api.delete_analysis(registry_id): + if self.mgx_api.delete_analysis(registry_id): logging.info(f"{ajob} successfully deleted") ajob.last_mgx_indexed = timezone.now() jobs_to_update.append(ajob) @@ -190,5 +207,3 @@ def handle(self, *args, **options): AnalysisJob.objects.bulk_update( jobs_to_update, ["last_mgx_indexed"], batch_size=100 ) - - logging.info("Done") diff --git a/emgapi/metagenomics_exchange.py b/emgapi/metagenomics_exchange.py index 24f63b104..a5942bc75 100644 --- a/emgapi/metagenomics_exchange.py +++ b/emgapi/metagenomics_exchange.py @@ -88,7 +88,7 @@ def add_analysis(self, mgya: str, run_accession: str, public: bool): def check_analysis(self, source_id: str, sequence_id: str, public=None, metadata=None) -> [str, bool]: - logging.info(f"Check {source_id}") + logging.info(f"Check {source_id} {sequence_id}") params = {} if public: params = { @@ -120,8 +120,9 @@ def check_analysis(self, source_id: str, sequence_id: str, public=None, metadata return analysis_registry_id , metadata_match else: if metadata[metadata_record] != found_record[metadata_record]: + print(metadata[metadata_record], found_record[metadata_record]) metadata_match = False - logging.info(f"Incorrect field {metadata[metadata_record]} in ME ({found_record[metadata_record]})") + logging.info(f"Incorrect field {metadata[metadata_record]} != {found_record[metadata_record]})") return analysis_registry_id, metadata_match return analysis_registry_id , metadata_match else: diff --git a/emgapi/models.py b/emgapi/models.py index 4abcd41fd..cf6ab08f8 100644 --- a/emgapi/models.py +++ b/emgapi/models.py @@ -331,6 +331,15 @@ def to_add(self): return self.filter(never_indexed | updated_after_indexing, not_suppressed, not_private) + def get_suppressed(self): + try: + self.model._meta.get_field("suppressed_at") + except FieldDoesNotExist: + return Q() + else: + return self.filter( + Q(suppressed_at__gte=F(self.index_field)) + ) class EBISearchIndexQueryset(IndexableModelQueryset): diff --git a/tests/me/test_populate_metagenomics_exchange.py b/tests/me/test_populate_metagenomics_exchange.py index f79c18078..d9a35f32d 100644 --- a/tests/me/test_populate_metagenomics_exchange.py +++ b/tests/me/test_populate_metagenomics_exchange.py @@ -15,10 +15,10 @@ # limitations under the License. import pytest - from unittest import mock from django.core.management import call_command +from django.utils import timezone from test_utils.emg_fixtures import * # noqa @@ -29,48 +29,114 @@ class TestPopulateMeAPI: @pytest.mark.usefixtures("run_multiple_analysis_me") def test_population_dry_mode(self, caplog): + """ + 2 of 4 analyses require indexing, both are not in ME API + 1 of 4 needs to be deleted because it was suppressed + 1 was indexed after updated - no action needed + """ call_command( "populate_metagenomics_exchange", dry_run=True, ) - assert "Indexing 3 new analyses" in caplog.text - assert "MGYA00001234 does not exist in ME" in caplog.text - assert "MGYA00005678 does not exist in ME" in caplog.text - assert "MGYA00466090 does not exist in ME" in caplog.text - assert "Dry-mode run: no addition to real ME for MGYA00001234" in caplog.text - assert "Dry-mode run: no addition to real ME for MGYA00005678" in caplog.text + assert "Indexing 2 new analyses" in caplog.text assert "Dry-mode run: no addition to real ME for MGYA00466090" in caplog.text + assert "Dry-mode run: no addition to real ME for MGYA00466091" in caplog.text + assert "Processing 1 analyses to remove" in caplog.text + assert "MGYA00005678 doesn't exist in the registry, nothing to delete" in caplog.text + + + @pytest.mark.usefixtures("run_multiple_analysis_me") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.add_analysis") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") + def test_add_new_analysis(self, mock_check_analysis, mock_add_analysis, caplog): + """ + Test checks new added analysis that was not indexed before, It should be added to ME. + Post process is mocked. + AnalysisJob should have updated indexed field and assigned MGX + """ + pipeline = 4.1 + registry_id = "MGX1" + class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + self.ok = True + def json(self): + return self.json_data + + def mock_check_process(*args, **kwargs): + if 'metadata' in kwargs: + return None, True + else: + return registry_id, True + + mock_add_analysis.return_value = MockResponse({}, 200) + mock_check_analysis.side_effect = mock_check_process + + call_command( + "populate_metagenomics_exchange", + pipeline=pipeline, + ) + assert "Indexing 1 new analyses" in caplog.text assert "Processing 0 analyses to remove" in caplog.text + assert "Successfully added MGYA00466090" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed + assert ajob.mgx_accession == registry_id + + @pytest.mark.usefixtures("run_multiple_analysis_me") @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") - @pytest.mark.usefixtures("suppressed_analysis_jobs") - def test_removals_dry_mode(self, mock_check_analysis, caplog): - mock_check_analysis.return_value = None, False + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.delete_analysis") + def test_removals(self, + mock_delete_analysis, + mock_check_analysis, + caplog): + """ + Test delete process. + 1 analysis should be removed and updated indexed field in DB + """ + pipeline = 4.0 + mock_check_analysis.return_value = True, True + mock_delete_analysis.return_value = True + call_command( "populate_metagenomics_exchange", - dry_run=True, + pipeline=pipeline ) - ajobs = AnalysisJob.objects.all() - for job in ajobs: - assert ( - f"{job.accession} doesn't exist in the registry, nothing to delete" - in caplog.text - ) assert "Indexing 0 new analyses" in caplog.text + assert "Processing 1 analyses to remove" in caplog.text + assert "Deleting MGYA00005678" in caplog.text + assert "MGYA00005678 successfully deleted" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed.date() == timezone.now().date() - @pytest.mark.usefixtures("analysis_existed_in_me") - def test_update_dry_mode(self, caplog): + @pytest.mark.usefixtures("run_multiple_analysis_me") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis") + @mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.patch_analysis") + def test_update(self, + mock_patch_analysis, + mock_check_analysis, + caplog): + """ + Test update process for job that was indexed before updated. + MGX accession and last_mgx_indexed should be updated + """ + pipeline = 5.0 + registry_id = "MGX2" + mock_check_analysis.return_value = registry_id, False + mock_patch_analysis.return_value = True call_command( "populate_metagenomics_exchange", - dry_run=True, + pipeline=pipeline, ) + assert "Indexing 1 new analyses" in caplog.text - assert "Incorrect field None in ME (ERR1806500)" in caplog.text - assert "Dry-mode run: no patch to real ME for MGYA00147343" in caplog.text assert "Processing 0 analyses to remove" in caplog.text + assert "Patch existing MGYA00466091" in caplog.text + assert "Analysis MGYA00466091 updated successfully" in caplog.text + ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first() + assert ajob.last_mgx_indexed.date() == timezone.now().date() + assert ajob.mgx_accession == registry_id + - @pytest.mark.usefixtures("run_multiple_analysis") - def test_population(self, caplog): - call_command( - "populate_metagenomics_exchange", - ) diff --git a/tests/test_utils/emg_fixtures.py b/tests/test_utils/emg_fixtures.py index a4e4cdd32..25f99c70a 100644 --- a/tests/test_utils/emg_fixtures.py +++ b/tests/test_utils/emg_fixtures.py @@ -38,7 +38,7 @@ 'ena_public_assemblies', 'ena_private_assemblies', 'ena_suppressed_assemblies', 'ena_suppression_propagation_samples', 'ena_suppression_propagation_assemblies', 'assembly_extra_annotation', 'ena_suppression_propagation_studies', 'ena_suppression_propagation_runs', - 'suppressed_analysis_jobs', 'analysis_existed_in_me', 'run_multiple_analysis_me' + 'suppressed_analysis_jobs', 'run_multiple_analysis_me' ] @@ -1110,46 +1110,45 @@ def suppressed_analysis_jobs(ena_suppressed_runs): 'last_mgx_indexed': '1970-01-01 00:00:00'}) return suppressed_analysisjobs -@pytest.fixture -def analysis_existed_in_me(): - emg_props = { - "job_id": 147343, - "last_mgx_indexed": '1970-01-01 00:00:00', - "last_update": '1980-01-01 00:00:00', - "is_suppressed": False, - "is_private": False - } - return baker.make(emg_models.AnalysisJob, **emg_props) - @pytest.fixture def run_multiple_analysis_me(study, sample, analysis_status, experiment_type): + """ + Run: ERR1806500 + MGYA0000147343: pipeline v1: indexed after created - no action needed + MGYA0000005678: pipeline v4.0: suppressed - delete from ME + MGYA0000466090: pipeline v4.1: never indexed - add to ME + MGYA0000466091: pipeline v5: update in ME + """ pipeline, created = emg_models.Pipeline.objects.get_or_create( pk=1, release_version='1.0', - release_date='1970-01-01', + release_date='2000-01-01', ) pipeline4, created4 = emg_models.Pipeline.objects.get_or_create( pk=4, release_version='4.0', - release_date='1970-01-01', + release_date='2015-01-01', ) - pipeline5, created5 = emg_models.Pipeline.objects.get_or_create( + pipeline4_1, created4_1 = emg_models.Pipeline.objects.get_or_create( pk=5, - release_version='5.0', - release_date='2020-01-01', + release_version='4.1', + release_date='2016-01-01', + ) + pipeline5, created5 = emg_models.Pipeline.objects.get_or_create( + pk=6, ) run = emg_models.Run.objects.create( - run_id=1234, - accession='ERR3063408', + run_id=111, + accession='ERR1806500', sample=sample, study=study, is_private=False, experiment_type=experiment_type ) _anl1 = emg_models.AnalysisJob.objects.create( - job_id=1234, + job_id=147343, sample=sample, study=study, run=run, @@ -1160,6 +1159,8 @@ def run_multiple_analysis_me(study, sample, analysis_status, input_file_name='ABC_FASTQ', result_directory='test_data/version_1.0/ABC_FASTQ', submit_time='1970-01-01 00:00:00', + last_mgx_indexed='2970-01-01 20:00:00', + is_suppressed=False, ) _anl4 = emg_models.AnalysisJob.objects.create( job_id=5678, @@ -1173,6 +1174,9 @@ def run_multiple_analysis_me(study, sample, analysis_status, input_file_name='ABC_FASTQ', result_directory='test_data/version_4.0/ABC_FASTQ', submit_time='1970-01-01 00:00:00', + last_mgx_indexed='1970-01-01 20:00:00', + is_suppressed=True, + suppressed_at='1980-01-01 20:00:00', ) _anl5 = emg_models.AnalysisJob.objects.create( job_id=466090, @@ -1181,10 +1185,26 @@ def run_multiple_analysis_me(study, sample, analysis_status, run=run, is_private=False, experiment_type=experiment_type, + pipeline=pipeline4_1, + analysis_status=analysis_status, + input_file_name='ABC_FASTQ', + result_directory='test_data/version_5.0/ABC_FASTQ', + submit_time='2020-01-01 00:00:00', + is_suppressed=False, + ) + _anl51 = emg_models.AnalysisJob.objects.create( + job_id=466091, + sample=sample, + study=study, + run=run, + is_private=False, + experiment_type=experiment_type, pipeline=pipeline5, analysis_status=analysis_status, input_file_name='ABC_FASTQ', result_directory='test_data/version_5.0/ABC_FASTQ', submit_time='2020-01-01 00:00:00', + last_mgx_indexed='2020-01-01 20:00:00', + is_suppressed=False, ) - return (_anl1, _anl4, _anl5) + return (_anl1, _anl4, _anl5, _anl51)