From e4525a229319376f0a531aa106b7260ef9d20296 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Tue, 12 Nov 2024 14:49:23 +0100 Subject: [PATCH 01/41] add indexing app --- backend/ianalyzer/common_settings.py | 1 + backend/indexing/__init__.py | 0 backend/indexing/admin.py | 3 +++ backend/indexing/apps.py | 6 ++++++ backend/indexing/migrations/__init__.py | 0 backend/indexing/models.py | 3 +++ 6 files changed, 13 insertions(+) create mode 100644 backend/indexing/__init__.py create mode 100644 backend/indexing/admin.py create mode 100644 backend/indexing/apps.py create mode 100644 backend/indexing/migrations/__init__.py create mode 100644 backend/indexing/models.py diff --git a/backend/ianalyzer/common_settings.py b/backend/ianalyzer/common_settings.py index 78476ac1e..84acf7cdd 100644 --- a/backend/ianalyzer/common_settings.py +++ b/backend/ianalyzer/common_settings.py @@ -32,6 +32,7 @@ 'api', 'es', 'corpora', + 'indexing', 'visualization', 'download', 'wordmodels', diff --git a/backend/indexing/__init__.py b/backend/indexing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/indexing/admin.py b/backend/indexing/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/backend/indexing/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/backend/indexing/apps.py b/backend/indexing/apps.py new file mode 100644 index 000000000..0182a620b --- /dev/null +++ b/backend/indexing/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class IndexingConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'indexing' diff --git a/backend/indexing/migrations/__init__.py b/backend/indexing/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/indexing/models.py b/backend/indexing/models.py new file mode 100644 index 000000000..71a836239 --- /dev/null +++ b/backend/indexing/models.py @@ -0,0 +1,3 @@ +from django.db import models + +# Create your models here. From 62155a8007fc6228dfa9c84e4fedeb610987b3c5 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Tue, 12 Nov 2024 15:10:55 +0100 Subject: [PATCH 02/41] draft IndexJob and IndexTask models --- backend/indexing/migrations/0001_initial.py | 98 +++++++++++++++ .../migrations/0002_updatesettingstask.py | 27 +++++ backend/indexing/models.py | 113 +++++++++++++++++- 3 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 backend/indexing/migrations/0001_initial.py create mode 100644 backend/indexing/migrations/0002_updatesettingstask.py diff --git a/backend/indexing/migrations/0001_initial.py b/backend/indexing/migrations/0001_initial.py new file mode 100644 index 000000000..d902954fc --- /dev/null +++ b/backend/indexing/migrations/0001_initial.py @@ -0,0 +1,98 @@ +# Generated by Django 4.2.14 on 2024-11-12 14:10 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('addcorpus', '0023_alter_corpusdocumentationpage_type_alter_field_name'), + ('es', '0002_alter_index_available'), + ] + + operations = [ + migrations.CreateModel( + name='IndexJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created', models.DateTimeField(auto_now_add=True)), + ('corpus', models.ForeignKey(help_text='corpus for which the job is created; task may use the corpus to determine metadata or extract documents', on_delete=django.db.models.deletion.CASCADE, to='addcorpus.corpus')), + ], + ), + migrations.CreateModel( + name='UpdateIndexTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('document_min_date', models.DateField(blank=True, help_text='minimum date on which to filter documents', null=True)), + ('document_max_date', models.DateField(blank=True, help_text='maximum date on which to filter documents', null=True)), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RemoveAliasTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('alias', models.CharField(help_text='alias to remove', max_length=128)), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='PopulateIndexTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('document_min_date', models.DateField(blank=True, help_text='minimum date on which to filter documents', null=True)), + ('document_max_date', models.DateField(blank=True, help_text='maximum date on which to filter documents', null=True)), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='DeleteIndexTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='CreateIndexTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('delete_existing', models.BooleanField(default=False, help_text='if an index by this name already exists, delete it, instead of raising an exception')), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='AddAliasTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('alias', models.CharField(help_text='alias to assign', max_length=128)), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/backend/indexing/migrations/0002_updatesettingstask.py b/backend/indexing/migrations/0002_updatesettingstask.py new file mode 100644 index 000000000..3289d0aa2 --- /dev/null +++ b/backend/indexing/migrations/0002_updatesettingstask.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.14 on 2024-11-13 12:23 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('es', '0002_alter_index_available'), + ('indexing', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='UpdateSettingsTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('settings', models.JSONField(blank=True, default=dict)), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/backend/indexing/models.py b/backend/indexing/models.py index 71a836239..bd5dbb4c9 100644 --- a/backend/indexing/models.py +++ b/backend/indexing/models.py @@ -1,3 +1,114 @@ from django.db import models -# Create your models here. +from addcorpus.models import Corpus +from es.models import Index + +class IndexJob(models.Model): + corpus = models.ForeignKey( + to=Corpus, + on_delete=models.CASCADE, + help_text='corpus for which the job is created; task may use the corpus ' + 'to determine metadata or extract documents', + ) + created = models.DateTimeField( + auto_now_add=True, + ) + +class IndexTask(models.Model): + class Meta: + abstract = True + + job = models.ForeignKey( + to=IndexJob, + on_delete=models.CASCADE, + related_name='%(class)ss', + help_text='job in which this task is run', + ) + index = models.ForeignKey( + to=Index, + on_delete=models.CASCADE, + related_name='%(class)ss', + help_text='index on which this task is applied', + ) + + +class CreateIndexTask(IndexTask): + ''' + Create a new index based on corpus settings. + ''' + + delete_existing = models.BooleanField( + default=False, + help_text='if an index by this name already exists, delete it, instead of ' + 'raising an exception' + ) + +class PopulateIndexTask(IndexTask): + ''' + Extract documents from a corpus and add them to the index. + ''' + + document_min_date = models.DateField( + blank=True, + null=True, + help_text='minimum date on which to filter documents' + ) + document_max_date = models.DateField( + blank=True, + null=True, + help_text='maximum date on which to filter documents' + ) + + +class UpdateIndexTask(IndexTask): + ''' + Run an update script; usually to add/change field values in existing documents. + + Only available for corpora with a Python definition (which must also include a method + that defines such a script). + ''' + + document_min_date = models.DateField( + blank=True, + null=True, + help_text='minimum date on which to filter documents' + ) + document_max_date = models.DateField( + blank=True, + null=True, + help_text='maximum date on which to filter documents' + ) + + +class UpdateSettingsTask(IndexTask): + ''' + Push new settings to an index + ''' + + settings = models.JSONField( + blank=True, + default=dict, + ) + + def __str__(self): + return f'update settings of {self.index}' + + +class RemoveAliasTask(IndexTask): + alias = models.CharField( + max_length=128, + help_text='alias to remove' + ) + + +class AddAliasTask(IndexTask): + alias = models.CharField( + max_length=128, + help_text='alias to assign' + ) + + +class DeleteIndexTask(IndexTask): + ''' + Delete an index. + ''' From bceec2055e4483e7dd22a461b6e11acb681d8791 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Tue, 12 Nov 2024 15:21:52 +0100 Subject: [PATCH 03/41] add utility methods remove tasks method --- backend/indexing/models.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/backend/indexing/models.py b/backend/indexing/models.py index bd5dbb4c9..8b637e48a 100644 --- a/backend/indexing/models.py +++ b/backend/indexing/models.py @@ -1,4 +1,5 @@ from django.db import models +from itertools import chain from addcorpus.models import Corpus from es.models import Index @@ -14,6 +15,11 @@ class IndexJob(models.Model): auto_now_add=True, ) + + def __str__(self): + return f'{self.corpus} ({self.created})' + + class IndexTask(models.Model): class Meta: abstract = True @@ -31,6 +37,10 @@ class Meta: help_text='index on which this task is applied', ) + @property + def corpus(self) -> Corpus: + return self.job.corpus + class CreateIndexTask(IndexTask): ''' @@ -43,6 +53,9 @@ class CreateIndexTask(IndexTask): 'raising an exception' ) + def __str__(self): + return f'create {self.index} based on {self.corpus}' + class PopulateIndexTask(IndexTask): ''' Extract documents from a corpus and add them to the index. @@ -59,6 +72,9 @@ class PopulateIndexTask(IndexTask): help_text='maximum date on which to filter documents' ) + def __str__(self): + return f'populate {self.index} based on {self.corpus}' + class UpdateIndexTask(IndexTask): ''' @@ -79,6 +95,10 @@ class UpdateIndexTask(IndexTask): help_text='maximum date on which to filter documents' ) + def __str__(self): + return f'update {self.index} based on {self.corpus}' + + class UpdateSettingsTask(IndexTask): ''' @@ -100,6 +120,9 @@ class RemoveAliasTask(IndexTask): help_text='alias to remove' ) + def __str__(self): + return f'remove alias {self.alias} from {self.index}' + class AddAliasTask(IndexTask): alias = models.CharField( @@ -107,8 +130,14 @@ class AddAliasTask(IndexTask): help_text='alias to assign' ) + def __str__(self): + return f'add alias {self.alias} to {self.index}' + class DeleteIndexTask(IndexTask): ''' Delete an index. ''' + + def __str__(self): + return f'delete {self.index}' From 1718402e3040ffffd1391fc34326b04d6bf305a9 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Tue, 12 Nov 2024 15:31:08 +0100 Subject: [PATCH 04/41] add admin interface --- backend/indexing/admin.py | 54 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/backend/indexing/admin.py b/backend/indexing/admin.py index 8c38f3f3d..a606477bc 100644 --- a/backend/indexing/admin.py +++ b/backend/indexing/admin.py @@ -1,3 +1,55 @@ from django.contrib import admin -# Register your models here. +from indexing import models + + +class CreateIndexAdmin(admin.StackedInline): + model = models.CreateIndexTask + extra = 0 + + +class PopulateIndexAdmin(admin.StackedInline): + model = models.PopulateIndexTask + extra = 0 + + +class UpdateIndexAdmin(admin.StackedInline): + model = models.UpdateIndexTask + extra = 0 + + +class UpdateSettingsAdmin(admin.StackedInline): + model = models.UpdateSettingsTask + extra = 0 + + +class RemoveAliasAdmin(admin.StackedInline): + model = models.RemoveAliasTask + extra = 0 + + +class AddAliasAdmin(admin.StackedInline): + model = models.AddAliasTask + extra = 0 + + +class DeleteIndexAdmin(admin.StackedInline): + model = models.DeleteIndexTask + extra = 0 + + + +class IndexJobAdmin(admin.ModelAdmin): + list_display = ['created', 'corpus'] + list_filter = ['corpus'] + inlines = [ + CreateIndexAdmin, + PopulateIndexAdmin, + UpdateIndexAdmin, + UpdateSettingsAdmin, + RemoveAliasAdmin, + AddAliasAdmin, + DeleteIndexAdmin, + ] + +admin.site.register(models.IndexJob, IndexJobAdmin) From de10047dd724da10d4b3d790423503b21ff9c2f9 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Tue, 12 Nov 2024 16:04:04 +0100 Subject: [PATCH 05/41] draft create_job function --- backend/es/es_index.py | 91 +++++++++++++++++++++++++++++- backend/ianalyzer/elasticsearch.py | 5 +- 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index efe20ff17..64d27c4be 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -17,9 +17,15 @@ from addcorpus.models import Corpus, CorpusConfiguration from addcorpus.python_corpora.load_corpus import load_corpus_definition from addcorpus.reader import make_reader -from ianalyzer.elasticsearch import elasticsearch +from ianalyzer.elasticsearch import elasticsearch, server_for_corpus from .es_alias import alias, get_current_index_name, get_new_version_number import datetime +from indexing.models import ( + IndexJob, CreateIndexTask, PopulateIndexTask, UpdateIndexTask, + RemoveAliasTask, AddAliasTask, UpdateSettingsTask +) +from es.sync import update_server_table_from_settings +from es.models import Server, Index import logging logger = logging.getLogger('indexing') @@ -157,6 +163,89 @@ def populate( logger.error(f"FAILED INDEX: {info}") +def create_job( + corpus: Corpus, + start: Optional[datetime.date] = None, + end: Optional[datetime.date] = None, + mappings_only: bool = False, + add: bool = False, + clear: bool = False, + prod: bool = False, + rollover: bool = False, + update: bool = False, +) -> IndexJob: + job = IndexJob.objects.create(corpus=corpus) + + update_server_table_from_settings() + server_name = server_for_corpus(corpus.name) + server = Server.objects.get(name=server_name) + client = elasticsearch(corpus.name) + base_name = corpus.configuration.es_index + + if prod: + alias = corpus.configuration.es_alias or corpus.configuration.es_index + if add or update: + versioned_name = get_current_index_name(corpus.configuration, client) + else: + next_version = get_new_version_number(client, alias, base_name) + versioned_name = f'{base_name}-{next_version}' + + index, _ = Index.objects.get_or_create( + server=server, name=versioned_name + ) + + UpdateSettingsTask.objects.create( + job=job, + index=index, + settings={"number_of_replicas": 1}, + ) + + if rollover: + for index_name in client.indices.get_alias(name=alias): + aliased_index, _ = Index.objects.get_or_create( + server=server, + name=index_name, + ) + RemoveAliasTask.objects.create( + job=job, + index=aliased_index, + alias=alias, + ) + AddAliasTask.objects.create( + job=job, + index=index, + alias=alias, + ) + else: + index, _ = Index.objects.get_or_create( + server=server, name=base_name + ) + + if not add or update: + CreateIndexTask.objects.create( + job=job, + index=index, + delete_existing=clear, + ) + + if not mappings_only or update: + PopulateIndexTask.objects.create( + job=job, + index=index, + document_min_date=start, + document_max_date=end, + ) + + if update: + UpdateIndexTask.objects.create( + job=job, + index=index, + document_min_date=start, + document_max_date=end, + ) + + return job + def perform_indexing( corpus: Corpus, start: Optional[datetime.date] = None, diff --git a/backend/ianalyzer/elasticsearch.py b/backend/ianalyzer/elasticsearch.py index ae404ddb3..e7b0e14f4 100644 --- a/backend/ianalyzer/elasticsearch.py +++ b/backend/ianalyzer/elasticsearch.py @@ -9,11 +9,14 @@ def elasticsearch(corpus_name): If multiple Elasticsearch servers are configured in the project, the server is selected based on the CORPUS_SERVER_NAMES setting. ''' - server_name = settings.CORPUS_SERVER_NAMES.get(corpus_name, 'default') + server_name = server_for_corpus(corpus_name) server_config = settings.SERVERS[server_name] return client_from_config(server_config) +def server_for_corpus(corpus_name): + return settings.CORPUS_SERVER_NAMES.get(corpus_name, 'default') + def client_from_config(server_config): ''' From ab7a69d172582d5ec3b0d86c85eaaf822d7dcd1f Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 13:29:16 +0100 Subject: [PATCH 06/41] create job in index command --- backend/es/es_index.py | 2 +- backend/es/management/commands/index.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 64d27c4be..452ed1a5e 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -163,7 +163,7 @@ def populate( logger.error(f"FAILED INDEX: {info}") -def create_job( +def create_indexing_job( corpus: Corpus, start: Optional[datetime.date] = None, end: Optional[datetime.date] = None, diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 3644bfb05..d39558397 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -5,7 +5,7 @@ from addcorpus.python_corpora.load_corpus import load_corpus_definition from addcorpus.python_corpora.save_corpus import load_all_corpus_definitions from addcorpus.models import Corpus -from es.es_index import perform_indexing +from es.es_index import perform_indexing, create_indexing_job from es.es_update import update_index, update_by_query class Command(BaseCommand): @@ -96,6 +96,11 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F ) raise + job = create_indexing_job( + corpus_object, start_index, end_index, mappings_only, add, delete, prod, + rollover, update + ) + if update: try: if corpus_definition.update_body(): From 274df24fed134896518bf177cedf4899ecb01830 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 13:46:28 +0100 Subject: [PATCH 07/41] create job in alias command --- backend/es/es_alias.py | 34 ++++++++++++++++++++++++- backend/es/management/commands/alias.py | 4 +-- backend/ianalyzer/elasticsearch.py | 2 +- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index a8ffd94e6..7e0ec8c81 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -2,11 +2,43 @@ import re from addcorpus.models import Corpus, CorpusConfiguration -from ianalyzer.elasticsearch import elasticsearch +from ianalyzer.elasticsearch import elasticsearch, server_for_corpus +from es.models import Server, Index +from indexing.models import IndexJob, DeleteIndexTask, RemoveAliasTask, AddAliasTask import logging logger = logging.getLogger('indexing') +def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: + job = IndexJob.objects.create(corpus=corpus) + + corpus_config = corpus.configuration + corpus_name = corpus.name + server = Server.objects.get(name=server_for_corpus(corpus_name)) + index_name = corpus_config.es_index + index_alias = corpus_config.es_alias + client = elasticsearch(corpus_name) + + alias = index_alias if index_alias else index_name + indices = client.indices.get(index='{}-*'.format(index_name)) + highest_version = get_highest_version_number(indices, alias) + + for index_name, properties in indices.items(): + is_aliased = alias in properties['aliases'].keys() + is_highest_version = extract_version(index_name, alias) == highest_version + index, _ = Index.objects.get_or_create(server=server, name=index_name) + + if not is_highest_version and clean: + DeleteIndexTask.objects.create(job=job, index=index) + + if not is_highest_version and is_aliased and not clean: + RemoveAliasTask.objects.create(job=job, index=index, alias=alias) + + if is_highest_version and not is_aliased: + AddAliasTask.objects.create(job=job, index=index, alias=alias) + + return job + def alias(corpus: Corpus, clean=False): ''' diff --git a/backend/es/management/commands/alias.py b/backend/es/management/commands/alias.py index df6659a7b..e36a24359 100644 --- a/backend/es/management/commands/alias.py +++ b/backend/es/management/commands/alias.py @@ -1,8 +1,7 @@ from django.core.management import BaseCommand from addcorpus.models import Corpus -from addcorpus.python_corpora.load_corpus import load_corpus_definition -from es.es_alias import alias +from es.es_alias import alias, create_alias_job class Command(BaseCommand): help = ''' @@ -27,4 +26,5 @@ def add_arguments(self, parser): def handle(self, corpus, clean=False, **options): corpus_obj = Corpus.objects.get(name=corpus) + create_alias_job(corpus_obj, clean) alias(corpus_obj, clean) diff --git a/backend/ianalyzer/elasticsearch.py b/backend/ianalyzer/elasticsearch.py index e7b0e14f4..df5dd6f53 100644 --- a/backend/ianalyzer/elasticsearch.py +++ b/backend/ianalyzer/elasticsearch.py @@ -14,7 +14,7 @@ def elasticsearch(corpus_name): return client_from_config(server_config) -def server_for_corpus(corpus_name): +def server_for_corpus(corpus_name) -> str: return settings.CORPUS_SERVER_NAMES.get(corpus_name, 'default') From 0a6c58943ee8b1a76224f57d6c9f9b929693f8e4 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 13:58:57 +0100 Subject: [PATCH 08/41] add @transaction.atomic decorator --- backend/es/es_alias.py | 2 ++ backend/es/es_index.py | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index 7e0ec8c81..b24063c0c 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import re +from django.db import transaction from addcorpus.models import Corpus, CorpusConfiguration from ianalyzer.elasticsearch import elasticsearch, server_for_corpus @@ -9,6 +10,7 @@ import logging logger = logging.getLogger('indexing') +@transaction.atomic def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: job = IndexJob.objects.create(corpus=corpus) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 452ed1a5e..f4779a212 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -12,6 +12,7 @@ from elasticsearch.exceptions import RequestError from django.conf import settings +from django.db import transaction from addcorpus.es_settings import es_settings from addcorpus.models import Corpus, CorpusConfiguration @@ -163,6 +164,7 @@ def populate( logger.error(f"FAILED INDEX: {info}") +@transaction.atomic def create_indexing_job( corpus: Corpus, start: Optional[datetime.date] = None, @@ -185,7 +187,9 @@ def create_indexing_job( if prod: alias = corpus.configuration.es_alias or corpus.configuration.es_index if add or update: - versioned_name = get_current_index_name(corpus.configuration, client) + versioned_name = get_current_index_name( + corpus.configuration, client + ) else: next_version = get_new_version_number(client, alias, base_name) versioned_name = f'{base_name}-{next_version}' @@ -246,6 +250,8 @@ def create_indexing_job( return job + + def perform_indexing( corpus: Corpus, start: Optional[datetime.date] = None, From 1681b1ebeef88ce7d153c5d83397ee6b642045c3 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 14:15:56 +0100 Subject: [PATCH 09/41] add prod field to CreateIndexTask --- backend/es/es_index.py | 1 + ...0003_createindextask_production_settings.py | 18 ++++++++++++++++++ backend/indexing/models.py | 9 +++++++++ 3 files changed, 28 insertions(+) create mode 100644 backend/indexing/migrations/0003_createindextask_production_settings.py diff --git a/backend/es/es_index.py b/backend/es/es_index.py index f4779a212..704889a39 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -229,6 +229,7 @@ def create_indexing_job( CreateIndexTask.objects.create( job=job, index=index, + production_settings=prod, delete_existing=clear, ) diff --git a/backend/indexing/migrations/0003_createindextask_production_settings.py b/backend/indexing/migrations/0003_createindextask_production_settings.py new file mode 100644 index 000000000..90586323f --- /dev/null +++ b/backend/indexing/migrations/0003_createindextask_production_settings.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.14 on 2024-11-13 13:15 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('indexing', '0002_updatesettingstask'), + ] + + operations = [ + migrations.AddField( + model_name='createindextask', + name='production_settings', + field=models.BooleanField(default=False, help_text='configure index settings for a production environment'), + ), + ] diff --git a/backend/indexing/models.py b/backend/indexing/models.py index 8b637e48a..0cbdb3e4a 100644 --- a/backend/indexing/models.py +++ b/backend/indexing/models.py @@ -1,6 +1,8 @@ from django.db import models from itertools import chain +from elasticsearch import Elasticsearch +from ianalyzer.elasticsearch import elasticsearch from addcorpus.models import Corpus from es.models import Index @@ -41,12 +43,19 @@ class Meta: def corpus(self) -> Corpus: return self.job.corpus + def client(self) -> Elasticsearch: + return elasticsearch(self.corpus.name) + class CreateIndexTask(IndexTask): ''' Create a new index based on corpus settings. ''' + production_settings = models.BooleanField( + default=False, + help_text='configure index settings for a production environment', + ) delete_existing = models.BooleanField( default=False, help_text='if an index by this name already exists, delete it, instead of ' From 83eb892ccc3a6689f7a72a8a7ff898c5964bad79 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 14:31:56 +0100 Subject: [PATCH 10/41] add check if alias exists --- backend/es/es_index.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 704889a39..9e513fc6a 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -205,16 +205,17 @@ def create_indexing_job( ) if rollover: - for index_name in client.indices.get_alias(name=alias): - aliased_index, _ = Index.objects.get_or_create( - server=server, - name=index_name, - ) - RemoveAliasTask.objects.create( - job=job, - index=aliased_index, - alias=alias, - ) + if client.indices.exists_alias(name=alias): + for index_name in client.indices.get_alias(name=alias): + aliased_index, _ = Index.objects.get_or_create( + server=server, + name=index_name, + ) + RemoveAliasTask.objects.create( + job=job, + index=aliased_index, + alias=alias, + ) AddAliasTask.objects.create( job=job, index=index, @@ -254,6 +255,7 @@ def create_indexing_job( def perform_indexing( + job: IndexJob, corpus: Corpus, start: Optional[datetime.date] = None, end: Optional[datetime.date] = None, From 20ba659459629b869f263df35eec3122172ad555 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 14:32:35 +0100 Subject: [PATCH 11/41] add job as argument to perform_indexing --- backend/conftest.py | 3 +- backend/es/management/commands/index.py | 2 +- backend/es/tests/test_es_index.py | 43 +++++++++++++++++++++---- backend/es/tests/test_sync.py | 5 +-- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/backend/conftest.py b/backend/conftest.py index c9124964a..6c9c81683 100644 --- a/backend/conftest.py +++ b/backend/conftest.py @@ -155,7 +155,8 @@ def _index_test_corpus(es_client: Elasticsearch, corpus_name: str): corpus = Corpus.objects.get(name=corpus_name) if not es_client.indices.exists(index=corpus.configuration.es_index): - index.perform_indexing(corpus) + job = index.create_indexing_job(corpus) + index.perform_indexing(job, corpus) # ES is "near real time", so give it a second before we start searching the index sleep(2) diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index d39558397..06f09283e 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -122,7 +122,7 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F logging.critical(e) raise else: - perform_indexing(corpus_object, start_index, end_index, + perform_indexing(job, corpus_object, start_index, end_index, mappings_only, add, delete, prod, rollover) def _corpus_object(self, corpus_name): diff --git a/backend/es/tests/test_es_index.py b/backend/es/tests/test_es_index.py index 8ad14ddea..61860b3c1 100644 --- a/backend/es/tests/test_es_index.py +++ b/backend/es/tests/test_es_index.py @@ -3,7 +3,7 @@ from time import sleep from addcorpus.models import Corpus -from es.es_index import perform_indexing +from es.es_index import perform_indexing, create_indexing_job START = datetime.strptime('1970-01-01', '%Y-%m-%d') END = datetime.strptime('1970-12-31', '%Y-%m-%d') @@ -16,8 +16,12 @@ def mock_client(es_index_client): @pytest.mark.parametrize("prod, name, shards", [(True, "times-test-1", '5'), (False, "times-test", '1')]) def test_prod_flag(mock_corpus, es_index_client, corpus_definition, prod, name, shards): corpus = Corpus.objects.get(name=mock_corpus) - perform_indexing( + job = create_indexing_job( corpus, START, END, + mappings_only=True, add=False, clear=False, prod=prod, rollover=False, + ) + perform_indexing( + job, corpus, START, END, mappings_only=True, add=False, clear=False, prod=prod, rollover=False) assert es_index_client.indices.exists(index=name) @@ -28,8 +32,12 @@ def test_prod_flag(mock_corpus, es_index_client, corpus_definition, prod, name, @pytest.mark.parametrize("mappings_only, expected", [(False, 2), (True, 0)]) def test_mappings_only_flag(mock_corpus, es_index_client, corpus_definition, mappings_only, expected): corpus = Corpus.objects.get(name=mock_corpus) - perform_indexing( + job = create_indexing_job( corpus, START, END, + mappings_only=mappings_only, add=False, clear=False, prod=False, rollover=False, + ) + perform_indexing( + job, corpus, START, END, mappings_only=mappings_only, add=False, clear=False, prod=False, rollover=False) sleep(1) res = es_index_client.count(index='times-test*') @@ -38,21 +46,33 @@ def test_mappings_only_flag(mock_corpus, es_index_client, corpus_definition, map def test_add_clear(db, mock_corpus, es_index_client): corpus = Corpus.objects.get(name=mock_corpus) - perform_indexing( + job = create_indexing_job( corpus, START, END, + mappings_only=True, add=False, clear=False, prod=False, rollover=False, + ) + perform_indexing( + job, corpus, START, END, mappings_only=True, add=False, clear=False, prod=False, rollover=False ) res = es_index_client.count(index='times-test*') assert res.get('count') == 0 - perform_indexing( + job = create_indexing_job( corpus, START, END, + mappings_only=False, add=True, clear=False, prod=False, rollover=False, + ) + perform_indexing( + job, corpus, START, END, mappings_only=False, add=True, clear=False, prod=False, rollover=False ) sleep(1) res = es_index_client.count(index='times-test*') assert res.get('count') == 2 - perform_indexing( + job = create_indexing_job( corpus, START, END, + mappings_only=True, add=False, clear=True, prod=False, rollover=False, + ) + perform_indexing( + job, corpus, START, END, mappings_only=True, add=False, clear=True, prod=False, rollover=False ) res = es_index_client.count(index='times-test*') @@ -70,7 +90,18 @@ def test_db_only_corpus(json_mock_corpus, es_client, index_json_mock_corpus): def test_indexing_with_version(mock_corpus, corpus_definition, es_index_client): corpus = Corpus.objects.get(name=mock_corpus) + job = create_indexing_job( + corpus, + START, + END, + mappings_only=False, + add=False, + clear=False, + prod=True, + rollover=True, + ) perform_indexing( + job, corpus, START, END, diff --git a/backend/es/tests/test_sync.py b/backend/es/tests/test_sync.py index 4d870e293..09fb84d2a 100644 --- a/backend/es/tests/test_sync.py +++ b/backend/es/tests/test_sync.py @@ -1,7 +1,7 @@ from time import sleep from elasticsearch import Elasticsearch -from es.es_index import perform_indexing +from es.es_index import perform_indexing, create_indexing_job from es.sync import ( update_server_table_from_settings, fetch_index_metadata, update_availability ) @@ -42,7 +42,8 @@ def test_fetch_index_data(db, es_client, basic_mock_corpus, index_basic_mock_cor assert not index.available # restore index - perform_indexing(corpus) + job = create_indexing_job(corpus) + perform_indexing(job, corpus) sleep(1) fetch_index_metadata() From 8d63e9a8d21830563dc13d35696befba6c0b524c Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 14:58:34 +0100 Subject: [PATCH 12/41] use createindextask input in create func --- backend/conftest.py | 10 ++++- backend/es/conftest.py | 20 +++++++-- backend/es/es_index.py | 93 ++++++++++++++++++------------------------ 3 files changed, 66 insertions(+), 57 deletions(-) diff --git a/backend/conftest.py b/backend/conftest.py index 6c9c81683..2f8edcf79 100644 --- a/backend/conftest.py +++ b/backend/conftest.py @@ -8,11 +8,12 @@ from ianalyzer.elasticsearch import client_from_config from addcorpus.python_corpora.save_corpus import load_and_save_all_corpora -from es import es_index as index +from es import es_index as index, sync from django.conf import settings from django.contrib.auth.models import Group from addcorpus.models import Corpus from addcorpus.serializers import CorpusJSONDefinitionSerializer +from es.models import Server @pytest.fixture(autouse=True) def media_dir(tmpdir, settings): @@ -111,6 +112,13 @@ def es_client(): return client + +@pytest.fixture() +def es_server(db, settings) -> Server: + sync.update_server_table_from_settings() + return Server.objects.get(name='default') + + @pytest.fixture() def basic_mock_corpus() -> str: return 'mock-csv-corpus' diff --git a/backend/es/conftest.py b/backend/es/conftest.py index 8f60cd588..46124c94f 100644 --- a/backend/es/conftest.py +++ b/backend/es/conftest.py @@ -7,7 +7,8 @@ from addcorpus.python_corpora.load_corpus import load_corpus_definition from addcorpus.models import Corpus from es import es_index - +from es.models import Index +from indexing.models import IndexJob, CreateIndexTask @pytest.fixture(scope='session') def mock_corpus(): @@ -58,14 +59,27 @@ def es_index_client(es_client, mock_corpus): es_client.indices.delete(index=index) @pytest.fixture() -def es_alias_client(es_client, mock_corpus): +def es_alias_client(db, es_server, es_client, mock_corpus): """ Create multiple indices with version numbers for the mock corpus in elasticsearch. Returns an elastic search client for the mock corpus. """ # add data from mock corpus corpus = Corpus.objects.get(name=mock_corpus) - es_index.create(es_client, corpus, add=False, clear=True, prod=True) # create ianalyzer-times-1 index + index = Index.objects.create( + name='times-test-1', + server=es_server, + ) + job = IndexJob.objects.create( + corpus=corpus, + ) + create_task = CreateIndexTask.objects.create( + job=job, + index=index, + delete_existing=True, + production_settings=True, + ) + es_index.create(create_task) es_client.indices.create(index='times-test-2') es_client.indices.create(index='times-test-bla-3') diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 9e513fc6a..1dd72e5e4 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -57,61 +57,42 @@ def _make_es_mapping(corpus_configuration: CorpusConfiguration) -> Dict: } -def create( - client: Elasticsearch, - corpus: Corpus, - add: bool = False, - clear: bool = False, - prod: bool = False, -) -> str: - ''' - Initialise an ElasticSearch index. - ''' - corpus_config = corpus.configuration - index_name = corpus_config.es_index - es_mapping = _make_es_mapping(corpus_config) +def create(task: CreateIndexTask): + client = task.client() - if add: - # we add document to existing index - skip creation, return current index - return get_current_index_name(corpus_config, client) - - if clear: - logger.info('Attempting to clean old index...') - client.indices.delete( - index=index_name, ignore=[400, 404]) + corpus_config = task.corpus.configuration + index_name = task.index.name + es_mapping = _make_es_mapping(corpus_config) - settings = _make_es_settings(corpus) + if client.indices.exists(index=index_name): + if task.delete_existing: + logger.info('Attempting to clean old index...') + client.indices.delete( + index=index_name, ignore=[400, 404]) + else: + logger.error( + 'Index `{}` already exists. Do you need to add an alias for it or ' + 'perhaps delete it?'.format(index_name) + ) + raise Exception('index already exists') - if prod: - logger.info('Using a versioned index name') - alias = corpus_config.es_alias if corpus_config.es_alias else index_name - index_name = "{}-{}".format( - index_name, get_new_version_number(client, alias, index_name)) - if client.indices.exists(index=index_name): - logger.error('Index `{}` already exists. Do you need to add an alias for it or perhaps delete it?'.format( - index_name)) - sys.exit(1) + settings = _make_es_settings(task.corpus) + if task.production_settings: logger.info('Adding prod settings to index') settings['index'].update({ 'number_of_replicas': 0, 'number_of_shards': 5 }) - logger.info('Attempting to create index `{}`...'.format( - index_name)) - try: - client.indices.create( - index=index_name, - settings=settings, - mappings=es_mapping, - ) - return index_name - except RequestError as e: - if 'already_exists' not in e.error: - # ignore that the index already exist, - # raise any other errors. - raise + logger.info('Attempting to create index `{}`...'.format(index_name)) + + client.indices.create( + index=task.index.name, + settings=settings, + mappings=es_mapping, + ) + return index_name def populate( @@ -287,23 +268,29 @@ def perform_indexing( logger.info('retry on timeout: {}'.format( vars(client).get('_retry_on_timeout')) ) - versioned_index_name = create(client, corpus, add, clear, prod) + + for task in job.createindextasks.all(): + create(task) + client.cluster.health(wait_for_status='yellow') if mappings_only: logger.info('Created index `{}` with mappings only.'.format(index_name)) return - populate(client, corpus, versioned_index_name, start=start, end=end) + for task in job.populateindextasks.all(): + populate(client, corpus, task.index.name, start=start, end=end) logger.info('Finished indexing `{}` to index `{}`.'.format( corpus_name, index_name)) - if prod: - logger.info("Updating settings for index `{}`".format(versioned_index_name)) + for task in job.updatesettingstasks.all(): + logger.info("Updating settings for index `{}`".format(task.index.name)) client.indices.put_settings( - settings={"number_of_replicas": 1}, index=versioned_index_name + settings={"number_of_replicas": 1}, index=task.index.name ) - if rollover: - logger.info("Adjusting alias for index `{}`".format(versioned_index_name)) - alias(corpus) # not deleting old index, so we can roll back + + if job.addaliastasks.exists(): + versioned_index_name = job.addaliastasks.first().index.name + logger.info("Adjusting alias for index `{}`".format(versioned_index_name)) + alias(corpus) From e6fb5cbe2d0fb3aaa2183dbe91fff45641deed22 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 16:46:47 +0100 Subject: [PATCH 13/41] allow date inpunt in consolidate_start_end_years --- backend/addcorpus/python_corpora/corpus.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/backend/addcorpus/python_corpora/corpus.py b/backend/addcorpus/python_corpora/corpus.py index 3b0e2594a..b6b42efd1 100644 --- a/backend/addcorpus/python_corpora/corpus.py +++ b/backend/addcorpus/python_corpora/corpus.py @@ -2,9 +2,9 @@ Module contains the base classes from which corpora can derive; ''' -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Union from ianalyzer_readers import extract -from datetime import datetime +from datetime import datetime, date from os.path import isdir import os @@ -495,7 +495,12 @@ def f(metadata): return f -def consolidate_start_end_years(start, end, min_date, max_date): +def consolidate_start_end_years( + start: Union[datetime, date, int], + end: Union[datetime, date, int], + min_date: datetime, + max_date: datetime +): ''' given a start and end date provided by the user, make sure - that start is not before end - that start is not before min_date (corpus variable) @@ -503,8 +508,13 @@ def consolidate_start_end_years(start, end, min_date, max_date): ''' if isinstance(start, int): start = datetime(year=start, month=1, day=1) + elif isinstance(start, date): + start = datetime(year=start.year, month=start.month, day=start.day) if isinstance(end, int): end = datetime(year=end, month=12, day=31) + elif isinstance(end, date): + end = datetime(year=end.year, month=end.month, day=end.day) + if start > end: tmp = start start = end From bc0c3b30c11b8c016dbafdfe5300529c1ec71594 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 16:49:20 +0100 Subject: [PATCH 14/41] use task input for populate func --- backend/es/es_index.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 1dd72e5e4..7123e6eed 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -95,26 +95,19 @@ def create(task: CreateIndexTask): return index_name -def populate( - client: Elasticsearch, - corpus: Corpus, - versioned_index_name: str, - start=None, - end=None, -): +def populate(task: PopulateIndexTask): ''' Populate an ElasticSearch index from the corpus' source files. ''' - corpus_config = corpus.configuration - corpus_name = corpus.name - reader = make_reader(corpus) + corpus_config = task.corpus.configuration + reader = make_reader(task.corpus) logger.info('Attempting to populate index...') # Obtain source documents files = reader.sources( - start=start or corpus_config.min_date, - end=end or corpus_config.max_date) + start=task.document_min_date or corpus_config.min_date, + end=task.document_max_date or corpus_config.max_date) docs = reader.documents(files) # Each source document is decorated as an indexing operation, so that it @@ -122,22 +115,22 @@ def populate( actions = ( { "_op_type": "index", - "_index": versioned_index_name, + "_index": task.index.name, "_id": doc.get("id"), "_source": doc, } for doc in docs ) - corpus_server = settings.SERVERS[ - settings.CORPUS_SERVER_NAMES.get(corpus_name, 'default')] + server_config = task.index.server.configuration # Do bulk operation + client = task.client() for success, info in es_helpers.streaming_bulk( client, actions, - chunk_size=corpus_server["chunk_size"], - max_chunk_bytes=corpus_server["max_chunk_bytes"], + chunk_size=server_config["chunk_size"], + max_chunk_bytes=server_config["max_chunk_bytes"], raise_on_exception=False, raise_on_error=False, ): @@ -279,7 +272,7 @@ def perform_indexing( return for task in job.populateindextasks.all(): - populate(client, corpus, task.index.name, start=start, end=end) + populate(task) logger.info('Finished indexing `{}` to index `{}`.'.format( corpus_name, index_name)) From 410415d6dc9c5d83028f0197702f1148bcc33f10 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 16:51:12 +0100 Subject: [PATCH 15/41] use task input to update settings --- backend/es/es_index.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 7123e6eed..98719660a 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -138,6 +138,13 @@ def populate(task: PopulateIndexTask): logger.error(f"FAILED INDEX: {info}") +def update_index_settings(task: UpdateSettingsTask): + client = task.client() + client.indices.put_settings( + settings=task.settings, index=task.index.name + ) + + @transaction.atomic def create_indexing_job( corpus: Corpus, @@ -279,9 +286,7 @@ def perform_indexing( for task in job.updatesettingstasks.all(): logger.info("Updating settings for index `{}`".format(task.index.name)) - client.indices.put_settings( - settings={"number_of_replicas": 1}, index=task.index.name - ) + update_index_settings(task) if job.addaliastasks.exists(): versioned_index_name = job.addaliastasks.first().index.name From ab2d951fabf6705be9c25605ace7158e7497ec41 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:03:41 +0100 Subject: [PATCH 16/41] add add_alias, remove_alias, delete_index functions --- backend/es/es_alias.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index b24063c0c..c8efdd6f8 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -85,6 +85,29 @@ def alias(corpus: Corpus, clean=False): logger.info('Done updating aliases') +def add_alias(task: AddAliasTask): + client = task.client() + client.indices.put_alias( + index=task.index.name, + name=task.alias + ) + + +def remove_alias(task: RemoveAliasTask): + client = task.client() + client.indices.delete_alias( + index=task.index.name, + name=task.alias + ) + + +def delete_index(task: DeleteIndexTask): + client = task.client() + client.indices.delete( + index=task.index.name, + ) + + def get_current_index_name(corpus: CorpusConfiguration, client) -> str: """get the name of the current corpus' associated index""" alias = corpus.es_alias or corpus.es_index From 0420fc23970366deb44823a263e3f7a4aca1e8f3 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:10:56 +0100 Subject: [PATCH 17/41] get alias actions from indexjob --- backend/es/es_index.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 98719660a..342513192 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -19,7 +19,10 @@ from addcorpus.python_corpora.load_corpus import load_corpus_definition from addcorpus.reader import make_reader from ianalyzer.elasticsearch import elasticsearch, server_for_corpus -from .es_alias import alias, get_current_index_name, get_new_version_number +from .es_alias import ( + alias, get_current_index_name, get_new_version_number, + add_alias, remove_alias, delete_index +) import datetime from indexing.models import ( IndexJob, CreateIndexTask, PopulateIndexTask, UpdateIndexTask, @@ -272,23 +275,28 @@ def perform_indexing( for task in job.createindextasks.all(): create(task) - client.cluster.health(wait_for_status='yellow') + if not job.populateindextasks.exists() or job.updateindextasks.exists(): + logger.info(f'Created index `{task.index.name}` with mappings only.') - if mappings_only: - logger.info('Created index `{}` with mappings only.'.format(index_name)) - return + client.cluster.health(wait_for_status='yellow') for task in job.populateindextasks.all(): populate(task) - - logger.info('Finished indexing `{}` to index `{}`.'.format( - corpus_name, index_name)) + logger.info('Finished indexing `{}` to index `{}`.'.format( + corpus_name, task.index.name)) for task in job.updatesettingstasks.all(): logger.info("Updating settings for index `{}`".format(task.index.name)) update_index_settings(task) - if job.addaliastasks.exists(): - versioned_index_name = job.addaliastasks.first().index.name - logger.info("Adjusting alias for index `{}`".format(versioned_index_name)) - alias(corpus) + for task in job.removealiastasks.all(): + logger.info(f'Removing alias `{task.alias}` for index `{task.index.name}`') + remove_alias(task) + + for task in job.addaliastasks.all(): + logger.info(f'Adding alias `{task.alias}` for index `{task.index.name}`') + add_alias(task) + + for task in job.deleteindextasks.all(): + logger.info(f'Deleting index {task.index.name}') + delete_index(task) From fd7de542ad012347396e7c99c37f41170ed4721f Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:11:44 +0100 Subject: [PATCH 18/41] move warning call --- backend/es/es_index.py | 4 ---- backend/es/management/commands/index.py | 5 +++++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 342513192..366b1dbe0 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -260,10 +260,6 @@ def perform_indexing( index_name )) - if rollover and not prod: - logger.warning( - 'rollover flag is set but prod flag not set -- no effect') - # Create and populate the ES index client = elasticsearch(corpus_name) logger.info('max_retries: {}'.format(vars(client).get('_max_retries'))) diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 06f09283e..31b4a43fe 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -96,6 +96,11 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F ) raise + if rollover and not prod: + logging.warning( + 'rollover flag is set but prod flag not set -- no effect') + + job = create_indexing_job( corpus_object, start_index, end_index, mappings_only, add, delete, prod, rollover, update From d09ac1ebb562d4613fbb1d3bdec2510ce129ccf8 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:14:11 +0100 Subject: [PATCH 19/41] remove unused arguments from perform_indexing --- backend/conftest.py | 2 +- backend/es/es_index.py | 18 +++---------- backend/es/management/commands/index.py | 3 +-- backend/es/tests/test_es_index.py | 35 +++++-------------------- backend/es/tests/test_sync.py | 2 +- 5 files changed, 13 insertions(+), 47 deletions(-) diff --git a/backend/conftest.py b/backend/conftest.py index 2f8edcf79..0646e4c44 100644 --- a/backend/conftest.py +++ b/backend/conftest.py @@ -164,7 +164,7 @@ def _index_test_corpus(es_client: Elasticsearch, corpus_name: str): if not es_client.indices.exists(index=corpus.configuration.es_index): job = index.create_indexing_job(corpus) - index.perform_indexing(job, corpus) + index.perform_indexing(job) # ES is "near real time", so give it a second before we start searching the index sleep(2) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 366b1dbe0..9d63645fb 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -238,21 +238,11 @@ def create_indexing_job( -def perform_indexing( - job: IndexJob, - corpus: Corpus, - start: Optional[datetime.date] = None, - end: Optional[datetime.date] = None, - mappings_only: bool = False, - add: bool = False, - clear: bool = False, - prod: bool = False, - rollover: bool = False, -): - corpus.validate_ready_to_index() +def perform_indexing(job: IndexJob): + job.corpus.validate_ready_to_index() - corpus_config = corpus.configuration - corpus_name = corpus.name + corpus_config = job.corpus.configuration + corpus_name = job.corpus.name index_name = corpus_config.es_index logger.info('Started indexing `{}` on index {}'.format( diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 31b4a43fe..0075cbaf6 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -127,8 +127,7 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F logging.critical(e) raise else: - perform_indexing(job, corpus_object, start_index, end_index, - mappings_only, add, delete, prod, rollover) + perform_indexing(job) def _corpus_object(self, corpus_name): load_all_corpus_definitions() diff --git a/backend/es/tests/test_es_index.py b/backend/es/tests/test_es_index.py index 61860b3c1..705286adc 100644 --- a/backend/es/tests/test_es_index.py +++ b/backend/es/tests/test_es_index.py @@ -20,9 +20,7 @@ def test_prod_flag(mock_corpus, es_index_client, corpus_definition, prod, name, corpus, START, END, mappings_only=True, add=False, clear=False, prod=prod, rollover=False, ) - perform_indexing( - job, corpus, START, END, - mappings_only=True, add=False, clear=False, prod=prod, rollover=False) + perform_indexing(job) assert es_index_client.indices.exists(index=name) assert es_index_client.indices.get_settings(index=name).get( @@ -36,9 +34,7 @@ def test_mappings_only_flag(mock_corpus, es_index_client, corpus_definition, map corpus, START, END, mappings_only=mappings_only, add=False, clear=False, prod=False, rollover=False, ) - perform_indexing( - job, corpus, START, END, - mappings_only=mappings_only, add=False, clear=False, prod=False, rollover=False) + perform_indexing(job) sleep(1) res = es_index_client.count(index='times-test*') assert res.get('count') == expected @@ -50,20 +46,14 @@ def test_add_clear(db, mock_corpus, es_index_client): corpus, START, END, mappings_only=True, add=False, clear=False, prod=False, rollover=False, ) - perform_indexing( - job, corpus, START, END, - mappings_only=True, add=False, clear=False, prod=False, rollover=False - ) + perform_indexing(job) res = es_index_client.count(index='times-test*') assert res.get('count') == 0 job = create_indexing_job( corpus, START, END, mappings_only=False, add=True, clear=False, prod=False, rollover=False, ) - perform_indexing( - job, corpus, START, END, - mappings_only=False, add=True, clear=False, prod=False, rollover=False - ) + perform_indexing(job) sleep(1) res = es_index_client.count(index='times-test*') assert res.get('count') == 2 @@ -71,10 +61,7 @@ def test_add_clear(db, mock_corpus, es_index_client): corpus, START, END, mappings_only=True, add=False, clear=True, prod=False, rollover=False, ) - perform_indexing( - job, corpus, START, END, - mappings_only=True, add=False, clear=True, prod=False, rollover=False - ) + perform_indexing(job) res = es_index_client.count(index='times-test*') assert res.get('count') == 0 @@ -100,15 +87,5 @@ def test_indexing_with_version(mock_corpus, corpus_definition, es_index_client): prod=True, rollover=True, ) - perform_indexing( - job, - corpus, - START, - END, - mappings_only=False, - add=False, - clear=False, - prod=True, - rollover=True, - ) + perform_indexing(job) assert es_index_client.indices.exists(index="times-test-1") == True diff --git a/backend/es/tests/test_sync.py b/backend/es/tests/test_sync.py index 09fb84d2a..c5f55a239 100644 --- a/backend/es/tests/test_sync.py +++ b/backend/es/tests/test_sync.py @@ -43,7 +43,7 @@ def test_fetch_index_data(db, es_client, basic_mock_corpus, index_basic_mock_cor # restore index job = create_indexing_job(corpus) - perform_indexing(job, corpus) + perform_indexing(job) sleep(1) fetch_index_metadata() From 8d2a823b22ca8ba90e2a7060ecd94866515101a7 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:31:36 +0100 Subject: [PATCH 20/41] hande update through perform_indexing --- backend/es/es_index.py | 4 +++ backend/es/es_update.py | 36 ++++++++++++++++++++++--- backend/es/management/commands/index.py | 23 +--------------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 9d63645fb..178e40910 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -30,6 +30,7 @@ ) from es.sync import update_server_table_from_settings from es.models import Server, Index +from es.es_update import run_update_task import logging logger = logging.getLogger('indexing') @@ -271,6 +272,9 @@ def perform_indexing(job: IndexJob): logger.info('Finished indexing `{}` to index `{}`.'.format( corpus_name, task.index.name)) + for task in job.updateindextasks.all(): + run_update_task(task) + for task in job.updatesettingstasks.all(): logger.info("Updating settings for index `{}`".format(task.index.name)) update_index_settings(task) diff --git a/backend/es/es_update.py b/backend/es/es_update.py index d29f470a1..427f8855a 100644 --- a/backend/es/es_update.py +++ b/backend/es/es_update.py @@ -1,10 +1,38 @@ from django.conf import settings + +from addcorpus.python_corpora.corpus import CorpusDefinition from ianalyzer.elasticsearch import elasticsearch +from indexing.models import UpdateIndexTask +from addcorpus.python_corpora.load_corpus import load_corpus_definition import logging logger = logging.getLogger('indexing') -def update_index(corpus, corpus_definition, query_model): +def run_update_task(task: UpdateIndexTask) -> None: + if not task.corpus.has_python_definition: + raise Exception('Cannot update: corpus has no Python definition') + + corpus_definition = load_corpus_definition(task.corpus.name) + + if corpus_definition.update_body(): + min_date = task.document_min_date or task.corpus.configuration.min_date + max_date = task.document_max_date or task.corpus.configuration.min_date + update_index( + task.corpus.name, + corpus_definition, + corpus_definition.update_query( + min_date=min_date.strftime('%Y-%m-%d'), + max_date=max_date.strftime('%Y-%m-%d') + )) + elif corpus_definition.update_script(): + update_by_query( + task.corpus.name, corpus_definition, corpus_definition.update_script() + ) + else: + raise Exception("Cannot update without update_body or update_script") + + +def update_index(corpus: str, corpus_definition: CorpusDefinition, query_model): ''' update information for fields in the index requires the definition of the functions - update_query @@ -38,7 +66,7 @@ def update_index(corpus, corpus_definition, query_model): logger.info("Updated {} of {} documents".format(hits, total_hits)) -def update_by_query(corpus, corpus_definition, query_generator): +def update_by_query(corpus: str, corpus_definition: CorpusDefinition, query_generator): client = elasticsearch(corpus) scroll_timeout, scroll_size = get_es_settings(corpus, corpus_definition) for query_model in query_generator: @@ -53,7 +81,7 @@ def update_by_query(corpus, corpus_definition, query_generator): logger.info('No documents updated for query {}'.format(query_model)) -def update_document(corpus, doc, update_body, client=None): +def update_document(corpus: str, doc, update_body, client=None): if not client: client = elasticsearch(corpus) doc_id = doc.get('_id', doc.get('id', None)) @@ -64,7 +92,7 @@ def update_document(corpus, doc, update_body, client=None): -def get_es_settings(corpus, corpus_definition): +def get_es_settings(corpus: str, corpus_definition): """ Get the settings for the scroll request. Return: - scroll_timeout diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 0075cbaf6..12509da78 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -106,28 +106,7 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F rollover, update ) - if update: - try: - if corpus_definition.update_body(): - update_index( - corpus, - corpus_definition, - corpus_definition.update_query( - min_date=start_index.strftime('%Y-%m-%d'), - max_date=end_index.strftime('%Y-%m-%d') - )) - elif corpus_definition.update_script(): - update_by_query( - corpus, corpus_definition, corpus_definition.update_script() - ) - else: - logging.critical("Cannot update without update_body or update_script") - return None - except Exception as e: - logging.critical(e) - raise - else: - perform_indexing(job) + perform_indexing(job) def _corpus_object(self, corpus_name): load_all_corpus_definitions() From dce9c68171495cf0ad1eb825fb15bf099acbb626 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:39:42 +0100 Subject: [PATCH 21/41] clarity and log messages --- backend/es/es_index.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 178e40910..159e34f49 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -242,19 +242,13 @@ def create_indexing_job( def perform_indexing(job: IndexJob): job.corpus.validate_ready_to_index() - corpus_config = job.corpus.configuration corpus_name = job.corpus.name - index_name = corpus_config.es_index - logger.info('Started indexing `{}` on index {}'.format( - corpus_name, - index_name - )) + logger.info(f'Started index job: {job}') # Create and populate the ES index client = elasticsearch(corpus_name) logger.info('max_retries: {}'.format(vars(client).get('_max_retries'))) - logger.info('retry on timeout: {}'.format( vars(client).get('_retry_on_timeout')) ) @@ -264,8 +258,10 @@ def perform_indexing(job: IndexJob): if not job.populateindextasks.exists() or job.updateindextasks.exists(): logger.info(f'Created index `{task.index.name}` with mappings only.') + else: + logger.info(f'Created index `{task.index.name}`') - client.cluster.health(wait_for_status='yellow') + client.cluster.health(wait_for_status='yellow') for task in job.populateindextasks.all(): populate(task) From 93f8e3debfe3f334cf12872fe4b9a656f34df7c9 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:41:11 +0100 Subject: [PATCH 22/41] remove alias function --- backend/es/es_alias.py | 43 ------------------------- backend/es/management/commands/alias.py | 5 +-- backend/es/tests/test_alias.py | 9 ++++-- 3 files changed, 9 insertions(+), 48 deletions(-) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index c8efdd6f8..4f2ba2513 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -42,49 +42,6 @@ def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: return job -def alias(corpus: Corpus, clean=False): - ''' - Script to create, update and remove aliases from ES - ''' - corpus_config = corpus.configuration - corpus_name = corpus.name - index_name = corpus_config.es_index - index_alias = corpus_config.es_alias - client = elasticsearch(corpus_name) - - alias = index_alias if index_alias else index_name - indices = client.indices.get(index='{}-*'.format(index_name)) - highest_version = get_highest_version_number(indices, alias) - - actions = [] - - for index_name, properties in indices.items(): - is_aliased = alias in properties['aliases'].keys() - is_highest_version = extract_version(index_name, alias) == highest_version - - if not is_highest_version and clean: - logger.info('Removing index `{}`'.format(index_name)) - # note that removing an index automatically removes alias - actions.append({'remove_index': {'index': index_name}}) - - if not is_highest_version and is_aliased and not clean: - logger.info('Removing alias `{}` for index `{}`'.format(alias, index_name)) - actions.append( - {'remove': {'index': index_name, 'alias': alias}}) - - if is_highest_version and not is_aliased: - logger.info('Adding alias `{}` for index `{}`'.format(alias, index_name)) - actions.append( - {'add': {'index': index_name, 'alias': alias}}) - elif is_highest_version and is_aliased: - logger.info('Alias `{}` already exists for `{}`, skipping alias creation'.format( - alias, index_name)) - - if len(actions) > 0: - client.indices.update_aliases(actions=actions) - logger.info('Done updating aliases') - - def add_alias(task: AddAliasTask): client = task.client() client.indices.put_alias( diff --git a/backend/es/management/commands/alias.py b/backend/es/management/commands/alias.py index e36a24359..7120d812b 100644 --- a/backend/es/management/commands/alias.py +++ b/backend/es/management/commands/alias.py @@ -2,6 +2,7 @@ from addcorpus.models import Corpus from es.es_alias import alias, create_alias_job +from es.es_index import perform_indexing class Command(BaseCommand): help = ''' @@ -26,5 +27,5 @@ def add_arguments(self, parser): def handle(self, corpus, clean=False, **options): corpus_obj = Corpus.objects.get(name=corpus) - create_alias_job(corpus_obj, clean) - alias(corpus_obj, clean) + job = create_alias_job(corpus_obj, clean) + perform_indexing(job) diff --git a/backend/es/tests/test_alias.py b/backend/es/tests/test_alias.py index 5abde19f0..d2d3b9a59 100644 --- a/backend/es/tests/test_alias.py +++ b/backend/es/tests/test_alias.py @@ -1,11 +1,13 @@ from addcorpus.models import Corpus -from es.es_alias import alias, get_highest_version_number +from es.es_alias import get_highest_version_number, create_alias_job +from es.es_index import perform_indexing def test_alias(db, es_alias_client): corpus = Corpus.objects.get(name='times') assert corpus.configuration.es_index == 'times-test' - alias(corpus) # create an alias ianalyzer-times + job = create_alias_job(corpus) + perform_indexing(job) res = es_alias_client.indices.get_alias(name=corpus.configuration.es_index) assert res.get('times-test-2') is not None @@ -15,7 +17,8 @@ def test_alias_with_clean(es_alias_client): indices = es_alias_client.indices.get( index='{}-*'.format(corpus.configuration.es_index)) assert 'times-test-1' in list(indices.keys()) - alias(corpus, True) + job = create_alias_job(corpus, True) + perform_indexing(job) indices = es_alias_client.indices.get( index='{}-*'.format(corpus.configuration.es_index)) assert 'times-test-1' not in list(indices.keys()) From f265fbe4e0c9ecba091d3be95e90f1d289ebc9c6 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 17:59:54 +0100 Subject: [PATCH 23/41] docstrings --- backend/es/es_alias.py | 13 ++++ backend/es/es_index.py | 158 +++++++++++++++++++++++------------------ 2 files changed, 103 insertions(+), 68 deletions(-) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index 4f2ba2513..c1a9d2f2e 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -12,6 +12,10 @@ @transaction.atomic def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: + ''' + Create a job to move the alias of a corpus to the index with the highest version + ''' + job = IndexJob.objects.create(corpus=corpus) corpus_config = corpus.configuration @@ -43,6 +47,9 @@ def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: def add_alias(task: AddAliasTask): + ''' + Add an alias to an Elasticsearch index, as defined by an AddAliasTask + ''' client = task.client() client.indices.put_alias( index=task.index.name, @@ -51,6 +58,9 @@ def add_alias(task: AddAliasTask): def remove_alias(task: RemoveAliasTask): + ''' + Remove an alias from an Elasticsearch index, as defined by a RemoveAliasTask + ''' client = task.client() client.indices.delete_alias( index=task.index.name, @@ -59,6 +69,9 @@ def remove_alias(task: RemoveAliasTask): def delete_index(task: DeleteIndexTask): + ''' + Delete an Elasticsearch index, as defined by a DeleteIndexTask + ''' client = task.client() client.indices.delete( index=task.index.name, diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 159e34f49..54518f57e 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -161,85 +161,107 @@ def create_indexing_job( rollover: bool = False, update: bool = False, ) -> IndexJob: - job = IndexJob.objects.create(corpus=corpus) - - update_server_table_from_settings() - server_name = server_for_corpus(corpus.name) - server = Server.objects.get(name=server_name) - client = elasticsearch(corpus.name) - base_name = corpus.configuration.es_index - - if prod: - alias = corpus.configuration.es_alias or corpus.configuration.es_index - if add or update: - versioned_name = get_current_index_name( - corpus.configuration, client - ) - else: - next_version = get_new_version_number(client, alias, base_name) - versioned_name = f'{base_name}-{next_version}' + ''' + Create an IndexJob to index a corpus. - index, _ = Index.objects.get_or_create( - server=server, name=versioned_name - ) + Depending on parameters, this job may include creating an new index, adding documents, + running an update script, and rolling over the alias. Parameters are described + in detail in the documentation for the `index` command. + ''' + job = IndexJob.objects.create(corpus=corpus) - UpdateSettingsTask.objects.create( - job=job, - index=index, - settings={"number_of_replicas": 1}, - ) + update_server_table_from_settings() + server_name = server_for_corpus(corpus.name) + server = Server.objects.get(name=server_name) + client = elasticsearch(corpus.name) + base_name = corpus.configuration.es_index - if rollover: - if client.indices.exists_alias(name=alias): - for index_name in client.indices.get_alias(name=alias): - aliased_index, _ = Index.objects.get_or_create( - server=server, - name=index_name, - ) - RemoveAliasTask.objects.create( - job=job, - index=aliased_index, - alias=alias, - ) - AddAliasTask.objects.create( + # tasks below are added in order of execution for readability + + if prod: + alias = corpus.configuration.es_alias or corpus.configuration.es_index + if add or update: + versioned_name = get_current_index_name( + corpus.configuration, client + ) + else: + next_version = get_new_version_number(client, alias, base_name) + versioned_name = f'{base_name}-{next_version}' + + index, _ = Index.objects.get_or_create( + server=server, name=versioned_name + ) + else: + index, _ = Index.objects.get_or_create( + server=server, name=base_name + ) + + if not add or update: + CreateIndexTask.objects.create( + job=job, + index=index, + production_settings=prod, + delete_existing=clear, + ) + + if not mappings_only or update: + PopulateIndexTask.objects.create( + job=job, + index=index, + document_min_date=start, + document_max_date=end, + ) + + if update: + UpdateIndexTask.objects.create( + job=job, + index=index, + document_min_date=start, + document_max_date=end, + ) + + if prod: + UpdateSettingsTask.objects.create( + job=job, + index=index, + settings={"number_of_replicas": 1}, + ) + + if prod and rollover: + if client.indices.exists_alias(name=alias): + for index_name in client.indices.get_alias(name=alias): + aliased_index, _ = Index.objects.get_or_create( + server=server, + name=index_name, + ) + RemoveAliasTask.objects.create( job=job, - index=index, + index=aliased_index, alias=alias, ) - else: - index, _ = Index.objects.get_or_create( - server=server, name=base_name - ) + AddAliasTask.objects.create( + job=job, + index=index, + alias=alias, + ) - if not add or update: - CreateIndexTask.objects.create( - job=job, - index=index, - production_settings=prod, - delete_existing=clear, - ) - - if not mappings_only or update: - PopulateIndexTask.objects.create( - job=job, - index=index, - document_min_date=start, - document_max_date=end, - ) - - if update: - UpdateIndexTask.objects.create( - job=job, - index=index, - document_min_date=start, - document_max_date=end, - ) - - return job + return job def perform_indexing(job: IndexJob): + ''' + Run an IndexJob by running all related tasks. + + Tasks are run per type. The order of types is: + - `CreateIndexTask` + - `PopulateIndexTask` + - `UpdateIndexTask` + - `UpdateSettingsTask` + - `RemoveAliasTask` + - `AddAliasTask` + - `DeleteIndexTask` + ''' job.corpus.validate_ready_to_index() corpus_name = job.corpus.name From 28bdcf007152cdfc76951d89c791de9b6ce1eeda Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 18:01:25 +0100 Subject: [PATCH 24/41] order imports --- backend/es/es_index.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 54518f57e..fc985d1a3 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -4,14 +4,10 @@ Script to index the data into ElasticSearch. ''' -import sys from typing import Dict, Optional - -from elasticsearch import Elasticsearch +import datetime +import logging import elasticsearch.helpers as es_helpers -from elasticsearch.exceptions import RequestError - -from django.conf import settings from django.db import transaction from addcorpus.es_settings import es_settings @@ -20,10 +16,9 @@ from addcorpus.reader import make_reader from ianalyzer.elasticsearch import elasticsearch, server_for_corpus from .es_alias import ( - alias, get_current_index_name, get_new_version_number, + get_current_index_name, get_new_version_number, add_alias, remove_alias, delete_index ) -import datetime from indexing.models import ( IndexJob, CreateIndexTask, PopulateIndexTask, UpdateIndexTask, RemoveAliasTask, AddAliasTask, UpdateSettingsTask @@ -32,7 +27,6 @@ from es.models import Server, Index from es.es_update import run_update_task -import logging logger = logging.getLogger('indexing') From 56ad31f782f484680b7c15b78c043270c2bc73b3 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 18:07:28 +0100 Subject: [PATCH 25/41] fix jewishmigration test --- backend/corpora/jewishmigration/test_jewishmigration.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/corpora/jewishmigration/test_jewishmigration.py b/backend/corpora/jewishmigration/test_jewishmigration.py index 749c79a26..a71fa0ef6 100644 --- a/backend/corpora/jewishmigration/test_jewishmigration.py +++ b/backend/corpora/jewishmigration/test_jewishmigration.py @@ -154,7 +154,8 @@ def jm_client(es_client, jm_corpus): Returns an elastic search client for the mock corpus. """ # add data from mock corpus - es_index.create(es_client, jm_corpus, False, True, False) + job = es_index.create_indexing_job(jm_corpus, mappings_only=True, clear=True) + es_index.perform_indexing(job) # ES is "near real time", so give it a second before we start searching the index sleep(1) From db555695e64f5d9b76b3365bcd6c8e69719d6524 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 18:38:20 +0100 Subject: [PATCH 26/41] support corpus min_date, max_date as date type --- backend/addcorpus/python_corpora/filters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/addcorpus/python_corpora/filters.py b/backend/addcorpus/python_corpora/filters.py index 722efd4f4..54cae9281 100644 --- a/backend/addcorpus/python_corpora/filters.py +++ b/backend/addcorpus/python_corpora/filters.py @@ -3,7 +3,7 @@ passed through to ElasticSearch. ''' -from datetime import datetime +from datetime import datetime, date from addcorpus.constants import MappingType class Filter(object): @@ -24,7 +24,7 @@ def serialize(self): search_dict = {'name': name} for key, value in self.__dict__.items(): if key == 'search_filter' or key != 'field': - if type(value) == datetime: + if isinstance(value, datetime) or isinstance(value, date): search_dict[key] = value.isoformat() else: search_dict[key] = value From 5d40e222d7a75e3c29baea6f9c61f6a95a76a5f0 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 18:47:27 +0100 Subject: [PATCH 27/41] docstrings --- backend/indexing/models.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/backend/indexing/models.py b/backend/indexing/models.py index 0cbdb3e4a..ec34c7591 100644 --- a/backend/indexing/models.py +++ b/backend/indexing/models.py @@ -1,5 +1,4 @@ from django.db import models -from itertools import chain from elasticsearch import Elasticsearch from ianalyzer.elasticsearch import elasticsearch @@ -7,22 +6,31 @@ from es.models import Index class IndexJob(models.Model): + ''' + Represents a collection of index-related tasks that are carried out together. + ''' + corpus = models.ForeignKey( to=Corpus, on_delete=models.CASCADE, - help_text='corpus for which the job is created; task may use the corpus ' - 'to determine metadata or extract documents', + help_text='corpus for which the job is created; tasks may use the corpus ' + 'to determine index configuration or extract documents', ) created = models.DateTimeField( auto_now_add=True, ) + def __str__(self): return f'{self.corpus} ({self.created})' class IndexTask(models.Model): + ''' + Abstract model for index-related tasks. + ''' + class Meta: abstract = True @@ -124,6 +132,9 @@ def __str__(self): class RemoveAliasTask(IndexTask): + ''' + Remove an alias from an index + ''' alias = models.CharField( max_length=128, help_text='alias to remove' @@ -134,6 +145,9 @@ def __str__(self): class AddAliasTask(IndexTask): + ''' + Add an alias to an index + ''' alias = models.CharField( max_length=128, help_text='alias to assign' From 297c65e98c39723a0cef258cf84accd365c1f6c6 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 19:08:23 +0100 Subject: [PATCH 28/41] improve documentation of index commands --- backend/es/management/commands/alias.py | 19 ++++++---- backend/es/management/commands/index.py | 50 +++++++++++++++---------- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/backend/es/management/commands/alias.py b/backend/es/management/commands/alias.py index 7120d812b..c8c400f18 100644 --- a/backend/es/management/commands/alias.py +++ b/backend/es/management/commands/alias.py @@ -1,28 +1,31 @@ from django.core.management import BaseCommand from addcorpus.models import Corpus -from es.es_alias import alias, create_alias_job +from es.es_alias import create_alias_job from es.es_index import perform_indexing class Command(BaseCommand): help = ''' - Ensure that an alias exist for the index with the highest version number (e.g. `indexname_5`). - The alias is removed for all other (lower / older) versions. The indices themselves are only removed - if you add the `--clean` flag (but be very sure if this is what you want to do!). - Particularly useful in the production environment, i.e. after creating an index with `--prod`. - + Ensure that an alias exist for the index with the highest version number (e.g. + `indexname-5`). The alias is removed for all other (lower / older) versions. The + indices themselves are only removed if you add the `--clean` flag (but be very sure + if this is what you want to do!). Particularly useful in the production environment, + i.e. after creating an index with `--prod`. ''' def add_arguments(self, parser): parser.add_argument( 'corpus', - help='Corpus for which the alias should be updated. Use the name from settings.py' + help='''Corpus for which the alias should be updated. This should match the + "name" field in the database. For Python corpora, this field is based on + the name in settings.py''' ) parser.add_argument( '--clean', action='store_true', - help='''If included, any indices that are not the highest version will be deleted''' + help='''If included, any indices that are not the highest version will be + deleted.''' ) def handle(self, corpus, clean=False, **options): diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 12509da78..89bc44f10 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -16,60 +16,72 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument( 'corpus', - help='''Sets which corpus should be indexed. Use the name provided in settings.py''', + help='''Sets which corpus should be indexed. This should match the "name" + field in the database. For Python corpora, this field is based on the + name in settings.py''', ) parser.add_argument( '--start', '-s', - help='''The date where indexing should start. - The input format is YYYY-MM-DD. - If not set, indexing will start from corpus minimum date.''' + help='''Minimum date to select documents. The input format is YYYY-MM-DD. + Optional. Only has effect for Python corpora which implement date selection + in their sources() method. No effect in combination with --mappings-only.''' ) parser.add_argument( '--end', '-e', - help='''The date where indexing should end - The input format is YYYY-MM-DD. - If not set, indexing will stop at corpus maximum date.''' + help='''Maximum date to select documents. The input format is YYYY-MM-DD. + Optional. Only has effect for Python corpora which implement date selection + in their sources() method. No effect in combination with --mappings-only.''' ) parser.add_argument( '--delete', '-d', action='store_true', - help='Delete the index before indexing' + help='''If this job is set to create an index that already exists, delete + it instead of raising an exception. No effect in combination with + --add.''' ) parser.add_argument( '--update', '-u', action='store_true', - help='Update an index (add / change fields in documents)' + help='''Run an update script defined in the corpus definition (to add/change + field values in documents). Only available for Python corpora. This + will also skip index creation and population.''' ) parser.add_argument( '--mappings-only', '-m', action='store_true', - help='''Only create the index with mappings - without adding data to it. This is useful e.g. before a remote reindex.''' + help='''Only create the index with mappings without adding data to it. This + is useful e.g. before a remote reindex. No effect in combination with + --update.''' ) parser.add_argument( '--add', '-a', action='store_true', - help='''Add documents to an existing index, i.e., skip index creation''' + help='''Skip index creation. Documents will be added to the existing index + for the corpus. No effect in combination with --update.''' ) parser.add_argument( '--prod', '-p', action='store_true', - help='''Specifies that this is NOT a local indexing operation. - This influences index settings in particular''' + help='''Specifies that this is NOT a local indexing operation. This + will affect index settings. The script will also generate a versioned + name for the index, which requires an alias to be linked to the + corpus.''' ) parser.add_argument( '--rollover', '-r', action='store_true', - help='''Specifies that the alias of the index should be adjusted. - (Only applicable in combination with --prod)''' + help='''Specifies that the alias of the index should be moved to this version + after populating the index. Only applicable in combination with --prod. + Note that you can also move the alias with the separate "alias" + command after indexing is complete.''' ) def handle(self, corpus, start=None, end=None, add=False, delete=False, update=False, mappings_only=False, prod=False, rollover=False, **options): @@ -91,14 +103,14 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F except Exception: logging.critical( - 'Incorrect data format ' - 'Example call: flask es times -s 1785-01-01 -e 2010-12-31' + 'Incorrect data format; dates must be YYYY-MM-DD. ' + 'Example call: python manage.py index times -s 1785-01-01 -e 2010-12-31' ) raise if rollover and not prod: logging.warning( - 'rollover flag is set but prod flag not set -- no effect') + '--rollover flag is set but --prod flag not set; no effect.') job = create_indexing_job( From b86f4fbbce44cb1468fa781646857cd7e6caae84 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 19:11:41 +0100 Subject: [PATCH 29/41] no production settings in test --- backend/es/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/es/conftest.py b/backend/es/conftest.py index 46124c94f..cd0f2634a 100644 --- a/backend/es/conftest.py +++ b/backend/es/conftest.py @@ -77,7 +77,6 @@ def es_alias_client(db, es_server, es_client, mock_corpus): job=job, index=index, delete_existing=True, - production_settings=True, ) es_index.create(create_task) es_client.indices.create(index='times-test-2') From fef82b0e1ce5cf3eeb61750f538259b42065ba7e Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 19:18:35 +0100 Subject: [PATCH 30/41] clean up migrations --- backend/indexing/migrations/0001_initial.py | 21 ++++++++++++--- .../migrations/0002_updatesettingstask.py | 27 ------------------- ...003_createindextask_production_settings.py | 18 ------------- backend/indexing/models.py | 2 ++ 4 files changed, 19 insertions(+), 49 deletions(-) delete mode 100644 backend/indexing/migrations/0002_updatesettingstask.py delete mode 100644 backend/indexing/migrations/0003_createindextask_production_settings.py diff --git a/backend/indexing/migrations/0001_initial.py b/backend/indexing/migrations/0001_initial.py index d902954fc..9894516e4 100644 --- a/backend/indexing/migrations/0001_initial.py +++ b/backend/indexing/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.14 on 2024-11-12 14:10 +# Generated by Django 4.2.16 on 2024-11-13 18:18 from django.db import migrations, models import django.db.models.deletion @@ -9,8 +9,8 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('addcorpus', '0023_alter_corpusdocumentationpage_type_alter_field_name'), ('es', '0002_alter_index_available'), + ('addcorpus', '0023_alter_corpusdocumentationpage_type_alter_field_name'), ] operations = [ @@ -18,9 +18,21 @@ class Migration(migrations.Migration): name='IndexJob', fields=[ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('created', models.DateTimeField(auto_now_add=True)), - ('corpus', models.ForeignKey(help_text='corpus for which the job is created; task may use the corpus to determine metadata or extract documents', on_delete=django.db.models.deletion.CASCADE, to='addcorpus.corpus')), + ('created', models.DateTimeField(auto_now_add=True, help_text='time when the job was created')), + ('corpus', models.ForeignKey(help_text='corpus for which the job is created; tasks may use the corpus to determine index configuration or extract documents', on_delete=django.db.models.deletion.CASCADE, to='addcorpus.corpus')), + ], + ), + migrations.CreateModel( + name='UpdateSettingsTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('settings', models.JSONField(blank=True, default=dict, help_text='settings to push')), + ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), + ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), ], + options={ + 'abstract': False, + }, ), migrations.CreateModel( name='UpdateIndexTask', @@ -75,6 +87,7 @@ class Migration(migrations.Migration): name='CreateIndexTask', fields=[ ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('production_settings', models.BooleanField(default=False, help_text='configure index settings for a production environment')), ('delete_existing', models.BooleanField(default=False, help_text='if an index by this name already exists, delete it, instead of raising an exception')), ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), diff --git a/backend/indexing/migrations/0002_updatesettingstask.py b/backend/indexing/migrations/0002_updatesettingstask.py deleted file mode 100644 index 3289d0aa2..000000000 --- a/backend/indexing/migrations/0002_updatesettingstask.py +++ /dev/null @@ -1,27 +0,0 @@ -# Generated by Django 4.2.14 on 2024-11-13 12:23 - -from django.db import migrations, models -import django.db.models.deletion - - -class Migration(migrations.Migration): - - dependencies = [ - ('es', '0002_alter_index_available'), - ('indexing', '0001_initial'), - ] - - operations = [ - migrations.CreateModel( - name='UpdateSettingsTask', - fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('settings', models.JSONField(blank=True, default=dict)), - ('index', models.ForeignKey(help_text='index on which this task is applied', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='es.index')), - ('job', models.ForeignKey(help_text='job in which this task is run', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)ss', to='indexing.indexjob')), - ], - options={ - 'abstract': False, - }, - ), - ] diff --git a/backend/indexing/migrations/0003_createindextask_production_settings.py b/backend/indexing/migrations/0003_createindextask_production_settings.py deleted file mode 100644 index 90586323f..000000000 --- a/backend/indexing/migrations/0003_createindextask_production_settings.py +++ /dev/null @@ -1,18 +0,0 @@ -# Generated by Django 4.2.14 on 2024-11-13 13:15 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('indexing', '0002_updatesettingstask'), - ] - - operations = [ - migrations.AddField( - model_name='createindextask', - name='production_settings', - field=models.BooleanField(default=False, help_text='configure index settings for a production environment'), - ), - ] diff --git a/backend/indexing/models.py b/backend/indexing/models.py index ec34c7591..b543071a4 100644 --- a/backend/indexing/models.py +++ b/backend/indexing/models.py @@ -18,6 +18,7 @@ class IndexJob(models.Model): ) created = models.DateTimeField( auto_now_add=True, + help_text='time when the job was created', ) @@ -125,6 +126,7 @@ class UpdateSettingsTask(IndexTask): settings = models.JSONField( blank=True, default=dict, + help_text='settings to push', ) def __str__(self): From f5cd63dbdb26eeff9645028a023a2d3b8aa967bf Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 19:40:27 +0100 Subject: [PATCH 31/41] update server table before fetching server --- backend/es/es_alias.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index c1a9d2f2e..d3c5b1295 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -5,6 +5,7 @@ from addcorpus.models import Corpus, CorpusConfiguration from ianalyzer.elasticsearch import elasticsearch, server_for_corpus from es.models import Server, Index +from es.sync import update_server_table_from_settings from indexing.models import IndexJob, DeleteIndexTask, RemoveAliasTask, AddAliasTask import logging @@ -20,6 +21,7 @@ def create_alias_job(corpus: Corpus, clean=False) -> IndexJob: corpus_config = corpus.configuration corpus_name = corpus.name + update_server_table_from_settings() server = Server.objects.get(name=server_for_corpus(corpus_name)) index_name = corpus_config.es_index index_alias = corpus_config.es_alias From 172be319aa9d1020007662be5b8b9c6d2917199f Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 19:47:16 +0100 Subject: [PATCH 32/41] update documentation --- documentation/Indexing-corpora.md | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/documentation/Indexing-corpora.md b/documentation/Indexing-corpora.md index 04fb71e33..14f564233 100644 --- a/documentation/Indexing-corpora.md +++ b/documentation/Indexing-corpora.md @@ -6,8 +6,11 @@ This step is necessary to make a dataset available in the I-analyzer interface. You can start indexing once you have: - Created a definition for the corpus -- If it is a Python corpus: added necessary settings to your project, such as the source data directory. -- Imported the definition into the database. For Python corpora, run `yarn django loadcorpora` to do this. +- Run all migrations +- Started Elasticsearch +- If it is a Python corpus: + - added necessary settings to your project, such as the source data directory. + - imported the definition into the database. For Python corpora, run `yarn django loadcorpora` to do this. The basic indexing command is: @@ -15,22 +18,25 @@ The basic indexing command is: yarn django index my-corpus ``` -Use `yarn django index --help` to see all possible flags. Some useful options are highlighted below. +## Parameters -## Development +Use `yarn django index --help` to see all possible parameters. Some useful options are highlighted below. + +### Development For development environments, we usually maintain a single index per corpus, rather than creating versioned indices. New indices are also created with `number_of_replicas` set to 0 (this is to make index creation easier/lighter). Some options that may be useful for development: -### Delete index before starting +- `--delete` / `-d` deletes an existing index of this name, if there is one. Without this flag, the script will raise an error if the index already exists. +- `--start` / `-s` and `--end` / `-e` respectively give a start and end date to select source files. Note that this only works if the `sources` function in your corpus definition makes use of these options; not all corpora have this defined. (It is not always possible to infer dates from source file metadata without parsing the file.) -`--delete` / `-d` deletes an existing index of this name, if there is one. Without this flag, you will add your data to the existing index. +### Production -### Date selection +See [Indexing on server](documentation/Indexing-on-server.md) for more information about production-specific settings. -`--start` / `-s` and `--end` / `-e` respectively give a start and end date to select source files. Note that this only works if the `sources` function in your corpus definition makes use of these options; not all corpora have this defined. (It is not always possible to infer dates from source file metadata without parsing the file.) +## IndexJob log -## Production +When you start the command, the application will save an `IndexJob` that represents the action. These can also be viewed in the admin site. -See [Indexing on server](documentation/Indexing-on-server.md) for more information about production-specific settings. +IndexJobs are not automatically deleted when the command completes, but can be freely deleted at that point. From 08250dc9c78e3d7024ab51670c5dfe1da1813804 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 20:22:07 +0100 Subject: [PATCH 33/41] skip update settings if --add or --update is used --- backend/es/es_index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index fc985d1a3..5f10e6371 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -214,7 +214,7 @@ def create_indexing_job( document_max_date=end, ) - if prod: + if prod and not (add or update): UpdateSettingsTask.objects.create( job=job, index=index, From 30ca5f70ebc7b7f7d780b218b0c006f4fb271324 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Wed, 13 Nov 2024 20:22:51 +0100 Subject: [PATCH 34/41] parentheses for boolean order of operations --- backend/es/es_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 5f10e6371..89d5d599b 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -190,7 +190,7 @@ def create_indexing_job( server=server, name=base_name ) - if not add or update: + if not (add or update): CreateIndexTask.objects.create( job=job, index=index, @@ -198,7 +198,7 @@ def create_indexing_job( delete_existing=clear, ) - if not mappings_only or update: + if not (mappings_only or update): PopulateIndexTask.objects.create( job=job, index=index, From 7933dca353a6aaeb6cdda929db2b8307202c6a3a Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 14 Nov 2024 15:24:51 +0100 Subject: [PATCH 35/41] fix migrations --- backend/indexing/migrations/0001_initial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/indexing/migrations/0001_initial.py b/backend/indexing/migrations/0001_initial.py index 9894516e4..08e3b8286 100644 --- a/backend/indexing/migrations/0001_initial.py +++ b/backend/indexing/migrations/0001_initial.py @@ -9,7 +9,7 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('es', '0002_alter_index_available'), + ('es', '0001_initial'), ('addcorpus', '0023_alter_corpusdocumentationpage_type_alter_field_name'), ] From f42c7ae1bd794ec237c5c712f468464a6dafaef4 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 11:49:58 +0100 Subject: [PATCH 36/41] add custom exception for corpora --- backend/addcorpus/exceptions.py | 33 +++++++++++++++++++++++++++++++++ backend/es/es_update.py | 3 ++- 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 backend/addcorpus/exceptions.py diff --git a/backend/addcorpus/exceptions.py b/backend/addcorpus/exceptions.py new file mode 100644 index 000000000..02cd41728 --- /dev/null +++ b/backend/addcorpus/exceptions.py @@ -0,0 +1,33 @@ +from addcorpus.models import Corpus + + +class PythonDefinitionRequired(Exception): + ''' + Exception that can be raised when attempting to use functionality only applicable for + Python corpora, on a corpus that does not have a Python definition. + ''' + + def __init__(self, corpus: Corpus, message: str, *args): + self.corpus = corpus + self.message = message + super().__init__(*args) + + + def __str__(self): + return f'{self.message} (corpus: {self.corpus})' + +class NoPythonDefinitionAllowed(Exception): + ''' + Exception that can be raised when attempting to use functionality only applicable for + database-only corpora, on a corpus with a Python definition. + ''' + + def __init__(self, corpus: Corpus, message: str, *args): + self.corpus = corpus + self.message = message + super().__init__(*args) + + + def __str__(self): + return f'{self.message} (corpus: {self.corpus})' + diff --git a/backend/es/es_update.py b/backend/es/es_update.py index 427f8855a..800bb260c 100644 --- a/backend/es/es_update.py +++ b/backend/es/es_update.py @@ -4,13 +4,14 @@ from ianalyzer.elasticsearch import elasticsearch from indexing.models import UpdateIndexTask from addcorpus.python_corpora.load_corpus import load_corpus_definition +from addcorpus.exceptions import PythonDefinitionRequired import logging logger = logging.getLogger('indexing') def run_update_task(task: UpdateIndexTask) -> None: if not task.corpus.has_python_definition: - raise Exception('Cannot update: corpus has no Python definition') + raise PythonDefinitionRequired(task.corpus, 'Update task not applicable') corpus_definition = load_corpus_definition(task.corpus.name) From 8c2648a3defcf69e5a48101a2ddbe89cc97dcc5d Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 11:57:19 +0100 Subject: [PATCH 37/41] more appropriate error type --- backend/es/es_update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/es/es_update.py b/backend/es/es_update.py index 800bb260c..46863cfa1 100644 --- a/backend/es/es_update.py +++ b/backend/es/es_update.py @@ -30,7 +30,7 @@ def run_update_task(task: UpdateIndexTask) -> None: task.corpus.name, corpus_definition, corpus_definition.update_script() ) else: - raise Exception("Cannot update without update_body or update_script") + raise RuntimeError("Cannot update without update_body or update_script") def update_index(corpus: str, corpus_definition: CorpusDefinition, query_model): From b62ac0ae79c09620cbd52a452b500ffb3c7c5a68 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 12:17:24 +0100 Subject: [PATCH 38/41] raise exception with conflicting arguments --- backend/es/management/commands/index.py | 72 +++++++++++++++++++++---- 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/backend/es/management/commands/index.py b/backend/es/management/commands/index.py index 89bc44f10..200f798f2 100644 --- a/backend/es/management/commands/index.py +++ b/backend/es/management/commands/index.py @@ -25,21 +25,23 @@ def add_arguments(self, parser): '--start', '-s', help='''Minimum date to select documents. The input format is YYYY-MM-DD. Optional. Only has effect for Python corpora which implement date selection - in their sources() method. No effect in combination with --mappings-only.''' + in their sources() method. Cannot be used in combination with + --mappings-only.''' ) parser.add_argument( '--end', '-e', help='''Maximum date to select documents. The input format is YYYY-MM-DD. Optional. Only has effect for Python corpora which implement date selection - in their sources() method. No effect in combination with --mappings-only.''' + in their sources() method. Cannot be used in combination with + --mappings-only.''' ) parser.add_argument( '--delete', '-d', action='store_true', help='''If this job is set to create an index that already exists, delete - it instead of raising an exception. No effect in combination with + it instead of raising an exception. Cannot be used in combination with --add.''' ) @@ -48,14 +50,15 @@ def add_arguments(self, parser): action='store_true', help='''Run an update script defined in the corpus definition (to add/change field values in documents). Only available for Python corpora. This - will also skip index creation and population.''' + will also skip index creation and population. Cannot be used in + combination with --add, --delete, or --mappings-only''' ) parser.add_argument( '--mappings-only', '-m', action='store_true', help='''Only create the index with mappings without adding data to it. This - is useful e.g. before a remote reindex. No effect in combination with + is useful e.g. before a remote reindex. Cannot be used in combination with --update.''' ) @@ -63,7 +66,7 @@ def add_arguments(self, parser): '--add', '-a', action='store_true', help='''Skip index creation. Documents will be added to the existing index - for the corpus. No effect in combination with --update.''' + for the corpus. Cannot be used in combination with --update.''' ) parser.add_argument( @@ -90,6 +93,10 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F corpus_definition = load_corpus_definition(corpus) + self._validate_arguments( + start, end, add, delete, update, mappings_only, prod, rollover + ) + try: if start: start_index = datetime.strptime(start, '%Y-%m-%d') @@ -108,11 +115,6 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F ) raise - if rollover and not prod: - logging.warning( - '--rollover flag is set but --prod flag not set; no effect.') - - job = create_indexing_job( corpus_object, start_index, end_index, mappings_only, add, delete, prod, rollover, update @@ -120,6 +122,54 @@ def handle(self, corpus, start=None, end=None, add=False, delete=False, update=F perform_indexing(job) + def _validate_arguments( + self, + start=None, + end=None, + add=False, + delete=False, + update=False, + mappings_only=False, + prod=False, + rollover=False, + ): + if (start or end) and mappings_only: + raise ValueError( + '--start cannot be used in combination with --mappings_only. Start/end ' + 'date determine data selection when populating or updating the index.' + ) + + if add and delete: + raise ValueError( + '--add and --delete cannot be used together. You must either delete the ' + 'existing index or add to it.' + ) + + if update and mappings_only: + raise ValueError( + '--update cannot be used in combination with --mappings_only. Updates ' + 'scripts are intended to be run on a populated index.' + ) + + if update and add: + raise ValueError( + '--update cannot be used in combination with --add. Update scripts are ' + 'always applied to an existing index.' + ) + + if update and delete: + raise ValueError( + '--update cannot be used in combination with --delete. You cannot update ' + 'and index after deleting it.' + ) + + if rollover and not prod: + raise ValueError( + '--rollover can only be used in combination with --prod. Alias rollover ' + 'is only applicable with versioned indices.' + ) + + def _corpus_object(self, corpus_name): load_all_corpus_definitions() return Corpus.objects.get(name=corpus_name) From c5e058a27b0d383ffa527166d9c79dc7cfd6fd58 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 13:49:26 +0100 Subject: [PATCH 39/41] remove ignore errors argument --- backend/es/es_index.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 10707cabf..c66300a00 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -7,7 +7,9 @@ from typing import Dict, Optional import datetime import logging +import warnings import elasticsearch.helpers as es_helpers +from elasticsearch.exceptions import NotFoundError from django.db import transaction from addcorpus.es_settings import es_settings @@ -65,8 +67,7 @@ def create(task: CreateIndexTask): if client.indices.exists(index=index_name): if task.delete_existing: logger.info('Attempting to clean old index...') - client.indices.delete( - index=index_name, ignore=[400, 404]) + client.indices.delete(index=index_name) else: logger.error( 'Index `{}` already exists. Do you need to add an alias for it or ' From e5833135f7332e75734ed0bbfc7a3097b019264f Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 13:53:47 +0100 Subject: [PATCH 40/41] don't match aliases/wildcards in create task --- backend/es/es_index.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/es/es_index.py b/backend/es/es_index.py index c66300a00..873e89b40 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -64,10 +64,10 @@ def create(task: CreateIndexTask): index_name = task.index.name es_mapping = _make_es_mapping(corpus_config) - if client.indices.exists(index=index_name): + if client.indices.exists(index=index_name, allow_no_indices=False): if task.delete_existing: logger.info('Attempting to clean old index...') - client.indices.delete(index=index_name) + client.indices.delete(index=index_name, allow_no_indices=False) else: logger.error( 'Index `{}` already exists. Do you need to add an alias for it or ' @@ -139,7 +139,9 @@ def populate(task: PopulateIndexTask): def update_index_settings(task: UpdateSettingsTask): client = task.client() client.indices.put_settings( - settings=task.settings, index=task.index.name + settings=task.settings, + index=task.index.name, + allow_no_indices=False, ) From 9ce04cbee167023f8657b35f343b88475a509e94 Mon Sep 17 00:00:00 2001 From: Luka van der Plas Date: Thu, 30 Jan 2025 14:19:13 +0100 Subject: [PATCH 41/41] debulk create_indexing_job --- backend/es/es_alias.py | 15 ++++++- backend/es/es_index.py | 88 ++++++++++++++++++++++-------------------- 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/backend/es/es_alias.py b/backend/es/es_alias.py index 8d35ccac7..edf4f951c 100644 --- a/backend/es/es_alias.py +++ b/backend/es/es_alias.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 import re from django.db import transaction +from elasticsearch import Elasticsearch +from typing import Generator from addcorpus.models import Corpus, CorpusConfiguration -from ianalyzer.elasticsearch import elasticsearch, server_for_corpus +from ianalyzer.elasticsearch import elasticsearch, server_for_corpus, client_from_config from es.models import Server, Index from es.sync import update_server_table_from_settings from indexing.models import IndexJob, DeleteIndexTask, RemoveAliasTask, AddAliasTask @@ -138,3 +140,14 @@ def get_highest_version_number(indices, current_index=None): return max([v for v in versions if v is not None]) except: return 0 + + +def indices_with_alias(server: Server, alias: str) -> Generator[Index, None, None]: + client = client_from_config(server.configuration) + if client.indices.exists_alias(name=alias): + for index_name in client.indices.get_alias(name=alias): + aliased_index, _ = Index.objects.get_or_create( + server=server, + name=index_name, + ) + yield aliased_index diff --git a/backend/es/es_index.py b/backend/es/es_index.py index 873e89b40..630ff24a5 100644 --- a/backend/es/es_index.py +++ b/backend/es/es_index.py @@ -4,12 +4,10 @@ Script to index the data into ElasticSearch. ''' -from typing import Dict, Optional +from typing import Dict, Optional, Tuple import datetime import logging -import warnings import elasticsearch.helpers as es_helpers -from elasticsearch.exceptions import NotFoundError from django.db import transaction from addcorpus.es_settings import es_settings @@ -19,7 +17,7 @@ from ianalyzer.elasticsearch import elasticsearch, server_for_corpus from .es_alias import ( get_current_index_name, get_new_version_number, - add_alias, remove_alias, delete_index + add_alias, remove_alias, delete_index, indices_with_alias ) from indexing.models import ( IndexJob, CreateIndexTask, PopulateIndexTask, UpdateIndexTask, @@ -164,35 +162,15 @@ def create_indexing_job( running an update script, and rolling over the alias. Parameters are described in detail in the documentation for the `index` command. ''' - job = IndexJob.objects.create(corpus=corpus) + create_new = not (add or update) update_server_table_from_settings() - server_name = server_for_corpus(corpus.name) - server = Server.objects.get(name=server_name) - client = elasticsearch(corpus.name) - base_name = corpus.configuration.es_index - - # tasks below are added in order of execution for readability - - if prod: - alias = corpus.configuration.es_alias or corpus.configuration.es_index - if add or update: - versioned_name = get_current_index_name( - corpus.configuration, client - ) - else: - next_version = get_new_version_number(client, alias, base_name) - versioned_name = f'{base_name}-{next_version}' - index, _ = Index.objects.get_or_create( - server=server, name=versioned_name - ) - else: - index, _ = Index.objects.get_or_create( - server=server, name=base_name - ) + job = IndexJob.objects.create(corpus=corpus) + server = _server_for_job(job) + index, alias = _index_and_alias_for_job(job, prod, create_new) - if not (add or update): + if create_new: CreateIndexTask.objects.create( job=job, index=index, @@ -216,7 +194,7 @@ def create_indexing_job( document_max_date=end, ) - if prod and not (add or update): + if prod and create_new: UpdateSettingsTask.objects.create( job=job, index=index, @@ -224,17 +202,12 @@ def create_indexing_job( ) if prod and rollover: - if client.indices.exists_alias(name=alias): - for index_name in client.indices.get_alias(name=alias): - aliased_index, _ = Index.objects.get_or_create( - server=server, - name=index_name, - ) - RemoveAliasTask.objects.create( - job=job, - index=aliased_index, - alias=alias, - ) + for aliased_index in indices_with_alias(server, alias): + RemoveAliasTask.objects.create( + job=job, + index=aliased_index, + alias=alias, + ) AddAliasTask.objects.create( job=job, index=index, @@ -244,6 +217,39 @@ def create_indexing_job( return job +def _server_for_job(job: IndexJob): + server_name = server_for_corpus(job.corpus.name) + server = Server.objects.get(name=server_name) + return server + + +def _index_and_alias_for_job(job: IndexJob, prod: bool, create_new: bool) -> Tuple[Index, str]: + corpus = job.corpus + server = _server_for_job(job) + client = elasticsearch(corpus.name) + base_name = corpus.configuration.es_index + + if prod: + alias = corpus.configuration.es_alias or corpus.configuration.es_index + if create_new: + next_version = get_new_version_number(client, alias, base_name) + versioned_name = f'{base_name}-{next_version}' + else: + versioned_name = get_current_index_name( + corpus.configuration, client + ) + + index, _ = Index.objects.get_or_create( + server=server, name=versioned_name + ) + else: + alias = None + index, _ = Index.objects.get_or_create( + server=server, name=base_name + ) + + return index, alias + def perform_indexing(job: IndexJob): '''