Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Feat/one hot encoding" #37

Merged
merged 1 commit into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 14 additions & 204 deletions nmrcraft/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@

import itertools

import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
LabelBinarizer,
LabelEncoder,
OneHotEncoder,
StandardScaler,
)
from sklearn.preprocessing import LabelEncoder, StandardScaler

from nmrcraft.utils.set_seed import set_seed

Expand All @@ -25,13 +19,6 @@ def __init__(self, t):
super().__init__(f"Invalid target '{t}'")


class InvalidTargetTypeError(ValueError):
"""Exception raised when the specified target type is not valid."""

def __init__(self, t):
super().__init__(f"Invalid target Type '{t}'")


def filename_to_ligands(dataset: pd.DataFrame):
"""
Extract ligands from the filename and add as columns to the dataset.
Expand Down Expand Up @@ -98,58 +85,13 @@ def get_target_columns(target_columns: str):
return targets_transformed


def get_structural_feature_columns(target_columns: list):
TARGET_TYPES = [
"metal",
"X1_ligand",
"X2_ligand",
"X3_ligand",
"X4_ligand",
"L_ligand",
"E_ligand",
]

# Get the features as the not targets
features = [x for x in TARGET_TYPES if x not in target_columns]

return features


def get_target_labels(target_columns: str, dataset: pd.DataFrame):
# Get unique values for each column
unique_values = [list(set(dataset[col])) for col in target_columns]
# Convert the list of sets to a list of lists
return unique_values


def target_label_readabilitizer(readable_labels):
"""
function takes in the classes from the binarzier and turns them into something human usable.
"""
# Trun that class_ into list
human_readable_label_list = list(itertools.chain(*readable_labels))
# Handle Binarized metal stuff and make the two columns become a single one
for i in enumerate(human_readable_label_list):
if (
human_readable_label_list[i[0]] == "Mo"
and human_readable_label_list[i[0] + 1] == "W"
) or (
human_readable_label_list[i[0]] == "W"
and human_readable_label_list[i[0] + 1] == "Mo"
):
human_readable_label_list[i[0]] = "Mo W"
human_readable_label_list.pop(i[0] + 1)

return human_readable_label_list


def target_label_readabilitizer_categorical(target_labels):
good_labels = []
for label_array in target_labels:
good_labels.append(list(label_array))
return good_labels


# def get_target_labels(target_columns: str, dataset: pd.DataFrame):
# # Get unique values for each column
# unique_values = [set(dataset[col]) for col in target_columns]
Expand All @@ -165,7 +107,6 @@ def __init__(
data_files="all_no_nan.csv",
feature_columns=None,
target_columns="metal",
target_type="one-hot", # can be "categorical" or "one-hot"
test_size=0.3,
random_state=42,
dataset_size=0.01,
Expand All @@ -175,22 +116,14 @@ def __init__(
self.test_size = test_size
self.random_state = random_state
self.dataset_size = dataset_size
self.target_type = target_type
self.dataset = load_dataset_from_hf()

def load_data(self):
self.dataset = filename_to_ligands(
self.dataset
) # Assuming filename_to_ligands is defined elsewhere
self.dataset = self.dataset.sample(frac=self.dataset_size)
if self.target_type == "categorical":
return self.split_and_preprocess_categorical()
elif (
self.target_type == "one-hot"
): # Target is binarized and Features are one hot
return self.split_and_preprocess_one_hot()
else:
raise InvalidTargetTypeError()
return self.split_and_preprocess()

def preprocess_features(self, X):
"""
Expand All @@ -200,165 +133,42 @@ def preprocess_features(self, X):
X_scaled = scaler.fit_transform(X)
return X_scaled, scaler

def split_and_preprocess_categorical(self):
def split_and_preprocess(self):
"""
Split data into training and test sets, then apply normalization.
Ensures that the test data does not leak into training data preprocessing.
"""
# Get NMR and structural Features and combine
X_NMR = self.dataset[self.feature_columns].to_numpy()
X_Structural_Features_Columns = get_structural_feature_columns(
target_columns=self.target_columns
)
X_Structural_Features = self.dataset[
X_Structural_Features_Columns
].to_numpy()
X_Structural_Features = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*X_Structural_Features)))
]
self.feature_unique_labels = get_target_labels(
dataset=self.dataset, target_columns=X_Structural_Features_Columns
)
xs = []
for i in range(len(self.feature_unique_labels)):
tmp_encoder = LabelEncoder()
tmp_encoder.fit(self.feature_unique_labels[i])
xs.append(tmp_encoder.transform(X_Structural_Features[i]))
X_Structural_Features = list(zip(*xs))

# Get the targets, rotate, apply encoding, rotate back
X = self.dataset[self.feature_columns].to_numpy()
target_unique_labels = get_target_labels(
target_columns=self.target_columns, dataset=self.dataset
)

