Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
update opportunity_to_csv
Browse files Browse the repository at this point in the history
  • Loading branch information
Rwolfe-Nava committed Aug 9, 2024
1 parent 779de85 commit 6ab7848
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 7 deletions.
6 changes: 4 additions & 2 deletions api/src/api/opportunities_v1/opportunity_routes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging

from flask import Response
Expand All @@ -13,7 +14,7 @@
from src.auth.api_key_auth import api_key_auth
from src.logging.flask_logger import add_extra_data_to_current_request_logs
from src.services.opportunities_v1.get_opportunity import get_opportunity, get_opportunity_versions
from src.services.opportunities_v1.opportunity_to_csv import opportunity_to_csv
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.services.opportunities_v1.search_opportunities import search_opportunities
from src.util.dict_util import flatten_dict

Expand Down Expand Up @@ -135,7 +136,8 @@ def opportunity_search(

if search_params.get("format") == opportunity_schemas.SearchResponseFormat.CSV:
# Convert the response into a CSV and return the contents
output = opportunity_to_csv(opportunities)
output = io.StringIO()
opportunities_to_csv(opportunities, output)
timestamp = datetime_util.utcnow().strftime("%Y%m%d-%H%M%S")
return Response(
output.getvalue().encode("utf-8"),
Expand Down
6 changes: 1 addition & 5 deletions api/src/services/opportunities_v1/opportunity_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _process_assistance_listing(assistance_listings: list[dict]) -> str:
)


def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:
def opportunities_to_csv(opportunities: Sequence[dict], output: io.StringIO) -> None:
opportunities_to_write: list[dict] = []

for opportunity in opportunities:
Expand All @@ -84,10 +84,6 @@ def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:

opportunities_to_write.append(out_opportunity)

output = io.StringIO()

writer = csv.DictWriter(output, fieldnames=CSV_FIELDS, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(opportunities_to_write)

return output
1 change: 1 addition & 0 deletions api/src/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
# import any of the other files so they get initialized and attached to the blueprint
import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip
import src.task.opportunities.import_opportunity_csvs # noqa: F401 E402 isort:skip
import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip

__all__ = ["task_blueprint"]
108 changes: 108 additions & 0 deletions api/src/task/opportunities/export_opportunity_data_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import json
import logging
from enum import StrEnum
from typing import Iterator, Sequence

from smart_open import open
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util.datetime_util import get_now_us_eastern_datetime

logger = logging.getLogger(__name__)


@task_blueprint.cli.command(
"export-opportunity-data",
help="Generate JSON and CSV files containing an export of all opportunity data",
)
@flask_db.with_db_session()
def export_opportunity_data(db_session: db.Session) -> None:
ExportOpportunityDataTask(db_session).run()


class ExportOpportunityDataTask(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"

def __init__(
self,
db_session: db.Session,
) -> None:
super().__init__(db_session)

FILE_NAME: str = "opportunity_data"
self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")

# Surely there is a better way to do paths in python?
# I tried pathlib's Path.cwd() thinking it would resolve
# to /api/src/task/opportunities/ but it resolved to just /api
self.FILE_PATH = "/api/src/task/opportunities/output/"

self.json_file = f"{FILE_NAME}-{self.current_timestamp}.json"
self.csv_file = f"{FILE_NAME}-{self.current_timestamp}.csv"

self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file})

def run_task(self) -> None:
# load records
schema = OpportunityV1Schema()
data_to_export: dict = {
"metadata": {"file_generated_at": self.current_timestamp},
"opportunities": [],
}

for opp_batch in self.fetch_opportunities():
for record in opp_batch:
self.increment(self.Metrics.RECORDS_LOADED)
data_to_export["opportunities"].append(schema.dump(record))

# Export the data to json
self.export_data_to_json(data_to_export=data_to_export)

# Export the opportunities to a csv
self.export_opportunities_to_csv(opportunities=data_to_export["opportunities"])

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
will give you each individual batch to be processed.
Fetches all opportunities where:
* is_draft = False
* current_opportunity_summary is not None
"""
return (
self.db_session.execute(
select(Opportunity)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
.execution_options(yield_per=5000)
)
.scalars()
.partitions()
)

def export_data_to_json(self, data_to_export: dict) -> None:
# create the json file
json_object = json.dumps(data_to_export, indent=4)
json_file = self.FILE_PATH + self.json_file
with open(json_file, "w") as outfile:
outfile.write(json_object)

def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None:
# create the csv file
csv_file = self.FILE_PATH + self.csv_file
with open(csv_file, "w") as outfile:
opportunities_to_csv(opportunities, outfile)

0 comments on commit 6ab7848

Please sign in to comment.