Skip to content

Commit

Permalink
Revert "Revert "Feat/one hot encoding" (#37)"
Browse files Browse the repository at this point in the history
This reverts commit ed26a60.
  • Loading branch information
mlederbauer authored May 7, 2024
1 parent eed1a8a commit 0a005d9
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 15 deletions.
218 changes: 204 additions & 14 deletions nmrcraft/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

import itertools

import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.preprocessing import (
LabelBinarizer,
LabelEncoder,
OneHotEncoder,
StandardScaler,
)

from nmrcraft.utils.set_seed import set_seed

Expand All @@ -19,6 +25,13 @@ def __init__(self, t):
super().__init__(f"Invalid target '{t}'")


class InvalidTargetTypeError(ValueError):
"""Exception raised when the specified target type is not valid."""

def __init__(self, t):
super().__init__(f"Invalid target Type '{t}'")


def filename_to_ligands(dataset: pd.DataFrame):
"""
Extract ligands from the filename and add as columns to the dataset.
Expand Down Expand Up @@ -85,13 +98,58 @@ def get_target_columns(target_columns: str):
return targets_transformed


def get_structural_feature_columns(target_columns: list):
TARGET_TYPES = [
"metal",
"X1_ligand",
"X2_ligand",
"X3_ligand",
"X4_ligand",
"L_ligand",
"E_ligand",
]

# Get the features as the not targets
features = [x for x in TARGET_TYPES if x not in target_columns]

return features


def get_target_labels(target_columns: str, dataset: pd.DataFrame):
# Get unique values for each column
unique_values = [list(set(dataset[col])) for col in target_columns]
# Convert the list of sets to a list of lists
return unique_values


def target_label_readabilitizer(readable_labels):
"""
function takes in the classes from the binarzier and turns them into something human usable.
"""
# Trun that class_ into list
human_readable_label_list = list(itertools.chain(*readable_labels))
# Handle Binarized metal stuff and make the two columns become a single one
for i in enumerate(human_readable_label_list):
if (
human_readable_label_list[i[0]] == "Mo"
and human_readable_label_list[i[0] + 1] == "W"
) or (
human_readable_label_list[i[0]] == "W"
and human_readable_label_list[i[0] + 1] == "Mo"
):
human_readable_label_list[i[0]] = "Mo W"
human_readable_label_list.pop(i[0] + 1)

return human_readable_label_list


def target_label_readabilitizer_categorical(target_labels):
good_labels = []
for label_array in target_labels:
good_labels.append(list(label_array))
return good_labels


# def get_target_labels(target_columns: str, dataset: pd.DataFrame):
# # Get unique values for each column
# unique_values = [set(dataset[col]) for col in target_columns]
Expand All @@ -107,6 +165,7 @@ def __init__(
data_files="all_no_nan.csv",
feature_columns=None,
target_columns="metal",
target_type="one-hot", # can be "categorical" or "one-hot"
test_size=0.3,
random_state=42,
dataset_size=0.01,
Expand All @@ -116,14 +175,22 @@ def __init__(
self.test_size = test_size
self.random_state = random_state
self.dataset_size = dataset_size
self.target_type = target_type
self.dataset = load_dataset_from_hf()

def load_data(self):
self.dataset = filename_to_ligands(
self.dataset
) # Assuming filename_to_ligands is defined elsewhere
self.dataset = self.dataset.sample(frac=self.dataset_size)
return self.split_and_preprocess()
if self.target_type == "categorical":
return self.split_and_preprocess_categorical()
elif (
self.target_type == "one-hot"
): # Target is binarized and Features are one hot
return self.split_and_preprocess_one_hot()
else:
raise InvalidTargetTypeError()

def preprocess_features(self, X):
"""
Expand All @@ -133,42 +200,165 @@ def preprocess_features(self, X):
X_scaled = scaler.fit_transform(X)
return X_scaled, scaler

def split_and_preprocess(self):
def split_and_preprocess_categorical(self):
"""
Split data into training and test sets, then apply normalization.
Ensures that the test data does not leak into training data preprocessing.
"""
X = self.dataset[self.feature_columns].to_numpy()
target_unique_labels = get_target_labels(
target_columns=self.target_columns, dataset=self.dataset
# Get NMR and structural Features and combine
X_NMR = self.dataset[self.feature_columns].to_numpy()
X_Structural_Features_Columns = get_structural_feature_columns(
target_columns=self.target_columns
)
X_Structural_Features = self.dataset[
X_Structural_Features_Columns
].to_numpy()
X_Structural_Features = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*X_Structural_Features)))
]
self.feature_unique_labels = get_target_labels(
dataset=self.dataset, target_columns=X_Structural_Features_Columns
)
xs = []
for i in range(len(self.feature_unique_labels)):
tmp_encoder = LabelEncoder()
tmp_encoder.fit(self.feature_unique_labels[i])
xs.append(tmp_encoder.transform(X_Structural_Features[i]))
X_Structural_Features = list(zip(*xs))

