Skip to content

Commit

Permalink
optimising speed of dump (particularly mongo)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyRogers committed Jan 17, 2024
1 parent 57a391b commit 67edbcd
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 34 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ jobs:
pip install install .[tests]
pip freeze
- name: Check for unmigrated code changes
run: |
python emgcli/manage.py makemigrations --noinput --check --dry-run
- name: 🧪 - Testing
run: |
cat $EMG_CONFIG
Expand Down
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ logs

secret.key

dumps
dumps

/config/*.yml
/config/*.yaml
!/config/*local*

/config/*.yml
/config/*.yaml
!/config/*local*

/config/*.yml
/config/*.yaml
!/config/*local*
18 changes: 11 additions & 7 deletions emgapi/management/commands/ebi_search_analysis_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ def add_arguments(self, parser):
)
parser.add_argument("-o", "--output", help="Output dir for xml files", required=True)
parser.add_argument("-c", "--chunk", help="Number of analyses per chunk", default=100, nargs='?', type=int)
parser.add_argument("-m", "--max_pages", help="Max number of pages to dump", default=-1, type=int)

def get_analysis_context(self, analysis: AnalysisJob):
try:
analysis_taxonomy: Optional[AnalysisJobTaxonomy] = AnalysisJobTaxonomy.objects.get(
analysis_id=analysis.job_id
pk=str(analysis.job_id)
)
except AnalysisJobTaxonomy.DoesNotExist:
logger.debug(f"Could not find analysis job taxonomy for {analysis.job_id}")
Expand Down Expand Up @@ -88,7 +89,7 @@ def get_analysis_context(self, analysis: AnalysisJob):
for taxonomy_attribute in taxonomy_attributes:
if taxonomy_attribute:
for tax in taxonomy_attribute:
tax_lineage_list = list(filter(None, tax.lineage.split(":")))
tax_lineage_list = list(filter(None, tax.organism.pk.split('|')[0].split(":")))
if len(tax_lineage_list) > 1:
taxonomy_lists.append(
tax_lineage_list
Expand Down Expand Up @@ -139,13 +140,13 @@ def get_analysis_context(self, analysis: AnalysisJob):

if 'location_name' not in sample_metadata and analysis.sample.geo_loc_name:
sample_metadata['location_name'] = analysis.sample.geo_loc_name

return {
"analysis": analysis,
"analysis_biome": biome_list,
"analysis_taxonomies": taxonomy_lists,
"analysis_go_entries": go_annotation.go_terms if go_annotation else [],
"analysis_ips_entries": ips_annotation.interpro_identifiers if ips_annotation else [],
"analysis_go_entries": [go.go_term.pk for go in go_annotation.go_terms] if go_annotation else [],
"analysis_ips_entries": [ipr.interpro_identifier.pk for ipr in ips_annotation.interpro_identifiers] if ips_annotation else [],
# .pk ensures the IPR and GO documents are not queried on mongo, which would have a big performance hit
"sample_metadata": sample_metadata,
}

Expand All @@ -168,7 +169,7 @@ def handle(self, *args, **options):

pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

analyses: QuerySet = AnalysisJob.objects.available(None)
analyses: QuerySet = AnalysisJob.objects_dump.available(None)

if not is_full_snapshot:
analyses = AnalysisJob.objects_for_indexing.to_add()
Expand All @@ -190,6 +191,10 @@ def handle(self, *args, **options):
paginated_analyses = Paginator(analyses, chunk_size)

for page in paginated_analyses:
if (mp := options["max_pages"]) >= 0:
if page.number > mp:
logger.warning("Skipping remaining pages")
break
logger.info(f"Dumping {page.number = }/{paginated_analyses.num_pages}")
additions_file = pathlib.Path(output_dir) / pathlib.Path(f'analyses_{page.number:04}.xml')
with open(additions_file, 'w') as a:
Expand All @@ -202,7 +207,6 @@ def handle(self, *args, **options):
}
)
)

nowish = timezone.now() + timedelta(minutes=1)
# Small buffer into the future so that the indexing time remains ahead of auto-now updated times.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class Migration(migrations.Migration):

dependencies = [
('emgapi', '0012_alter_publication_pub_type'),
('emgapi', '0014_suppression_reason_ancestor_suppressed'),
]

operations = [
Expand Down
22 changes: 22 additions & 0 deletions emgapi/migrations/0016_auto_20240117_1757.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 3.2.23 on 2024-01-17 17:57

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('emgapi', '0015_last_updates_on_ebi_searchable_models'),
]

operations = [
migrations.AlterModelOptions(
name='blacklistedstudy',
options={'managed': False, 'verbose_name': 'blacklisted study', 'verbose_name_plural': 'blacklisted studies'},
),
migrations.RenameField(
model_name='analysisjob',
old_name='analysis_summary_json',
new_name='analysis_summary',
),
]
8 changes: 8 additions & 0 deletions emgapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,13 @@ def available(self, request):
return self.get_queryset().available(request)


class AnalysisJobDumpManager(AnalysisJobManager):
def get_queryset(self):
return AnalysisJobQuerySet(self.model, using=self._db) \
.select_related(
'analysis_status','experiment_type', 'assembly', 'pipeline', 'run', 'sample', 'study')


class AnalysisJob(SuppressibleModel, PrivacyControlledModel, EbiSearchIndexedModel):
def __init__(self, *args, **kwargs):
super(AnalysisJob, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -1761,6 +1768,7 @@ def downloads(self):

objects = AnalysisJobManager()
objects_admin = models.Manager()
objects_dump = AnalysisJobDumpManager()

class Meta:
db_table = 'ANALYSIS_JOB'
Expand Down
12 changes: 2 additions & 10 deletions emgapi/templates/ebi_search/analysis.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@
{% endfor %}
</hierarchical_field>
{% endfor %}

{% for go_slim in analysis_go_entries %}
<field name="go_term">{{ go_slim.description | escape }}</field>
{% endfor %}

{% for ips in analysis_ips_entries %}
<field name="interpro_entry">{{ ips.description | escape }}</field>
{% endfor %}
</additional_fields>
<cross_references>
<ref dbkey="{{ analysis.study.accession }}" dbname="metagenomics_projects"/>
Expand All @@ -88,10 +80,10 @@
{% endif %}

{% for go in analysis_go_entries %}
<ref dbkey="{{ go.accession }}" dbname="go"/>
<ref dbkey="{{ go }}" dbname="go"/>
{% endfor %}
{% for ips in analysis_ips_entries %}
<ref dbkey="{{ ips.accession }}" dbname="interpro"/>
<ref dbkey="{{ ips }}" dbname="interpro"/>
{% endfor %}
</cross_references>
</entry>
22 changes: 12 additions & 10 deletions emgapianns/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class BaseAnalysisJobAnnotation(mongoengine.EmbeddedDocument):

class AnalysisJobGoTermAnnotation(BaseAnalysisJobAnnotation):

go_term = mongoengine.ReferenceField(GoTerm, required=True)
go_term = mongoengine.LazyReferenceField(GoTerm, required=True)

@property
def accession(self):
Expand All @@ -123,8 +123,10 @@ def lineage(self):

class AnalysisJobInterproIdentifierAnnotation(BaseAnalysisJobAnnotation):

interpro_identifier = mongoengine.ReferenceField(InterproIdentifier,
required=True)
interpro_identifier = mongoengine.LazyReferenceField(
InterproIdentifier,
required=True
)

@property
def accession(self):
Expand All @@ -142,7 +144,7 @@ def lineage(self):
class AnalysisJobKeggModuleAnnotation(mongoengine.EmbeddedDocument):
"""KEGG modules on a given Analysis Job.
"""
module = mongoengine.ReferenceField(KeggModule, required=True)
module = mongoengine.LazyReferenceField(KeggModule, required=True)
completeness = mongoengine.FloatField(default=0.0)
matching_kos = mongoengine.ListField(mongoengine.StringField(), default=list)
missing_kos = mongoengine.ListField(mongoengine.StringField(), default=list)
Expand All @@ -163,7 +165,7 @@ def name(self):
class AnalysisJobPfamAnnotation(BaseAnalysisJobAnnotation):
"""Pfam on a given Analysis Job.
"""
pfam_entry = mongoengine.ReferenceField(PfamEntry, required=True)
pfam_entry = mongoengine.LazyReferenceField(PfamEntry, required=True)

@property
def accession(self):
Expand All @@ -177,7 +179,7 @@ def description(self):
class AnalysisJobCOGAnnotation(BaseAnalysisJobAnnotation):
"""COG on a given Analysis Job.
"""
cog = mongoengine.ReferenceField(COG, required=True)
cog = mongoengine.LazyReferenceField(COG, required=True)

@property
def accession(self):
Expand All @@ -199,7 +201,7 @@ class AnalysisJobGenomePropAnnotation(mongoengine.EmbeddedDocument):
(PARTIAL_PRESENCE, 'Partial'),
(NO_PRESENCE, 'No'),
)
genome_property = mongoengine.ReferenceField(GenomeProperty, required=True)
genome_property = mongoengine.LazyReferenceField(GenomeProperty, required=True)
presence = mongoengine.IntField(required=True, choices=PRESENCE_CHOICES)

@property
Expand All @@ -214,7 +216,7 @@ def description(self):
class AnalysisJobKeggOrthologAnnotation(BaseAnalysisJobAnnotation):
"""KEGG KO on a given Analysis Job.
"""
ko = mongoengine.ReferenceField(KeggOrtholog, required=True)
ko = mongoengine.LazyReferenceField(KeggOrtholog, required=True)

@property
def accession(self):
Expand All @@ -228,7 +230,7 @@ def description(self):
class AnalysisJobAntiSmashGCAnnotation(BaseAnalysisJobAnnotation):
"""antiSMASH gene cluster on a given Analysis Job
"""
gene_cluster = mongoengine.ReferenceField(AntiSmashGeneCluster, required=True)
gene_cluster = mongoengine.LazyReferenceField(AntiSmashGeneCluster, required=True)

@property
def accession(self):
Expand Down Expand Up @@ -333,7 +335,7 @@ class Organism(mongoengine.Document):
class AnalysisJobOrganism(mongoengine.EmbeddedDocument):

count = mongoengine.IntField(required=True)
organism = mongoengine.ReferenceField(Organism)
organism = mongoengine.LazyReferenceField(Organism)

@property
def lineage(self):
Expand Down
25 changes: 25 additions & 0 deletions emgapianns/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
from pymongo import monitoring


class MongoCommandLogger(monitoring.CommandListener):

def started(self, event):
logging.debug(
f"Command {event.command}"
)
logging.info(
f"Command {event.command_name} with request id {event.request_id} started on server {event.connection_id}"
)

def succeeded(self, event):
logging.info(
f"Command {event.command_name} with request id {event.request_id} on server {event.connection_id} "
f"succeeded in {event.duration_micros} microseconds"
)

def failed(self, event):
logging.info(
f"Command {event.command_name} with request id {event.request_id} on server {event.connection_id} "
f"failed in {event.duration_micros} microseconds"
)
2 changes: 1 addition & 1 deletion emgcli/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__: str = "2.4.41"
__version__: str = "2.4.43"
15 changes: 12 additions & 3 deletions emgcli/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from os.path import expanduser

from corsheaders.defaults import default_headers
from pymongo import monitoring

from emgapianns.utils import MongoCommandLogger

try:
from YamJam import yamjam, YAMLError
Expand All @@ -45,7 +48,6 @@

logger = logging.getLogger(__name__)


# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
EMG_CONFIG = os.environ.get(
Expand Down Expand Up @@ -161,8 +163,15 @@
}
}

# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/
if EMG_CONF.get('emg', {}).get('log_db_queries', False):
monitoring.register(MongoCommandLogger())
LOGGING['loggers']['django.db.backends'] = {
'handlers': ['default', 'console'],
'level': 'DEBUG',
'propagate': False,
}
LOGGING['loggers']['']['level'] = 'DEBUG'


def create_secret_key(var_dir):
secret_key = None
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ max-line-length = 119
"""

[tool.bumpversion]
current_version = "2.4.41"
current_version = "2.4.43"

[[tool.bumpversion.files]]
filename = "emgcli/__init__.py"

0 comments on commit 67edbcd

Please sign in to comment.