Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
[Issue: #166] Create ecs task to export opportunity data as csv and j…
Browse files Browse the repository at this point in the history
…son (#176)

## Summary
Fixes #166

### Time to review: 20 mins

## Changes proposed
- Adds export_opportunity_data task
- Changes opportunity_to_csv function to opportunities_to_csv to be more
flexible by including output as a parameter
- Adds unit test for export_opportunity_data task.

## Context for reviewers
- The test runs the export_opportunity_data task, uploading a csv and
json file to mock_s3_bucket. Then it reads the files and verifies
contents.

---------

Co-authored-by: Michael Chouinard <[email protected]>
  • Loading branch information
Rwolfe-Nava and chouinar authored Aug 15, 2024
1 parent 779de85 commit 2e0666f
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 8 deletions.
9 changes: 8 additions & 1 deletion api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ ENABLE_OPPORTUNITY_LOG_MSG=false
# For the script to setup the foreign data tables
# this env var overrides it so the script generates normal
# tables that don't need to connect to an Oracle database
IS_LOCAL_FOREIGN_TABLE=true
IS_LOCAL_FOREIGN_TABLE=true

############################
# Task Configuration
############################

# File path for the export_opportunity_data task
EXPORT_OPP_DATA_FILE_PATH=/tmp
6 changes: 4 additions & 2 deletions api/src/api/opportunities_v1/opportunity_routes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging

from flask import Response
Expand All @@ -13,7 +14,7 @@
from src.auth.api_key_auth import api_key_auth
from src.logging.flask_logger import add_extra_data_to_current_request_logs
from src.services.opportunities_v1.get_opportunity import get_opportunity, get_opportunity_versions
from src.services.opportunities_v1.opportunity_to_csv import opportunity_to_csv
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.services.opportunities_v1.search_opportunities import search_opportunities
from src.util.dict_util import flatten_dict

Expand Down Expand Up @@ -135,7 +136,8 @@ def opportunity_search(

if search_params.get("format") == opportunity_schemas.SearchResponseFormat.CSV:
# Convert the response into a CSV and return the contents
output = opportunity_to_csv(opportunities)
output = io.StringIO()
opportunities_to_csv(opportunities, output)
timestamp = datetime_util.utcnow().strftime("%Y%m%d-%H%M%S")
return Response(
output.getvalue().encode("utf-8"),
Expand Down
6 changes: 1 addition & 5 deletions api/src/services/opportunities_v1/opportunity_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _process_assistance_listing(assistance_listings: list[dict]) -> str:
)


def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:
def opportunities_to_csv(opportunities: Sequence[dict], output: io.StringIO) -> None:
opportunities_to_write: list[dict] = []

for opportunity in opportunities:
Expand All @@ -84,10 +84,6 @@ def opportunity_to_csv(opportunities: Sequence[dict]) -> io.StringIO:

opportunities_to_write.append(out_opportunity)

output = io.StringIO()

writer = csv.DictWriter(output, fieldnames=CSV_FIELDS, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(opportunities_to_write)

return output
1 change: 1 addition & 0 deletions api/src/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
# import any of the other files so they get initialized and attached to the blueprint
import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip
import src.task.opportunities.import_opportunity_csvs # noqa: F401 E402 isort:skip
import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip

__all__ = ["task_blueprint"]
124 changes: 124 additions & 0 deletions api/src/task/opportunities/export_opportunity_data_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import json
import logging
import os
from enum import StrEnum
from typing import Iterator, Sequence

from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)


@task_blueprint.cli.command(
"export-opportunity-data",
help="Generate JSON and CSV files containing an export of all opportunity data",
)
@flask_db.with_db_session()
def export_opportunity_data(db_session: db.Session) -> None:
ExportOpportunityDataTask(db_session).run()


class ExportOpportunityDataConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_")

# EXPORT_OPP_DATA_FILE_PATH
file_path: str


class ExportOpportunityDataTask(Task):
class Metrics(StrEnum):
RECORDS_EXPORTED = "records_exported"

def __init__(
self,
db_session: db.Session,
config: ExportOpportunityDataConfig | None = None,
) -> None:
super().__init__(db_session)

if config is None:
config = ExportOpportunityDataConfig()
self.config = config

self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")

self.json_file = os.path.join(
config.file_path, f"opportunity_data-{self.current_timestamp}.json"
)
self.csv_file = os.path.join(
config.file_path, f"opportunity_data-{self.current_timestamp}.csv"
)

self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file})

def run_task(self) -> None:
# Load records
schema = OpportunityV1Schema()

opportunities = []
for opp_batch in self.fetch_opportunities():
for record in opp_batch:
self.increment(self.Metrics.RECORDS_EXPORTED)
opportunities.append(schema.dump(record))

# Format data
data_to_export: dict = {
"metadata": {"file_generated_at": self.current_timestamp},
"opportunities": opportunities,
}

# Export data
self.export_data_to_json(data_to_export)
self.export_opportunities_to_csv(opportunities)

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
will give you each individual batch to be processed.
Fetches all opportunities where:
* is_draft = False
* current_opportunity_summary is not None
"""
return (
self.db_session.execute(
select(Opportunity)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
.execution_options(yield_per=5000)
)
.scalars()
.partitions()
)

def export_data_to_json(self, data_to_export: dict) -> None:
# create the json file
logger.info(
"Creating Opportunity JSON extract", extra={"json_extract_path": self.json_file}
)
json_object = json.dumps(data_to_export, indent=4)
with file_util.open_stream(self.json_file, "w") as outfile:
outfile.write(json_object)

def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None:
# create the csv file
logger.info("Creating Opportunity CSV extract", extra={"csv_extract_path": self.csv_file})
with file_util.open_stream(self.csv_file, "w") as outfile:
opportunities_to_csv(opportunities, outfile)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import csv
import json

import pytest

import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.task.opportunities.export_opportunity_data_task import (
ExportOpportunityDataConfig,
ExportOpportunityDataTask,
)
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory


class TestExportOpportunityDataTask(BaseTestClass):
@pytest.fixture
def export_opportunity_data_task(self, db_session, mock_s3_bucket):
config = ExportOpportunityDataConfig(file_path=f"s3://{mock_s3_bucket}/")
return ExportOpportunityDataTask(db_session, config)

def test_export_opportunity_data_task(
self,
db_session,
truncate_opportunities,
enable_factory_create,
export_opportunity_data_task,
):
# Create 25 opportunities we will load
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
)

# Create some opportunities that won't get fetched / exported
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)

export_opportunity_data_task.run()

# Verify some metrics first
# Make sure the opportunities we have created matches the number
# That get exported
assert (
len(opportunities)
== export_opportunity_data_task.metrics[
export_opportunity_data_task.Metrics.RECORDS_EXPORTED
]
)

expected_opportunity_ids = set([opp.opportunity_id for opp in opportunities])
# Verify csv file contents
with file_util.open_stream(export_opportunity_data_task.csv_file, "r") as infile:
reader = csv.DictReader(infile)
assert expected_opportunity_ids == set(
[int(record["opportunity_id"]) for record in reader]
)

# Verify JSON file contents
with file_util.open_stream(export_opportunity_data_task.json_file, "r") as infile:
# Parse JSON File
json_opportunities = json.load(infile)

assert expected_opportunity_ids == set(
[int(record["opportunity_id"]) for record in json_opportunities["opportunities"]]
)

schema = OpportunityV1Schema(many=True)

errors = schema.validate(json_opportunities["opportunities"])
assert len(errors) == 0

0 comments on commit 2e0666f

Please sign in to comment.