Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6189] trovesearch denormalized #828

Merged
merged 14 commits into from
Nov 12, 2024
4 changes: 2 additions & 2 deletions api/search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from api import authentication
from share.search import exceptions
from share.search.index_strategy import IndexStrategy
from share.search import index_strategy


class Sharev2ElasticSearchView(views.APIView):
Expand All @@ -32,7 +32,7 @@ def _handle_request(self, request):
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
specific_index = IndexStrategy.get_for_sharev2_search(requested_index_strategy)
specific_index = index_strategy.get_index_for_sharev2_search(requested_index_strategy)
except exceptions.IndexStrategyError as error:
raise http.Http404(str(error))
try:
Expand Down
8 changes: 5 additions & 3 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pendulum
import sentry_sdk

from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
from share.util.xml import strip_illegal_xml_chars

Expand All @@ -34,14 +34,16 @@ class MetadataRecordsRSS(Feed):
description = 'Updates to the SHARE open dataset'
author_name = 'SHARE'

_search_index: index_strategy.IndexStrategy.SpecificIndex

def title(self, obj):
query = json.dumps(obj.get('query', 'All'))
return prepare_string('SHARE: Atom feed for query: {}'.format(query))

def get_object(self, request):
self._order = request.GET.get('order')
elastic_query = request.GET.get('elasticQuery')
self._index_strategy = IndexStrategy.get_for_sharev2_search(request.GET.get('indexStrategy'))
self._search_index = index_strategy.get_index_for_sharev2_search(request.GET.get('indexStrategy'))

if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
self._order = 'date_modified'
Expand All @@ -62,7 +64,7 @@ def get_object(self, request):

def items(self, obj):
try:
json_response = self._index_strategy.pls_handle_search__sharev2_backcompat(
json_response = self._search_index.pls_handle_search__sharev2_backcompat(
request_body=obj,
)
except IndexStrategyError:
Expand Down
45 changes: 4 additions & 41 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,52 +314,15 @@ def split(string, delim):
'TIMEOUT': int(os.environ.get('ELASTICSEARCH_TIMEOUT', '45')),
'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 2000)),
'MAX_RETRIES': int(os.environ.get('ELASTICSEARCH_MAX_RETRIES', 7)),
'INDEX_STRATEGIES': {}, # populated below based on environment
}
ELASTICSEARCH5_URL = (
os.environ.get('ELASTICSEARCH5_URL')
or os.environ.get('ELASTICSEARCH_URL')
or os.environ.get('ELASTICSEARCH_URL') # backcompat
)
if ELASTICSEARCH5_URL:
ELASTICSEARCH['INDEX_STRATEGIES']['sharev2_elastic5'] = {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic5.Sharev2Elastic5IndexStrategy',
'CLUSTER_SETTINGS': {
'URL': ELASTICSEARCH5_URL,
},
}
ELASTICSEARCH8_URL = os.environ.get('ELASTICSEARCH8_URL')
if ELASTICSEARCH8_URL:
ELASTICSEARCH8_CERT_PATH = os.environ.get('ELASTICSEARCH8_CERT_PATH')
ELASTICSEARCH8_USERNAME = os.environ.get('ELASTICSEARCH8_USERNAME', 'elastic')
ELASTICSEARCH8_SECRET = os.environ.get('ELASTICSEARCH8_SECRET')
ELASTICSEARCH8_CLUSTER_SETTINGS = {
'URL': ELASTICSEARCH8_URL,
'AUTH': (
(ELASTICSEARCH8_USERNAME, ELASTICSEARCH8_SECRET)
if ELASTICSEARCH8_SECRET is not None
else None
),
'CERT_PATH': ELASTICSEARCH8_CERT_PATH,
}
aaxelb marked this conversation as resolved.
Show resolved Hide resolved
ELASTICSEARCH['INDEX_STRATEGIES'].update({
'sharev2_elastic8': {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic8.Sharev2Elastic8IndexStrategy',
'CLUSTER_SETTINGS': ELASTICSEARCH8_CLUSTER_SETTINGS,
},
'trove_indexcard_flats': {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.trove_indexcard_flats.TroveIndexcardFlatsIndexStrategy',
'CLUSTER_SETTINGS': ELASTICSEARCH8_CLUSTER_SETTINGS,
},
})
DEFAULT_INDEX_STRATEGY_FOR_LEGACY_SEARCH = (
'sharev2_elastic5'
if ELASTICSEARCH5_URL
else (
'sharev2_elastic8'
if ELASTICSEARCH8_URL
else None
)
)
ELASTICSEARCH8_CERT_PATH = os.environ.get('ELASTICSEARCH8_CERT_PATH')
ELASTICSEARCH8_USERNAME = os.environ.get('ELASTICSEARCH8_USERNAME', 'elastic')
ELASTICSEARCH8_SECRET = os.environ.get('ELASTICSEARCH8_SECRET')

