Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexJob model #1699

Merged
merged 45 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e4525a2
add indexing app
lukavdplas Nov 12, 2024
62155a8
draft IndexJob and IndexTask models
lukavdplas Nov 12, 2024
bceec20
add utility methods
lukavdplas Nov 12, 2024
1718402
add admin interface
lukavdplas Nov 12, 2024
de10047
draft create_job function
lukavdplas Nov 12, 2024
ab7a69d
create job in index command
lukavdplas Nov 13, 2024
274df24
create job in alias command
lukavdplas Nov 13, 2024
0a6c589
add @transaction.atomic decorator
lukavdplas Nov 13, 2024
1681b1e
add prod field to CreateIndexTask
lukavdplas Nov 13, 2024
83eb892
add check if alias exists
lukavdplas Nov 13, 2024
20ba659
add job as argument to perform_indexing
lukavdplas Nov 13, 2024
8d63e9a
use createindextask input in create func
lukavdplas Nov 13, 2024
e6fb5cb
allow date inpunt in consolidate_start_end_years
lukavdplas Nov 13, 2024
bc0c3b3
use task input for populate func
lukavdplas Nov 13, 2024
410415d
use task input to update settings
lukavdplas Nov 13, 2024
ab2d951
add add_alias, remove_alias, delete_index functions
lukavdplas Nov 13, 2024
0420fc2
get alias actions from indexjob
lukavdplas Nov 13, 2024
fd7de54
move warning call
lukavdplas Nov 13, 2024
d09ac1e
remove unused arguments from perform_indexing
lukavdplas Nov 13, 2024
8d2a823
hande update through perform_indexing
lukavdplas Nov 13, 2024
dce9c68
clarity and log messages
lukavdplas Nov 13, 2024
93f8e3d
remove alias function
lukavdplas Nov 13, 2024
f265fbe
docstrings
lukavdplas Nov 13, 2024
28bdcf0
order imports
lukavdplas Nov 13, 2024
56ad31f
fix jewishmigration test
lukavdplas Nov 13, 2024
db55569
support corpus min_date, max_date as date type
lukavdplas Nov 13, 2024
5d40e22
docstrings
lukavdplas Nov 13, 2024
297c65e
improve documentation of index commands
lukavdplas Nov 13, 2024
b86f4fb
no production settings in test
lukavdplas Nov 13, 2024
fef82b0
clean up migrations
lukavdplas Nov 13, 2024
f5cd63d
update server table before fetching server
lukavdplas Nov 13, 2024
172be31
update documentation
lukavdplas Nov 13, 2024
08250dc
skip update settings if --add or --update is used
lukavdplas Nov 13, 2024
30ca5f7
parentheses for boolean order of operations
lukavdplas Nov 13, 2024
9dee520
Merge branch 'develop' into feature/index-job-models
lukavdplas Nov 14, 2024
7933dca
fix migrations
lukavdplas Nov 14, 2024
33601df
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 22, 2025
f42c7ae
add custom exception for corpora
lukavdplas Jan 30, 2025
8c2648a
more appropriate error type
lukavdplas Jan 30, 2025
b62ac0a
raise exception with conflicting arguments
lukavdplas Jan 30, 2025
bdbf0af
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 30, 2025
c5e058a
remove ignore errors argument
lukavdplas Jan 30, 2025
e583313
don't match aliases/wildcards in create task
lukavdplas Jan 30, 2025
9ce04cb
debulk create_indexing_job
lukavdplas Jan 30, 2025
e1c769a
Merge branch 'develop' into feature/index-job-models
lukavdplas Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
debulk create_indexing_job
  • Loading branch information
lukavdplas committed Jan 30, 2025
commit 9ce04cbee167023f8657b35f343b88475a509e94
15 changes: 14 additions & 1 deletion backend/es/es_alias.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python3
import re
from django.db import transaction
from elasticsearch import Elasticsearch
from typing import Generator

from addcorpus.models import Corpus, CorpusConfiguration
from ianalyzer.elasticsearch import elasticsearch, server_for_corpus
from ianalyzer.elasticsearch import elasticsearch, server_for_corpus, client_from_config
from es.models import Server, Index
from es.sync import update_server_table_from_settings
from indexing.models import IndexJob, DeleteIndexTask, RemoveAliasTask, AddAliasTask
Expand Down Expand Up @@ -138,3 +140,14 @@ def get_highest_version_number(indices, current_index=None):
return max([v for v in versions if v is not None])
except:
return 0


