Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexJob model #1699

Merged
merged 45 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e4525a2
add indexing app
lukavdplas Nov 12, 2024
62155a8
draft IndexJob and IndexTask models
lukavdplas Nov 12, 2024
bceec20
add utility methods
lukavdplas Nov 12, 2024
1718402
add admin interface
lukavdplas Nov 12, 2024
de10047
draft create_job function
lukavdplas Nov 12, 2024
ab7a69d
create job in index command
lukavdplas Nov 13, 2024
274df24
create job in alias command
lukavdplas Nov 13, 2024
0a6c589
add @transaction.atomic decorator
lukavdplas Nov 13, 2024
1681b1e
add prod field to CreateIndexTask
lukavdplas Nov 13, 2024
83eb892
add check if alias exists
lukavdplas Nov 13, 2024
20ba659
add job as argument to perform_indexing
lukavdplas Nov 13, 2024
8d63e9a
use createindextask input in create func
lukavdplas Nov 13, 2024
e6fb5cb
allow date inpunt in consolidate_start_end_years
lukavdplas Nov 13, 2024
bc0c3b3
use task input for populate func
lukavdplas Nov 13, 2024
410415d
use task input to update settings
lukavdplas Nov 13, 2024
ab2d951
add add_alias, remove_alias, delete_index functions
lukavdplas Nov 13, 2024
0420fc2
get alias actions from indexjob
lukavdplas Nov 13, 2024
fd7de54
move warning call
lukavdplas Nov 13, 2024
d09ac1e
remove unused arguments from perform_indexing
lukavdplas Nov 13, 2024
8d2a823
hande update through perform_indexing
lukavdplas Nov 13, 2024
dce9c68
clarity and log messages
lukavdplas Nov 13, 2024
93f8e3d
remove alias function
lukavdplas Nov 13, 2024
f265fbe
docstrings
lukavdplas Nov 13, 2024
28bdcf0
order imports
lukavdplas Nov 13, 2024
56ad31f
fix jewishmigration test
lukavdplas Nov 13, 2024
db55569
support corpus min_date, max_date as date type
lukavdplas Nov 13, 2024
5d40e22
docstrings
lukavdplas Nov 13, 2024
297c65e
improve documentation of index commands
lukavdplas Nov 13, 2024
b86f4fb
no production settings in test
lukavdplas Nov 13, 2024
fef82b0
clean up migrations
lukavdplas Nov 13, 2024
f5cd63d
update server table before fetching server
lukavdplas Nov 13, 2024
172be31
update documentation
lukavdplas Nov 13, 2024
08250dc
skip update settings if --add or --update is used
lukavdplas Nov 13, 2024
30ca5f7
parentheses for boolean order of operations
lukavdplas Nov 13, 2024
9dee520
Merge branch 'develop' into feature/index-job-models
lukavdplas Nov 14, 2024
7933dca
fix migrations
lukavdplas Nov 14, 2024
33601df
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 22, 2025
f42c7ae
add custom exception for corpora
lukavdplas Jan 30, 2025
8c2648a
more appropriate error type
lukavdplas Jan 30, 2025
b62ac0a
raise exception with conflicting arguments
lukavdplas Jan 30, 2025
bdbf0af
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 30, 2025
c5e058a
remove ignore errors argument
lukavdplas Jan 30, 2025
e583313
don't match aliases/wildcards in create task
lukavdplas Jan 30, 2025
9ce04cb
debulk create_indexing_job
lukavdplas Jan 30, 2025
e1c769a
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions backend/addcorpus/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from addcorpus.models import Corpus


class PythonDefinitionRequired(Exception):
'''
Exception that can be raised when attempting to use functionality only applicable for
Python corpora, on a corpus that does not have a Python definition.
'''

def __init__(self, corpus: Corpus, message: str, *args):
self.corpus = corpus
self.message = message
super().__init__(*args)


def __str__(self):
return f'{self.message} (corpus: {self.corpus})'

class NoPythonDefinitionAllowed(Exception):
'''
Exception that can be raised when attempting to use functionality only applicable for
database-only corpora, on a corpus with a Python definition.
'''

def __init__(self, corpus: Corpus, message: str, *args):
self.corpus = corpus
self.message = message
super().__init__(*args)


def __str__(self):
return f'{self.message} (corpus: {self.corpus})'