# Seconds, not an actual celery settings
CELERY_RETRY_BACKOFF_BASE = int(os.environ.get('CELERY_RETRY_BACKOFF_BASE', 2 if DEBUG else 10))
Expand Down
7 changes: 6 additions & 1 deletion share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from share.admin.celery import CeleryTaskResultAdmin
from share.admin.jobs import HarvestJobAdmin
from share.admin.readonly import ReadOnlyAdmin
from share.admin.search import search_indexes_view
from share.admin.search import search_indexes_view, search_index_mappings_view
from share.admin.util import TimeLimitedPaginator, linked_fk, linked_many, SourceConfigFilter
from share.harvest.scheduler import HarvestScheduler
from share.models import (
Expand Down Expand Up @@ -48,6 +48,11 @@ def get_urls(self):
self.admin_view(search_indexes_view),
name='search-indexes',
),
path(
'search-index-mappings/<index_name>',
self.admin_view(search_index_mappings_view),
name='search-index-mappings',
),
*super().get_urls(),
]

Expand Down
38 changes: 26 additions & 12 deletions share/admin/search.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging

from django.http.response import HttpResponseRedirect
from django.http.response import HttpResponseRedirect, JsonResponse
from django.template.response import TemplateResponse
from django.urls import reverse

from share.admin.util import admin_url
from share.models.index_backfill import IndexBackfill
from share.search.index_messenger import IndexMessenger
from share.search.index_strategy import IndexStrategy
from share.search import index_strategy


logger = logging.getLogger(__name__)
Expand All @@ -20,11 +20,12 @@ def search_indexes_view(request):
'admin/search-indexes.html',
context={
'search_url_prefix': _search_url_prefix(),
'mappings_url_prefix': _mappings_url_prefix(),
'index_status_by_strategy': _index_status_by_strategy(),
},
)
if request.method == 'POST':
_specific_index = IndexStrategy.get_specific_index(request.POST['specific_indexname'])
_specific_index = index_strategy.get_specific_index(request.POST['specific_indexname'])
_pls_doer = PLS_DOERS[request.POST['pls_do']]
_pls_doer(_specific_index)
_redirect_id = (
Expand All @@ -35,24 +36,34 @@ def search_indexes_view(request):
return HttpResponseRedirect('#'.join((request.path, _redirect_id)))


def search_index_mappings_view(request, index_name):
_specific_index = index_strategy.get_specific_index(index_name)
_mappings = _specific_index.pls_get_mappings()
return JsonResponse(_mappings)


def _search_url_prefix():
api_url = reverse('api:search')
return f'{api_url}?indexStrategy=' # append strategyname or indexname


def _mappings_url_prefix():
return '/admin/search-index-mappings/'


def _index_status_by_strategy():
backfill_by_indexname = {
backfill_by_indexname: dict[str, IndexBackfill] = {
backfill.specific_indexname: backfill
for backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=IndexStrategy.all_strategies_by_name().keys())
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
)
}
status_by_strategy = {}
_messenger = IndexMessenger()
for index_strategy in IndexStrategy.all_strategies():
current_index = index_strategy.for_current_index()
status_by_strategy[index_strategy.name] = {
for _index_strategy in index_strategy.all_index_strategies().values():
current_index = _index_strategy.for_current_index()
status_by_strategy[_index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'backfill': _serialize_backfill(
Expand All @@ -62,7 +73,7 @@ def _index_status_by_strategy():
},
'prior': sorted((
specific_index.pls_get_status()
for specific_index in index_strategy.each_specific_index()
for specific_index in _index_strategy.each_specific_index()
if not specific_index.is_current
), reverse=True),
'queues': [
Expand All @@ -71,15 +82,18 @@ def _index_status_by_strategy():
**_messenger.get_queue_stats(_queue_name),
}
for _queue_name in (
index_strategy.urgent_messagequeue_name,
index_strategy.nonurgent_messagequeue_name,
_index_strategy.urgent_messagequeue_name,
_index_strategy.nonurgent_messagequeue_name,
)
],
}
return status_by_strategy


def _serialize_backfill(specific_index: IndexStrategy.SpecificIndex, backfill: IndexBackfill):
def _serialize_backfill(
specific_index: index_strategy.IndexStrategy.SpecificIndex,
backfill: IndexBackfill | None,
):
if not specific_index.is_current:
return {}
if not backfill:
Expand Down
12 changes: 5 additions & 7 deletions share/bin/search.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from project.celery import app as celery_app

