Skip to content

Commit

Permalink
Merge pull request #828 from aaxelb/feature/6189-denormal
Browse files Browse the repository at this point in the history
[ENG-6189] trovesearch denormalized
  • Loading branch information
aaxelb authored Nov 12, 2024
2 parents ec39058 + 46ccc83 commit d6d5774
Show file tree
Hide file tree
Showing 48 changed files with 2,489 additions and 1,113 deletions.
4 changes: 2 additions & 2 deletions api/search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from api import authentication
from share.search import exceptions
from share.search.index_strategy import IndexStrategy
from share.search import index_strategy


class Sharev2ElasticSearchView(views.APIView):
Expand All @@ -32,7 +32,7 @@ def _handle_request(self, request):
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
specific_index = IndexStrategy.get_for_sharev2_search(requested_index_strategy)
specific_index = index_strategy.get_index_for_sharev2_search(requested_index_strategy)
except exceptions.IndexStrategyError as error:
raise http.Http404(str(error))
try:
Expand Down
8 changes: 5 additions & 3 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pendulum
import sentry_sdk

from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
from share.util.xml import strip_illegal_xml_chars

Expand All @@ -34,14 +34,16 @@ class MetadataRecordsRSS(Feed):
description = 'Updates to the SHARE open dataset'
author_name = 'SHARE'

_search_index: index_strategy.IndexStrategy.SpecificIndex

def title(self, obj):
query = json.dumps(obj.get('query', 'All'))
return prepare_string('SHARE: Atom feed for query: {}'.format(query))

def get_object(self, request):
self._order = request.GET.get('order')
elastic_query = request.GET.get('elasticQuery')
self._index_strategy = IndexStrategy.get_for_sharev2_search(request.GET.get('indexStrategy'))
self._search_index = index_strategy.get_index_for_sharev2_search(request.GET.get('indexStrategy'))

if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
self._order = 'date_modified'
Expand All @@ -62,7 +64,7 @@ def get_object(self, request):

def items(self, obj):
try:
json_response = self._index_strategy.pls_handle_search__sharev2_backcompat(
json_response = self._search_index.pls_handle_search__sharev2_backcompat(
request_body=obj,
)
except IndexStrategyError:
Expand Down
45 changes: 4 additions & 41 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,52 +314,15 @@ def split(string, delim):
'TIMEOUT': int(os.environ.get('ELASTICSEARCH_TIMEOUT', '45')),
'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 2000)),
'MAX_RETRIES': int(os.environ.get('ELASTICSEARCH_MAX_RETRIES', 7)),
'INDEX_STRATEGIES': {}, # populated below based on environment
}
ELASTICSEARCH5_URL = (
os.environ.get('ELASTICSEARCH5_URL')
or os.environ.get('ELASTICSEARCH_URL')
or os.environ.get('ELASTICSEARCH_URL') # backcompat
)
if ELASTICSEARCH5_URL:
ELASTICSEARCH['INDEX_STRATEGIES']['sharev2_elastic5'] = {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic5.Sharev2Elastic5IndexStrategy',
'CLUSTER_SETTINGS': {
'URL': ELASTICSEARCH5_URL,
},
}
ELASTICSEARCH8_URL = os.environ.get('ELASTICSEARCH8_URL')
if ELASTICSEARCH8_URL:
ELASTICSEARCH8_CERT_PATH = os.environ.get('ELASTICSEARCH8_CERT_PATH')
ELASTICSEARCH8_USERNAME = os.environ.get('ELASTICSEARCH8_USERNAME', 'elastic')
ELASTICSEARCH8_SECRET = os.environ.get('ELASTICSEARCH8_SECRET')
ELASTICSEARCH8_CLUSTER_SETTINGS = {
'URL': ELASTICSEARCH8_URL,
'AUTH': (
(ELASTICSEARCH8_USERNAME, ELASTICSEARCH8_SECRET)
if ELASTICSEARCH8_SECRET is not None
else None
),
'CERT_PATH': ELASTICSEARCH8_CERT_PATH,
}
ELASTICSEARCH['INDEX_STRATEGIES'].update({
'sharev2_elastic8': {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic8.Sharev2Elastic8IndexStrategy',
'CLUSTER_SETTINGS': ELASTICSEARCH8_CLUSTER_SETTINGS,
},
'trove_indexcard_flats': {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.trove_indexcard_flats.TroveIndexcardFlatsIndexStrategy',
'CLUSTER_SETTINGS': ELASTICSEARCH8_CLUSTER_SETTINGS,
},
})
DEFAULT_INDEX_STRATEGY_FOR_LEGACY_SEARCH = (
'sharev2_elastic5'
if ELASTICSEARCH5_URL
else (
'sharev2_elastic8'
if ELASTICSEARCH8_URL
else None
)
)
ELASTICSEARCH8_CERT_PATH = os.environ.get('ELASTICSEARCH8_CERT_PATH')
ELASTICSEARCH8_USERNAME = os.environ.get('ELASTICSEARCH8_USERNAME', 'elastic')
ELASTICSEARCH8_SECRET = os.environ.get('ELASTICSEARCH8_SECRET')

