Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

[Issue: #166] Create ecs task to export opportunity data as csv and json #176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ ENABLE_OPPORTUNITY_LOG_MSG=false
# For the script to setup the foreign data tables
# this env var overrides it so the script generates normal
# tables that don't need to connect to an Oracle database
IS_LOCAL_FOREIGN_TABLE=true
IS_LOCAL_FOREIGN_TABLE=true

############################
# Task Configuration
############################

# File path for the export_opportunity_data task
EXPORT_OPP_DATA_FILE_PATH=/tmp
6 changes: 4 additions & 2 deletions api/src/api/opportunities_v1/opportunity_routes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging

from flask import Response
Expand All @@ -13,7 +14,7 @@
from src.auth.api_key_auth import api_key_auth
from src.logging.flask_logger import add_extra_data_to_current_request_logs
from src.services.opportunities_v1.get_opportunity import get_opportunity, get_opportunity_versions
from src.services.opportunities_v1.opportunity_to_csv import opportunity_to_csv
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.services.opportunities_v1.search_opportunities import search_opportunities
from src.util.dict_util import flatten_dict

Expand Down Expand Up @@ -135,7 +136,8 @@ def opportunity_search(

if search_params.get("format") == opportunity_schemas.SearchResponseFormat.CSV:
# Convert the response into a CSV and return the contents
output = opportunity_to_csv(opportunities)
output = io.StringIO()
opportunities_to_csv(opportunities, output)
timestamp = datetime_util.utcnow().strftime("%Y%m%d-%H%M%S")
return Response(
output.getvalue().encode("utf-8"),
Expand Down
6 changes: 1 addition & 5 deletions api/src/services/opportunities_v1/opportunity_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _process_assistance_listing(assistance_listings: list[dict]) -> str:
)


def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:
def opportunities_to_csv(opportunities: Sequence[dict], output: io.StringIO) -> None:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I renamed this function with a plural name because I thought it would be slightly more accurate, since it can handle more than one opportunity by taking in a sequence.

opportunities_to_write: list[dict] = []

for opportunity in opportunities:
Expand All @@ -84,10 +84,6 @@ def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:

opportunities_to_write.append(out_opportunity)

output = io.StringIO()

writer = csv.DictWriter(output, fieldnames=CSV_FIELDS, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(opportunities_to_write)

return output
1 change: 1 addition & 0 deletions api/src/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
# import any of the other files so they get initialized and attached to the blueprint
import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip
import src.task.opportunities.import_opportunity_csvs # noqa: F401 E402 isort:skip
import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip

__all__ = ["task_blueprint"]
124 changes: 124 additions & 0 deletions api/src/task/opportunities/export_opportunity_data_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import json
import logging
import os
from enum import StrEnum
from typing import Iterator, Sequence

from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)


@task_blueprint.cli.command(
"export-opportunity-data",
help="Generate JSON and CSV files containing an export of all opportunity data",
)
@flask_db.with_db_session()
def export_opportunity_data(db_session: db.Session) -> None:
ExportOpportunityDataTask(db_session).run()


class ExportOpportunityDataConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_")

# EXPORT_OPP_DATA_FILE_PATH
file_path: str


class ExportOpportunityDataTask(Task):
class Metrics(StrEnum):
RECORDS_EXPORTED = "records_exported"

def __init__(
self,
db_session: db.Session,
config: ExportOpportunityDataConfig | None = None,
) -> None:
super().__init__(db_session)

if config is None:
config = ExportOpportunityDataConfig()
self.config = config

self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")

self.json_file = os.path.join(
config.file_path, f"opportunity_data-{self.current_timestamp}.json"
)
self.csv_file = os.path.join(
config.file_path, f"opportunity_data-{self.current_timestamp}.csv"
)

self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file})

def run_task(self) -> None:
# Load records
schema = OpportunityV1Schema()

opportunities = []
for opp_batch in self.fetch_opportunities():
for record in opp_batch:
self.increment(self.Metrics.RECORDS_EXPORTED)
opportunities.append(schema.dump(record))

# Format data
data_to_export: dict = {
"metadata": {"file_generated_at": self.current_timestamp},
"opportunities": opportunities,
}

# Export data
self.export_data_to_json(data_to_export)
self.export_opportunities_to_csv(opportunities)

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
will give you each individual batch to be processed.

Fetches all opportunities where:
* is_draft = False
* current_opportunity_summary is not None
"""
return (
self.db_session.execute(
select(Opportunity)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
.execution_options(yield_per=5000)
)
.scalars()
.partitions()
)

def export_data_to_json(self, data_to_export: dict) -> None:
# create the json file
logger.info(
"Creating Opportunity JSON extract", extra={"json_extract_path": self.json_file}
)
json_object = json.dumps(data_to_export, indent=4)
with file_util.open_stream(self.json_file, "w") as outfile:
outfile.write(json_object)

def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None:
# create the csv file
logger.info("Creating Opportunity CSV extract", extra={"csv_extract_path": self.csv_file})
with file_util.open_stream(self.csv_file, "w") as outfile:
opportunities_to_csv(opportunities, outfile)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import csv
import json

import pytest

import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.task.opportunities.export_opportunity_data_task import (
ExportOpportunityDataConfig,
ExportOpportunityDataTask,
)
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory


class TestExportOpportunityDataTask(BaseTestClass):
@pytest.fixture
def export_opportunity_data_task(self, db_session, mock_s3_bucket):
config = ExportOpportunityDataConfig(file_path=f"s3://{mock_s3_bucket}/")
return ExportOpportunityDataTask(db_session, config)

def test_export_opportunity_data_task(
self,
db_session,
truncate_opportunities,
enable_factory_create,
export_opportunity_data_task,
):
# Create 25 opportunities we will load
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
)

# Create some opportunities that won't get fetched / exported
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)

export_opportunity_data_task.run()

# Verify some metrics first
# Make sure the opportunities we have created matches the number
# That get exported
assert (
len(opportunities)
== export_opportunity_data_task.metrics[
export_opportunity_data_task.Metrics.RECORDS_EXPORTED
]
)

expected_opportunity_ids = set([opp.opportunity_id for opp in opportunities])
# Verify csv file contents
with file_util.open_stream(export_opportunity_data_task.csv_file, "r") as infile:
reader = csv.DictReader(infile)
assert expected_opportunity_ids == set(
[int(record["opportunity_id"]) for record in reader]
)

# Verify JSON file contents
with file_util.open_stream(export_opportunity_data_task.json_file, "r") as infile:
# Parse JSON File
json_opportunities = json.load(infile)

assert expected_opportunity_ids == set(
[int(record["opportunity_id"]) for record in json_opportunities["opportunities"]]
)

schema = OpportunityV1Schema(many=True)

errors = schema.validate(json_opportunities["opportunities"])
assert len(errors) == 0
Loading