from share.bin.util import command
from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
from share.search.daemon import IndexerDaemonControl

Expand Down Expand Up @@ -29,7 +29,7 @@ def purge(args, argv):
Usage: {0} search purge <index_names>...
"""
for index_name in args['<index_names>']:
specific_index = IndexStrategy.get_specific_index(index_name)
specific_index = index_strategy.get_specific_index(index_name)
specific_index.pls_delete()


Expand All @@ -43,18 +43,16 @@ def setup(args, argv):
if _is_initial:
_specific_indexes = [
_index_strategy.for_current_index()
for _index_strategy in IndexStrategy.all_strategies()
for _index_strategy in index_strategy.all_index_strategies().values()
]
else:
_index_or_strategy_name = args['<index_or_strategy_name>']
try:
_specific_indexes = [
IndexStrategy.get_by_name(_index_or_strategy_name).for_current_index(),
]
_specific_indexes = [index_strategy.get_specific_index(_index_or_strategy_name)]
except IndexStrategyError:
try:
_specific_indexes = [
IndexStrategy.get_specific_index(_index_or_strategy_name),
index_strategy.get_specific_index(_index_or_strategy_name),
]
except IndexStrategyError:
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
Expand Down
8 changes: 4 additions & 4 deletions share/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@


def check_all_index_strategies_current(app_configs, **kwargs):
from share.search import IndexStrategy
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
errors = []
for index_strategy in IndexStrategy.all_strategies():
for _index_strategy in index_strategy.all_index_strategies().values():
try:
index_strategy.assert_strategy_is_current()
_index_strategy.assert_strategy_is_current()
except IndexStrategyError as exception:
errors.append(
checks.Error(
'IndexStrategy changed without checksum confirmation!',
hint=str(exception),
obj=index_strategy,
obj=_index_strategy,
id='share.search.E001',
)
)
Expand Down
1 change: 1 addition & 0 deletions share/models/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class FeatureFlag(models.Model):
IGNORE_SHAREV2_INGEST = 'ignore_sharev2_ingest'
SUGGEST_CREATOR_FACET = 'suggest_creator_facet'
FORBID_UNTRUSTED_FEED = 'forbid_untrusted_feed'
TROVESEARCH_DENORMILY = 'trovesearch_denormily'

# name _should_ be one of the constants above, but that is not enforced by `choices`
name = models.TextField(unique=True)
Expand Down
3 changes: 1 addition & 2 deletions share/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from share.search.messages import MessageType, MessagesChunk
from share.search.index_strategy import IndexStrategy
from share.search.index_messenger import IndexMessenger


__all__ = ('IndexStrategy', 'IndexMessenger', 'MessageType', 'MessagesChunk',)
__all__ = ('IndexMessenger', 'MessageType', 'MessagesChunk',)
14 changes: 9 additions & 5 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from kombu.mixins import ConsumerMixin
import sentry_sdk

from share.search import exceptions, messages, IndexStrategy, IndexMessenger
from share.search import (
exceptions,
messages,
index_strategy,
IndexMessenger,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,7 +57,7 @@ def start_daemonthreads_for_strategy(self, index_strategy):
return _daemon

def start_all_daemonthreads(self):
for _index_strategy in IndexStrategy.all_strategies():
for _index_strategy in index_strategy.all_index_strategies().values():
self.start_daemonthreads_for_strategy(_index_strategy)

def stop_daemonthreads(self, *, wait=False):
Expand Down Expand Up @@ -176,7 +181,7 @@ def __repr__(self):

@dataclasses.dataclass
class MessageHandlingLoop:
index_strategy: IndexStrategy
index_strategy: index_strategy.IndexStrategy
message_type: messages.MessageType
stop_event: threading.Event
local_message_queue: queue.Queue
Expand Down Expand Up @@ -243,7 +248,6 @@ def _get_daemon_messages(self):
return daemon_messages_by_target_id

def _handle_some_messages(self):
# each message corresponds to one action on this daemon's index
start_time = time.time()
doc_count, error_count = 0, 0
daemon_messages_by_target_id = self._get_daemon_messages()
Expand All @@ -265,7 +269,7 @@ def _handle_some_messages(self):
logger.error('%sEncountered error: %s', self.log_prefix, message_response.error_text)
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
target_id = message_response.index_message.target_id
for daemon_message in daemon_messages_by_target_id.pop(target_id):
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
daemon_message.ack() # finally set it free
if daemon_messages_by_target_id: # should be empty by now
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
Expand Down
Loading
Loading