Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

96 model development transformation scripts #176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import pytest
import pandas as pd
import os
import tempfile
import shutil
import unittest
from meal_identification.transformations.pipeline_generator import PipelineGenerator
from meal_identification.transformations.pydantic_test_models import NumericColumns, CategoricalColumns, CosineTransformed
from meal_identification.datasets.pydantic_test_models import DataFrameValidator
from sktime.transformations.series.cos import CosineTransformer

from sktime.transformations.series.impute import Imputer

class TestTransformation(unittest.TestCase):
def setUp(self):
"""Set up test environment with temporary directory and sample CSV files."""
self.project_root = os.path.realpath(tempfile.mkdtemp())
os.makedirs(os.path.join(self.project_root, '.github'))

# Create interim and processed data directory
self.interim_data_dir = "0_meal_identification/meal_identification/data/interim"
self.full_interim_path = os.path.join(self.project_root, self.interim_data_dir)
os.makedirs(self.full_interim_path)

self.processed_data_dir = "0_meal_identification/meal_identification/data/processed"
self.full_processed_path = os.path.join(self.project_root, self.processed_data_dir)
os.makedirs(self.full_processed_path)

# Make it as a pd frame, then convert it to a csv for readability
self.sample_interim_data = pd.DataFrame({
'date': [
'2024-07-01 00:00:00-04:00',
'2024-07-01 00:05:00-04:00'
],
'bgl': [115.0, 112.0],
'msg_type': ['ANNOUCE_MEAL', 'DOSE_INSULIN'],
'affects_fob': [False, None],
'affects_iob': [None, True],
'dose_units': [None, None],
'food_g': [None, None],
'food_glycemic_index': [None, None],
'food_g_keep': [None, None],
'day_start_shift':['2024-06-30', '2024-06-30']
})

# Convert date strings to datetime objects before saving
self.sample_interim_data['date'] = pd.to_datetime(self.sample_interim_data['date'])

# Save sample CSV files
self.filename = 'interim_data.csv'
self.file_path = os.path.join(self.full_interim_path, self.filename)
self.sample_interim_data.to_csv(self.file_path, index=False)
self.original_dir = os.getcwd()
os.chdir(self.project_root)


def tearDown(self):
"""Clean up temporary files and directories."""
os.chdir(self.original_dir)
shutil.rmtree(self.project_root)

def test_load_numerical_data(self):
"""
Tests that the numerical columns of loaded data maintains valid data types using pydantic
"""
interim_file = self.filename
gen = PipelineGenerator()
gen.load_data([interim_file])
result_df = gen.data_num[interim_file]
print(result_df.columns)
assert DataFrameValidator(NumericColumns, index_field='bgl').validate_df(result_df)

def test_load_categorical_data(self):
"""
Tests that the categorical columns of loaded data maintains valid data types using pydantic
"""
interim_file = self.filename
gen = PipelineGenerator()
gen.load_data([interim_file])
result_df = gen.data_cat[interim_file]
# dfs are not indexed with date col
assert DataFrameValidator(CategoricalColumns, index_field='date').validate_df(result_df, True)