12 changes: 11 additions & 1 deletion backend/addcorpus/python_corpora/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,26 @@ def f(metadata):
return f


def consolidate_start_end_years(start, end, min_date, max_date):
def consolidate_start_end_years(
start: Union[datetime, date, int],
end: Union[datetime, date, int],
min_date: datetime,
max_date: datetime
):
''' given a start and end date provided by the user, make sure
- that start is not before end
- that start is not before min_date (corpus variable)
- that end is not after max_date (corpus variable)
'''
if isinstance(start, int):
start = datetime(year=start, month=1, day=1)
elif isinstance(start, date):
start = datetime(year=start.year, month=start.month, day=start.day)
if isinstance(end, int):
end = datetime(year=end, month=12, day=31)
elif isinstance(end, date):
end = datetime(year=end.year, month=end.month, day=end.day)

if start > end:
tmp = start
start = end
Expand Down
4 changes: 2 additions & 2 deletions backend/addcorpus/python_corpora/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
passed through to ElasticSearch.
'''

from datetime import datetime
from datetime import datetime, date
from addcorpus.constants import MappingType

class Filter(object):
Expand All @@ -24,7 +24,7 @@ def serialize(self):
search_dict = {'name': name}
for key, value in self.__dict__.items():
if key == 'search_filter' or key != 'field':
if type(value) == datetime:
if isinstance(value, datetime) or isinstance(value, date):
search_dict[key] = value.isoformat()
else:
search_dict[key] = value
Expand Down
15 changes: 12 additions & 3 deletions backend/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

from ianalyzer.elasticsearch import client_from_config
from addcorpus.python_corpora.save_corpus import load_and_save_all_corpora
from es import es_index as index
from es import es_index as index, sync
from django.conf import settings
from django.contrib.auth.models import Group
from addcorpus.models import Corpus
from addcorpus.serializers import CorpusJSONDefinitionSerializer
from es.models import Server

@pytest.fixture(autouse=True)
def media_dir(tmpdir, settings):
Expand Down Expand Up @@ -112,6 +113,13 @@ def es_client():

return client


@pytest.fixture()
def es_server(db, settings) -> Server:
sync.update_server_table_from_settings()
return Server.objects.get(name='default')


@pytest.fixture()
def basic_mock_corpus() -> str:
return 'mock-csv-corpus'
Expand Down Expand Up @@ -157,8 +165,9 @@ def _index_test_corpus(es_client: Elasticsearch, corpus_name: str):

if not es_client.indices.exists(index=corpus.configuration.es_index):
with warnings.catch_warnings():
warnings.filterwarnings('ignore', message="Corpus has no 'id' field")
index.perform_indexing(corpus)
job = index.create_indexing_job(corpus)
index.perform_indexing(job)

# ES is "near real time", so give it a second before we start searching the index
sleep(2)

Expand Down
3 changes: 2 additions & 1 deletion backend/corpora/jewishmigration/test_jewishmigration.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def jm_client(es_client, jm_corpus):
Returns an elastic search client for the mock corpus.
"""
# add data from mock corpus
es_index.create(es_client, jm_corpus, False, True, False)
job = es_index.create_indexing_job(jm_corpus, mappings_only=True, clear=True)
es_index.perform_indexing(job)

# ES is "near real time", so give it a second before we start searching the index
sleep(1)
Expand Down
19 changes: 16 additions & 3 deletions backend/es/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from addcorpus.python_corpora.load_corpus import load_corpus_definition
from addcorpus.models import Corpus
from es import es_index

from es.models import Index
from indexing.models import IndexJob, CreateIndexTask

@pytest.fixture(scope='session')
def mock_corpus():
Expand Down Expand Up @@ -58,14 +59,26 @@ def es_index_client(es_client, mock_corpus):
es_client.indices.delete(index=index)

@pytest.fixture()
def es_alias_client(es_client, mock_corpus):
def es_alias_client(db, es_server, es_client, mock_corpus):
"""
Create multiple indices with version numbers for the mock corpus in elasticsearch.
Returns an elastic search client for the mock corpus.
"""
# add data from mock corpus
corpus = Corpus.objects.get(name=mock_corpus)
es_index.create(es_client, corpus, add=False, clear=True, prod=True) # create ianalyzer-times-1 index
index = Index.objects.create(
name='times-test-1',
server=es_server,
)
job = IndexJob.objects.create(
corpus=corpus,
)
create_task = CreateIndexTask.objects.create(
job=job,
index=index,
delete_existing=True,
)
es_index.create(create_task)
es_client.indices.create(index='times-test-2')
es_client.indices.create(index='times-test-bla-3')

Expand Down
84 changes: 63 additions & 21 deletions backend/es/es_alias.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
#!/usr/bin/env python3
import re
from django.db import transaction
from elasticsearch import Elasticsearch
from typing import Generator

from addcorpus.models import Corpus, CorpusConfiguration
from ianalyzer.elasticsearch import elasticsearch
from ianalyzer.elasticsearch import elasticsearch, server_for_corpus, client_from_config
from es.models import Server, Index
from es.sync import update_server_table_from_settings
from indexing.models import IndexJob, DeleteIndexTask, RemoveAliasTask, AddAliasTask

import logging
logger = logging.getLogger('indexing')


def alias(corpus: Corpus, clean=False):
@transaction.atomic
def create_alias_job(corpus: Corpus, clean=False) -> IndexJob:
'''
Script to create, update and remove aliases from ES
Create a job to move the alias of a corpus to the index with the highest version
'''

job = IndexJob.objects.create(corpus=corpus)

corpus_config = corpus.configuration
corpus_name = corpus.name
update_server_table_from_settings()
server = Server.objects.get(name=server_for_corpus(corpus_name))
index_name = corpus_config.es_index
index_alias = corpus_config.es_alias
client = elasticsearch(corpus_name)
Expand All @@ -22,33 +33,53 @@ def alias(corpus: Corpus, clean=False):
indices = client.indices.get(index='{}-*'.format(index_name))
highest_version = get_highest_version_number(indices, alias)

actions = []

for index_name, properties in indices.items():
is_aliased = alias in properties['aliases'].keys()
is_highest_version = extract_version(index_name, alias) == highest_version
index, _ = Index.objects.get_or_create(server=server, name=index_name)

if not is_highest_version and clean:
logger.info('Removing index `{}`'.format(index_name))
# note that removing an index automatically removes alias
actions.append({'remove_index': {'index': index_name}})
DeleteIndexTask.objects.create(job=job, index=index)

if not is_highest_version and is_aliased and not clean:
logger.info('Removing alias `{}` for index `{}`'.format(alias, index_name))
actions.append(
{'remove': {'index': index_name, 'alias': alias}})
RemoveAliasTask.objects.create(job=job, index=index, alias=alias)

if is_highest_version and not is_aliased:
logger.info('Adding alias `{}` for index `{}`'.format(alias, index_name))
actions.append(
{'add': {'index': index_name, 'alias': alias}})
elif is_highest_version and is_aliased:
logger.info('Alias `{}` already exists for `{}`, skipping alias creation'.format(
alias, index_name))
AddAliasTask.objects.create(job=job, index=index, alias=alias)

return job


def add_alias(task: AddAliasTask):
'''
Add an alias to an Elasticsearch index, as defined by an AddAliasTask
'''
client = task.client()
client.indices.put_alias(
index=task.index.name,
name=task.alias
)

if len(actions) > 0:
client.indices.update_aliases(actions=actions)
logger.info('Done updating aliases')

def remove_alias(task: RemoveAliasTask):
'''
Remove an alias from an Elasticsearch index, as defined by a RemoveAliasTask
'''
client = task.client()
client.indices.delete_alias(
index=task.index.name,
name=task.alias
)


def delete_index(task: DeleteIndexTask):
'''
Delete an Elasticsearch index, as defined by a DeleteIndexTask
'''
client = task.client()
client.indices.delete(
index=task.index.name,
)


def get_current_index_name(corpus: CorpusConfiguration, client) -> str:
Expand Down Expand Up @@ -109,3 +140,14 @@ def get_highest_version_number(indices, current_index=None):
return max([v for v in versions if v is not None])
except:
return 0


def indices_with_alias(server: Server, alias: str) -> Generator[Index, None, None]:
client = client_from_config(server.configuration)
if client.indices.exists_alias(name=alias):
for index_name in client.indices.get_alias(name=alias):
aliased_index, _ = Index.objects.get_or_create(
server=server,
name=index_name,
)
yield aliased_index
Loading