# Get the targets, rotate, apply encoding, rotate back
y_labels_rotated = self.dataset[self.target_columns].to_numpy()
y_labels = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*y_labels_rotated)))
]
self.target_unique_labels = target_unique_labels
ys = []
readable_labels = []
for i in range(len(target_unique_labels)):
tmp_encoder = LabelEncoder()
tmp_encoder.fit(target_unique_labels[i])
ys.append(tmp_encoder.transform(y_labels[i]))
readable_labels.append(tmp_encoder.classes_)
y = list(zip(*ys))
(
X_NMR_train,
X_NMR_test,
X_train_structural,
X_test_structural,
y_train,
y_test,
) = train_test_split(
X_NMR,
X_Structural_Features,
y,
test_size=self.test_size,
random_state=self.random_state,
)
# Make targets 1D if only one is targeted
if len(y[0]) == 1:
y_train = list(itertools.chain(*y_train))
y_test = list(itertools.chain(*y_test))

# Normalize features with no leakage from test set
X_train_NMR_scaled, scaler = self.preprocess_features(X_NMR_train)
X_test_NMR_scaled = scaler.transform(
X_NMR_test
) # Apply the same transformation to test set
X_train_scaled = np.concatenate(
[X_train_NMR_scaled, X_train_structural], axis=1
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, random_state=self.random_state
)
X_test_scaled = np.concatenate(
[X_test_NMR_scaled, X_test_structural], axis=1
)

# Get the target labels going
y_label = target_label_readabilitizer_categorical(readable_labels)
return X_train_scaled, X_test_scaled, y_train, y_test, y_label

def split_and_preprocess_one_hot(self):
"""
Split data into training and test sets, then apply normalization.
Ensures that the test data does not leak into training data preprocessing.
"""
target_unique_labels = get_target_labels(
target_columns=self.target_columns, dataset=self.dataset
)

# Get the Targets, rotate, apply binarization, funze into a single array
y_labels_rotated = self.dataset[self.target_columns].to_numpy()
y_labels = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*y_labels_rotated)))
]
self.target_unique_labels = target_unique_labels
ys = []
readable_labels = []
for i in range(len(target_unique_labels)):
LBiner = LabelBinarizer()
ys.append(LBiner.fit_transform(y_labels[i]))
readable_labels.append(LBiner.classes_)
y = np.concatenate(list(ys), axis=1)

# Get NMR and structural Features, one-hot-encode and combine
X_NMR = self.dataset[self.feature_columns].to_numpy()
X_Structural_Features_Columns = get_structural_feature_columns(
self.target_columns
)
X_Structural_Features = self.dataset[
X_Structural_Features_Columns
].to_numpy()
one_hot = OneHotEncoder().fit(X_Structural_Features)
X_Structural_Features_enc = one_hot.transform(
X_Structural_Features
).toarray()
# X = [X_NMR, X_Structural_Features_enc]
# print(X)

# Split the datasets
(
X_train_NMR,
X_test_NMR,
X_train_structural,
X_test_structural,
y_train,
y_test,
) = train_test_split(
X_NMR,
X_Structural_Features_enc,
y,
test_size=self.test_size,
random_state=self.random_state,
)
# Make targets 1D if only one is targeted
if len(y[0]) == 1:
if len(y_train[0]) == 1:
# Make targets 1D if only one is targeted
y_train = list(itertools.chain(*y_train))
y_test = list(itertools.chain(*y_test))

# Normalize features with no leakage from test set
X_train_NMR_scaled, scaler = self.preprocess_features(X_train_NMR)
X_test_NMR_scaled = scaler.transform(
X_test_NMR
X_train_scaled, scaler = self.preprocess_features(X_train)
X_test_scaled = scaler.transform(
X_test
) # Apply the same transformation to test set
# Combine scaled NMR features with structural features
X_train_scaled = np.concatenate(
[X_train_NMR_scaled, X_train_structural], axis=1
)
X_test_scaled = np.concatenate(
[X_test_NMR_scaled, X_test_structural], axis=1
)

# Creates the labels that can be used to identify the targets in the binaized y-array
good_target_labels = target_label_readabilitizer(readable_labels)

return (
X_train_scaled,
X_test_scaled,
y_train,
y_test,
good_target_labels,
)
return X_train_scaled, X_test_scaled, y_train, y_test
2 changes: 1 addition & 1 deletion scripts/training/train_metal.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(dataset_size, target, model_name):
)

# Load and preprocess data
X_train, X_test, y_train, y_test, y_labels = data_loader.load_data()
X_train, X_test, y_train, y_test = data_loader.load_data()

tuner = HyperparameterTuner(model_name, config)
best_params, _ = tuner.tune(X_train, y_train, X_test, y_test)
Expand Down
Loading