# Seconds, not an actual celery settings
CELERY_RETRY_BACKOFF_BASE = int(os.environ.get('CELERY_RETRY_BACKOFF_BASE', 2 if DEBUG else 10))
Expand Down
7 changes: 6 additions & 1 deletion share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from share.admin.celery import CeleryTaskResultAdmin
from share.admin.jobs import HarvestJobAdmin
from share.admin.readonly import ReadOnlyAdmin
from share.admin.search import search_indexes_view
from share.admin.search import search_indexes_view, search_index_mappings_view
from share.admin.util import TimeLimitedPaginator, linked_fk, linked_many, SourceConfigFilter
from share.harvest.scheduler import HarvestScheduler
from share.models import (
Expand Down Expand Up @@ -48,6 +48,11 @@ def get_urls(self):
self.admin_view(search_indexes_view),
name='search-indexes',
),
path(
'search-index-mappings/<index_name>',
self.admin_view(search_index_mappings_view),
name='search-index-mappings',
),
*super().get_urls(),
]

Expand Down
38 changes: 26 additions & 12 deletions share/admin/search.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging

from django.http.response import HttpResponseRedirect
from django.http.response import HttpResponseRedirect, JsonResponse
from django.template.response import TemplateResponse
from django.urls import reverse

from share.admin.util import admin_url
from share.models.index_backfill import IndexBackfill
from share.search.index_messenger import IndexMessenger
from share.search.index_strategy import IndexStrategy
from share.search import index_strategy


logger = logging.getLogger(__name__)
Expand All @@ -20,11 +20,12 @@ def search_indexes_view(request):
'admin/search-indexes.html',
context={
'search_url_prefix': _search_url_prefix(),
'mappings_url_prefix': _mappings_url_prefix(),
'index_status_by_strategy': _index_status_by_strategy(),
},
)
if request.method == 'POST':
_specific_index = IndexStrategy.get_specific_index(request.POST['specific_indexname'])
_specific_index = index_strategy.get_specific_index(request.POST['specific_indexname'])
_pls_doer = PLS_DOERS[request.POST['pls_do']]
_pls_doer(_specific_index)
_redirect_id = (
Expand All @@ -35,24 +36,34 @@ def search_indexes_view(request):
return HttpResponseRedirect('#'.join((request.path, _redirect_id)))


def search_index_mappings_view(request, index_name):
_specific_index = index_strategy.get_specific_index(index_name)
_mappings = _specific_index.pls_get_mappings()
return JsonResponse(_mappings)


def _search_url_prefix():
api_url = reverse('api:search')
return f'{api_url}?indexStrategy=' # append strategyname or indexname


def _mappings_url_prefix():
return '/admin/search-index-mappings/'


def _index_status_by_strategy():
backfill_by_indexname = {
backfill_by_indexname: dict[str, IndexBackfill] = {
backfill.specific_indexname: backfill
for backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=IndexStrategy.all_strategies_by_name().keys())
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
)
}
status_by_strategy = {}
_messenger = IndexMessenger()
for index_strategy in IndexStrategy.all_strategies():
current_index = index_strategy.for_current_index()
status_by_strategy[index_strategy.name] = {
for _index_strategy in index_strategy.all_index_strategies().values():
current_index = _index_strategy.for_current_index()
status_by_strategy[_index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'backfill': _serialize_backfill(
Expand All @@ -62,7 +73,7 @@ def _index_status_by_strategy():
},
'prior': sorted((
specific_index.pls_get_status()
for specific_index in index_strategy.each_specific_index()
for specific_index in _index_strategy.each_specific_index()
if not specific_index.is_current
), reverse=True),
'queues': [
Expand All @@ -71,15 +82,18 @@ def _index_status_by_strategy():
**_messenger.get_queue_stats(_queue_name),
}
for _queue_name in (
index_strategy.urgent_messagequeue_name,
index_strategy.nonurgent_messagequeue_name,
_index_strategy.urgent_messagequeue_name,
_index_strategy.nonurgent_messagequeue_name,
)
],
}
return status_by_strategy


def _serialize_backfill(specific_index: IndexStrategy.SpecificIndex, backfill: IndexBackfill):
def _serialize_backfill(
specific_index: index_strategy.IndexStrategy.SpecificIndex,
backfill: IndexBackfill | None,
):
if not specific_index.is_current:
return {}
if not backfill:
Expand Down
12 changes: 5 additions & 7 deletions share/bin/search.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from project.celery import app as celery_app

