diff --git a/api/src/adapters/search/opensearch_client.py b/api/src/adapters/search/opensearch_client.py index bd649d572..93ff76832 100644 --- a/api/src/adapters/search/opensearch_client.py +++ b/api/src/adapters/search/opensearch_client.py @@ -129,6 +129,15 @@ def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = Tr ) self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh) + def index_exists(self, index_name: str) -> bool: + # Check if an index OR alias exists by a given name + return self._client.indices.exists(index_name) + + def alias_exists(self, alias_name: str) -> bool: + # Check if an alias exists + existing_index_mapping = self._client.cat.aliases(alias_name, format="json") + return len(existing_index_mapping) > 0 + def swap_alias_index( self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False ) -> None: diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py index b6fe1fd75..dcf778037 100644 --- a/api/src/search/backend/load_opportunities_to_index.py +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -50,7 +50,6 @@ def __init__( config = LoadOpportunitiesToIndexConfig() self.config = config - # TODO - determine if this is for a full refresh and set the index name based on that if is_full_refresh: current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") self.index_name = f"{self.config.index_prefix}-{current_timestamp}" @@ -60,17 +59,20 @@ def __init__( def run_task(self) -> None: if self.is_full_refresh: + logger.info("Running full refresh") self.full_refresh() else: + logger.info("Running incremental load") self.incremental_updates_and_deletes() def incremental_updates_and_deletes(self) -> None: existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index() - # load the records - # TODO - we should probably not load everything if what is in the search index - # is identical - otherwise this isn't much different from the full refresh - # BUT - need some sort of mechanism for determining that (timestamp?) + # load the records incrementally + # TODO - The point of this incremental load is to support upcoming work + # to load only opportunities that have changes as we'll eventually be indexing + # files which will take longer. However - the structure of the data isn't yet + # known so I want to hold on actually setting up any change-detection logic loaded_opportunity_ids = set() for opp_batch in self.fetch_opportunities(): loaded_opportunity_ids.update(self.load_records(opp_batch)) @@ -123,7 +125,11 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: ) def fetch_existing_opportunity_ids_in_index(self) -> set[int]: - # TODO - check if the index exists already + if not self.search_client.alias_exists(self.index_name): + raise RuntimeError( + "Alias %s does not exist, please run the full refresh job before the incremental job" + % self.index_name + ) opportunity_ids: set[int] = set() @@ -133,7 +139,7 @@ def fetch_existing_opportunity_ids_in_index(self) -> set[int]: include_scores=False, ): for record in response.records: - opportunity_ids.add(record.get("opportunity_id")) + opportunity_ids.add(record["opportunity_id"]) return opportunity_ids diff --git a/api/src/search/backend/load_search_data.py b/api/src/search/backend/load_search_data.py index cf6f0445f..5b82e5a6d 100644 --- a/api/src/search/backend/load_search_data.py +++ b/api/src/search/backend/load_search_data.py @@ -1,3 +1,5 @@ +import click + import src.adapters.db as db import src.adapters.search as search from src.adapters.db import flask_db @@ -8,8 +10,13 @@ @load_search_data_blueprint.cli.command( "load-opportunity-data", help="Load opportunity data from our database to the search index" ) +@click.option( + "--full-refresh/--incremental", + default=True, + help="Whether to run a full refresh, or only incrementally update oppportunities", +) @flask_db.with_db_session() -def load_opportunity_data(db_session: db.Session) -> None: +def load_opportunity_data(db_session: db.Session, full_refresh: bool) -> None: search_client = search.SearchClient() - LoadOpportunitiesToIndex(db_session, search_client).run() + LoadOpportunitiesToIndex(db_session, search_client, full_refresh).run() diff --git a/api/tests/src/adapters/search/test_opensearch_client.py b/api/tests/src/adapters/search/test_opensearch_client.py index c2e830197..8de2c2cc9 100644 --- a/api/tests/src/adapters/search/test_opensearch_client.py +++ b/api/tests/src/adapters/search/test_opensearch_client.py @@ -122,6 +122,53 @@ def test_swap_alias_index(search_client, generic_index): assert search_client._client.indices.exists(tmp_index) is False +def test_index_or_alias_exists(search_client, generic_index): + # Create a few aliased indexes + index_a = f"test-index-a-{uuid.uuid4().int}" + index_b = f"test-index-b-{uuid.uuid4().int}" + index_c = f"test-index-c-{uuid.uuid4().int}" + + search_client.create_index(index_a) + search_client.create_index(index_b) + search_client.create_index(index_c) + + alias_index_a = f"test-alias-a-{uuid.uuid4().int}" + alias_index_b = f"test-alias-b-{uuid.uuid4().int}" + alias_index_c = f"test-alias-c-{uuid.uuid4().int}" + + search_client.swap_alias_index(index_a, alias_index_a) + search_client.swap_alias_index(index_b, alias_index_b) + search_client.swap_alias_index(index_c, alias_index_c) + + # Checking the indexes directly - we expect the index method to return true + # and the alias method to not + assert search_client.index_exists(index_a) is True + assert search_client.index_exists(index_b) is True + assert search_client.index_exists(index_c) is True + + assert search_client.alias_exists(index_a) is False + assert search_client.alias_exists(index_b) is False + assert search_client.alias_exists(index_c) is False + + # We just created these aliases, they should exist + assert search_client.index_exists(alias_index_a) is True + assert search_client.index_exists(alias_index_b) is True + assert search_client.index_exists(alias_index_c) is True + + assert search_client.alias_exists(alias_index_a) is True + assert search_client.alias_exists(alias_index_b) is True + assert search_client.alias_exists(alias_index_c) is True + + # Other random things won't be found for either case + assert search_client.index_exists("test-index-a") is False + assert search_client.index_exists("asdasdasd") is False + assert search_client.index_exists(alias_index_a + "-other") is False + + assert search_client.alias_exists("test-index-a") is False + assert search_client.alias_exists("asdasdasd") is False + assert search_client.alias_exists(alias_index_a + "-other") is False + + def test_scroll(search_client, generic_index): records = [ {"id": 1, "title": "Green Eggs & Ham", "notes": "why are the eggs green?"}, diff --git a/api/tests/src/search/backend/test_load_opportunities_to_index.py b/api/tests/src/search/backend/test_load_opportunities_to_index.py index 303c10ea0..9a3961f2b 100644 --- a/api/tests/src/search/backend/test_load_opportunities_to_index.py +++ b/api/tests/src/search/backend/test_load_opportunities_to_index.py @@ -103,7 +103,6 @@ def test_load_opportunities_to_index( opportunity_index_alias, load_opportunities_to_index, ): - # TODO - need to test/modify logic to be better about handling not already having an index index_name = "partial-refresh-index-" + get_now_us_eastern_datetime().strftime( "%Y-%m-%d_%H-%M-%S" ) @@ -141,3 +140,14 @@ def test_load_opportunities_to_index( resp = search_client.search(opportunity_index_alias, {"size": 100}) assert resp.total_records == len(opportunities) + + def test_load_opportunities_to_index_index_does_not_exist(self, db_session, search_client): + config = LoadOpportunitiesToIndexConfig( + alias_name="fake-index-that-will-not-exist", index_prefix="test-load-opps" + ) + load_opportunities_to_index = LoadOpportunitiesToIndex( + db_session, search_client, False, config + ) + + with pytest.raises(RuntimeError, match="please run the full refresh job"): + load_opportunities_to_index.run()