This repository has been archived by the owner on Sep 18, 2024. It is now read-only.
forked from HHS/simpler-grants-gov
-
Notifications
You must be signed in to change notification settings - Fork 0
[Issue: #166] Create ecs task to export opportunity data as csv and json #176
Merged
Rwolfe-Nava
merged 6 commits into
main
from
rwolfe-nava/166-create-ecs-task-for-opportunity-data-csv-and-json
Aug 15, 2024
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
6ab7848
update opportunity_to_csv
Rwolfe-Nava f56bd36
implement changes based on feedback from initial draft review
Rwolfe-Nava e45969b
add test for export_opportunity_data_task
Rwolfe-Nava 4ab4dc2
update with review feedback
Rwolfe-Nava 50d3a52
add opportunities that should not get fetched
Rwolfe-Nava e580f99
Update comment
Rwolfe-Nava File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
124 changes: 124 additions & 0 deletions
124
api/src/task/opportunities/export_opportunity_data_task.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import json | ||
import logging | ||
import os | ||
from enum import StrEnum | ||
from typing import Iterator, Sequence | ||
|
||
from pydantic_settings import SettingsConfigDict | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import noload, selectinload | ||
|
||
import src.adapters.db as db | ||
import src.adapters.db.flask_db as flask_db | ||
import src.util.file_util as file_util | ||
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema | ||
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity | ||
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv | ||
from src.task.task import Task | ||
from src.task.task_blueprint import task_blueprint | ||
from src.util.datetime_util import get_now_us_eastern_datetime | ||
from src.util.env_config import PydanticBaseEnvConfig | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@task_blueprint.cli.command( | ||
"export-opportunity-data", | ||
help="Generate JSON and CSV files containing an export of all opportunity data", | ||
) | ||
@flask_db.with_db_session() | ||
def export_opportunity_data(db_session: db.Session) -> None: | ||
ExportOpportunityDataTask(db_session).run() | ||
|
||
|
||
class ExportOpportunityDataConfig(PydanticBaseEnvConfig): | ||
model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_") | ||
|
||
# EXPORT_OPP_DATA_FILE_PATH | ||
file_path: str | ||
|
||
|
||
class ExportOpportunityDataTask(Task): | ||
class Metrics(StrEnum): | ||
RECORDS_EXPORTED = "records_exported" | ||
|
||
def __init__( | ||
self, | ||
db_session: db.Session, | ||
config: ExportOpportunityDataConfig | None = None, | ||
) -> None: | ||
super().__init__(db_session) | ||
|
||
if config is None: | ||
config = ExportOpportunityDataConfig() | ||
self.config = config | ||
|
||
self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") | ||
|
||
self.json_file = os.path.join( | ||
config.file_path, f"opportunity_data-{self.current_timestamp}.json" | ||
) | ||
self.csv_file = os.path.join( | ||
config.file_path, f"opportunity_data-{self.current_timestamp}.csv" | ||
) | ||
|
||
self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file}) | ||
|
||
def run_task(self) -> None: | ||
# Load records | ||
schema = OpportunityV1Schema() | ||
|
||
opportunities = [] | ||
for opp_batch in self.fetch_opportunities(): | ||
for record in opp_batch: | ||
self.increment(self.Metrics.RECORDS_EXPORTED) | ||
opportunities.append(schema.dump(record)) | ||
|
||
# Format data | ||
data_to_export: dict = { | ||
"metadata": {"file_generated_at": self.current_timestamp}, | ||
"opportunities": opportunities, | ||
} | ||
|
||
# Export data | ||
self.export_data_to_json(data_to_export) | ||
self.export_opportunities_to_csv(opportunities) | ||
|
||
def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: | ||
""" | ||
Fetch the opportunities in batches. The iterator returned | ||
will give you each individual batch to be processed. | ||
Fetches all opportunities where: | ||
* is_draft = False | ||
* current_opportunity_summary is not None | ||
""" | ||
return ( | ||
self.db_session.execute( | ||
select(Opportunity) | ||
.join(CurrentOpportunitySummary) | ||
.where( | ||
Opportunity.is_draft.is_(False), | ||
CurrentOpportunitySummary.opportunity_status.isnot(None), | ||
) | ||
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) | ||
.execution_options(yield_per=5000) | ||
) | ||
.scalars() | ||
.partitions() | ||
) | ||
|
||
def export_data_to_json(self, data_to_export: dict) -> None: | ||
# create the json file | ||
logger.info( | ||
"Creating Opportunity JSON extract", extra={"json_extract_path": self.json_file} | ||
) | ||
json_object = json.dumps(data_to_export, indent=4) | ||
with file_util.open_stream(self.json_file, "w") as outfile: | ||
outfile.write(json_object) | ||
|
||
def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None: | ||
# create the csv file | ||
logger.info("Creating Opportunity CSV extract", extra={"csv_extract_path": self.csv_file}) | ||
with file_util.open_stream(self.csv_file, "w") as outfile: | ||
opportunities_to_csv(opportunities, outfile) |
77 changes: 77 additions & 0 deletions
77
api/tests/src/task/opportunities/test_export_opportunity_data_task.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import csv | ||
import json | ||
|
||
import pytest | ||
|
||
import src.util.file_util as file_util | ||
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema | ||
from src.task.opportunities.export_opportunity_data_task import ( | ||
ExportOpportunityDataConfig, | ||
ExportOpportunityDataTask, | ||
) | ||
from tests.conftest import BaseTestClass | ||
from tests.src.db.models.factories import OpportunityFactory | ||
|
||
|
||
class TestExportOpportunityDataTask(BaseTestClass): | ||
@pytest.fixture | ||
def export_opportunity_data_task(self, db_session, mock_s3_bucket): | ||
config = ExportOpportunityDataConfig(file_path=f"s3://{mock_s3_bucket}/") | ||
return ExportOpportunityDataTask(db_session, config) | ||
|
||
def test_export_opportunity_data_task( | ||
self, | ||
db_session, | ||
truncate_opportunities, | ||
enable_factory_create, | ||
export_opportunity_data_task, | ||
): | ||
# Create 25 opportunities we will load | ||
opportunities = [] | ||
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True)) | ||
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True)) | ||
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True)) | ||
opportunities.extend( | ||
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True) | ||
) | ||
opportunities.extend( | ||
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True) | ||
) | ||
|
||
# Create some opportunities that won't get fetched / loaded into search | ||
Rwolfe-Nava marked this conversation as resolved.
Show resolved
Hide resolved
|
||
OpportunityFactory.create_batch(size=3, is_draft=True) | ||
OpportunityFactory.create_batch(size=4, no_current_summary=True) | ||
|
||
export_opportunity_data_task.run() | ||
|
||
# Verify some metrics first | ||
# Make sure the opportunities we have created matches the number | ||
# That get exported | ||
assert ( | ||
len(opportunities) | ||
== export_opportunity_data_task.metrics[ | ||
export_opportunity_data_task.Metrics.RECORDS_EXPORTED | ||
] | ||
) | ||
|
||
expected_opportunity_ids = set([opp.opportunity_id for opp in opportunities]) | ||
# Verify csv file contents | ||
with file_util.open_stream(export_opportunity_data_task.csv_file, "r") as infile: | ||
reader = csv.DictReader(infile) | ||
assert expected_opportunity_ids == set( | ||
[int(record["opportunity_id"]) for record in reader] | ||
) | ||
|
||
# Verify JSON file contents | ||
with file_util.open_stream(export_opportunity_data_task.json_file, "r") as infile: | ||
# Parse JSON File | ||
json_opportunities = json.load(infile) | ||
|
||
assert expected_opportunity_ids == set( | ||
[int(record["opportunity_id"]) for record in json_opportunities["opportunities"]] | ||
) | ||
|
||
schema = OpportunityV1Schema(many=True) | ||
|
||
errors = schema.validate(json_opportunities["opportunities"]) | ||
assert len(errors) == 0 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I renamed this function with a plural name because I thought it would be slightly more accurate, since it can handle more than one opportunity by taking in a sequence.