def test_transformed(self):
"""
Tests if transformed data has numerical values between -1 and 1
"""
interim_file = self.filename
gen = PipelineGenerator()
gen.load_data([interim_file])
gen.generate_pipeline([
CosineTransformer(),
Imputer(method = "constant", value = 0)
])
gen.fit_transform()
result_df = gen.save_output()[interim_file]
assert DataFrameValidator(CosineTransformed, index_field='date').validate_df(result_df, True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
from sktime.transformations.compose import TransformerPipeline

from loguru import logger

import random
import re
import os
import json
import pandas as pd

class PipelineGenerator():

"""
Class to generate sktime transformer pipeline:
Data is saved in 0_meal_identification/meal_identification/data/processed in the format:
run_# - number of run
run_#/data - the output data from that run
run_#/pipelines - pipelines saved from the run

to-do:
load data from a dataframe (in addition to files)
load transformers from json config file rather than .zip pipeline
add getters/setters and other helper methods
add functionality to delete previous runs
clean up logic & good coding practices
implement proper exception handling
test with various transformers
write documentation on how to use
"""

def __init__(self,
output_dir = "0_meal_identification/meal_identification/data/processed",
input_dir = "0_meal_identification/meal_identification/data/interim"):

self.data_cat = {}
self.data_num = {}
self.pipe = {}
self.column_order = None #order of columns for data for consistency

#set up paths for data directories
self.processed_dir_path = os.path.join(self.__get_root_dir(), output_dir)
self.interim_dir_path = os.path.join(self.__get_root_dir(), input_dir)
self

def load_data(self, raw_files):
'''
Loads data from files in the input_dir directory and splits it into numeric/categorical

Parameters
----------
raw_files : list of str
list of datasets in input_dir to be used with the pipeline. Format: "filename.csv"

Returns
-------
'''

data = {}

#read the data from interim directory
for file in raw_files:
file_path = os.path.join(self.interim_dir_path, file)
data[file] = pd.read_csv(file_path, parse_dates=['date'])

#separate data into numerical/categorical for sktime transformers
for key in data:
self.column_order = list(data[key].columns)
self.data_num[key] = data[key]._get_numeric_data()
self.data_cat[key] = data[key][list(set(data[key].columns) - set(self.data_num[key].columns))]

def generate_pipeline(self, transformers = None, run = None):
'''
Creates a pipeline either based on provided list of transformers or copies one from a previous run

Parameters
----------
transformers : optional, list of sktime transformers
Transformers to be added into pipeline, in the provided order
run: optional, int
Number of the run from which transformer params should be taken
Returns
-------
'''

if not transformers and not run:
raise TypeError("List of transformers or run number was not provided")

# load pipeline from past runs
if run:
pipeline_path = os.path.join(self.processed_dir_path, f"run_{run}", "pipelines")
# does load_from_path actually work?
for file in os.scandir(pipeline_path): # this is junky, refractor
pipe = TransformerPipeline.load_from_path(os.path.join(pipeline_path, file))
break

# load pipeline from parameters
else:
pipe = TransformerPipeline(steps = transformers)

# clone pipeline to fit to different datasets
for key in self.data_num:
self.pipe[key] = pipe.clone()

def fit(self):
'''
Fits the pipeline to the loaded data

Parameters
----------

Returns
-------
'''
for key in self.pipe:

#fit if pipeline was not fitted before
try:
self.pipe[key].check_is_fitted()
except:
self.pipe[key].fit(self.data_num[key])

def transform(self):
'''
Transforms the data from the pipelines

Parameters
----------

Returns
-------
'''

for key in self.pipe:
self.data_num[key] = self.pipe[key].transform(self.data_num[key])

def fit_transform(self):
'''
Applies fit and transform in sequence

Parameters
----------

Returns
-------
'''

self.fit()
self.transform()

def save_output(
self,
output_dir=None
):
'''
Applies fit and transform in sequence

Parameters
----------
output_dir: str
The directory in which pipelines and transformed data should be stored
Returns
-------
processed_data: dictionary of pandas DataFrames
Data supplied to the PipelineGenerator after transformations
'''
if(output_dir is None):
output_dir = self.processed_dir_path

# figure out which run is the last one
runs = []
for i in os.listdir(output_dir):
run_num = re.search("[0-9]+", i)
if run_num:
runs.append(int(run_num.group()))

if not runs:
runs = [0]
new_run = max(runs)+1

# make the run directory
output_dir = os.path.join(output_dir, f"run_{new_run}")
os.mkdir(output_dir)

# make the data directory
dir_path_data = os.path.join(output_dir, "data")
os.mkdir(dir_path_data)

# make the pipeline directory
dir_path_pipelines = os.path.join(output_dir, "pipelines")
os.mkdir(dir_path_pipelines)


processed_data = {}
# save processed datasets into the data directory and pipelines into pipeline directory
for key in self.data_num:
whole_data = pd.concat([self.data_num[key], self.data_cat[key]], axis=1)
whole_data = whole_data[self.column_order]
whole_data.to_csv(os.path.join(dir_path_data, key), index=True)

processed_data[key] = whole_data

pipeline_path = os.path.join(dir_path_pipelines, "Pipeline_" + key)
pipeline_path = pipeline_path.rpartition('.')[0]
self.pipe[key].save(path = pipeline_path)

logger.info(self.pipe[key].get_params())

# save pipeline configuration into json
with open(os.path.join(dir_path_pipelines, "Pipeline_config.json"), "w") as json_file:
transformer_config = random.choice(list(self.pipe.values())).get_params()
json.dump({key: str(value) for key, value in transformer_config.items()}, json_file, indent=4)

return processed_data


def __get_root_dir(self, current_dir=None):
"""
Get the root directory of the project by looking for a specific directory
(e.g., '.github') that indicates the project root.

Parameters
----------
current_dir : str, optional
The starting directory to search from. If None, uses the current working directory.

Returns
-------
str
The root directory of the project.
"""
if current_dir is None:
current_dir = os.getcwd()

unique_dir = '.github' # Directory that uniquely identifies the root

while current_dir != os.path.dirname(current_dir):
if os.path.isdir(os.path.join(current_dir, unique_dir)):
return current_dir
current_dir = os.path.dirname(current_dir)

raise FileNotFoundError(f"Project root directory not found. '{unique_dir}' directory missing in path.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pydantic import BaseModel, Field, validator, confloat, field_validator
from typing import List, Optional
from datetime import datetime
import pandas as pd

class NumericColumns(BaseModel):
"""
Pydantic model for all numeric columns in the interim data
"""
bgl: float
dose_units: float
food_g: float
food_glycemic_index: float
food_g_keep: float

@field_validator('food_g')
def validate_food_g(cls, v):
if v < 0:
raise ValueError('food_g must be non-negative')
return v


class CategoricalColumns(BaseModel):
"""
Pydantic model for all categorical columns in the interim data
"""
date: datetime
affects_fob: object
day_start_shift: object
msg_type: object
affects_iob: object


class CosineTransformed(BaseModel):
"""
Pydantic model for numerical data transformed by CosineTransformer
"""
bgl: float
dose_units: float
food_g: float
food_glycemic_index: float
food_g_keep: float
date: object
affects_fob: object
day_start_shift: object
msg_type: object
affects_iob: object


@field_validator('bgl', 'dose_units', 'food_g', 'food_glycemic_index', 'food_g_keep')
def validate_food_g(cls, v):
if(v < -1 or v > 1):
raise ValueError('Column value should be between -1 and 1')
return v

Loading
Loading