From 1cbbc181ea5eb6e14bd722b7192d240c36477b00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=B4mulo=20Penido?= Date: Thu, 14 Mar 2024 16:54:29 -0300 Subject: [PATCH] feat: update search index when course content is updated --- openedx/core/djangoapps/content/search/api.py | 328 ++++++++++++++++++ .../djangoapps/content/search/documents.py | 33 +- .../djangoapps/content/search/handlers.py | 72 ++++ .../search/management/commands/meili_mixin.py | 105 ------ .../management/commands/reindex_studio.py | 107 +----- .../core/djangoapps/content/search/tasks.py | 52 +++ 6 files changed, 478 insertions(+), 219 deletions(-) create mode 100644 openedx/core/djangoapps/content/search/api.py delete mode 100644 openedx/core/djangoapps/content/search/management/commands/meili_mixin.py create mode 100644 openedx/core/djangoapps/content/search/tasks.py diff --git a/openedx/core/djangoapps/content/search/api.py b/openedx/core/djangoapps/content/search/api.py new file mode 100644 index 000000000000..14b457612f43 --- /dev/null +++ b/openedx/core/djangoapps/content/search/api.py @@ -0,0 +1,328 @@ +""" +Content index and search API using Meilisearch +""" +from __future__ import annotations + +import logging +import time +from contextlib import contextmanager +from typing import Callable, Generator + +import meilisearch +from django.conf import settings +from django.core.cache import cache +from meilisearch.errors import MeilisearchError +from meilisearch.models.task import TaskInfo +from opaque_keys.edx.keys import UsageKey + +from openedx.core.djangoapps.content.search.documents import ( + STUDIO_INDEX_NAME, + Fields, + meili_id_from_opaque_key, + searchable_doc_for_course_block, + searchable_doc_for_library_block +) +from openedx.core.djangoapps.content_libraries import api as lib_api +from xmodule.modulestore import ModuleStoreEnum +from xmodule.modulestore.django import modulestore + +from .documents import Fields, searchable_doc_for_course_block, searchable_doc_for_library_block + +log = logging.getLogger(__name__) + +STUDIO_INDEX_NAME = "studio_content" +INDEX_NAME = settings.MEILISEARCH_INDEX_PREFIX + STUDIO_INDEX_NAME + +_MEILI_CLIENT = None + +LOCK_EXPIRE = 5 * 60 # Lock expires in 5 minutes + + +@contextmanager +def _index_rebuild_lock(index_name: str) -> Generator[None, None, None]: + """ + Lock to prevent that the index is updated while it is being rebuilt + """ + timeout_at = time.monotonic() + LOCK_EXPIRE + lock_id = f"lock-meilisearch-index-{index_name}" + + while True: + status = cache.add(lock_id, True, LOCK_EXPIRE) + if status: + # Lock acquired + try: + yield + finally: + break + + if time.monotonic() > timeout_at: + raise TimeoutError("Timeout acquiring lock") + + time.sleep(1) + + # Release the lock + cache.delete(lock_id) + + +def _wait_index_rebuild_lock(index_name: str) -> None: + """ + Wait for the index rebuild lock to be released + """ + timeout_at = time.monotonic() + LOCK_EXPIRE + lock_id = f"lock-meilisearch-index-{index_name}" + + while cache.get(lock_id): + if time.monotonic() > timeout_at: + raise TimeoutError("Timeout waiting lock") + + time.sleep(1) + + +def _get_meilisearch_client(): + """ + Get the Meiliesearch client + """ + + global _MEILI_CLIENT # pylint: disable=global-statement + if _MEILI_CLIENT is not None: + return _MEILI_CLIENT + + # Connect to Meilisearch + if not settings.MEILISEARCH_URL: + raise RuntimeError("MEILISEARCH_URL is not set - search functionality disabled.") + + _MEILI_CLIENT = meilisearch.Client(settings.MEILISEARCH_URL, settings.MEILISEARCH_API_KEY) + try: + _MEILI_CLIENT.health() + except MeilisearchError as err: + _MEILI_CLIENT = None + raise ConnectionError("Unable to connect to Meilisearch") from err + return _MEILI_CLIENT + + +def _wait_for_meili_task(info: TaskInfo) -> None: + """ + Simple helper method to wait for a Meilisearch task to complete + """ + client = _get_meilisearch_client() + current_status = client.get_task(info.task_uid) + while current_status.status in ("enqueued", "processing"): + time.sleep(1) + current_status = client.get_task(info.task_uid) + if current_status.status != "succeeded": + try: + err_reason = current_status.error['message'] + except (TypeError, KeyError): + err_reason = "Unknown error" + raise MeilisearchError(err_reason) + + +def _index_exists(index_name: str) -> bool: + """ + Check if an index exists + """ + client = _get_meilisearch_client() + try: + client.get_index(index_name) + except MeilisearchError as err: + if err.code == "index_not_found": + return False + else: + raise err + return True + + +@contextmanager +def _using_temp_index(target_index, status_cb: Callable[[str], None] | None = None) -> Generator[str, None, None]: + """ + Create a new temporary Meilisearch index, populate it, then swap it to + become the active index. + """ + def nop(_): + pass + + if status_cb is None: + status_cb = nop + + client = _get_meilisearch_client() + status_cb("Checking index...") + temp_index_name = target_index + "_new" + with _index_rebuild_lock(target_index): + if _index_exists(temp_index_name): + status_cb("Temporary index already exists. Deleting it...") + _wait_for_meili_task(client.delete_index(temp_index_name)) + + status_cb("Creating new index...") + _wait_for_meili_task( + client.create_index(temp_index_name, {'primaryKey': 'id'}) + ) + new_index_created = client.get_index(temp_index_name).created_at + + yield temp_index_name + + if not _index_exists(target_index): + # We have to create the "target" index before we can successfully swap the new one into it: + status_cb("Preparing to swap into index (first time)...") + _wait_for_meili_task(client.create_index(target_index)) + status_cb("Swapping index...") + client.swap_indexes([{'indexes': [temp_index_name, target_index]}]) + # If we're using an API key that's restricted to certain index prefix(es), we won't be able to get the status + # of this request unfortunately. https://github.com/meilisearch/meilisearch/issues/4103 + while True: + time.sleep(1) + if client.get_index(target_index).created_at != new_index_created: + status_cb("Waiting for swap completion...") + pass + else: + break + status_cb("Deleting old index...") + _wait_for_meili_task(client.delete_index(temp_index_name)) + + +def _recurse_children(block, fn, status_cb: Callable[[str], None] | None = None) -> None: + """ + Recurse the children of an XBlock and call the given function for each + + The main purpose of this is just to wrap the loading of each child in + try...except. Otherwise block.get_children() would do what we need. + """ + if block.has_children: + for child_id in block.children: + try: + child = block.get_child(child_id) + except Exception as err: # pylint: disable=broad-except + log.exception(err) + if status_cb is not None: + status_cb(f"Unable to load block {child_id}") + pass + else: + fn(child) + + +def rebuild_index(status_cb: Callable[[str], None] | None) -> None: + """ + Rebuild the Meilisearch index from scratch + """ + def nop(_message): + pass + + if status_cb is None: + status_cb = nop + + client = _get_meilisearch_client() + store = modulestore() + + # Get the lists of libraries + status_cb("Counting libraries...") + lib_keys = [lib.library_key for lib in lib_api.ContentLibrary.objects.select_related('org').only('org', 'slug')] + num_libraries = len(lib_keys) + + # Get the list of courses + status_cb("Counting courses...") + with store.branch_setting(ModuleStoreEnum.Branch.draft_preferred): + all_courses = store.get_courses() + num_courses = len(all_courses) + + # Some counters so we can track our progress as indexing progresses: + num_contexts = num_courses + num_libraries + num_contexts_done = 0 # How many courses/libraries we've indexed + num_blocks_done = 0 # How many individual components/XBlocks we've indexed + + status_cb(f"Found {num_courses} courses and {num_libraries} libraries.") + with _using_temp_index(INDEX_NAME, status_cb) as temp_index_name: + ############## Configure the index ############## + # Mark usage_key as unique (it's not the primary key for the index, but nevertheless must be unique): + client.index(temp_index_name).update_distinct_attribute(Fields.usage_key) + # Mark which attributes can be used for filtering/faceted search: + client.index(temp_index_name).update_filterable_attributes([ + Fields.block_type, + Fields.context_key, + Fields.org, + Fields.tags, + Fields.type, + ]) + + ############## Libraries ############## + status_cb("Indexing libraries...") + for lib_key in lib_keys: + status_cb(f"{num_contexts_done + 1}/{num_contexts}. Now indexing library {lib_key}") + docs = [] + for component in lib_api.get_library_components(lib_key): + metadata = lib_api.LibraryXBlockMetadata.from_component(lib_key, component) + doc = searchable_doc_for_library_block(metadata) + docs.append(doc) + num_blocks_done += 1 + # Add all the docs in this library at once (usually faster than adding one at a time): + _wait_for_meili_task(client.index(temp_index_name).add_documents(docs)) + num_contexts_done += 1 + + ############## Courses ############## + status_cb("Indexing courses...") + for course in all_courses: + status_cb( + f"{num_contexts_done + 1}/{num_contexts}. Now indexing course {course.display_name} ({course.id})" + ) + docs = [] + + def add_with_children(block): + """ Recursively index the given XBlock/component """ + doc = searchable_doc_for_course_block(block) + docs.append(doc) # pylint: disable=cell-var-from-loop + _recurse_children(block, add_with_children) # pylint: disable=cell-var-from-loop + + _recurse_children(course, add_with_children) + + # Add all the docs in this course at once (usually faster than adding one at a time): + _wait_for_meili_task(client.index(temp_index_name).add_documents(docs)) + num_contexts_done += 1 + num_blocks_done += len(docs) + + status_cb(f"Done! {num_blocks_done} blocks indexed across {num_contexts_done} courses and libraries.") + + +def upsert_xblock_index_doc( + usage_key: UsageKey, recursive: bool = True, update_metadata: bool = True, update_tags: bool = True +) -> None: + """ + Creates or updates the document for the given XBlock in the search index + + Args: + usage_key (UsageKey): The usage key of the XBlock to index + recursive (bool): If True, also index all children of the XBlock + update_metadata (bool): If True, update the metadata of the XBlock + update_tags (bool): If True, update the tags of the XBlock + """ + # If there is a rebuild in progress, wait for it to finish + # This could still be a problem if a rebuild starts right after this check, but we don't want to prevent + # concurrent updates entirely. + _wait_index_rebuild_lock(INDEX_NAME) + + course = modulestore().get_item(usage_key) + client = _get_meilisearch_client() + + docs = [] + + def add_with_children(block): + """ Recursively index the given XBlock/component """ + doc = searchable_doc_for_course_block(block, metadata=update_metadata, tags=update_tags) + docs.append(doc) + if recursive: + _recurse_children(block, add_with_children) + + add_with_children(course) + + _wait_for_meili_task(client.index(INDEX_NAME).update_documents(docs)) + + +def delete_xblock_index_doc(usage_key: UsageKey) -> None: + """ + Deletes the document for the given XBlock from the search index + """ + # If there is a rebuild in progress, wait for it to finish + # This could still be a problem if a rebuild starts right after this check, but we don't want to prevent + # concurrent updates entirely. + _wait_index_rebuild_lock(INDEX_NAME) + + client = _get_meilisearch_client() + + _wait_for_meili_task(client.index(INDEX_NAME).delete_document(meili_id_from_opaque_key(usage_key))) diff --git a/openedx/core/djangoapps/content/search/documents.py b/openedx/core/djangoapps/content/search/documents.py index 7cb61e5dca06..4ba4c8750b93 100644 --- a/openedx/core/djangoapps/content/search/documents.py +++ b/openedx/core/djangoapps/content/search/documents.py @@ -2,10 +2,11 @@ Utilities related to searching content libraries """ from __future__ import annotations + import logging from django.utils.text import slugify -from opaque_keys.edx.keys import UsageKey, LearningContextKey +from opaque_keys.edx.keys import LearningContextKey, UsageKey from openedx.core.djangoapps.content_libraries import api as lib_api from openedx.core.djangoapps.content_tagging import api as tagging_api @@ -49,7 +50,7 @@ class DocType: library_block = "library_block" -def _meili_id_from_opaque_key(usage_key: UsageKey) -> str: +def meili_id_from_opaque_key(usage_key: UsageKey) -> str: """ Meilisearch requires each document to have a primary key that's either an integer or a string composed of alphanumeric characters (a-z A-Z 0-9), @@ -92,7 +93,6 @@ class implementation returns only: log.exception(f"Failed to process index_dictionary for {block.usage_key}: {err}") block_data = {} block_data.update({ - Fields.id: _meili_id_from_opaque_key(block.usage_key), Fields.usage_key: str(block.usage_key), Fields.block_id: str(block.usage_key.block_id), Fields.display_name: block.display_name, # TODO: there is some function to get the fallback display_name @@ -141,7 +141,7 @@ def _tags_for_content_object(object_id: UsageKey | LearningContextKey) -> dict: parts = [part.replace(" > ", " _ ") for part in parts] # Escape our separator for level in range(0, 4): new_value = " > ".join(parts[0:level + 2]) - if not f"level{level}" in result: + if f"level{level}" not in result: result[f"level{level}"] = [new_value] elif new_value not in result[f"level{level}"]: result[f"level{level}"].append(new_value) @@ -164,7 +164,7 @@ def searchable_doc_for_library_block(metadata: lib_api.LibraryXBlockMetadata) -> log.exception(f"Failed to load XBlock {metadata.usage_key}: {err}") # Even though we couldn't load the block, we can still include basic data about it in the index, from 'metadata' doc.update({ - Fields.id: _meili_id_from_opaque_key(metadata.usage_key), + Fields.id: meili_id_from_opaque_key(metadata.usage_key), Fields.usage_key: str(metadata.usage_key), Fields.block_id: str(metadata.usage_key.block_id), Fields.display_name: metadata.display_name, @@ -179,13 +179,26 @@ def searchable_doc_for_library_block(metadata: lib_api.LibraryXBlockMetadata) -> return doc -def searchable_doc_for_course_block(block) -> dict: +def searchable_doc_for_course_block(block, metadata: bool = True, tags: bool = True) -> dict: """ Generate a dictionary document suitable for ingestion into a search engine - like Meilisearch or Elasticsearch, so that the given library block can be + like Meilisearch or Elasticsearch, so that the given course block can be found using faceted search. + + Args: + block: The XBlock instance to index + metadata: If True, include the block's metadata in the doc + tags: If True, include the block's tags in the doc """ - doc = _fields_from_block(block) - doc.update(_tags_for_content_object(block.usage_key)) - doc[Fields.type] = DocType.course_block + doc = { + Fields.id: meili_id_from_opaque_key(block.usage_key), + Fields.type: DocType.course_block, + } + + if metadata: + doc.update(_fields_from_block(block)) + + if tags: + doc.update(_tags_for_content_object(block.usage_key)) + return doc diff --git a/openedx/core/djangoapps/content/search/handlers.py b/openedx/core/djangoapps/content/search/handlers.py index e69de29bb2d1..aa6ffa8e1d0c 100644 --- a/openedx/core/djangoapps/content/search/handlers.py +++ b/openedx/core/djangoapps/content/search/handlers.py @@ -0,0 +1,72 @@ +""" +Handlers for content indexing +""" + +import logging + +from django.dispatch import receiver +from openedx_events.content_authoring.data import XBlockData +from openedx_events.content_authoring.signals import ( + XBLOCK_CREATED, + XBLOCK_DELETED, + XBLOCK_UPDATED +) + +from .tasks import delete_xblock_index_doc, upsert_xblock_index_doc + +log = logging.getLogger(__name__) + + +@receiver(XBLOCK_CREATED) +def xblock_created_handler(**kwargs) -> None: + """ + Create the index for the XBlock + """ + # FixMe: Add check for enabled melisearch + + xblock_info = kwargs.get("xblock_info", None) + if not xblock_info or not isinstance(xblock_info, XBlockData): + log.error("Received null or incorrect data for event") + return + + upsert_xblock_index_doc.delay( + str(xblock_info.usage_key), + recursive=False, + update_metadata=True, + update_tags=False, + ) + + +@receiver(XBLOCK_UPDATED) +def xblock_updated_handler(**kwargs) -> None: + """ + Update the index for the XBlock and its children + """ + # FixMe: Add check for enabled melisearch + + xblock_info = kwargs.get("xblock_info", None) + if not xblock_info or not isinstance(xblock_info, XBlockData): + log.error("Received null or incorrect data for event") + return + + upsert_xblock_index_doc.delay( + str(xblock_info.usage_key), + recursive=True, # Update all children because the breadcrumb may have changed + update_metadata=True, + update_tags=False, + ) + + +@receiver(XBLOCK_DELETED) +def xblock_deleted_handler(**kwargs) -> None: + """ + Delete the index for the XBlock + """ + # FixMe: Add check for enabled melisearch + + xblock_info = kwargs.get("xblock_info", None) + if not xblock_info or not isinstance(xblock_info, XBlockData): + log.error("Received null or incorrect data for event") + return + + delete_xblock_index_doc.delay(str(xblock_info.usage_key)) diff --git a/openedx/core/djangoapps/content/search/management/commands/meili_mixin.py b/openedx/core/djangoapps/content/search/management/commands/meili_mixin.py deleted file mode 100644 index 798dfd8a7e47..000000000000 --- a/openedx/core/djangoapps/content/search/management/commands/meili_mixin.py +++ /dev/null @@ -1,105 +0,0 @@ -""" -Mixin for Django management commands that interact with Meilisearch -""" -from contextlib import contextmanager -import time - -from django.conf import settings -from django.core.management import CommandError -import meilisearch -from meilisearch.errors import MeilisearchError -from meilisearch.models.task import TaskInfo - - -class MeiliCommandMixin: - """ - Mixin for Django management commands that interact with Meilisearch - """ - def get_meilisearch_client(self): - """ - Get the Meiliesearch client - """ - if hasattr(self, "_meili_client"): - return self._meili_client - # Connect to Meilisearch - if not settings.MEILISEARCH_URL: - raise CommandError("MEILISEARCH_URL is not set - search functionality disabled.") - - self._meili_client = meilisearch.Client(settings.MEILISEARCH_URL, settings.MEILISEARCH_API_KEY) - try: - self._meili_client.health() - except MeilisearchError as err: - self.stderr.write(err.message) # print this because 'raise...from...' doesn't print the details - raise CommandError("Unable to connect to Meilisearch") from err - return self._meili_client - - def wait_for_meili_task(self, info: TaskInfo): - """ - Simple helper method to wait for a Meilisearch task to complete - """ - client = self.get_meilisearch_client() - current_status = client.get_task(info.task_uid) - while current_status.status in ("enqueued", "processing"): - self.stdout.write("...") - time.sleep(1) - current_status = client.get_task(info.task_uid) - if current_status.status != "succeeded": - self.stderr.write(f"Task has status: {current_status.status}") - self.stderr.write(str(current_status.error)) - try: - err_reason = current_status.error['message'] - except (TypeError, KeyError): - err_reason = "Unknown error" - raise MeilisearchError(err_reason) - - def index_exists(self, index_name: str) -> bool: - """ - Check if an index exists - """ - client = self.get_meilisearch_client() - try: - client.get_index(index_name) - except MeilisearchError as err: - if err.code == "index_not_found": - return False - else: - raise err - return True - - @contextmanager - def using_temp_index(self, target_index): - """ - Create a new temporary Meilisearch index, populate it, then swap it to - become the active index. - """ - client = self.get_meilisearch_client() - self.stdout.write("Checking index...") - temp_index_name = target_index + "_new" - if self.index_exists(temp_index_name): - self.stdout.write("Temporary index already exists. Deleting it...") - self.wait_for_meili_task(client.delete_index(temp_index_name)) - - self.stdout.write("Creating new index...") - self.wait_for_meili_task( - client.create_index(temp_index_name, {'primaryKey': 'id'}) - ) - new_index_created = client.get_index(temp_index_name).created_at - - yield temp_index_name - - if not self.index_exists(target_index): - # We have to create the "target" index before we can successfully swap the new one into it: - self.stdout.write("Preparing to swap into index (first time)...") - self.wait_for_meili_task(client.create_index(target_index)) - self.stdout.write("Swapping index...") - client.swap_indexes([{'indexes': [temp_index_name, target_index]}]) - # If we're using an API key that's restricted to certain index prefix(es), we won't be able to get the status - # of this request unfortunately. https://github.com/meilisearch/meilisearch/issues/4103 - while True: - time.sleep(1) - if client.get_index(target_index).created_at != new_index_created: - self.stdout.write("Waiting for swap completion...") - else: - break - self.stdout.write("Deleting old index...") - self.wait_for_meili_task(client.delete_index(temp_index_name)) diff --git a/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py b/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py index fe0981e24ca4..c780b4e20f06 100644 --- a/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py +++ b/openedx/core/djangoapps/content/search/management/commands/reindex_studio.py @@ -5,27 +5,12 @@ See also cms/djangoapps/contentstore/management/commands/reindex_course.py which indexes LMS (published) courses in ElasticSearch. """ -import logging - -from django.conf import settings from django.core.management import BaseCommand -from openedx.core.djangoapps.content_libraries import api as lib_api -from openedx.core.djangoapps.content.search.documents import ( - Fields, - searchable_doc_for_course_block, - searchable_doc_for_library_block, - STUDIO_INDEX_NAME, -) -from xmodule.modulestore import ModuleStoreEnum -from xmodule.modulestore.django import modulestore -from .meili_mixin import MeiliCommandMixin - +from ... import api -log = logging.getLogger(__name__) - -class Command(MeiliCommandMixin, BaseCommand): +class Command(BaseCommand): """ Build or re-build the search index for courses (in Studio, i.e. Draft mode) """ @@ -34,90 +19,4 @@ def handle(self, *args, **options): """ Build a new search index for Studio, containing content from courses and libraries """ - client = self.get_meilisearch_client() - store = modulestore() - - # Get the lists of libraries - self.stdout.write("Counting libraries...") - lib_keys = [lib.library_key for lib in lib_api.ContentLibrary.objects.select_related('org').only('org', 'slug')] - num_libraries = len(lib_keys) - - # Get the list of courses - self.stdout.write("Counting courses...") - with store.branch_setting(ModuleStoreEnum.Branch.draft_preferred): - all_courses = store.get_courses() - num_courses = len(all_courses) - - # Some counters so we can track our progress as indexing progresses: - num_contexts = num_courses + num_libraries - num_contexts_done = 0 # How many courses/libraries we've indexed - num_blocks_done = 0 # How many individual components/XBlocks we've indexed - - self.stdout.write(f"Found {num_courses} courses and {num_libraries} libraries.") - index_name = settings.MEILISEARCH_INDEX_PREFIX + STUDIO_INDEX_NAME - with self.using_temp_index(index_name) as temp_index_name: - ############## Configure the index ############## - # Mark usage_key as unique (it's not the primary key for the index, but nevertheless must be unique): - client.index(temp_index_name).update_distinct_attribute(Fields.usage_key) - # Mark which attributes can be used for filtering/faceted search: - client.index(temp_index_name).update_filterable_attributes([ - Fields.block_type, - Fields.context_key, - Fields.org, - Fields.tags, - Fields.type, - ]) - - ############## Libraries ############## - self.stdout.write("Indexing libraries...") - for lib_key in lib_keys: - self.stdout.write(f"{num_contexts_done + 1}/{num_contexts}. Now indexing library {lib_key}") - docs = [] - for component in lib_api.get_library_components(lib_key): - metadata = lib_api.LibraryXBlockMetadata.from_component(lib_key, component) - doc = searchable_doc_for_library_block(metadata) - docs.append(doc) - num_blocks_done += 1 - # Add all the docs in this library at once (usually faster than adding one at a time): - self.wait_for_meili_task(client.index(temp_index_name).add_documents(docs)) - num_contexts_done += 1 - - ############## Courses ############## - self.stdout.write("Indexing courses...") - for course in all_courses: - self.stdout.write( - f"{num_contexts_done + 1}/{num_contexts}. Now indexing course {course.display_name} ({course.id})" - ) - docs = [] - - def add_with_children(block): - """ Recursively index the given XBlock/component """ - doc = searchable_doc_for_course_block(block) - docs.append(doc) # pylint: disable=cell-var-from-loop - self.recurse_children(block, add_with_children) # pylint: disable=cell-var-from-loop - - self.recurse_children(course, add_with_children) - - # Add all the docs in this course at once (usually faster than adding one at a time): - self.wait_for_meili_task(client.index(temp_index_name).add_documents(docs)) - num_contexts_done += 1 - num_blocks_done += len(docs) - - self.stdout.write(f"Done! {num_blocks_done} blocks indexed across {num_contexts_done} courses and libraries.") - - def recurse_children(self, block, fn): - """ - Recurse the children of an XBlock and call the given function for each - - The main purpose of this is just to wrap the loading of each child in - try...except. Otherwise block.get_children() would do what we need. - """ - if block.has_children: - for child_id in block.children: - try: - child = block.get_child(child_id) - except Exception as err: # pylint: disable=broad-except - log.exception(err) - self.stderr.write(f"Unable to load block {child_id}") - else: - fn(child) + api.rebuild_index(self.stdout.write) diff --git a/openedx/core/djangoapps/content/search/tasks.py b/openedx/core/djangoapps/content/search/tasks.py new file mode 100644 index 000000000000..e79cd58ef7d8 --- /dev/null +++ b/openedx/core/djangoapps/content/search/tasks.py @@ -0,0 +1,52 @@ +""" +Defines asynchronous celery task for content indexing +""" + +from __future__ import annotations + +import logging + +from celery import shared_task +from celery_utils.logged_task import LoggedTask +from edx_django_utils.monitoring import set_code_owner_attribute +from opaque_keys.edx.keys import UsageKey + +from . import api + +log = logging.getLogger(__name__) + + +@shared_task(base=LoggedTask) +@set_code_owner_attribute +def upsert_xblock_index_doc(usage_key_str: str, recursive: bool, update_metadata: bool, update_tags: bool) -> bool: + """ + """ + try: + usage_key = UsageKey.from_string(usage_key_str) + + log.info("Updating content index document for XBlock with id: %s", usage_key) + + api.upsert_xblock_index_doc(usage_key, recursive, update_metadata, update_tags) + + return True + except Exception as e: # pylint: disable=broad-except + log.error("Error updating content index document for XBlock with id: %s. %s", usage_key_str, e) + return False + + +@shared_task(base=LoggedTask) +@set_code_owner_attribute +def delete_xblock_index_doc(usage_key_str: str) -> bool: + """ + """ + try: + usag_key = UsageKey.from_string(usage_key_str) + + log.info("Updating content index document for XBlock with id: %s", usag_key) + + api.delete_xblock_index_doc(usag_key) + + return True + except Exception as e: # pylint: disable=broad-except + log.error("Error deleting content index document for XBlock with id: %s. %s", usage_key_str, e) + return False