This repository has been archived by the owner on Sep 18, 2024. It is now read-only.
forked from HHS/simpler-grants-gov
-
Notifications
You must be signed in to change notification settings - Fork 0
[Issue #10] Populate the search index from the opportunity tables #47
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
62ba7f1
[Issue #9] Setup opensearch locally
chouinar 1922340
Some rearranging of files
chouinar 649339c
Dependency fixes
chouinar 2126171
Trying something else for the network setup?
chouinar 8f80852
Simplify the networking/docker setup
chouinar f02f3d3
[Issue #10] Populate the search index from the opportunity tables
chouinar 49c2a2b
Slightly tidying up
chouinar f263381
Merge branch 'main' into chouinar/10-populate-search-data
chouinar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from src.adapters.search.opensearch_client import SearchClient, get_opensearch_client | ||
from src.adapters.search.opensearch_client import SearchClient | ||
from src.adapters.search.opensearch_config import get_opensearch_config | ||
|
||
__all__ = ["SearchClient", "get_opensearch_client", "get_opensearch_config"] | ||
__all__ = ["SearchClient", "get_opensearch_config"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# import all files so they get initialized and attached to the blueprint | ||
from . import load_search_data # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import logging | ||
from enum import StrEnum | ||
from typing import Iterator, Sequence | ||
|
||
from pydantic import Field | ||
from pydantic_settings import SettingsConfigDict | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import noload, selectinload | ||
|
||
import src.adapters.db as db | ||
import src.adapters.search as search | ||
from src.api.opportunities_v0_1.opportunity_schemas import OpportunityV01Schema | ||
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity | ||
from src.task.task import Task | ||
from src.util.datetime_util import get_now_us_eastern_datetime | ||
from src.util.env_config import PydanticBaseEnvConfig | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig): | ||
model_config = SettingsConfigDict(env_prefix="LOAD_OPP_SEARCH_") | ||
|
||
shard_count: int = Field(default=1) # LOAD_OPP_SEARCH_SHARD_COUNT | ||
replica_count: int = Field(default=1) # LOAD_OPP_SEARCH_REPLICA_COUNT | ||
|
||
# TODO - these might make sense to come from some sort of opportunity-search-index-config? | ||
# look into this a bit more when we setup the search endpoint itself. | ||
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME | ||
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX | ||
|
||
|
||
class LoadOpportunitiesToIndex(Task): | ||
class Metrics(StrEnum): | ||
RECORDS_LOADED = "records_loaded" | ||
|
||
def __init__( | ||
self, | ||
db_session: db.Session, | ||
search_client: search.SearchClient, | ||
config: LoadOpportunitiesToIndexConfig | None = None, | ||
) -> None: | ||
super().__init__(db_session) | ||
|
||
self.search_client = search_client | ||
|
||
if config is None: | ||
config = LoadOpportunitiesToIndexConfig() | ||
self.config = config | ||
|
||
current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") | ||
self.index_name = f"{self.config.index_prefix}-{current_timestamp}" | ||
self.set_metrics({"index_name": self.index_name}) | ||
|
||
def run_task(self) -> None: | ||
# create the index | ||
self.search_client.create_index( | ||
self.index_name, | ||
shard_count=self.config.shard_count, | ||
replica_count=self.config.replica_count, | ||
) | ||
|
||
# load the records | ||
for opp_batch in self.fetch_opportunities(): | ||
self.load_records(opp_batch) | ||
|
||
# handle aliasing of endpoints | ||
self.search_client.swap_alias_index( | ||
self.index_name, self.config.alias_name, delete_prior_indexes=True | ||
) | ||
|
||
def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: | ||
""" | ||
Fetch the opportunities in batches. The iterator returned | ||
will give you each individual batch to be processed. | ||
|
||
Fetches all opportunities where: | ||
* is_draft = False | ||
* current_opportunity_summary is not None | ||
""" | ||
return ( | ||
self.db_session.execute( | ||
select(Opportunity) | ||
.join(CurrentOpportunitySummary) | ||
.where( | ||
Opportunity.is_draft.is_(False), | ||
CurrentOpportunitySummary.opportunity_status.isnot(None), | ||
) | ||
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) | ||
.execution_options(yield_per=5000) | ||
) | ||
.scalars() | ||
.partitions() | ||
) | ||
|
||
def load_records(self, records: Sequence[Opportunity]) -> None: | ||
logger.info("Loading batch of opportunities...") | ||
schema = OpportunityV01Schema() | ||
json_records = [] | ||
|
||
for record in records: | ||
logger.info( | ||
"Preparing opportunity for upload to search index", | ||
extra={ | ||
"opportunity_id": record.opportunity_id, | ||
"opportunity_status": record.opportunity_status, | ||
}, | ||
) | ||
json_records.append(schema.dump(record)) | ||
self.increment(self.Metrics.RECORDS_LOADED) | ||
|
||
self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import src.adapters.db as db | ||
import src.adapters.search as search | ||
from src.adapters.db import flask_db | ||
from src.search.backend.load_opportunities_to_index import LoadOpportunitiesToIndex | ||
from src.search.backend.load_search_data_blueprint import load_search_data_blueprint | ||
|
||
|
||
@load_search_data_blueprint.cli.command( | ||
"load-opportunity-data", help="Load opportunity data from our database to the search index" | ||
) | ||
@flask_db.with_db_session() | ||
def load_opportunity_data(db_session: db.Session) -> None: | ||
search_client = search.SearchClient() | ||
|
||
LoadOpportunitiesToIndex(db_session, search_client).run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from apiflask import APIBlueprint | ||
|
||
load_search_data_blueprint = APIBlueprint( | ||
"load-search-data", __name__, enable_openapi=False, cli_group="load-search-data" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(thought) This could be a pain point in the future depending on how large the records get, but looks great for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its much faster than you'd think. Locally I ran ~87000 records through this script in about 61 seconds. And that includes querying the DB, joining across all the tables, doing this iteration to jsonify, and then doing the bulk inserts.
The batching (which is done by the DB queries at 5000 records per batch) makes it scale pretty uneventfully.
I imagine on an actual cluster it'll be a bit slower as we'll probably have more nodes, but I don't see why this would ever go beyond ~5 minutes in a run, and there are a few quick optimizations (removing the index refresh from every bulk insert)