def indices_with_alias(server: Server, alias: str) -> Generator[Index, None, None]:
client = client_from_config(server.configuration)
if client.indices.exists_alias(name=alias):
for index_name in client.indices.get_alias(name=alias):
aliased_index, _ = Index.objects.get_or_create(
server=server,
name=index_name,
)
yield aliased_index
88 changes: 47 additions & 41 deletions backend/es/es_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
Script to index the data into ElasticSearch.
'''

from typing import Dict, Optional
from typing import Dict, Optional, Tuple
import datetime
import logging
import warnings
import elasticsearch.helpers as es_helpers
from elasticsearch.exceptions import NotFoundError
from django.db import transaction

from addcorpus.es_settings import es_settings
Expand All @@ -19,7 +17,7 @@
from ianalyzer.elasticsearch import elasticsearch, server_for_corpus
from .es_alias import (
get_current_index_name, get_new_version_number,
add_alias, remove_alias, delete_index
add_alias, remove_alias, delete_index, indices_with_alias
)
from indexing.models import (
IndexJob, CreateIndexTask, PopulateIndexTask, UpdateIndexTask,
Expand Down Expand Up @@ -164,35 +162,15 @@ def create_indexing_job(
running an update script, and rolling over the alias. Parameters are described
in detail in the documentation for the `index` command.
'''
job = IndexJob.objects.create(corpus=corpus)
create_new = not (add or update)

update_server_table_from_settings()
server_name = server_for_corpus(corpus.name)
server = Server.objects.get(name=server_name)
client = elasticsearch(corpus.name)
base_name = corpus.configuration.es_index

# tasks below are added in order of execution for readability

if prod:
alias = corpus.configuration.es_alias or corpus.configuration.es_index
if add or update:
versioned_name = get_current_index_name(
corpus.configuration, client
)
else:
next_version = get_new_version_number(client, alias, base_name)
versioned_name = f'{base_name}-{next_version}'

index, _ = Index.objects.get_or_create(
server=server, name=versioned_name
)
else:
index, _ = Index.objects.get_or_create(
server=server, name=base_name
)
job = IndexJob.objects.create(corpus=corpus)
server = _server_for_job(job)
index, alias = _index_and_alias_for_job(job, prod, create_new)

if not (add or update):
if create_new:
CreateIndexTask.objects.create(
job=job,
index=index,
Expand All @@ -216,25 +194,20 @@ def create_indexing_job(
document_max_date=end,
)

if prod and not (add or update):
if prod and create_new:
UpdateSettingsTask.objects.create(
job=job,
index=index,
settings={"number_of_replicas": 1},
)

if prod and rollover:
if client.indices.exists_alias(name=alias):
for index_name in client.indices.get_alias(name=alias):
aliased_index, _ = Index.objects.get_or_create(
server=server,
name=index_name,
)
RemoveAliasTask.objects.create(
job=job,
index=aliased_index,
alias=alias,
)
for aliased_index in indices_with_alias(server, alias):
RemoveAliasTask.objects.create(
job=job,
index=aliased_index,
alias=alias,
)
AddAliasTask.objects.create(
job=job,
index=index,
Expand All @@ -244,6 +217,39 @@ def create_indexing_job(
return job


def _server_for_job(job: IndexJob):
server_name = server_for_corpus(job.corpus.name)
server = Server.objects.get(name=server_name)
return server


def _index_and_alias_for_job(job: IndexJob, prod: bool, create_new: bool) -> Tuple[Index, str]:
corpus = job.corpus
server = _server_for_job(job)
client = elasticsearch(corpus.name)
base_name = corpus.configuration.es_index

if prod:
alias = corpus.configuration.es_alias or corpus.configuration.es_index
if create_new:
next_version = get_new_version_number(client, alias, base_name)
versioned_name = f'{base_name}-{next_version}'
else:
versioned_name = get_current_index_name(
corpus.configuration, client
)

index, _ = Index.objects.get_or_create(
server=server, name=versioned_name
)
else:
alias = None
index, _ = Index.objects.get_or_create(
server=server, name=base_name
)

return index, alias


def perform_indexing(job: IndexJob):
'''
Expand Down