Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/transform #4

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/eur_revenue.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
company,currency,operational_revenue,date
ABC Inc.,EUR,1000,2023-01
XYZ Corp.,EUR,1500,2023-02
123 Ltd.,EUR,2000,2023-03
XYZ Corp.,EUR,1500,2023-01
123 Ltd.,EUR,2000,2023-01
4 changes: 2 additions & 2 deletions data/usd_revenue.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
company,currency,operational_revenue,date
ABC Inc.,USD,3000,2023-01
XYZ Corp.,USD,3500,2023-02
123 Ltd.,USD,4000,2023-03
XYZ Corp.,USD,3500,2023-01
123 Ltd.,USD,4000,2023-01
4 changes: 2 additions & 2 deletions data/yen_revenue.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
company,currency,operational_revenue,date
ABC Inc.,YEN,2000,2023-01
XYZ Corp.,YEN,2500,2023-02
123 Ltd.,YEN,3000,2023-03
XYZ Corp.,YEN,2500,2023-01
123 Ltd.,YEN,3000,2023-01
90 changes: 64 additions & 26 deletions src/etl.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,85 @@
import pandas as pd
import pandera as pa
from typing import Union
from .google_drive import GoogleDrive
from .database import engine, get_files_ids_from_db
from .schema import CompanyRevenue
from .transform_utils import get_currencies_rates, transform_and_validate_data

def extract_files(service_account_path: str, parent_folder_name: str, folder_to_extract: str) -> Union[list[pd.DataFrame], list[None]]:

list_df: list[pd.DataFrame] = []

drive_conn = GoogleDrive(service_account_file= service_account_path,
def connect_drive_and_extract_files(service_account_path: str, parent_folder_name: str, folder_to_extract: str) -> list[pd.DataFrame] | list[None]:

try:
drive_conn = GoogleDrive(service_account_file= service_account_path,
folder_name= parent_folder_name)
except Exception as e:
print(f'Error connecting with Google Drive: {e}')

try:
revenue_folder_id = drive_conn.id_folders()[folder_to_extract]
except:
except Exception as e:
print('There is no folder with this name.')
print(e)

df_files_csv = drive_conn.get_csv_files(folder_id= revenue_folder_id)
try:
df_files_csv = drive_conn.get_csv_files(folder_id= revenue_folder_id)
except Exception as e:
print(e)

database_files_ids = get_files_ids_from_db()

list_df_files_info: list[pd.DataFrame] = []

for _ , row in df_files_csv.iterrows():

file_id = row['id']
file_name = row['name']
if file_id not in database_files_ids:
df_raw = drive_conn.read_csv_from_drive(file_id=file_id)
df_extracted = drive_conn.read_csv_from_drive(file_id=file_id)
df_extracted['file_id'] = file_id
list_df_files_info.append(df_extracted)
print(f'File {file_name} extracted.')
try:
df_validated = CompanyRevenue.validate(df_raw, lazy = True)
df_validated['file_id'] = file_id
list_df.append(df_validated)
except pa.errors.SchemaErrors as err:
print("Schema errors and failure cases:")
print(err.failure_cases)
print("\nDataFrame object that failed validation:")
print(err.data)
else:
print(f'file {file_name} already loaded in database.')
print(f'File {file_name} was previously loaded into the database')

return list_df_files_info


def extract_data_from_files(list_df_files_info: list[pd.DataFrame]) -> list[pd.DataFrame] | list[None]:

list_df_validated: list[pd.DataFrame] = []

return list_df
for df in list_df_files_info:
try:
df_validated = CompanyRevenue.validate(df, lazy = True)
list_df_validated.append(df_validated)
except pa.errors.SchemaErrors as err:
print("Schema errors and failure cases:")
print(err.message)
print("\nDataFrame object that failed validation:")
print(err.data)

return list_df_validated

def transform_data(list_df_validated: list[pd.DataFrame]) -> list[pd.DataFrame]:

def load_files(list_df: Union[list[pd.DataFrame], list[None]]) -> None:
list_df_transformed: list[pd.DataFrame] = []

df_convertion_rates = get_currencies_rates()

for df in list_df_validated:
try:
df_transformed = transform_and_validate_data(df_to_transform = df,
df_convertion_rates = df_convertion_rates)
list_df_transformed.append(df_transformed)
except pa.errors.SchemaErrors as err:
print("Schema errors and failure cases:")
print(err.message)
print("\nDataFrame object that failed validation:")
print(err.data)

return list_df_transformed



def load_files(list_df: list[pd.DataFrame] | list[None] ) -> None:

if not list_df:
print('There are no files to load into the database')
Expand All @@ -60,11 +95,14 @@ def load_files(list_df: Union[list[pd.DataFrame], list[None]]) -> None:

def pipeline(service_account_path: str, parent_folder_name: str, folder_to_extract: str) -> None:

list_df = extract_files(service_account_path, parent_folder_name, folder_to_extract)
list_df_files_info = connect_drive_and_extract_files(service_account_path, parent_folder_name, folder_to_extract)

list_df_extracted = extract_data_from_files(list_df_files_info)

list_df_transformed = transform_data(list_df_extracted)

load_files(list_df_transformed)

load_files(list_df)

# print(list_df[0].info())



Expand Down
4 changes: 3 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# Add the root directory of your project to sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
######################################################
from src.etl import pipeline
import pandas as pd
from src.etl import pipeline, extract_data_from_files, transform_data
from pandera.engines import pandas_engine


service_account_path = 'service_account.json'
Expand Down
21 changes: 17 additions & 4 deletions src/schema.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
import pandera as pa
import pandas as pd
from pandera.typing import Series
from typing import Optional

class CompanyRevenue(pa.SchemaModel):

company: Series[str]
currency: Series[str] = pa.Field(isin= ['EUR', 'USD', 'YEN'])
operational_revenue: Series[float] = pa.Field(ge = 0)
date: Series[pa.DateTime]
date: Series[pa.DateTime]
file_id: Optional[str]

class Config:
coerce = True # coerce types of all schema components
strict = True # make sure all specified columns are in the validated dataframe
coerce = True
strict = True

@pa.check("currency", name = "Currency types", error= "There is more than one type of currency;")
def check_equal_currency(cls, currency: Series[str]) -> Series[bool]:
return currency.nunique() == 1

@pa.check("date", name = "Date format", error= "Date format must contain just year and month;")
def check_date_format(cls, date: Series[pa.DateTime]) -> Series[bool]:
return date.dt.strftime('%Y-%m') == date

@pa.check("date", name = "Date format", error= "There is more than one date;")
def check_date_format(cls, date: Series[pa.DateTime]) -> Series[bool]:
return date.nunique() == 1
44 changes: 44 additions & 0 deletions src/transform_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pandas as pd
import pandas_datareader.data as web
import datetime

def get_currencies_rates() -> pd.DataFrame:

start_date = datetime.datetime(2010, 1, 1)

eur_usd = web.DataReader('DEXUSEU', 'fred', start = start_date)
eur_usd = eur_usd.resample('M').last()
eur_usd.index = eur_usd.index.strftime('%Y-%m')

usd_yen = web.DataReader('DEXJPUS', 'fred', start = start_date)
usd_yen = usd_yen.resample('M').last()
usd_yen.index = usd_yen.index.strftime('%Y-%m')
yen_usd = (1/ usd_yen) # Convertion rate from YEN to USD

df_convertion_rates = pd.concat([yen_usd, eur_usd], axis = 1)

df_convertion_rates.columns = ['YEN/USD', 'EUR/USD']

return df_convertion_rates


def transform_and_validate_data(df_to_transform: pd.DataFrame, df_convertion_rates: pd.DataFrame) -> pd.DataFrame:

currency = df_to_transform['currency'][0]

if currency == 'USD':
convertion_rate = 1
df_to_transform['conversion_rate'] = convertion_rate
df_to_transform['usd_converted'] = df_to_transform['operational_revenue']
return df_to_transform


currency_pair = f'{currency}/USD'
convertion_rate_date = df_to_transform['date'].dt.strftime('%Y-%m')[0]

convertion_rate = df_convertion_rates.loc[convertion_rate_date][currency_pair]

df_to_transform['conversion_rate'] = convertion_rate
df_to_transform['usd_converted'] = df_to_transform['operational_revenue'] * convertion_rate

return df_to_transform
74 changes: 58 additions & 16 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,62 +9,104 @@
from src.schema import CompanyRevenue

def test_valid_schema():
df = pd.DataFrame({
'company': ['ABC Inc', 'company', 'Company 02'],
'currency': ['EUR', 'EUR', 'EUR'],
'operational_revenue': [1000, 2000, 3],
'date': ['March 2022', '2022-03', '2022/03']
})

CompanyRevenue.validate(df)

def test_aditional_column():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['EUR', 'USD'],
'currency': ['EUR', 'EUR'],
'operational_revenue': [1000, 2000],
'date': ['2020-01', '2020-02']
'date': ['2020-02', '2020-02'],
'aditional': [0,0]
})

