Skip to content

Commit

Permalink
This is still a WIP - MGX
Browse files Browse the repository at this point in the history
  • Loading branch information
mberacochea committed Jan 18, 2024
1 parent 355bee0 commit 43753e9
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 215 deletions.
142 changes: 84 additions & 58 deletions emgapi/management/commands/populate_metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from django.core.paginator import Paginator
from datetime import timedelta

from emgapi.models import AnalysisJob
from emgapi.metagenomics_exchange import MetagenomicsExchangeAPI
Expand Down Expand Up @@ -79,7 +81,7 @@ def handle(self, *args, **options):
self.study_accession = options.get("study")
self.dry_run = options.get("dry_run")
self.pipeline_version = options.get("pipeline")

mgx_api = MetagenomicsExchangeAPI(base_url=settings.METAGENOMICS_EXCHANGE_API)

analyses_to_index = AnalysisJob.objects_for_mgx_indexing.to_add()
Expand All @@ -97,78 +99,102 @@ def handle(self, *args, **options):
analyses_to_index = analyses_to_index.filter(
pipeline__pipeline_id=self.pipeline_version
)
analyses_to_delete = analyses_to_delete.filter(pipeline__pipeline_id=self.pipeline_version)
analyses_to_delete = analyses_to_delete.filter(
pipeline__pipeline_id=self.pipeline_version
)

logging.info(f"Indexig {len(analyses_to_index)} new analyses")
logging.info(f"Indexing {len(analyses_to_index)} new analyses")

jobs_to_update = []
for page in Paginator(analyses_to_index, 100):
jobs_to_update = []

for ajob in analyses_to_index:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, metadata_match = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
# The job is not registered
if not registry_id:
logging.debug(f"Add new {ajob}")
if self.dry_run:
logging.info(f"Dry-mode run: no addition to real ME for {ajob}")
continue

response = mgx_api.add_analysis(
for ajob in page:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
public=not ajob.is_private,
status="public" if not ajob.is_private else "private",
)
if response.ok:
logging.debug(f"Added {ajob}")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
else:
logging.error(f"Error adding {ajob}: {response.message}")

# else we have to check if the metadata matches, if not we need to update it
else:
if not metadata_match:
logging.debug(f"Patch existing {ajob}")
registry_id, metadata_match = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
# The job is not registered
if not registry_id:
logging.debug(f"Add new {ajob}")
if self.dry_run:
logging.info(f"Dry-mode run: no patch to real ME for {ajob}")
logging.info(f"Dry-mode run: no addition to real ME for {ajob}")
continue
if mgx_api.patch_analysis(registry_id=registry_id, data=metadata):
logging.info(f"Analysis {ajob} updated successfully")
ajob.last_mgx_indexed = timezone.now()

response = mgx_api.add_analysis(
mgya=ajob.accession,
run_accession=ajob.run,
public=not ajob.is_private,
)
if response.ok:
logging.debug(f"Added {ajob}")
ajob.mgx_accession = registry_id
ajob.last_mgx_indexed = timezone.now() + timedelta(minutes=1)
jobs_to_update.append(ajob)
else:
logging.error(f"Analysis {ajob} update failed")
logging.error(f"Error adding {ajob}: {response.message}")

# else we have to check if the metadata matches, if not we need to update it
else:
logging.debug(f"No edit for {ajob}, metadata is correct")
if not metadata_match:
logging.debug(f"Patch existing {ajob}")
if self.dry_run:
logging.info(
f"Dry-mode run: no patch to real ME for {ajob}"
)
continue
if mgx_api.patch_analysis(
registry_id=registry_id, data=metadata
):
logging.info(f"Analysis {ajob} updated successfully")
# Just to be safe, update the MGX accession
ajob.mgx_accession = registry_id
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
else:
logging.error(f"Analysis {ajob} update failed")
else:
logging.debug(f"No edit for {ajob}, metadata is correct")

# BULK UPDATE #
AnalysisJob.objects.bulk_update(jobs_to_update, ["last_mgx_indexed"])
AnalysisJob.objects.bulk_update(
jobs_to_update, ["last_mgx_indexed", "mgx_accession"], batch_size=100
)

