From 627d5d1d398a279c1e04579b696adda79893bf03 Mon Sep 17 00:00:00 2001 From: kobina Date: Sun, 4 Aug 2024 23:27:20 +0000 Subject: [PATCH] Playing with the backfilling script until the github action works: Trial #5 --- .gitignore | 3 ++- src/feature_pipeline/data_extraction.py | 9 +-------- src/feature_pipeline/station_indexing.py | 2 +- .../backfill_feature_store.py | 11 ++++------- src/inference_pipeline/inference.py | 5 ----- src/training_pipeline/training.py | 19 ++++++++----------- 6 files changed, 16 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index bdb6ccc..fb2d709 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ keyfile /.idea /data /models -**/__pycache__ +/.cometml-runs +**/__pycache__ \ No newline at end of file diff --git a/src/feature_pipeline/data_extraction.py b/src/feature_pipeline/data_extraction.py index e38a61c..09bce32 100755 --- a/src/feature_pipeline/data_extraction.py +++ b/src/feature_pipeline/data_extraction.py @@ -13,21 +13,14 @@ 2021 would perform, so I've included code that would allow for that data to be downloaded """ -# Utilities import os import requests -from zipfile import ZipFile - -# Data Manipulation and Access import pandas as pd -# Logging from loguru import logger - -# Dates +from zipfile import ZipFile from datetime import datetime as dt -# Custom Code from src.setup.paths import RAW_DATA_DIR, make_fundamental_paths diff --git a/src/feature_pipeline/station_indexing.py b/src/feature_pipeline/station_indexing.py index facaa2b..8db27d3 100755 --- a/src/feature_pipeline/station_indexing.py +++ b/src/feature_pipeline/station_indexing.py @@ -215,7 +215,7 @@ def find_rows_with_known_ids_and_names(self, save: bool = True) -> dict[str, tup for row in tqdm( iterable=range(self.data.shape[0]), - desc="Looking for any row that has either a missing station ID OR a missing station name." + desc="Looking for rows that have either a missing station ID OR a missing station name." ): latitude = self.data.iloc[row, self.latitudes_index] longitude = self.data.iloc[row, self.longitudes_index] diff --git a/src/inference_pipeline/backfill_feature_store.py b/src/inference_pipeline/backfill_feature_store.py index 3742419..45e196c 100755 --- a/src/inference_pipeline/backfill_feature_store.py +++ b/src/inference_pipeline/backfill_feature_store.py @@ -1,3 +1,4 @@ +import json import pandas as pd from pathlib import Path from datetime import datetime @@ -110,15 +111,11 @@ def backfill_predictions(self, target_date: datetime, model_name: str = "lightgb geocode=False ) - print(engineered_features.shape) - breakpoint() - try: - engineered_features = engineered_features.drop("trips_next_hour", axis=1) - logger.success("Dropped target column") + engineered_features = engineered_features.drop(["trips_next_hour", f"{scenario}_hour"], axis=1) except Exception as error: - logger.error(error) - + logger.error(error) + predictions_df: pd.DataFrame = inferrer.get_model_predictions(model=model, features=engineered_features) predictions_feature_group: FeatureGroup = self.api.get_or_create_feature_group( diff --git a/src/inference_pipeline/inference.py b/src/inference_pipeline/inference.py index 37f6c2f..20d9019 100755 --- a/src/inference_pipeline/inference.py +++ b/src/inference_pipeline/inference.py @@ -176,11 +176,6 @@ def load_predictions_from_store( end_time=to_hour + timedelta(days=1) ) - print( - f" There are {len(predictions_df[f"{self.scenario}_station_id"])} station ids from the feature view" - ) - breakpoint() - predictions_df[f"{self.scenario}_hour"] = pd.to_datetime(predictions_df[f"{self.scenario}_hour"], utc=True) return predictions_df.sort_values( diff --git a/src/training_pipeline/training.py b/src/training_pipeline/training.py index 74d487d..ee9ff54 100755 --- a/src/training_pipeline/training.py +++ b/src/training_pipeline/training.py @@ -92,10 +92,9 @@ def train(self, model_name: str) -> float: project_name=config.comet_project_name ) - experiment.add_tags(tags=[model_name, self.scenario]) - if not self.tune_hyperparameters: - experiment.set_name(name=f"{model_name.title()}(not tuned) model for the {self.scenario}s of trips") + experiment.set_name(name=f"{model_name.title()}(Untuned) model for the {self.scenario}s of trips") + logger.info("Using the default hyperparameters") if model_name == "base": pipeline = make_pipeline(model_fn(scenario=self.scenario)) @@ -104,7 +103,6 @@ def train(self, model_name: str) -> float: pipeline = make_pipeline(model_fn) else: pipeline = make_pipeline(model_fn()) - else: experiment.set_name(name=f"{model_name.title()}(Tuned) model for the {self.scenario}s of trips") logger.info(f"Tuning hyperparameters of the {model_name} model. Have a snack...") @@ -118,6 +116,7 @@ def train(self, model_name: str) -> float: ) logger.success(f"Best model hyperparameters {best_model_hyperparameters}") + pipeline = make_pipeline( model_fn(**best_model_hyperparameters) ) @@ -141,16 +140,13 @@ def save_model_locally(self, model_fn: Pipeline, model_name: str): model_name (str): the name of the model to be saved """ model_file_name = f"{model_name.title()} ({self.tuned_or_not} for {self.scenario}s).pkl" + with open(LOCAL_SAVE_DIR/model_file_name, mode="wb") as file: pickle.dump(obj=model_fn, file=file) + logger.success("Saved model to disk") - def train_and_register_models( - self, - model_names: list[str], - version: str, - status: str - ) -> None: + def train_and_register_models(self, model_names: list[str], version: str, status: str) -> None: """ Train the named models, identify the best performer (on the test data) and register it to the CometML model registry. @@ -160,9 +156,9 @@ def train_and_register_models( version: the version of the registered model on CometML. status: the registered status of the model on CometML. """ + models_and_errors = {} assert status.lower() in ["staging", "production"], 'The status must be either "staging" or "production"' - models_and_errors = {} for model_name in model_names: test_error = self.train(model_name=model_name) models_and_errors[model_name] = test_error @@ -170,6 +166,7 @@ def train_and_register_models( test_errors = models_and_errors.values() for model_name in model_names: if models_and_errors[model_name] == min(test_errors): + logger.info(f"The best performing model is {model_name} -> Pushing it to the CometML model registry") model = load_local_model(