From 2e0666f598c1432848079a4069a59c5cf40efc0a Mon Sep 17 00:00:00 2001 From: Rwolfe-Nava <87499456+Rwolfe-Nava@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:10:54 -0400 Subject: [PATCH] [Issue: #166] Create ecs task to export opportunity data as csv and json (#176) ## Summary Fixes #166 ### Time to review: 20 mins ## Changes proposed - Adds export_opportunity_data task - Changes opportunity_to_csv function to opportunities_to_csv to be more flexible by including output as a parameter - Adds unit test for export_opportunity_data task. ## Context for reviewers - The test runs the export_opportunity_data task, uploading a csv and json file to mock_s3_bucket. Then it reads the files and verifies contents. --------- Co-authored-by: Michael Chouinard <46358556+chouinar@users.noreply.github.com> --- api/local.env | 9 +- .../opportunities_v1/opportunity_routes.py | 6 +- .../opportunities_v1/opportunity_to_csv.py | 6 +- api/src/task/__init__.py | 1 + .../export_opportunity_data_task.py | 124 ++++++++++++++++++ .../test_export_opportunity_data_task.py | 77 +++++++++++ 6 files changed, 215 insertions(+), 8 deletions(-) create mode 100644 api/src/task/opportunities/export_opportunity_data_task.py create mode 100644 api/tests/src/task/opportunities/test_export_opportunity_data_task.py diff --git a/api/local.env b/api/local.env index a2cb295f6..db0f89123 100644 --- a/api/local.env +++ b/api/local.env @@ -119,4 +119,11 @@ ENABLE_OPPORTUNITY_LOG_MSG=false # For the script to setup the foreign data tables # this env var overrides it so the script generates normal # tables that don't need to connect to an Oracle database -IS_LOCAL_FOREIGN_TABLE=true \ No newline at end of file +IS_LOCAL_FOREIGN_TABLE=true + +############################ +# Task Configuration +############################ + +# File path for the export_opportunity_data task +EXPORT_OPP_DATA_FILE_PATH=/tmp \ No newline at end of file diff --git a/api/src/api/opportunities_v1/opportunity_routes.py b/api/src/api/opportunities_v1/opportunity_routes.py index c705cb20d..306272847 100644 --- a/api/src/api/opportunities_v1/opportunity_routes.py +++ b/api/src/api/opportunities_v1/opportunity_routes.py @@ -1,3 +1,4 @@ +import io import logging from flask import Response @@ -13,7 +14,7 @@ from src.auth.api_key_auth import api_key_auth from src.logging.flask_logger import add_extra_data_to_current_request_logs from src.services.opportunities_v1.get_opportunity import get_opportunity, get_opportunity_versions -from src.services.opportunities_v1.opportunity_to_csv import opportunity_to_csv +from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv from src.services.opportunities_v1.search_opportunities import search_opportunities from src.util.dict_util import flatten_dict @@ -135,7 +136,8 @@ def opportunity_search( if search_params.get("format") == opportunity_schemas.SearchResponseFormat.CSV: # Convert the response into a CSV and return the contents - output = opportunity_to_csv(opportunities) + output = io.StringIO() + opportunities_to_csv(opportunities, output) timestamp = datetime_util.utcnow().strftime("%Y%m%d-%H%M%S") return Response( output.getvalue().encode("utf-8"), diff --git a/api/src/services/opportunities_v1/opportunity_to_csv.py b/api/src/services/opportunities_v1/opportunity_to_csv.py index 8be6f6c0d..4c049e44f 100644 --- a/api/src/services/opportunities_v1/opportunity_to_csv.py +++ b/api/src/services/opportunities_v1/opportunity_to_csv.py @@ -58,7 +58,7 @@ def _process_assistance_listing(assistance_listings: list[dict]) -> str: ) -def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO: +def opportunities_to_csv(opportunities: Sequence[dict], output: io.StringIO) -> None: opportunities_to_write: list[dict] = [] for opportunity in opportunities: @@ -84,10 +84,6 @@ def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO: opportunities_to_write.append(out_opportunity) - output = io.StringIO() - writer = csv.DictWriter(output, fieldnames=CSV_FIELDS, quoting=csv.QUOTE_ALL) writer.writeheader() writer.writerows(opportunities_to_write) - - return output diff --git a/api/src/task/__init__.py b/api/src/task/__init__.py index 0b527c246..1b7789f68 100644 --- a/api/src/task/__init__.py +++ b/api/src/task/__init__.py @@ -3,5 +3,6 @@ # import any of the other files so they get initialized and attached to the blueprint import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip import src.task.opportunities.import_opportunity_csvs # noqa: F401 E402 isort:skip +import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip __all__ = ["task_blueprint"] diff --git a/api/src/task/opportunities/export_opportunity_data_task.py b/api/src/task/opportunities/export_opportunity_data_task.py new file mode 100644 index 000000000..6c729dde8 --- /dev/null +++ b/api/src/task/opportunities/export_opportunity_data_task.py @@ -0,0 +1,124 @@ +import json +import logging +import os +from enum import StrEnum +from typing import Iterator, Sequence + +from pydantic_settings import SettingsConfigDict +from sqlalchemy import select +from sqlalchemy.orm import noload, selectinload + +import src.adapters.db as db +import src.adapters.db.flask_db as flask_db +import src.util.file_util as file_util +from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema +from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity +from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv +from src.task.task import Task +from src.task.task_blueprint import task_blueprint +from src.util.datetime_util import get_now_us_eastern_datetime +from src.util.env_config import PydanticBaseEnvConfig + +logger = logging.getLogger(__name__) + + +@task_blueprint.cli.command( + "export-opportunity-data", + help="Generate JSON and CSV files containing an export of all opportunity data", +) +@flask_db.with_db_session() +def export_opportunity_data(db_session: db.Session) -> None: + ExportOpportunityDataTask(db_session).run() + + +class ExportOpportunityDataConfig(PydanticBaseEnvConfig): + model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_") + + # EXPORT_OPP_DATA_FILE_PATH + file_path: str + + +class ExportOpportunityDataTask(Task): + class Metrics(StrEnum): + RECORDS_EXPORTED = "records_exported" + + def __init__( + self, + db_session: db.Session, + config: ExportOpportunityDataConfig | None = None, + ) -> None: + super().__init__(db_session) + + if config is None: + config = ExportOpportunityDataConfig() + self.config = config + + self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S") + + self.json_file = os.path.join( + config.file_path, f"opportunity_data-{self.current_timestamp}.json" + ) + self.csv_file = os.path.join( + config.file_path, f"opportunity_data-{self.current_timestamp}.csv" + ) + + self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file}) + + def run_task(self) -> None: + # Load records + schema = OpportunityV1Schema() + + opportunities = [] + for opp_batch in self.fetch_opportunities(): + for record in opp_batch: + self.increment(self.Metrics.RECORDS_EXPORTED) + opportunities.append(schema.dump(record)) + + # Format data + data_to_export: dict = { + "metadata": {"file_generated_at": self.current_timestamp}, + "opportunities": opportunities, + } + + # Export data + self.export_data_to_json(data_to_export) + self.export_opportunities_to_csv(opportunities) + + def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]: + """ + Fetch the opportunities in batches. The iterator returned + will give you each individual batch to be processed. + + Fetches all opportunities where: + * is_draft = False + * current_opportunity_summary is not None + """ + return ( + self.db_session.execute( + select(Opportunity) + .join(CurrentOpportunitySummary) + .where( + Opportunity.is_draft.is_(False), + CurrentOpportunitySummary.opportunity_status.isnot(None), + ) + .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) + .execution_options(yield_per=5000) + ) + .scalars() + .partitions() + ) + + def export_data_to_json(self, data_to_export: dict) -> None: + # create the json file + logger.info( + "Creating Opportunity JSON extract", extra={"json_extract_path": self.json_file} + ) + json_object = json.dumps(data_to_export, indent=4) + with file_util.open_stream(self.json_file, "w") as outfile: + outfile.write(json_object) + + def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None: + # create the csv file + logger.info("Creating Opportunity CSV extract", extra={"csv_extract_path": self.csv_file}) + with file_util.open_stream(self.csv_file, "w") as outfile: + opportunities_to_csv(opportunities, outfile) diff --git a/api/tests/src/task/opportunities/test_export_opportunity_data_task.py b/api/tests/src/task/opportunities/test_export_opportunity_data_task.py new file mode 100644 index 000000000..6ce6ff231 --- /dev/null +++ b/api/tests/src/task/opportunities/test_export_opportunity_data_task.py @@ -0,0 +1,77 @@ +import csv +import json + +import pytest + +import src.util.file_util as file_util +from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema +from src.task.opportunities.export_opportunity_data_task import ( + ExportOpportunityDataConfig, + ExportOpportunityDataTask, +) +from tests.conftest import BaseTestClass +from tests.src.db.models.factories import OpportunityFactory + + +class TestExportOpportunityDataTask(BaseTestClass): + @pytest.fixture + def export_opportunity_data_task(self, db_session, mock_s3_bucket): + config = ExportOpportunityDataConfig(file_path=f"s3://{mock_s3_bucket}/") + return ExportOpportunityDataTask(db_session, config) + + def test_export_opportunity_data_task( + self, + db_session, + truncate_opportunities, + enable_factory_create, + export_opportunity_data_task, + ): + # Create 25 opportunities we will load + opportunities = [] + opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True)) + opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True)) + opportunities.extend( + OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True) + ) + opportunities.extend( + OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True) + ) + + # Create some opportunities that won't get fetched / exported + OpportunityFactory.create_batch(size=3, is_draft=True) + OpportunityFactory.create_batch(size=4, no_current_summary=True) + + export_opportunity_data_task.run() + + # Verify some metrics first + # Make sure the opportunities we have created matches the number + # That get exported + assert ( + len(opportunities) + == export_opportunity_data_task.metrics[ + export_opportunity_data_task.Metrics.RECORDS_EXPORTED + ] + ) + + expected_opportunity_ids = set([opp.opportunity_id for opp in opportunities]) + # Verify csv file contents + with file_util.open_stream(export_opportunity_data_task.csv_file, "r") as infile: + reader = csv.DictReader(infile) + assert expected_opportunity_ids == set( + [int(record["opportunity_id"]) for record in reader] + ) + + # Verify JSON file contents + with file_util.open_stream(export_opportunity_data_task.json_file, "r") as infile: + # Parse JSON File + json_opportunities = json.load(infile) + + assert expected_opportunity_ids == set( + [int(record["opportunity_id"]) for record in json_opportunities["opportunities"]] + ) + + schema = OpportunityV1Schema(many=True) + + errors = schema.validate(json_opportunities["opportunities"]) + assert len(errors) == 0