Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

[Issue #10] Populate the search index from the opportunity tables #47

Merged
merged 8 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/adapters/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client
from src.adapters.search.opensearch_client import SearchClient
from src.adapters.search.opensearch_config import get_opensearch_config

__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"]
__all__ = ["SearchClient", "get_opensearch_config"]
115 changes: 102 additions & 13 deletions api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,114 @@
from typing import Any
import logging
from typing import Any, Sequence

import opensearchpy

from src.adapters.search.opensearch_config import OpensearchConfig, get_opensearch_config

# More configuration/setup coming in:
# TODO - https://github.com/navapbc/simpler-grants-gov/issues/13
logger = logging.getLogger(__name__)

# Alias the OpenSearch client so that it doesn't need to be imported everywhere
# and to make it clear it's a client
SearchClient = opensearchpy.OpenSearch

class SearchClient:
def __init__(self, opensearch_config: OpensearchConfig | None = None) -> None:
if opensearch_config is None:
opensearch_config = get_opensearch_config()

def get_opensearch_client(
opensearch_config: OpensearchConfig | None = None,
) -> SearchClient:
if opensearch_config is None:
opensearch_config = get_opensearch_config()
# See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details
self._client = opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config))

# See: https://opensearch.org/docs/latest/clients/python-low-level/ for more details
return opensearchpy.OpenSearch(**_get_connection_parameters(opensearch_config))
def create_index(
self, index_name: str, *, shard_count: int = 1, replica_count: int = 1
) -> None:
"""
Create an empty search index
"""
body = {
"settings": {
"index": {"number_of_shards": shard_count, "number_of_replicas": replica_count}
}
}

logger.info("Creating search index %s", index_name, extra={"index_name": index_name})
self._client.indices.create(index_name, body=body)

def delete_index(self, index_name: str) -> None:
"""
Delete an index. Can also delete all indexes via a prefix.
"""
logger.info("Deleting search index %s", index_name, extra={"index_name": index_name})
self._client.indices.delete(index=index_name)

def bulk_upsert(
self,
index_name: str,
records: Sequence[dict[str, Any]],
primary_key_field: str,
*,
refresh: bool = True
) -> None:
"""
Bulk upsert records to an index

See: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ for details
In this method we only use the "index" operation which creates or updates a record
based on the id value.
"""

bulk_operations = []

for record in records:
# For each record, we create two entries in the bulk operation list
# which include the unique ID + the actual record on separate lines
# When this is sent to the search index, this will send two lines like:
#
# {"index": {"_id": 123}}
# {"opportunity_id": 123, "opportunity_title": "example title", ...}
bulk_operations.append({"index": {"_id": record[primary_key_field]}})
bulk_operations.append(record)

logger.info(
"Upserting records to %s",
index_name,
extra={"index_name": index_name, "record_count": int(len(bulk_operations) / 2)},
)
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)

def swap_alias_index(
self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False
) -> None:
"""
For a given index, set it to the given alias. If any existing index(es) are
attached to the alias, remove them from the alias.

This operation is done atomically.
"""
extra = {"index_name": index_name, "index_alias": alias_name}
logger.info("Swapping index that backs alias %s", alias_name, extra=extra)

existing_index_mapping = self._client.cat.aliases(alias_name, format="json")
existing_indexes = [i["index"] for i in existing_index_mapping]

logger.info(
"Found existing indexes", extra=extra | {"existing_indexes": ",".join(existing_indexes)}
)

actions = [{"add": {"index": index_name, "alias": alias_name}}]

for index in existing_indexes:
actions.append({"remove": {"index": index, "alias": alias_name}})

self._client.indices.update_aliases({"actions": actions})

# Cleanup old indexes now that they aren't connected to the alias
if delete_prior_indexes:
for index in existing_indexes:
self.delete_index(index)

def search(self, index_name: str, search_query: dict) -> dict:
# TODO - add more when we build out the request/response parsing logic
# we use something like Pydantic to help reorganize the response
# object into something easier to parse.
return self._client.search(index=index_name, body=search_query)


def _get_connection_parameters(opensearch_config: OpensearchConfig) -> dict[str, Any]:
Expand Down
4 changes: 4 additions & 0 deletions api/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from src.api.schemas import response_schema
from src.auth.api_key_auth import get_app_security_scheme
from src.data_migration.data_migration_blueprint import data_migration_blueprint
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint
from src.task import task_blueprint

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,8 +104,11 @@ def register_blueprints(app: APIFlask) -> None:
app.register_blueprint(opportunities_v0_blueprint)
app.register_blueprint(opportunities_v0_1_blueprint)
app.register_blueprint(opportunities_v1_blueprint)

# Non-api blueprints
app.register_blueprint(data_migration_blueprint)
app.register_blueprint(task_blueprint)
app.register_blueprint(load_search_data_blueprint)