from share.bin.util import command
from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
from share.search.daemon import IndexerDaemonControl

Expand Down Expand Up @@ -29,7 +29,7 @@ def purge(args, argv):
Usage: {0} search purge <index_names>...
"""
for index_name in args['<index_names>']:
specific_index = IndexStrategy.get_specific_index(index_name)
specific_index = index_strategy.get_specific_index(index_name)
specific_index.pls_delete()


Expand All @@ -43,18 +43,16 @@ def setup(args, argv):
if _is_initial:
_specific_indexes = [
_index_strategy.for_current_index()
for _index_strategy in IndexStrategy.all_strategies()
for _index_strategy in index_strategy.all_index_strategies().values()
]
else:
_index_or_strategy_name = args['<index_or_strategy_name>']
try:
_specific_indexes = [
IndexStrategy.get_by_name(_index_or_strategy_name).for_current_index(),
]
_specific_indexes = [index_strategy.get_specific_index(_index_or_strategy_name)]
except IndexStrategyError:
try:
_specific_indexes = [
IndexStrategy.get_specific_index(_index_or_strategy_name),
index_strategy.get_specific_index(_index_or_strategy_name),
]
except IndexStrategyError:
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
Expand Down
8 changes: 4 additions & 4 deletions share/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@


def check_all_index_strategies_current(app_configs, **kwargs):
from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
errors = []
for index_strategy in IndexStrategy.all_strategies():
for _index_strategy in index_strategy.all_index_strategies().values():
try:
index_strategy.assert_strategy_is_current()
_index_strategy.assert_strategy_is_current()
except IndexStrategyError as exception:
errors.append(
checks.Error(
'IndexStrategy changed without checksum confirmation!',
hint=str(exception),
obj=index_strategy,
obj=_index_strategy,
id='share.search.E001',
)
)
Expand Down
1 change: 1 addition & 0 deletions share/models/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class FeatureFlag(models.Model):
IGNORE_SHAREV2_INGEST = 'ignore_sharev2_ingest'
SUGGEST_CREATOR_FACET = 'suggest_creator_facet'
FORBID_UNTRUSTED_FEED = 'forbid_untrusted_feed'
TROVESEARCH_DENORMILY = 'trovesearch_denormily'

# name _should_ be one of the constants above, but that is not enforced by `choices`
name = models.TextField(unique=True)
Expand Down
3 changes: 1 addition & 2 deletions share/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from share.search.messages import MessageType, MessagesChunk
from share.search.index_strategy import IndexStrategy
from share.search.index_messenger import IndexMessenger


__all__ = ('IndexStrategy', 'IndexMessenger', 'MessageType', 'MessagesChunk',)
__all__ = ('IndexMessenger', 'MessageType', 'MessagesChunk',)
14 changes: 9 additions & 5 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from kombu.mixins import ConsumerMixin
import sentry_sdk

from share.search import exceptions, messages, IndexStrategy, IndexMessenger
from share.search import (
exceptions,
messages,
index_strategy,
IndexMessenger,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,7 +57,7 @@ def start_daemonthreads_for_strategy(self, index_strategy):
return _daemon

def start_all_daemonthreads(self):
for _index_strategy in IndexStrategy.all_strategies():
for _index_strategy in index_strategy.all_index_strategies().values():
self.start_daemonthreads_for_strategy(_index_strategy)

def stop_daemonthreads(self, *, wait=False):
Expand Down Expand Up @@ -176,7 +181,7 @@ def __repr__(self):

@dataclasses.dataclass
class MessageHandlingLoop:
index_strategy: IndexStrategy
index_strategy: index_strategy.IndexStrategy
message_type: messages.MessageType
stop_event: threading.Event
local_message_queue: queue.Queue
Expand Down Expand Up @@ -243,7 +248,6 @@ def _get_daemon_messages(self):
return daemon_messages_by_target_id

def _handle_some_messages(self):
# each message corresponds to one action on this daemon's index
start_time = time.time()
doc_count, error_count = 0, 0
daemon_messages_by_target_id = self._get_daemon_messages()
Expand All @@ -265,7 +269,7 @@ def _handle_some_messages(self):
logger.error('%sEncountered error: %s', self.log_prefix, message_response.error_text)
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
target_id = message_response.index_message.target_id
for daemon_message in daemon_messages_by_target_id.pop(target_id):
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
daemon_message.ack() # finally set it free
if daemon_messages_by_target_id: # should be empty by now
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
Expand Down
Loading

0 comments on commit d6d5774

Please sign in to comment.