Skip to content

Commit

Permalink
Playing with the backfilling script until the github action works: Tr…
Browse files Browse the repository at this point in the history
…ial #5
  • Loading branch information
kobinabrandon committed Aug 4, 2024
1 parent 47858d7 commit 627d5d1
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ keyfile
/.idea
/data
/models
**/__pycache__
/.cometml-runs
**/__pycache__
9 changes: 1 addition & 8 deletions src/feature_pipeline/data_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,14 @@
2021 would perform, so I've included code that would allow for that data to be
downloaded
"""
# Utilities
import os
import requests
from zipfile import ZipFile

# Data Manipulation and Access
import pandas as pd

# Logging
from loguru import logger

# Dates
from zipfile import ZipFile
from datetime import datetime as dt

# Custom Code
from src.setup.paths import RAW_DATA_DIR, make_fundamental_paths


Expand Down
2 changes: 1 addition & 1 deletion src/feature_pipeline/station_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def find_rows_with_known_ids_and_names(self, save: bool = True) -> dict[str, tup

for row in tqdm(
iterable=range(self.data.shape[0]),
desc="Looking for any row that has either a missing station ID OR a missing station name."
desc="Looking for rows that have either a missing station ID OR a missing station name."
):
latitude = self.data.iloc[row, self.latitudes_index]
longitude = self.data.iloc[row, self.longitudes_index]
Expand Down
11 changes: 4 additions & 7 deletions src/inference_pipeline/backfill_feature_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
Expand Down Expand Up @@ -110,15 +111,11 @@ def backfill_predictions(self, target_date: datetime, model_name: str = "lightgb
geocode=False
)

print(engineered_features.shape)
breakpoint()

try:
engineered_features = engineered_features.drop("trips_next_hour", axis=1)
logger.success("Dropped target column")
engineered_features = engineered_features.drop(["trips_next_hour", f"{scenario}_hour"], axis=1)
except Exception as error:
logger.error(error)
logger.error(error)

predictions_df: pd.DataFrame = inferrer.get_model_predictions(model=model, features=engineered_features)

predictions_feature_group: FeatureGroup = self.api.get_or_create_feature_group(
Expand Down
5 changes: 0 additions & 5 deletions src/inference_pipeline/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ def load_predictions_from_store(
end_time=to_hour + timedelta(days=1)
)

print(
f" There are {len(predictions_df[f"{self.scenario}_station_id"])} station ids from the feature view"
)
breakpoint()

predictions_df[f"{self.scenario}_hour"] = pd.to_datetime(predictions_df[f"{self.scenario}_hour"], utc=True)

return predictions_df.sort_values(
Expand Down
19 changes: 8 additions & 11 deletions src/training_pipeline/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ def train(self, model_name: str) -> float:
project_name=config.comet_project_name
)

experiment.add_tags(tags=[model_name, self.scenario])

if not self.tune_hyperparameters:
experiment.set_name(name=f"{model_name.title()}(not tuned) model for the {self.scenario}s of trips")
experiment.set_name(name=f"{model_name.title()}(Untuned) model for the {self.scenario}s of trips")

logger.info("Using the default hyperparameters")
if model_name == "base":
pipeline = make_pipeline(model_fn(scenario=self.scenario))
Expand All @@ -104,7 +103,6 @@ def train(self, model_name: str) -> float:
pipeline = make_pipeline(model_fn)
else:
pipeline = make_pipeline(model_fn())

else:
experiment.set_name(name=f"{model_name.title()}(Tuned) model for the {self.scenario}s of trips")
logger.info(f"Tuning hyperparameters of the {model_name} model. Have a snack...")
Expand All @@ -118,6 +116,7 @@ def train(self, model_name: str) -> float:
)

logger.success(f"Best model hyperparameters {best_model_hyperparameters}")

pipeline = make_pipeline(
model_fn(**best_model_hyperparameters)
)
Expand All @@ -141,16 +140,13 @@ def save_model_locally(self, model_fn: Pipeline, model_name: str):
model_name (str): the name of the model to be saved
"""
model_file_name = f"{model_name.title()} ({self.tuned_or_not} for {self.scenario}s).pkl"

with open(LOCAL_SAVE_DIR/model_file_name, mode="wb") as file:
pickle.dump(obj=model_fn, file=file)

logger.success("Saved model to disk")

def train_and_register_models(
self,
model_names: list[str],
version: str,
status: str
) -> None:
def train_and_register_models(self, model_names: list[str], version: str, status: str) -> None:
"""
Train the named models, identify the best performer (on the test data) and
register it to the CometML model registry.
Expand All @@ -160,16 +156,17 @@ def train_and_register_models(
version: the version of the registered model on CometML.
status: the registered status of the model on CometML.
"""
models_and_errors = {}
assert status.lower() in ["staging", "production"], 'The status must be either "staging" or "production"'

models_and_errors = {}
for model_name in model_names:
test_error = self.train(model_name=model_name)
models_and_errors[model_name] = test_error

test_errors = models_and_errors.values()
for model_name in model_names:
if models_and_errors[model_name] == min(test_errors):

logger.info(f"The best performing model is {model_name} -> Pushing it to the CometML model registry")

model = load_local_model(
Expand Down

0 comments on commit 627d5d1

Please sign in to comment.