Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
add test for export_opportunity_data_task
Browse files Browse the repository at this point in the history
  • Loading branch information
Rwolfe-Nava committed Aug 9, 2024
1 parent f56bd36 commit e45969b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 8 deletions.
9 changes: 8 additions & 1 deletion api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ ENABLE_OPPORTUNITY_LOG_MSG=false
# For the script to setup the foreign data tables
# this env var overrides it so the script generates normal
# tables that don't need to connect to an Oracle database
IS_LOCAL_FOREIGN_TABLE=true
IS_LOCAL_FOREIGN_TABLE=true

############################
# Task Configuration
############################

# File path for the export_opportunity_data task
EXPORT_OPP_DATA_FILE_PATH=/tmp
13 changes: 6 additions & 7 deletions api/src/task/opportunities/export_opportunity_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from enum import StrEnum
from typing import Iterator, Sequence

from pydantic import Field
from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy.orm import noload, selectinload
Expand Down Expand Up @@ -35,13 +34,13 @@ def export_opportunity_data(db_session: db.Session) -> None:
class ExportOpportunityDataConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="EXPORT_OPP_DATA_")

file_path: str = Field(default="/api/src/task/opportunities/output/")
file_name: str = Field(default="opportunity_data")
# EXPORT_OPP_DATA_FILE_PATH
file_path: str


class ExportOpportunityDataTask(Task):
class Metrics(StrEnum):
RECORDS_LOADED = "records_loaded"
RECORDS_EXPORTED = "records_exported"

def __init__(
self,
Expand All @@ -57,10 +56,10 @@ def __init__(
self.current_timestamp = get_now_us_eastern_datetime().strftime("%Y-%m-%d_%H-%M-%S")

self.json_file = os.path.join(
config.file_path, f"{config.file_name}-{self.current_timestamp}.json"
config.file_path, f"opportunity_data-{self.current_timestamp}.json"
)
self.csv_file = os.path.join(
config.file_path, f"{config.file_name}-{self.current_timestamp}.csv"
config.file_path, f"opportunity_data-{self.current_timestamp}.csv"
)

self.set_metrics({"csv_file": self.csv_file, "json_file": self.json_file})
Expand All @@ -72,7 +71,7 @@ def run_task(self) -> None:
opportunities = []
for opp_batch in self.fetch_opportunities():
for record in opp_batch:
self.increment(self.Metrics.RECORDS_LOADED)
self.increment(self.Metrics.RECORDS_EXPORTED)
opportunities.append(schema.dump(record))

# Format data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import csv
import json

import pytest

import src.util.file_util as file_util
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.task.opportunities.export_opportunity_data_task import (
ExportOpportunityDataConfig,
ExportOpportunityDataTask,
)
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory


class TestExportOpportunityDataTask(BaseTestClass):
@pytest.fixture
def export_opportunity_data_task(self, db_session, mock_s3_bucket):
config = ExportOpportunityDataConfig(file_path=f"s3://{mock_s3_bucket}/")
return ExportOpportunityDataTask(db_session, config)

def test_export_opportunity_data_task(
self,
db_session,
truncate_opportunities,
enable_factory_create,
export_opportunity_data_task,
):
# Create 25 opportunities we will load
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
)

export_opportunity_data_task.run()

# Verify some metrics first
# Make sure the opportunities we have created matches the number
# That get exported
assert (
len(opportunities)
== export_opportunity_data_task.metrics[
export_opportunity_data_task.Metrics.RECORDS_EXPORTED
]
)

# Verify csv file contents
with file_util.open_stream(export_opportunity_data_task.csv_file, "r") as infile:
reader = csv.DictReader(infile)
assert set([opp.opportunity_id for opp in opportunities]) == set(
[int(record["opportunity_id"]) for record in reader]
)

with file_util.open_stream(export_opportunity_data_task.json_file, "r") as infile:
# Parse JSON File
print("JSON OPPORTUNITIES:")
json_opportunities = json.load(infile)

assert set([opp.opportunity_id for opp in opportunities]) == set(
[int(record["opportunity_id"]) for record in json_opportunities["opportunities"]]
)

schema = OpportunityV1Schema(many=True)

errors = schema.validate(json_opportunities["opportunities"])
assert len(errors) == 0

0 comments on commit e45969b

Please sign in to comment.