From af5152fe511716b4754500f5907b851ec234db06 Mon Sep 17 00:00:00 2001 From: Yimeng Xie Date: Fri, 15 Nov 2024 16:14:03 -0500 Subject: [PATCH 1/3] Added transformer file with some methods --- .../transformations/__init__.py | 0 .../transformations/data_transformations.py | 31 +++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/__init__.py create mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/__init__.py b/0_meal_identification/meal_identification/meal_identification/transformations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py b/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py new file mode 100644 index 0000000..bcc11ec --- /dev/null +++ b/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py @@ -0,0 +1,31 @@ +from sktime.transformations.series.exponent import ExponentTransformer +from sktime.transformations.compose import Id +from sktime.transformations.compose import TransformerPipeline +from sklearn.preprocessing import StandardScaler +import pandas as pd + +def run_pipeline(pipeline, data): + ''' + run a transformer pipeline given certain data + questions: does the training script provide the pipeline, or should we build this in transformation + do we want to work with only Series->Series or also Panel->Panel + todo: add flag to cache transformed data + add flag to run as list of Series or as Panel + log transformations applied + testing + ''' + transformed_data = [] + for df in data: + transformed = pipeline.fit_transform(df) + transformed_data.append(transformed) + return transformed_data + +def create_pipeline(transformers): + ''' + creates a pipeline from a list of transformers (apply all in series)? + questions: how should FeatureUnions be handled? + ''' + pipeline = TransformerPipeline(steps=transformers) + return pipeline + + From e45f9854b7888e90d117f8697497cf4bfd338f49 Mon Sep 17 00:00:00 2001 From: Anton Ryavkin Date: Sun, 17 Nov 2024 12:25:26 -0500 Subject: [PATCH 2/3] Created first rough draft of the transformations script. Contains a class for PipelineGenerator and demonstration of use. --- .../transformations/data_transformations.py | 31 --- .../transformations/pipeline_generator.py | 241 ++++++++++++++++++ .../transformations/sample_of_use.py | 24 ++ 3 files changed, 265 insertions(+), 31 deletions(-) delete mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py create mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py create mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py b/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py deleted file mode 100644 index bcc11ec..0000000 --- a/0_meal_identification/meal_identification/meal_identification/transformations/data_transformations.py +++ /dev/null @@ -1,31 +0,0 @@ -from sktime.transformations.series.exponent import ExponentTransformer -from sktime.transformations.compose import Id -from sktime.transformations.compose import TransformerPipeline -from sklearn.preprocessing import StandardScaler -import pandas as pd - -def run_pipeline(pipeline, data): - ''' - run a transformer pipeline given certain data - questions: does the training script provide the pipeline, or should we build this in transformation - do we want to work with only Series->Series or also Panel->Panel - todo: add flag to cache transformed data - add flag to run as list of Series or as Panel - log transformations applied - testing - ''' - transformed_data = [] - for df in data: - transformed = pipeline.fit_transform(df) - transformed_data.append(transformed) - return transformed_data - -def create_pipeline(transformers): - ''' - creates a pipeline from a list of transformers (apply all in series)? - questions: how should FeatureUnions be handled? - ''' - pipeline = TransformerPipeline(steps=transformers) - return pipeline - - diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py b/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py new file mode 100644 index 0000000..20590e1 --- /dev/null +++ b/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py @@ -0,0 +1,241 @@ +from sktime.transformations.compose import TransformerPipeline + +from loguru import logger + +import random +import re +import os +import json +import pandas as pd + +class PipelineGenerator(): + + """ + Class to generate sktime transformer pipeline: + Data is saved in 0_meal_identification/meal_identification/data/processed in the format: + run_# - number of run + run_#/data - the output data from that run + run_#/pipelines - pipelines saved from the run + + to-do: + load data from a dataframe (in addition to files) + load transformers from json config file rather than .zip pipeline + add getters/setters and other helper methods + add functionality to delete previous runs + clean up logic & good coding practices + implement proper exception handling + test with various transformers + write documentation on how to use + """ + + def __init__(self, + output_dir = "0_meal_identification/meal_identification/data/processed", + input_dir = "0_meal_identification/meal_identification/data/interim"): + + self.data_cat = {} + self.data_num = {} + self.pipe = {} + self.column_order = None #order of columns for data for consistency + + #set up paths for data directories + self.processed_dir_path = os.path.join(self.__get_root_dir(), output_dir) + self.interim_dir_path = os.path.join(self.__get_root_dir(), input_dir) + self + + def load_data(self, raw_files): + ''' + Loads data from files in the input_dir directory and splits it into numeric/categorical + + Parameters + ---------- + raw_files : list of str + list of datasets in input_dir to be used with the pipeline. Format: "filename.csv" + + Returns + ------- + ''' + + data = {} + + #read the data from interim directory + for file in raw_files: + file_path = os.path.join(self.interim_dir_path, file) + data[file] = pd.read_csv(file_path, parse_dates=['date']) + + #separate data into numerical/categorical for sktime transformers + for key in data: + self.column_order = list(data[key].columns) + self.data_num[key] = data[key]._get_numeric_data() + self.data_cat[key] = data[key][list(set(data[key].columns) - set(self.data_num[key].columns))] + + def generate_pipeline(self, transformers = None, run = None): + ''' + Creates a pipeline either based on provided list of transformers or copies one from a previous run + + Parameters + ---------- + transformers : optional, list of sktime transformers + Transformers to be added into pipeline, in the provided order + run: optional, int + Number of the run from which transformer params should be taken + Returns + ------- + ''' + + if not transformers and not run: + raise TypeError("List of transformers or run number was not provided") + + # load pipeline from past runs + if run: + pipeline_path = os.path.join(self.processed_dir_path, f"run_{run}", "pipelines") + + for file in os.scandir(pipeline_path): # this is junky, refractor + pipe = TransformerPipeline.load_from_path(os.path.join(pipeline_path, file)) + break + + # load pipeline from parameters + else: + pipe = TransformerPipeline(steps = transformers) + + # clone pipeline to fit to different datasets + for key in self.data_num: + self.pipe[key] = pipe.clone() + + def fit(self): + ''' + Fits the pipeline to the loaded data + + Parameters + ---------- + + Returns + ------- + ''' + for key in self.pipe: + + #fit if pipeline was not fitted before + try: + self.pipe[key].check_is_fitted() + except: + self.pipe[key].fit(self.data_num[key]) + + def transform(self): + ''' + Transforms the data from the pipelines + + Parameters + ---------- + + Returns + ------- + ''' + + for key in self.pipe: + self.data_num[key] = self.pipe[key].transform(self.data_num[key]) + + def fit_transform(self): + ''' + Applies fit and transform in sequence + + Parameters + ---------- + + Returns + ------- + ''' + + self.fit() + self.transform() + + def save_output( + self, + output_dir = "0_meal_identification/meal_identification/data/processed" + ): + ''' + Applies fit and transform in sequence + + Parameters + ---------- + output_dir: str + The directory in which pipelines and transformed data should be stored + Returns + ------- + processed_data: dictionary of pandas DataFrames + Data supplied to the PipelineGenerator after transformations + ''' + + output_dir = self.processed_dir_path + + # figure out which run is the last one + runs = [] + for i in os.listdir(output_dir): + run_num = re.search("[0-9]+", i) + if run_num: + runs.append(int(run_num.group())) + + if not runs: + runs = [0] + new_run = max(runs)+1 + + # make the run directory + output_dir = os.path.join(output_dir, f"run_{new_run}") + os.mkdir(output_dir) + + # make the data directory + dir_path_data = os.path.join(output_dir, "data") + os.mkdir(dir_path_data) + + # make the pipeline directory + dir_path_pipelines = os.path.join(output_dir, "pipelines") + os.mkdir(dir_path_pipelines) + + + processed_data = {} + # save processed datasets into the data directory and pipelines into pipeline directory + for key in self.data_num: + whole_data = pd.concat([self.data_num[key], self.data_cat[key]], axis=1) + whole_data = whole_data[self.column_order] + whole_data.to_csv(os.path.join(dir_path_data, key), index=True) + + processed_data[key] = whole_data + + pipeline_path = os.path.join(dir_path_pipelines, "Pipeline_" + key) + pipeline_path = pipeline_path.rpartition('.')[0] + self.pipe[key].save(path = pipeline_path) + + logger.info(self.pipe[key].get_params()) + + # save pipeline configuration into json + with open(os.path.join(dir_path_pipelines, "Pipeline_config.json"), "w") as json_file: + transformer_config = random.choice(list(self.pipe.values())).get_params() + json.dump({key: str(value) for key, value in transformer_config.items()}, json_file, indent=4) + + return processed_data + + + def __get_root_dir(self, current_dir=None): + """ + Get the root directory of the project by looking for a specific directory + (e.g., '.github') that indicates the project root. + + Parameters + ---------- + current_dir : str, optional + The starting directory to search from. If None, uses the current working directory. + + Returns + ------- + str + The root directory of the project. + """ + if current_dir is None: + current_dir = os.getcwd() + + unique_dir = '.github' # Directory that uniquely identifies the root + + while current_dir != os.path.dirname(current_dir): + if os.path.isdir(os.path.join(current_dir, unique_dir)): + return current_dir + current_dir = os.path.dirname(current_dir) + + raise FileNotFoundError(f"Project root directory not found. '{unique_dir}' directory missing in path.") \ No newline at end of file diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py b/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py new file mode 100644 index 0000000..2897b36 --- /dev/null +++ b/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py @@ -0,0 +1,24 @@ +from pipeline_generator import PipelineGenerator +from sktime.transformations.series.exponent import ExponentTransformer +from sktime.transformations.series.cos import CosineTransformer + +from sktime.transformations.series.impute import Imputer + +from loguru import logger + +gen = PipelineGenerator() + +gen.load_data(["2024-11-14_500030__timeInter5mins_dayStart4hrs_minCarb5g_3hrMealW.csv", + "2024-11-14_500030__timeInter5mins_dayStart4hrs_minCarb10g_3hrMealW.csv"]) + +gen.generate_pipeline([ + CosineTransformer(), + Imputer(method = "constant", value = 0) +]) + +#gen.generate_pipeline(run = 1) #alternative method, get pipeline from a run + +gen.fit_transform() + +processed_data = gen.save_output() +print(processed_data) \ No newline at end of file From 5a27632ea24572c40ce5410c266aedcad8bc1800 Mon Sep 17 00:00:00 2001 From: Yimeng Xie Date: Wed, 20 Nov 2024 19:43:41 -0500 Subject: [PATCH 3/3] Add unit tests --- .../tests/data_transform/test_transform.py | 98 +++++++++++++++++++ .../transformations/pipeline_generator.py | 8 +- .../transformations/pydantic_test_models.py | 55 +++++++++++ .../transformations/sample_of_use.py | 6 +- 4 files changed, 160 insertions(+), 7 deletions(-) create mode 100644 0_meal_identification/meal_identification/meal_identification/tests/data_transform/test_transform.py create mode 100644 0_meal_identification/meal_identification/meal_identification/transformations/pydantic_test_models.py diff --git a/0_meal_identification/meal_identification/meal_identification/tests/data_transform/test_transform.py b/0_meal_identification/meal_identification/meal_identification/tests/data_transform/test_transform.py new file mode 100644 index 0000000..88900e5 --- /dev/null +++ b/0_meal_identification/meal_identification/meal_identification/tests/data_transform/test_transform.py @@ -0,0 +1,98 @@ +import pytest +import pandas as pd +import os +import tempfile +import shutil +import unittest +from meal_identification.transformations.pipeline_generator import PipelineGenerator +from meal_identification.transformations.pydantic_test_models import NumericColumns, CategoricalColumns, CosineTransformed +from meal_identification.datasets.pydantic_test_models import DataFrameValidator +from sktime.transformations.series.cos import CosineTransformer + +from sktime.transformations.series.impute import Imputer + +class TestTransformation(unittest.TestCase): + def setUp(self): + """Set up test environment with temporary directory and sample CSV files.""" + self.project_root = os.path.realpath(tempfile.mkdtemp()) + os.makedirs(os.path.join(self.project_root, '.github')) + + # Create interim and processed data directory + self.interim_data_dir = "0_meal_identification/meal_identification/data/interim" + self.full_interim_path = os.path.join(self.project_root, self.interim_data_dir) + os.makedirs(self.full_interim_path) + + self.processed_data_dir = "0_meal_identification/meal_identification/data/processed" + self.full_processed_path = os.path.join(self.project_root, self.processed_data_dir) + os.makedirs(self.full_processed_path) + + # Make it as a pd frame, then convert it to a csv for readability + self.sample_interim_data = pd.DataFrame({ + 'date': [ + '2024-07-01 00:00:00-04:00', + '2024-07-01 00:05:00-04:00' + ], + 'bgl': [115.0, 112.0], + 'msg_type': ['ANNOUCE_MEAL', 'DOSE_INSULIN'], + 'affects_fob': [False, None], + 'affects_iob': [None, True], + 'dose_units': [None, None], + 'food_g': [None, None], + 'food_glycemic_index': [None, None], + 'food_g_keep': [None, None], + 'day_start_shift':['2024-06-30', '2024-06-30'] + }) + + # Convert date strings to datetime objects before saving + self.sample_interim_data['date'] = pd.to_datetime(self.sample_interim_data['date']) + + # Save sample CSV files + self.filename = 'interim_data.csv' + self.file_path = os.path.join(self.full_interim_path, self.filename) + self.sample_interim_data.to_csv(self.file_path, index=False) + self.original_dir = os.getcwd() + os.chdir(self.project_root) + + + def tearDown(self): + """Clean up temporary files and directories.""" + os.chdir(self.original_dir) + shutil.rmtree(self.project_root) + + def test_load_numerical_data(self): + """ + Tests that the numerical columns of loaded data maintains valid data types using pydantic + """ + interim_file = self.filename + gen = PipelineGenerator() + gen.load_data([interim_file]) + result_df = gen.data_num[interim_file] + print(result_df.columns) + assert DataFrameValidator(NumericColumns, index_field='bgl').validate_df(result_df) + + def test_load_categorical_data(self): + """ + Tests that the categorical columns of loaded data maintains valid data types using pydantic + """ + interim_file = self.filename + gen = PipelineGenerator() + gen.load_data([interim_file]) + result_df = gen.data_cat[interim_file] + # dfs are not indexed with date col + assert DataFrameValidator(CategoricalColumns, index_field='date').validate_df(result_df, True) + + + def test_transformed(self): + """ + Tests if transformed data has numerical values between -1 and 1 + """ + interim_file = self.filename + gen = PipelineGenerator() + gen.load_data([interim_file]) + gen.generate_pipeline([ + CosineTransformer(), + Imputer(method = "constant", value = 0) + ]) + gen.fit_transform() + result_df = gen.save_output()[interim_file] + assert DataFrameValidator(CosineTransformed, index_field='date').validate_df(result_df, True) diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py b/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py index 20590e1..55e7094 100644 --- a/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py +++ b/0_meal_identification/meal_identification/meal_identification/transformations/pipeline_generator.py @@ -88,7 +88,7 @@ def generate_pipeline(self, transformers = None, run = None): # load pipeline from past runs if run: pipeline_path = os.path.join(self.processed_dir_path, f"run_{run}", "pipelines") - + # does load_from_path actually work? for file in os.scandir(pipeline_path): # this is junky, refractor pipe = TransformerPipeline.load_from_path(os.path.join(pipeline_path, file)) break @@ -149,7 +149,7 @@ def fit_transform(self): def save_output( self, - output_dir = "0_meal_identification/meal_identification/data/processed" + output_dir=None ): ''' Applies fit and transform in sequence @@ -163,8 +163,8 @@ def save_output( processed_data: dictionary of pandas DataFrames Data supplied to the PipelineGenerator after transformations ''' - - output_dir = self.processed_dir_path + if(output_dir is None): + output_dir = self.processed_dir_path # figure out which run is the last one runs = [] diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/pydantic_test_models.py b/0_meal_identification/meal_identification/meal_identification/transformations/pydantic_test_models.py new file mode 100644 index 0000000..667bc07 --- /dev/null +++ b/0_meal_identification/meal_identification/meal_identification/transformations/pydantic_test_models.py @@ -0,0 +1,55 @@ +from pydantic import BaseModel, Field, validator, confloat, field_validator +from typing import List, Optional +from datetime import datetime +import pandas as pd + +class NumericColumns(BaseModel): + """ + Pydantic model for all numeric columns in the interim data + """ + bgl: float + dose_units: float + food_g: float + food_glycemic_index: float + food_g_keep: float + + @field_validator('food_g') + def validate_food_g(cls, v): + if v < 0: + raise ValueError('food_g must be non-negative') + return v + + +class CategoricalColumns(BaseModel): + """ + Pydantic model for all categorical columns in the interim data + """ + date: datetime + affects_fob: object + day_start_shift: object + msg_type: object + affects_iob: object + + +class CosineTransformed(BaseModel): + """ + Pydantic model for numerical data transformed by CosineTransformer + """ + bgl: float + dose_units: float + food_g: float + food_glycemic_index: float + food_g_keep: float + date: object + affects_fob: object + day_start_shift: object + msg_type: object + affects_iob: object + + + @field_validator('bgl', 'dose_units', 'food_g', 'food_glycemic_index', 'food_g_keep') + def validate_food_g(cls, v): + if(v < -1 or v > 1): + raise ValueError('Column value should be between -1 and 1') + return v + \ No newline at end of file diff --git a/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py b/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py index 2897b36..3c9c2dc 100644 --- a/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py +++ b/0_meal_identification/meal_identification/meal_identification/transformations/sample_of_use.py @@ -8,8 +8,8 @@ gen = PipelineGenerator() -gen.load_data(["2024-11-14_500030__timeInter5mins_dayStart4hrs_minCarb5g_3hrMealW.csv", - "2024-11-14_500030__timeInter5mins_dayStart4hrs_minCarb10g_3hrMealW.csv"]) +gen.load_data(["2024-11-15_679372__i5mins_d4hrs_c10g_l5hrs_n4.csv", + "2024-11-15_679372__i5mins_d4hrs_c10g_l5hrs_n3.csv"]) gen.generate_pipeline([ CosineTransformer(), @@ -21,4 +21,4 @@ gen.fit_transform() processed_data = gen.save_output() -print(processed_data) \ No newline at end of file +print(processed_data["2024-11-15_679372__i5mins_d4hrs_c10g_l5hrs_n4.csv"].head()) \ No newline at end of file