diff --git a/.vscode/launch.json b/.vscode/launch.json index 733aa741f..0b265f675 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -66,5 +66,18 @@ "justMyCode": false, "django": true }, + { + "name": "Export Dataset", + "type": "debugpy", + "request": "launch", + "stopOnEntry": false, + "python": "${workspaceFolder}/venv/bin/python", + "program": "${workspaceFolder}/manage.py", + "args": [ + "export_dataset", + ], + "justMyCode": false, + "django": true + }, ] } diff --git a/macantine/etl/analysis.py b/macantine/etl/analysis.py index a5f541f87..a6d348ba1 100644 --- a/macantine/etl/analysis.py +++ b/macantine/etl/analysis.py @@ -190,6 +190,7 @@ def load_dataset(self): """ Load in database """ + logger.info(f"Loading {len(self.df)} objects in db") self.warehouse.insert_dataframe(self.df, self.extracted_table_name) def _clean_dataset(self): @@ -212,13 +213,24 @@ def __init__(self): self.extracted_table_name = "teledeclarations" self.warehouse = DataWareHouse() self.schema = json.load(open("data/schemas/schema_analysis.json")) + self.columns = [field["name"] for field in self.schema["fields"]] def extract_dataset(self): # Load teledeclarations from prod database into the Data Warehouse self.df = utils.fetch_teledeclarations(self.years) - self.df.index = self.df.id + + if self.df.empty: + logger.warning("Dataset is empty. Creating an empty dataframe with columns from the schema") + self.df = pd.DataFrame(columns=self.columns) def transform_dataset(self): + if self.df.empty: + logger.warning("Dataset is empty. Skipping transformation") + return + + # Use id as index + self.df.index = self.df.id + # Flatten json 'declared_data' column df_json = pd.json_normalize(self.df["declared_data"]) del df_json["year"] @@ -230,6 +242,9 @@ def transform_dataset(self): # Aggregate columns for complete TD - Must occur before other transformations self.df = aggregate(self.df) + # Add additionnal filters (that couldn't be processed at queryset) + self.df = utils.filter_teledeclarations(self.df) + self.compute_miscellaneous_columns() # Convert types @@ -243,14 +258,14 @@ def transform_dataset(self): self.fill_geo_names(prefix="canteen.") # Fill campaign participation - logger.info("Canteens : Fill campaign participations...") + logger.info("TD : Fill campaign participations...") for year in utils.CAMPAIGN_DATES.keys(): campaign_participation = utils.map_canteens_td(year) col_name_campaign = f"declaration_{year}" self.df[col_name_campaign] = self.df["id"].apply(lambda x: x in campaign_participation) # Extract the sector names and categories - logger.info("Canteens : Extract sectors...") + logger.info("TD : Extract sectors...") self.df[["secteur", "catégorie"]] = self.df.apply( lambda x: utils.format_td_sector_column(x, "canteen.sectors"), axis=1, result_type="expand" ) diff --git a/macantine/etl/utils.py b/macantine/etl/utils.py index a3cbefc78..0ea562be1 100644 --- a/macantine/etl/utils.py +++ b/macantine/etl/utils.py @@ -225,6 +225,33 @@ def map_sectors(): return sectors_mapper +def filter_empty_values(df: pd.DataFrame, col_name) -> pd.DataFrame: + """ + Filtering out the teledeclarations for wich a certain field is empty + """ + return df.dropna(subset=col_name) + + +def filter_aberrant_td(df: pd.DataFrame) -> pd.DataFrame: + """ + Filtering out the teledeclarations that : + * products > 1 million € + AND + * an avg meal cost > 20 € + """ + mask = (df["teledeclaration.value_total_ht"] > 1000000) & ( + df["teledeclaration.value_total_ht"] / df["canteen.yearly_meal_count"] > 20 + ) + return df[~mask] + + +def filter_teledeclarations(df: pd.DataFrame): + df = filter_empty_values(df, col_name="teledeclaration.value_total_ht") + df = filter_empty_values(df, col_name="teledeclaration.value_bio_ht") + df = filter_aberrant_td(df) + return df + + def fetch_teledeclarations(years: list) -> pd.DataFrame: df = pd.DataFrame() for year in years: @@ -238,7 +265,18 @@ def fetch_teledeclarations(years: list) -> pd.DataFrame: ), status=Teledeclaration.TeledeclarationStatus.SUBMITTED, canteen_id__isnull=False, - ).values() + canteen_siret__isnull=False, + diagnostic__value_total_ht__isnull=False, + diagnostic__value_bio_ht__isnull=False, + ) + .exclude( + canteen__deletion_date__range=( + CAMPAIGN_DATES[year]["start_date"], + CAMPAIGN_DATES[year]["end_date"], + ) + ) + .exclude(canteen_siret="") + .values() ) df = pd.concat([df, df_year]) else: diff --git a/macantine/tests/test_etl_analysis.py b/macantine/tests/test_etl_analysis.py index a55b046be..5a2ab15da 100644 --- a/macantine/tests/test_etl_analysis.py +++ b/macantine/tests/test_etl_analysis.py @@ -54,30 +54,57 @@ def test_extraction_teledeclaration(self): """ Only teledeclarations that occurred during teledeclaration campaigns should be extracted """ - canteen = CanteenFactory.create() + canteen = CanteenFactory.create(siret="98648424243607") + canteen_no_siret = CanteenFactory.create() applicant = UserFactory.create() - with freeze_time("1991-01-14"): # Faking time to mock creation_date - diagnostic_1990 = DiagnosticFactory.create(canteen=canteen, year=1990, diagnostic_type=None) - _ = Teledeclaration.create_from_diagnostic(diagnostic_1990, applicant) - - with freeze_time("2023-05-14"): # Faking time to mock creation_date - diagnostic_2022 = DiagnosticFactory.create(canteen=canteen, year=2022, diagnostic_type=None) - td_2022 = Teledeclaration.create_from_diagnostic(diagnostic_2022, applicant) - - with freeze_time("2024-02-14"): # Faking time to mock creation_date - diagnostic_2023 = DiagnosticFactory.create(canteen=canteen, year=2023, diagnostic_type=None) - td_2023 = Teledeclaration.create_from_diagnostic(diagnostic_2023, applicant) - etl_stats = ETL_ANALYSIS_TD() - etl_stats.extract_dataset() - self.assertEqual( - len(etl_stats.df), - 2, - "There should be two teledeclaration. None for 1990 (no campaign). One for 2022 and one for 2023", - ) - self.assertEqual(etl_stats.df[etl_stats.df.id == td_2022.id].year.iloc[0], 2022) - self.assertEqual(etl_stats.df[etl_stats.df.id == td_2023.id].year.iloc[0], 2023) + test_cases = [ + { + "date_mocked": "1991-01-14", + "year": 1990, + "canteen": canteen, + "delete_canteen": False, + "expected_outcome": "no_extraction", + "msg": "Outside any campaign date", + }, + { + "date_mocked": "2023-05-14", + "year": 2022, + "canteen": canteen, + "delete_canteen": False, + "expected_outcome": "extraction", + "msg": "Valid", + }, + { + "date_mocked": "2024-02-14", + "year": 2023, + "canteen": canteen, + "delete_canteen": True, + "expected_outcome": "no_extraction", + "msg": "Canteen deleted during campaign", + }, + { + "date_mocked": "2024-02-14", + "year": 2023, + "canteen": canteen_no_siret, + "delete_canteen": False, + "expected_outcome": "no_extraction", + "msg": "Canteen without a siret", + }, + ] + for tc in test_cases: + with freeze_time(tc["date_mocked"]): # Faking time to mock creation_date + diag = DiagnosticFactory.create(canteen=tc["canteen"], year=tc["year"], diagnostic_type=None) + td = Teledeclaration.create_from_diagnostic(diag, applicant) + if tc["delete_canteen"]: + tc["canteen"].delete() + + etl_stats.extract_dataset() + if tc["expected_outcome"] == "extraction": + self.assertEqual(len(etl_stats.df[etl_stats.df.id == td.id]), 1) + else: + self.assertEqual(len(etl_stats.df[etl_stats.df.id == td.id]), 0) def test_get_egalim_hors_bio(self): data = { diff --git a/macantine/tests/test_etl_open_data.py b/macantine/tests/test_etl_open_data.py index 675c85552..68d345fa2 100644 --- a/macantine/tests/test_etl_open_data.py +++ b/macantine/tests/test_etl_open_data.py @@ -21,7 +21,7 @@ def test_td_range_years(self, mock): """ Only teledeclarations that occurred during one specific teledeclaration campaign should be extracted """ - canteen = CanteenFactory.create() + canteen = CanteenFactory.create(siret="98648424243607") applicant = UserFactory.create() test_cases = [ {"name": "Ignore years out of range", "year": 1990, "expected_outcome": 0}, @@ -37,7 +37,7 @@ def test_td_range_years(self, mock): @freeze_time("2023-05-14") # Faking time to mock creation_date def test_ignore_cancelled_tds(self, mock): - canteen = CanteenFactory.create() + canteen = CanteenFactory.create(siret="98648424243607") applicant = UserFactory.create() diagnostic = DiagnosticFactory.create(canteen=canteen, year=2022, diagnostic_type=None) teledeclaration = Teledeclaration.create_from_diagnostic(diagnostic, applicant) @@ -119,10 +119,10 @@ def test_extraction_canteen(self, mock): self.assertEqual(etl_canteen.len_dataset(), 0, "There shoud be an empty dataframe") # Adding data in the db - canteen_1 = CanteenFactory.create() + canteen_1 = CanteenFactory.create(siret="98648424243607") canteen_1.managers.add(UserFactory.create()) - canteen_2 = CanteenFactory.create() # Another canteen, but without a manager + canteen_2 = CanteenFactory.create(siret="98648424243607") # Another canteen, but without a manager canteen_2.managers.clear() etl_canteen.extract_dataset()