def get_project_root_dir() -> str:
Expand Down
Empty file added api/src/search/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions api/src/search/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# import all files so they get initialized and attached to the blueprint
from . import load_search_data # noqa: F401
112 changes: 112 additions & 0 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging
from enum import StrEnum
from typing import Iterator, Sequence

from pydantic import Field
from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.search as search
from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.task.task import Task
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)


class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_")

shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT
replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT

# TODO - these might make sense to come from some sort of opportunity-search-index-config?
# look into this a bit more when we setup the search endpoint itself.
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX


class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"

def __init__(
self,
db_session: db.Session,
search_client: search.SearchClient,
config: LoadOpportunitiesToIndexConfig | None = None,
) -> None:
super().__init__(db_session)

self.search_client = search_client

if config is None:
config = LoadOpportunitiesToIndexConfig()
self.config = config

current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")
self.index_name = f"{self.config.index_prefix}-{current_timestamp}"
self.set_metrics({"index_name": self.index_name})

def run_task(self) -> None:
# create the index
self.search_client.create_index(
self.index_name,
shard_count=self.config.shard_count,
replica_count=self.config.replica_count,
)

# load the records
for opp_batch in self.fetch_opportunities():
self.load_records(opp_batch)

# handle aliasing of endpoints
self.search_client.swap_alias_index(
self.index_name, self.config.alias_name, delete_prior_indexes=True
)

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
will give you each individual batch to be processed.

Fetches all opportunities where:
* is_draft = False
* current_opportunity_summary is not None
"""
return (
self.db_session.execute(
select(Opportunity)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
.execution_options(yield_per=5000)
)
.scalars()
.partitions()
)

def load_records(self, records: Sequence[Opportunity]) -> None:
logger.info("Loading batch of opportunities...")
schema = OpportunityV01Schema()
json_records = []

for record in records:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(thought) This could be a pain point in the future depending on how large the records get, but looks great for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its much faster than you'd think. Locally I ran ~87000 records through this script in about 61 seconds. And that includes querying the DB, joining across all the tables, doing this iteration to jsonify, and then doing the bulk inserts.

The batching (which is done by the DB queries at 5000 records per batch) makes it scale pretty uneventfully.

I imagine on an actual cluster it'll be a bit slower as we'll probably have more nodes, but I don't see why this would ever go beyond ~5 minutes in a run, and there are a few quick optimizations (removing the index refresh from every bulk insert)

logger.info(
"Preparing opportunity for upload to search index",
extra={
"opportunity_id": record.opportunity_id,
"opportunity_status": record.opportunity_status,
},
)
json_records.append(schema.dump(record))
self.increment(self.Metrics.RECORDS_LOADED)

self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id")
15 changes: 15 additions & 0 deletions api/src/search/backend/load_search_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import src.adapters.db as db
import src.adapters.search as search
from src.adapters.db import flask_db
from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint


@load_search_data_blueprint.cli.command(
"load-opportunity-data", help="Load opportunity data from our database to the search index"
)
@flask_db.with_db_session()
def load_opportunity_data(db_session: db.Session) -> None:
search_client = search.SearchClient()

LoadOpportunitiesToIndex(db_session, search_client).run()
5 changes: 5 additions & 0 deletions api/src/search/backend/load_search_data_blueprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from apiflask import APIBlueprint

load_search_data_blueprint = APIBlueprint(
"load-search-data", __name__, enable_openapi=False, cli_group="load-search-data"
)
24 changes: 17 additions & 7 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,35 @@ def test_foreign_schema(db_schema_prefix):

@pytest.fixture(scope="session")
def search_client() -> search.SearchClient:
return search.get_opensearch_client()
client = search.SearchClient()
try:
yield client
finally:
# Just in case a test setup an index
# in a way that didn't clean it up, delete
# all indexes at the end of a run that start with test
client.delete_index("test-*")


@pytest.fixture(scope="session")
def opportunity_index(search_client):
# TODO - will adjust this in the future to use utils we'll build
# for setting up / aliasing indexes. For now, keep it simple

# create a random index name just to make sure it won't ever conflict
# with an actual one, similar to how we create schemas for database tests
index_name = f"test_{uuid.uuid4().int}_opportunity"
index_name = f"test-opportunity-index-{uuid.uuid4().int}"

search_client.indices.create(index_name, body={})
search_client.create_index(index_name)

try:
yield index_name
finally:
# Try to clean up the index at the end
search_client.indices.delete(index_name)
search_client.delete_index(index_name)


@pytest.fixture(scope="session")
def opportunity_index_alias(search_client):
# Note we don't actually create anything, this is just a random name
return f"test-opportunity-index-alias-{uuid.uuid4().int}"


####################
Expand Down
58 changes: 0 additions & 58 deletions api/tests/src/adapters/search/test_opensearch.py

This file was deleted.

Loading
Loading