Skip to content

Commit

Permalink
4674 refacto classes etl (#4675)
Browse files Browse the repository at this point in the history
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
qloridant and dependabot[bot] authored Nov 25, 2024
1 parent fd1ad2b commit 46208bf
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 191 deletions.
28 changes: 10 additions & 18 deletions macantine/etl/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def aggregate(df):
return df


class ETL_ANALYSIS(etl.ETL):
class ANALYSIS(etl.TRANSFORMER_LOADER):
"""
Create a dataset for analysis in a Data Warehouse
* Extract data from prod
Expand All @@ -181,7 +181,7 @@ class ETL_ANALYSIS(etl.ETL):
"""

def __init__(self):
self.df = None
super().__init__()
self.extracted_table_name = ""
self.schema = ""
self.warehouse = DataWareHouse()
Expand All @@ -199,7 +199,7 @@ def _clean_dataset(self):
self.df = self.df.drop_duplicates(subset=["id"])


class ETL_ANALYSIS_TD(ETL_ANALYSIS):
class ETL_ANALYSIS_TELEDECLARATIONS(ANALYSIS, etl.TELEDECLARATIONS):
"""
Create a dataset for analysis in a Data Warehouse
* Extract data from prod
Expand All @@ -208,21 +208,13 @@ class ETL_ANALYSIS_TD(ETL_ANALYSIS):
"""

def __init__(self):
self.df = None
super().__init__()
self.years = utils.CAMPAIGN_DATES.keys()
self.extracted_table_name = "teledeclarations"
self.warehouse = DataWareHouse()
self.schema = json.load(open("data/schemas/schema_analysis.json"))
self.columns = [field["name"] for field in self.schema["fields"]]

def extract_dataset(self):
# Load teledeclarations from prod database into the Data Warehouse
self.df = utils.fetch_teledeclarations(self.years)

if self.df.empty:
logger.warning("Dataset is empty. Creating an empty dataframe with columns from the schema")
self.df = pd.DataFrame(columns=self.columns)

def transform_dataset(self):
if self.df.empty:
logger.warning("Dataset is empty. Skipping transformation")
Expand All @@ -243,7 +235,7 @@ def transform_dataset(self):
self.df = aggregate(self.df)

# Add additionnal filters (that couldn't be processed at queryset)
self.df = utils.filter_teledeclarations(self.df)
self.filter_teledeclarations()

self.compute_miscellaneous_columns()

Expand Down Expand Up @@ -308,7 +300,7 @@ def compute_miscellaneous_columns(self):
self.df["ratio_egalim_sans_bio"] = self.df.apply(get_ratio_egalim_sans_bio, axis=1)


class ETL_ANALYSIS_CANTEEN(ETL_ANALYSIS):
class ETL_ANALYSIS_CANTEEN(etl.CANTEENS, ANALYSIS):
"""
Create a dataset for analysis in a Data Warehouse
* Extract data from prod
Expand All @@ -321,10 +313,12 @@ class ETL_ANALYSIS_CANTEEN(ETL_ANALYSIS):
"""

def __init__(self):
self.df = None
super().__init__()

self.extracted_table_name = "canteens"
self.warehouse = DataWareHouse()
self.schema = json.load(open("data/schemas/schema_analysis_cantines.json"))

# The following mapper is used for renaming columns and for selecting the columns to extract from db
self.columns_mapper = {
"id": "id",
Expand All @@ -346,9 +340,7 @@ def __init__(self):
"line_ministry": "ministere_tutelle",
"sectors": "secteur",
}

def extract_dataset(self):
self.df = utils.fetch_canteens(self.columns_mapper.keys())
self.columns = self.columns_mapper.keys()

def transform_dataset(self):
logger.info("Filling geo names")
Expand Down
146 changes: 145 additions & 1 deletion macantine/etl/etl.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import csv
import json
import logging
import os
import time
from abc import ABC, abstractmethod

import pandas as pd
import requests
from django.core.files.storage import default_storage

from data.department_choices import Department
from data.models import Canteen, Teledeclaration
from data.region_choices import Region
from macantine.etl.utils import format_geo_name
from macantine.etl.utils import (
CAMPAIGN_DATES,
common_members,
filter_empty_values,
format_geo_name,
)

logger = logging.getLogger(__name__)

Expand All @@ -13,6 +27,12 @@ class ETL(ABC):
Interface for the different ETL
"""

def __init__(self):
self.df = None
self.schema = None
self.schema_url = ""
self.dataset_name = ""

def fill_geo_names(self, prefix=""):
"""
Given a dataframe with columns 'department' and 'region', this method maps the name of the location, based on the INSEE code
Expand All @@ -27,14 +47,138 @@ def fill_geo_names(self, prefix=""):
del self.df[f"{col_geo_zoom}_lib"]
self.df.insert(self.df.columns.get_loc(col_geo_zoom) + 1, f"{col_geo_zoom}_lib", col_to_insert)

def get_schema(self):
return self.schema

def get_dataset(self):
return self.df

def len_dataset(self):
if isinstance(self.df, pd.DataFrame):
return len(self.df)
else:
return 0

def is_valid(self, filepath) -> bool:
# In order to validate the dataset with the validata api, must first convert to CSV then save online
with default_storage.open(filepath + "_to_validate.csv", "w") as file:
self.df.to_csv(
file,
sep=";",
index=False,
na_rep="",
encoding="utf_8_sig",
quoting=csv.QUOTE_NONE,
)
dataset_to_validate_url = (
f"{os.environ['CELLAR_HOST']}/{os.environ['CELLAR_BUCKET_NAME']}/media/{filepath}_to_validate.csv"
)

res = requests.get(
f"https://api.validata.etalab.studio/validate?schema={self.schema_url}&url={dataset_to_validate_url}&header_case=true"
)
report = json.loads(res.text)["report"]
if len(report["errors"]) > 0 or report["stats"]["errors"] > 0:
logger.error(f"The dataset {self.dataset_name} extraction has errors : ")
logger.error(report["errors"])
logger.error(report["tasks"])
return False
else:
return True


class EXTRACTOR(ETL):
@abstractmethod
def extract_dataset(self):
pass


class TRANSFORMER_LOADER(ETL):
@abstractmethod
def transform_dataset(self):
pass

@abstractmethod
def load_dataset(self):
pass


class TELEDECLARATIONS(EXTRACTOR):
def __init__(self):
self.years = []
self.columns = []

def filter_aberrant_td(self):
"""
Filtering out the teledeclarations that :
* products > 1 million €
AND
* an avg meal cost > 20 €
"""
mask = (self.df["teledeclaration.value_total_ht"] > 1000000) & (
self.df["teledeclaration.value_total_ht"] / self.df["canteen.yearly_meal_count"] > 20
)
self.df = self.df[~mask]

def filter_teledeclarations(self):
"""
Filter teledeclarations for empty values."""

self.df = filter_empty_values(self.df, col_name="teledeclaration.value_total_ht")
self.df = filter_empty_values(self.df, col_name="teledeclaration.value_bio_ht")
self.filter_aberrant_td()

def extract_dataset(self) -> pd.DataFrame:
self.df = pd.DataFrame()
for year in self.years:
if year in CAMPAIGN_DATES.keys():
df_year = pd.DataFrame(
Teledeclaration.objects.filter(
year=year,
creation_date__range=(
CAMPAIGN_DATES[year]["start_date"],
CAMPAIGN_DATES[year]["end_date"],
),
status=Teledeclaration.TeledeclarationStatus.SUBMITTED,
canteen_id__isnull=False,
canteen_siret__isnull=False,
diagnostic__value_total_ht__isnull=False,
diagnostic__value_bio_ht__isnull=False,
)
.exclude(
canteen__deletion_date__range=(
CAMPAIGN_DATES[year]["start_date"],
CAMPAIGN_DATES[year]["end_date"],
)
)
.exclude(canteen_siret="")
.values()
)
self.df = pd.concat([self.df, df_year])
else:
logger.warning(f"TD dataset does not exist for year : {year}")
if self.df.empty:
logger.warning("Dataset is empty. Creating an empty dataframe with columns from the schema")
self.df = pd.DataFrame(columns=self.columns)


class CANTEENS(EXTRACTOR):
def __init__(self):
super().__init__()
self.exclude_filter = None
self.columns = []

def extract_dataset(self):
start = time.time()
canteens = Canteen.objects.all()
if self.exclude_filter:
canteens = Canteen.objects.exclude(self.exclude_filter)
if canteens.count() == 0:
self.df = pd.DataFrame(columns=self.columns)
else:
# Creating a dataframe with all canteens. The canteens can have multiple lines if they have multiple sectors
columns_model = [field.name for field in Canteen._meta.get_fields()]
columns_to_extract = common_members(self.columns, columns_model)
self.df = pd.DataFrame(canteens.values(*columns_to_extract))
end = time.time()
logger.info(f"Time spent on canteens extraction : {end - start}")
Loading

0 comments on commit 46208bf

Please sign in to comment.