Skip to content

Commit

Permalink
Tests for populate ME
Browse files Browse the repository at this point in the history
  • Loading branch information
KateSakharova committed Jan 31, 2024
1 parent ad539e7 commit 986f222
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 70 deletions.
55 changes: 35 additions & 20 deletions emgapi/management/commands/populate_metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,39 @@ def handle(self, *args, **options):
self.dry_run = options.get("dry_run")
self.pipeline_version = options.get("pipeline")

mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API)
self.mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API)

analyses_to_index = AnalysisJob.objects_for_mgx_indexing.to_add()
analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.to_delete()
# never indexed or updated after indexed
analyses_to_index_and_update = AnalysisJob.objects_for_mgx_indexing.to_add()
# suppressed only
analyses_to_delete = AnalysisJob.objects_for_mgx_indexing.get_suppressed()

if self.study_accession:
analyses_to_index = analyses_to_index.filter(
analyses_to_index_and_update = analyses_to_index_and_update.filter(
study__secondary_accession__in=self.study_accession
)
analyses_to_delete = analyses_to_delete.filter(
study__secondary_accession__in=self.study_accession
)

if self.pipeline_version:
analyses_to_index = analyses_to_index.filter(
pipeline__pipeline_id=self.pipeline_version
analyses_to_index_and_update = analyses_to_index_and_update.filter(
pipeline__release_version=self.pipeline_version
)
analyses_to_delete = analyses_to_delete.filter(
pipeline__pipeline_id=self.pipeline_version
pipeline__release_version=self.pipeline_version
)

logging.info(f"Indexing {len(analyses_to_index)} new analyses")
self.process_to_index_and_update_records(analyses_to_index_and_update)
self.process_to_delete_records(analyses_to_delete)

for page in Paginator(analyses_to_index, 100):
logging.info("Done")


def process_to_index_and_update_records(self, analyses_to_index_and_update):
logging.info(f"Indexing {len(analyses_to_index_and_update)} new analyses")

for page in Paginator(analyses_to_index_and_update, 100):
jobs_to_update = []

for ajob in page:
Expand All @@ -108,23 +117,25 @@ def handle(self, *args, **options):
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, metadata_match = mgx_api.check_analysis(
registry_id, metadata_match = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
# The job is not registered
if not registry_id:
logging.debug(f"Add new {ajob}")
logging.info(f"Add new {ajob}")
if self.dry_run:
logging.info(f"Dry-mode run: no addition to real ME for {ajob}")
continue

response = mgx_api.add_analysis(
response = self.mgx_api.add_analysis(
mgya=ajob.accession,
run_accession=ajob.run,
public=not ajob.is_private,
)
if response.ok:
logging.debug(f"Added {ajob}")
logging.info(f"Successfully added {ajob}")
registry_id, metadata_match = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run)
ajob.mgx_accession = registry_id
ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1)
jobs_to_update.append(ajob)
Expand All @@ -134,14 +145,14 @@ def handle(self, *args, **options):
# else we have to check if the metadata matches, if not we need to update it
else:
if not metadata_match:
logging.debug(f"Patch existing {ajob}")
logging.info(f"Patch existing {ajob}")
if self.dry_run:
logging.info(
f"Dry-mode run: no patch to real ME for {ajob}"
)
continue
if mgx_api.patch_analysis(
registry_id=registry_id, data=metadata
if self.mgx_api.patch_analysis(
registry_id=registry_id, data=metadata
):
logging.info(f"Analysis {ajob} updated successfully")
# Just to be safe, update the MGX accession
Expand All @@ -157,6 +168,10 @@ def handle(self, *args, **options):
jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=100
)

def process_to_delete_records(self, analyses_to_delete):
"""
This function removes suppressed records from ME.
"""
logging.info(f"Processing {len(analyses_to_delete)} analyses to remove")

for page in Paginator(analyses_to_delete, 100):
Expand All @@ -168,14 +183,16 @@ def handle(self, *args, **options):
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, _ = mgx_api.check_analysis(
registry_id, _ = self.mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
if registry_id:
logging.info(f"Deleting {ajob}")
if self.dry_run:
logging.info(f"Dry-mode run: no delete from real ME for {ajob}")
continue

if mgx_api.delete_analysis(registry_id):
if self.mgx_api.delete_analysis(registry_id):
logging.info(f"{ajob} successfully deleted")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
Expand All @@ -190,5 +207,3 @@ def handle(self, *args, **options):
AnalysisJob.objects.bulk_update(
jobs_to_update, ["last_mgx_indexed"], batch_size=100
)

logging.info("Done")
5 changes: 3 additions & 2 deletions emgapi/metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def add_analysis(self, mgya: str, run_accession: str, public: bool):


def check_analysis(self, source_id: str, sequence_id: str, public=None, metadata=None) -> [str, bool]:
logging.info(f"Check {source_id}")
logging.info(f"Check {source_id} {sequence_id}")
params = {}
if public:
params = {
Expand Down Expand Up @@ -120,8 +120,9 @@ def check_analysis(self, source_id: str, sequence_id: str, public=None, metadata
return analysis_registry_id , metadata_match
else:
if metadata[metadata_record] != found_record[metadata_record]:
print(metadata[metadata_record], found_record[metadata_record])
metadata_match = False
logging.info(f"Incorrect field {metadata[metadata_record]} in ME ({found_record[metadata_record]})")
logging.info(f"Incorrect field {metadata[metadata_record]} != {found_record[metadata_record]})")
return analysis_registry_id, metadata_match
return analysis_registry_id , metadata_match
else:
Expand Down
9 changes: 9 additions & 0 deletions emgapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,15 @@ def to_add(self):

return self.filter(never_indexed | updated_after_indexing, not_suppressed, not_private)

def get_suppressed(self):
try:
self.model._meta.get_field("suppressed_at")
except FieldDoesNotExist:
return Q()
else:
return self.filter(
Q(suppressed_at__gte=F(self.index_field))
)


class EBISearchIndexQueryset(IndexableModelQueryset):
Expand Down
120 changes: 93 additions & 27 deletions tests/me/test_populate_metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.

import pytest

from unittest import mock

from django.core.management import call_command
from django.utils import timezone

from test_utils.emg_fixtures import * # noqa

Expand All @@ -29,48 +29,114 @@
class TestPopulateMeAPI:
@pytest.mark.usefixtures("run_multiple_analysis_me")
def test_population_dry_mode(self, caplog):
"""
2 of 4 analyses require indexing, both are not in ME API
1 of 4 needs to be deleted because it was suppressed
1 was indexed after updated - no action needed
"""
call_command(
"populate_metagenomics_exchange",
dry_run=True,
)
assert "Indexing 3 new analyses" in caplog.text
assert "MGYA00001234 does not exist in ME" in caplog.text
assert "MGYA00005678 does not exist in ME" in caplog.text
assert "MGYA00466090 does not exist in ME" in caplog.text
assert "Dry-mode run: no addition to real ME for MGYA00001234" in caplog.text
assert "Dry-mode run: no addition to real ME for MGYA00005678" in caplog.text
assert "Indexing 2 new analyses" in caplog.text
assert "Dry-mode run: no addition to real ME for MGYA00466090" in caplog.text
assert "Dry-mode run: no addition to real ME for MGYA00466091" in caplog.text
assert "Processing 1 analyses to remove" in caplog.text
assert "MGYA00005678 doesn't exist in the registry, nothing to delete" in caplog.text


@pytest.mark.usefixtures("run_multiple_analysis_me")
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.add_analysis")
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis")
def test_add_new_analysis(self, mock_check_analysis, mock_add_analysis, caplog):
"""
Test checks new added analysis that was not indexed before, It should be added to ME.
Post process is mocked.
AnalysisJob should have updated indexed field and assigned MGX
"""
pipeline = 4.1
registry_id = "MGX1"
class MockResponse:
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
self.ok = True
def json(self):
return self.json_data

def mock_check_process(*args, **kwargs):
if 'metadata' in kwargs:
return None, True
else:
return registry_id, True

mock_add_analysis.return_value = MockResponse({}, 200)
mock_check_analysis.side_effect = mock_check_process

call_command(
"populate_metagenomics_exchange",
pipeline=pipeline,
)
assert "Indexing 1 new analyses" in caplog.text
assert "Processing 0 analyses to remove" in caplog.text
assert "Successfully added MGYA00466090" in caplog.text
ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first()
assert ajob.last_mgx_indexed
assert ajob.mgx_accession == registry_id


@pytest.mark.usefixtures("run_multiple_analysis_me")
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis")
@pytest.mark.usefixtures("suppressed_analysis_jobs")
def test_removals_dry_mode(self, mock_check_analysis, caplog):
mock_check_analysis.return_value = None, False
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.delete_analysis")
def test_removals(self,
mock_delete_analysis,
mock_check_analysis,
caplog):
"""
Test delete process.
1 analysis should be removed and updated indexed field in DB
"""
pipeline = 4.0
mock_check_analysis.return_value = True, True
mock_delete_analysis.return_value = True

call_command(
"populate_metagenomics_exchange",
dry_run=True,
pipeline=pipeline
)
ajobs = AnalysisJob.objects.all()
for job in ajobs:
assert (
f"{job.accession} doesn't exist in the registry, nothing to delete"
in caplog.text
)
assert "Indexing 0 new analyses" in caplog.text
assert "Processing 1 analyses to remove" in caplog.text
assert "Deleting MGYA00005678" in caplog.text
assert "MGYA00005678 successfully deleted" in caplog.text
ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first()
assert ajob.last_mgx_indexed.date() == timezone.now().date()

@pytest.mark.usefixtures("analysis_existed_in_me")
def test_update_dry_mode(self, caplog):
@pytest.mark.usefixtures("run_multiple_analysis_me")
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.check_analysis")
@mock.patch("emgapi.metagenomics_exchange.MetagenomicsExchangeAPI.patch_analysis")
def test_update(self,
mock_patch_analysis,
mock_check_analysis,
caplog):
"""
Test update process for job that was indexed before updated.
MGX accession and last_mgx_indexed should be updated
"""
pipeline = 5.0
registry_id = "MGX2"
mock_check_analysis.return_value = registry_id, False
mock_patch_analysis.return_value = True
call_command(
"populate_metagenomics_exchange",
dry_run=True,
pipeline=pipeline,
)

assert "Indexing 1 new analyses" in caplog.text
assert "Incorrect field None in ME (ERR1806500)" in caplog.text
assert "Dry-mode run: no patch to real ME for MGYA00147343" in caplog.text
assert "Processing 0 analyses to remove" in caplog.text
assert "Patch existing MGYA00466091" in caplog.text
assert "Analysis MGYA00466091 updated successfully" in caplog.text
ajob = AnalysisJob.objects.filter(pipeline__release_version=pipeline).first()
assert ajob.last_mgx_indexed.date() == timezone.now().date()
assert ajob.mgx_accession == registry_id


@pytest.mark.usefixtures("run_multiple_analysis")
def test_population(self, caplog):
call_command(
"populate_metagenomics_exchange",
)
Loading

0 comments on commit 986f222

Please sign in to comment.