# Get the targets, rotate, apply encoding, rotate back
target_unique_labels = get_target_labels(
target_columns=self.target_columns, dataset=self.dataset
)
y_labels_rotated = self.dataset[self.target_columns].to_numpy()
y_labels = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*y_labels_rotated)))
]
self.target_unique_labels = target_unique_labels
ys = []
readable_labels = []
for i in range(len(target_unique_labels)):
tmp_encoder = LabelEncoder()
tmp_encoder.fit(target_unique_labels[i])
ys.append(tmp_encoder.transform(y_labels[i]))
readable_labels.append(tmp_encoder.classes_)
y = list(zip(*ys))
(
X_NMR_train,
X_NMR_test,
X_train_structural,
X_test_structural,
y_train,
y_test,
) = train_test_split(
X_NMR,
X_Structural_Features,
y,
test_size=self.test_size,
random_state=self.random_state,
)
# Make targets 1D if only one is targeted
if len(y[0]) == 1:
y_train = list(itertools.chain(*y_train))
y_test = list(itertools.chain(*y_test))

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, random_state=self.random_state
# Normalize features with no leakage from test set
X_train_NMR_scaled, scaler = self.preprocess_features(X_NMR_train)
X_test_NMR_scaled = scaler.transform(
X_NMR_test
) # Apply the same transformation to test set
X_train_scaled = np.concatenate(
[X_train_NMR_scaled, X_train_structural], axis=1
)
if len(y_train[0]) == 1:
# Make targets 1D if only one is targeted
X_test_scaled = np.concatenate(
[X_test_NMR_scaled, X_test_structural], axis=1
)

# Get the target labels going
y_label = target_label_readabilitizer_categorical(readable_labels)
return X_train_scaled, X_test_scaled, y_train, y_test, y_label

def split_and_preprocess_one_hot(self):
"""
Split data into training and test sets, then apply normalization.
Ensures that the test data does not leak into training data preprocessing.
"""
target_unique_labels = get_target_labels(
target_columns=self.target_columns, dataset=self.dataset
)

# Get the Targets, rotate, apply binarization, funze into a single array
y_labels_rotated = self.dataset[self.target_columns].to_numpy()
y_labels = [
list(x) if i == 0 else x
for i, x in enumerate(map(list, zip(*y_labels_rotated)))
]
self.target_unique_labels = target_unique_labels
ys = []
readable_labels = []
for i in range(len(target_unique_labels)):
LBiner = LabelBinarizer()
ys.append(LBiner.fit_transform(y_labels[i]))
readable_labels.append(LBiner.classes_)
y = np.concatenate(list(ys), axis=1)

# Get NMR and structural Features, one-hot-encode and combine
X_NMR = self.dataset[self.feature_columns].to_numpy()
X_Structural_Features_Columns = get_structural_feature_columns(
self.target_columns
)
X_Structural_Features = self.dataset[
X_Structural_Features_Columns
].to_numpy()
one_hot = OneHotEncoder().fit(X_Structural_Features)
X_Structural_Features_enc = one_hot.transform(
X_Structural_Features
).toarray()
# X = [X_NMR, X_Structural_Features_enc]
# print(X)

# Split the datasets
(
X_train_NMR,
X_test_NMR,
X_train_structural,
X_test_structural,
y_train,
y_test,
) = train_test_split(
X_NMR,
X_Structural_Features_enc,
y,
test_size=self.test_size,
random_state=self.random_state,
)
# Make targets 1D if only one is targeted
if len(y[0]) == 1:
y_train = list(itertools.chain(*y_train))
y_test = list(itertools.chain(*y_test))

# Normalize features with no leakage from test set
X_train_scaled, scaler = self.preprocess_features(X_train)
X_test_scaled = scaler.transform(
X_test
X_train_NMR_scaled, scaler = self.preprocess_features(X_train_NMR)
X_test_NMR_scaled = scaler.transform(
X_test_NMR
) # Apply the same transformation to test set
# Combine scaled NMR features with structural features
X_train_scaled = np.concatenate(
[X_train_NMR_scaled, X_train_structural], axis=1
)
X_test_scaled = np.concatenate(
[X_test_NMR_scaled, X_test_structural], axis=1
)

# Creates the labels that can be used to identify the targets in the binaized y-array
good_target_labels = target_label_readabilitizer(readable_labels)

return X_train_scaled, X_test_scaled, y_train, y_test
return (
X_train_scaled,
X_test_scaled,
y_train,
y_test,
good_target_labels,
)
2 changes: 1 addition & 1 deletion scripts/training/train_metal.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(dataset_size, target, model_name):
)

# Load and preprocess data
X_train, X_test, y_train, y_test = data_loader.load_data()
X_train, X_test, y_train, y_test, y_labels = data_loader.load_data()

tuner = HyperparameterTuner(model_name, config, max_evals=1)
best_params, _ = tuner.tune(X_train, y_train, X_test, y_test)
Expand Down

0 comments on commit 0a005d9

Please sign in to comment.