Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
CLeanup, tests, more impl
Browse files Browse the repository at this point in the history
  • Loading branch information
chouinar committed Aug 26, 2024
1 parent 903145d commit 0f25373
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 10 deletions.
9 changes: 9 additions & 0 deletions api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = Tr
)
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)

def index_exists(self, index_name: str) -> bool:
# Check if an index OR alias exists by a given name
return self._client.indices.exists(index_name)

def alias_exists(self, alias_name: str) -> bool:
# Check if an alias exists
existing_index_mapping = self._client.cat.aliases(alias_name, format="json")
return len(existing_index_mapping) > 0

def swap_alias_index(
self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False
) -> None:
Expand Down
20 changes: 13 additions & 7 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(
config = LoadOpportunitiesToIndexConfig()
self.config = config

# TODO - determine if this is for a full refresh and set the index name based on that
if is_full_refresh:
current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")
self.index_name = f"{self.config.index_prefix}-{current_timestamp}"
Expand All @@ -60,17 +59,20 @@ def __init__(

def run_task(self) -> None:
if self.is_full_refresh:
logger.info("Running full refresh")
self.full_refresh()
else:
logger.info("Running incremental load")
self.incremental_updates_and_deletes()

def incremental_updates_and_deletes(self) -> None:
existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index()

# load the records
# TODO - we should probably not load everything if what is in the search index
# is identical - otherwise this isn't much different from the full refresh
# BUT - need some sort of mechanism for determining that (timestamp?)
# load the records incrementally
# TODO - The point of this incremental load is to support upcoming work
# to load only opportunities that have changes as we'll eventually be indexing
# files which will take longer. However - the structure of the data isn't yet
# known so I want to hold on actually setting up any change-detection logic
loaded_opportunity_ids = set()
for opp_batch in self.fetch_opportunities():
loaded_opportunity_ids.update(self.load_records(opp_batch))
Expand Down Expand Up @@ -123,7 +125,11 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
)

def fetch_existing_opportunity_ids_in_index(self) -> set[int]:
# TODO - check if the index exists already
if not self.search_client.alias_exists(self.index_name):
raise RuntimeError(
"Alias %s does not exist, please run the full refresh job before the incremental job"
% self.index_name
)

opportunity_ids: set[int] = set()

Expand All @@ -133,7 +139,7 @@ def fetch_existing_opportunity_ids_in_index(self) -> set[int]:
include_scores=False,
):
for record in response.records:
opportunity_ids.add(record.get("opportunity_id"))
opportunity_ids.add(record["opportunity_id"])

return opportunity_ids

Expand Down
11 changes: 9 additions & 2 deletions api/src/search/backend/load_search_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import click

import src.adapters.db as db
import src.adapters.search as search
from src.adapters.db import flask_db
Expand All @@ -8,8 +10,13 @@
@load_search_data_blueprint.cli.command(
"load-opportunity-data", help="Load opportunity data from our database to the search index"
)
@click.option(
"--full-refresh/--incremental",
default=True,
help="Whether to run a full refresh, or only incrementally update oppportunities",
)
@flask_db.with_db_session()
def load_opportunity_data(db_session: db.Session) -> None:
def load_opportunity_data(db_session: db.Session, full_refresh: bool) -> None:
search_client = search.SearchClient()

LoadOpportunitiesToIndex(db_session, search_client).run()
LoadOpportunitiesToIndex(db_session, search_client, full_refresh).run()
47 changes: 47 additions & 0 deletions api/tests/src/adapters/search/test_opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,53 @@ def test_swap_alias_index(search_client, generic_index):
assert search_client._client.indices.exists(tmp_index) is False


def test_index_or_alias_exists(search_client, generic_index):
# Create a few aliased indexes
index_a = f"test-index-a-{uuid.uuid4().int}"
index_b = f"test-index-b-{uuid.uuid4().int}"
index_c = f"test-index-c-{uuid.uuid4().int}"

search_client.create_index(index_a)
search_client.create_index(index_b)
search_client.create_index(index_c)

alias_index_a = f"test-alias-a-{uuid.uuid4().int}"
alias_index_b = f"test-alias-b-{uuid.uuid4().int}"
alias_index_c = f"test-alias-c-{uuid.uuid4().int}"

search_client.swap_alias_index(index_a, alias_index_a)
search_client.swap_alias_index(index_b, alias_index_b)
search_client.swap_alias_index(index_c, alias_index_c)

# Checking the indexes directly - we expect the index method to return true
# and the alias method to not
assert search_client.index_exists(index_a) is True
assert search_client.index_exists(index_b) is True
assert search_client.index_exists(index_c) is True

assert search_client.alias_exists(index_a) is False
assert search_client.alias_exists(index_b) is False
assert search_client.alias_exists(index_c) is False

# We just created these aliases, they should exist
assert search_client.index_exists(alias_index_a) is True
assert search_client.index_exists(alias_index_b) is True
assert search_client.index_exists(alias_index_c) is True

assert search_client.alias_exists(alias_index_a) is True
assert search_client.alias_exists(alias_index_b) is True
assert search_client.alias_exists(alias_index_c) is True

# Other random things won't be found for either case
assert search_client.index_exists("test-index-a") is False
assert search_client.index_exists("asdasdasd") is False
assert search_client.index_exists(alias_index_a + "-other") is False

assert search_client.alias_exists("test-index-a") is False
assert search_client.alias_exists("asdasdasd") is False
assert search_client.alias_exists(alias_index_a + "-other") is False


def test_scroll(search_client, generic_index):
records = [
{"id": 1, "title": "Green Eggs & Ham", "notes": "why are the eggs green?"},
Expand Down
12 changes: 11 additions & 1 deletion api/tests/src/search/backend/test_load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def test_load_opportunities_to_index(
opportunity_index_alias,
load_opportunities_to_index,
):
# TODO - need to test/modify logic to be better about handling not already having an index
index_name = "partial-refresh-index-" + get_now_us_eastern_datetime().strftime(
"%Y-%m-%d_%H-%M-%S"
)
Expand Down Expand Up @@ -141,3 +140,14 @@ def test_load_opportunities_to_index(

resp = search_client.search(opportunity_index_alias, {"size": 100})
assert resp.total_records == len(opportunities)

def test_load_opportunities_to_index_index_does_not_exist(self, db_session, search_client):
config = LoadOpportunitiesToIndexConfig(
alias_name="fake-index-that-will-not-exist", index_prefix="test-load-opps"
)
load_opportunities_to_index = LoadOpportunitiesToIndex(
db_session, search_client, False, config
)

with pytest.raises(RuntimeError, match="please run the full refresh job"):
load_opportunities_to_index.run()

0 comments on commit 0f25373

Please sign in to comment.