logging.info(f"Processing {len(analyses_to_delete)} analyses to remove")
for ajob in analyses_to_delete:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, _ = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
if registry_id:
if self.dry_run:
logging.info(f"Dry-mode run: no delete from real ME for {ajob}")

if mgx_api.delete_analysis(registry_id):
logging.info(f"{ajob} successfully deleted")
for page in Paginator(analyses_to_delete, 100):
jobs_to_update = []

for ajob in page:
metadata = self.generate_metadata(
mgya=ajob.accession,
run_accession=ajob.run,
status="public" if not ajob.is_private else "private",
)
registry_id, _ = mgx_api.check_analysis(
source_id=ajob.accession, sequence_id=ajob.run, metadata=metadata
)
if registry_id:
if self.dry_run:
logging.info(f"Dry-mode run: no delete from real ME for {ajob}")

if mgx_api.delete_analysis(registry_id):
logging.info(f"{ajob} successfully deleted")
ajob.last_mgx_indexed = timezone.now()
jobs_to_update.append(ajob)
else:
logging.info(f"{ajob} failed on delete")
else:
logging.info(f"{ajob} failed on delete")
else:
logging.info(f"{ajob} doesn't exist in the registry, nothing to delete")
logging.info(
f"{ajob} doesn't exist in the registry, nothing to delete"
)

# BULK UPDATE #
AnalysisJob.objects.bulk_update(
jobs_to_update, ["last_mgx_indexed"], batch_size=100
)

logging.info("Done")
2 changes: 1 addition & 1 deletion emgapi/metagenomics_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MetagenomicsExchangeAPI:
def __init__(self, base_url=None):
self.base_url = base_url if base_url else settings.METAGENOMICS_EXCHANGE_API
self.__token = settings.METAGENOMICS_EXCHANGE_API_TOKEN
self.broker = settings.MGNIFY_BROKER
self.broker = settings.METAGENOMICS_EXCHANGE_MGNIFY_BROKER

def get_request(self, endpoint: str, params: dict):
"""Make a GET request, returns the response"""
Expand Down
35 changes: 0 additions & 35 deletions emgapi/migrations/0010_auto_20230908_1346.py

This file was deleted.

23 changes: 0 additions & 23 deletions emgapi/migrations/0011_auto_20230912_1346.py

This file was deleted.

37 changes: 0 additions & 37 deletions emgapi/migrations/0012_auto_20231115_1448.py

This file was deleted.

33 changes: 0 additions & 33 deletions emgapi/migrations/0013_auto_20231110_1329.py

This file was deleted.

38 changes: 38 additions & 0 deletions emgapi/migrations/0013_auto_20240118_1220.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 3.2.18 on 2024-01-18 12:20

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('emgapi', '0012_alter_publication_pub_type'),
]

operations = [
migrations.AddField(
model_name='analysisjob',
name='last_ebi_search_indexed',
field=models.DateTimeField(blank=True, db_column='LAST_EBI_SEARCH_INDEXED', help_text='Date at which this model was last included in an EBI Search initial/incremental index.', null=True),
),
migrations.AddField(
model_name='analysisjob',
name='last_mgx_indexed',
field=models.DateTimeField(blank=True, db_column='LAST_MGX_INDEXED', help_text='Date at which this model was last indexed in the Metagenomics Exchange', null=True),
),
migrations.AddField(
model_name='analysisjob',
name='mgx_accession',
field=models.CharField(blank=True, db_column='MGX_ACCESSION', help_text='The Metagenomics Exchange accession.', max_length=10, null=True, unique=True),
),
migrations.AddField(
model_name='study',
name='last_ebi_search_indexed',
field=models.DateTimeField(blank=True, db_column='LAST_EBI_SEARCH_INDEXED', help_text='Date at which this model was last included in an EBI Search initial/incremental index.', null=True),
),
migrations.AlterField(
model_name='study',
name='last_update',
field=models.DateTimeField(auto_now=True, db_column='LAST_UPDATE'),
),
]
Loading

0 comments on commit 43753e9

Please sign in to comment.