CompanyRevenue.validate(df)

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy = True)

def test_missing_value():

df = pd.DataFrame({
'company': ['ABC Inc', np.nan],
'currency': ['EUR', 'USD'],
'currency': ['EUR', 'EUR'],
'operational_revenue': [1000, 2000],
'date': ['2020-01', '2020-02']
'date': ['2020-02', '2020-02']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy = True)

def test_parse_to_datetime():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['USD', 'USD'],
'operational_revenue': [1000, 2000],
'date': ['2020-02', 'date']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)

def test_wrong_date_format():

def test_date_with_day():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['EUR', 'USD'],
'currency': ['USD', 'USD'],
'operational_revenue': [1000, 2000],
'date': ['2020-01', 'date']
'date': ['2020-02', 'March 2022 02']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)

def test_dates_differents():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['USD', 'USD'],
'operational_revenue': [1000, 2000],
'date': ['2022-02', 'April 2022']
})

def test_negative_revenue():
with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)


def test_negative_revenue():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['EUR', 'USD'],
'currency': ['USD', 'USD'],
'operational_revenue': [-1000, 2000],
'date': ['2020-01', '2020-02']
'date': ['2020-02', '2020-02']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)


def test_currency_not_allowed():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['BRL', 'BRL'],
'operational_revenue': [1000, 2000],
'date': ['2020-02', '2020-02']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)


def test_more_than_one_currency():
df = pd.DataFrame({
'company': ['ABC Inc', 'company'],
'currency': ['BRL', 'USD'],
'currency': ['EUR', 'USD'],
'operational_revenue': [1000, 2000],
'date': ['2020-01', '2020-02']
'date': ['2020-02', '2020-02']
})

with pytest.raises(pa.errors.SchemaErrors):
CompanyRevenue.validate(df, lazy= True)
CompanyRevenue.validate(df, lazy= True)