Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
implement changes based on feedback from initial draft review
Browse files Browse the repository at this point in the history
  • Loading branch information
Rwolfe-Nava committed Aug 9, 2024
1 parent 6ab7848 commit f56bd36
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions api/src/task/opportunities/export_opportunity_data_task.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import json
import logging
import os
from enum import StrEnum
from typing import Iterator, Sequence

from smart_open import open
from pydantic import Field
from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.services.opportunities_v1.opportunity_to_csv import opportunities_to_csv
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)

Expand All @@ -28,47 +32,58 @@ def export_opportunity_data(db_session: db.Session) -> None:
ExportOpportunityDataTask(db_session).run()


class ExportOpportunityDataConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_")

file_path: str = Field(default="/api/src/task/opportunities/output/")
file_name: str = Field(default="opportunity_data")


class ExportOpportunityDataTask(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"

def __init__(
self,
db_session: db.Session,
config: ExportOpportunityDataConfig | None = None,
) -> None:
super().__init__(db_session)

FILE_NAME: str = "opportunity_data"
self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")
if config is None:
config = ExportOpportunityDataConfig()
self.config = config

# Surely there is a better way to do paths in python?
# I tried pathlib's Path.cwd() thinking it would resolve
# to /api/src/task/opportunities/ but it resolved to just /api
self.FILE_PATH = "/api/src/task/opportunities/output/"
self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")

self.json_file = f"{FILE_NAME}-{self.current_timestamp}.json"
self.csv_file = f"{FILE_NAME}-{self.current_timestamp}.csv"
self.json_file = os.path.join(
config.file_path, f"{config.file_name}-{self.current_timestamp}.json"
)
self.csv_file = os.path.join(
config.file_path, f"{config.file_name}-{self.current_timestamp}.csv"
)

self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file})

def run_task(self) -> None:
# load records
# Load records
schema = OpportunityV1Schema()
data_to_export: dict = {
"metadata": {"file_generated_at": self.current_timestamp},
"opportunities": [],
}

opportunities = []
for opp_batch in self.fetch_opportunities():
for record in opp_batch:
self.increment(self.Metrics.RECORDS_LOADED)
data_to_export["opportunities"].append(schema.dump(record))
opportunities.append(schema.dump(record))

# Export the data to json
self.export_data_to_json(data_to_export=data_to_export)
# Format data
data_to_export: dict = {
"metadata": {"file_generated_at": self.current_timestamp},
"opportunities": opportunities,
}

# Export the opportunities to a csv
self.export_opportunities_to_csv(opportunities=data_to_export["opportunities"])
# Export data
self.export_data_to_json(data_to_export)
self.export_opportunities_to_csv(opportunities)

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Expand Down Expand Up @@ -96,13 +111,15 @@ def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:

def export_data_to_json(self, data_to_export: dict) -> None:
# create the json file
logger.info(
"Creating Opportunity JSON extract", extra={"json_extract_path": self.json_file}
)
json_object = json.dumps(data_to_export, indent=4)
json_file = self.FILE_PATH + self.json_file
with open(json_file, "w") as outfile:
with file_util.open_stream(self.json_file, "w") as outfile:
outfile.write(json_object)

def export_opportunities_to_csv(self, opportunities: Sequence[dict]) -> None:
# create the csv file
csv_file = self.FILE_PATH + self.csv_file
with open(csv_file, "w") as outfile:
logger.info("Creating Opportunity CSV extract", extra={"csv_extract_path": self.csv_file})
with file_util.open_stream(self.csv_file, "w") as outfile:
opportunities_to_csv(opportunities, outfile)

0 comments on commit f56bd36

Please sign in to comment.