From 898e85d65866279e07764c6d0212a303ed190f58 Mon Sep 17 00:00:00 2001 From: Michael Chouinard Date: Tue, 27 Aug 2024 12:02:44 -0400 Subject: [PATCH] More docs --- api/src/adapters/search/opensearch_client.py | 35 ++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/api/src/adapters/search/opensearch_client.py b/api/src/adapters/search/opensearch_client.py index 93ff76832..24d3bdb1d 100644 --- a/api/src/adapters/search/opensearch_client.py +++ b/api/src/adapters/search/opensearch_client.py @@ -112,6 +112,12 @@ def bulk_upsert( self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh) def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = True) -> None: + """ + Bulk delete records from an index + + See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details. + In this method, we delete records based on the IDs passed in. + """ bulk_operations = [] for _id in ids: @@ -130,11 +136,15 @@ def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = Tr self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh) def index_exists(self, index_name: str) -> bool: - # Check if an index OR alias exists by a given name + """ + Check if an index OR alias exists by a given name + """ return self._client.indices.exists(index_name) def alias_exists(self, alias_name: str) -> bool: - # Check if an alias exists + """ + Check if an alias exists + """ existing_index_mapping = self._client.cat.aliases(alias_name, format="json") return len(existing_index_mapping) > 0 @@ -194,6 +204,24 @@ def scroll( include_scores: bool = True, duration: str = "10m", ) -> Generator[SearchResponse, None, None]: + """ + Scroll (iterate) over a large result set a given search query. + + This query uses additional resources to keep the response open, but + keeps a consistent set of results and is useful for backend processes + that need to fetch a large amount of search data. After processing the results, + the scroll lock is closed for you. + + This method is setup as a generator method and the results can be iterated over:: + + for response in search_client.scroll("my_index", {"size": 10000}): + for record in response.records: + process_record(record) + + + See: https://opensearch.org/docs/latest/api-reference/scroll/ + """ + # start scroll response = self.search( index_name=index_name, @@ -209,6 +237,9 @@ def scroll( while True: raw_response = self._client.scroll({"scroll_id": scroll_id, "scroll": duration}) response = SearchResponse.from_opensearch_response(raw_response, include_scores) + + # The scroll ID can change between queries according to the docs, so we + # keep updating the value while iterating in case they change. scroll_id = response.scroll_id if len(response.records) == 0: