diff --git a/src/abm_shape_collection/calculate_shape_stats.py b/src/abm_shape_collection/calculate_shape_stats.py index 6f3080f..27fc035 100644 --- a/src/abm_shape_collection/calculate_shape_stats.py +++ b/src/abm_shape_collection/calculate_shape_stats.py @@ -1,47 +1,99 @@ import numpy as np import pandas as pd -from scipy.stats import ks_2samp from sklearn.decomposition import PCA +from abm_shape_collection.calculate_size_stats import get_ks_statistic + def calculate_shape_stats( - pca: PCA, data: pd.DataFrame, ref_data: pd.DataFrame, components: int + pca: PCA, + data: pd.DataFrame, + ref_data: pd.DataFrame, + components: int, + include_ticks: bool = False, + include_samples: bool = False, + sample_reps: int = 1, + sample_size: int = 1, ) -> pd.DataFrame: all_stats = [] - columns = ref_data.filter(like="shcoeffs").columns - data_transform = pca.transform(data[columns].values) + columns = list(ref_data.filter(like="shcoeffs").columns) ref_data_transform = pca.transform(ref_data[columns].values) for component in range(components): - ks_stats = get_ks_statistic(data_transform[:, component], ref_data_transform[:, component]) - ks_stats.update({"FEATURE": f"PC_{component + 1}", "TICK": np.nan}) - all_stats.append(ks_stats) - - for tick, tick_data in data.groupby("TICK"): - tick_data_transform = pca.transform(tick_data[columns].values) + stats = calculate_shape_stats_for_all(pca, data, ref_data_transform, component, columns) - for component in range(components): - tick_ks_stats = get_ks_statistic( - tick_data_transform[:, component], ref_data_transform[:, component] + if include_ticks: + stats = stats + calculate_shape_stats_for_ticks( + pca, data, ref_data_transform, component, columns ) - tick_ks_stats.update( - { - "FEATURE": f"PC_{component + 1}", - "TICK": tick, - } + + if include_samples: + stats = stats + calculate_shape_stats_for_samples( + pca, data, ref_data_transform, component, columns, sample_reps, sample_size ) - all_stats.append(tick_ks_stats) - all_stats_df = pd.DataFrame(all_stats) + stats_df = pd.DataFrame(stats) + stats_df["FEATURE"] = f"PC_{component + 1}" + all_stats.append(stats_df) + + all_stats_df = pd.concat(all_stats).astype({"N": int}) return all_stats_df -def get_ks_statistic(population_a: np.ndarray, population_b: np.ndarray) -> dict: - ksresult = ks_2samp(population_a, population_b, mode="asymp") +def calculate_shape_stats_for_all( + pca: PCA, data: pd.DataFrame, ref_data: np.ndarray, component: int, columns: list[str] +) -> list[dict]: + ref_values = ref_data[:, component] + values = pca.transform(data[columns].values)[:, component] + + ks_stats = get_ks_statistic(values, ref_values) + ks_stats.update({"N": len(values)}) + + return [ks_stats] + + +def calculate_shape_stats_for_ticks( + pca: PCA, data: pd.DataFrame, ref_data: np.ndarray, component: int, columns: list[str] +) -> list[dict]: + ref_values = ref_data[:, component] + tick_stats = [] + + for tick, tick_data in data.groupby("TICK"): + tick_values = pca.transform(tick_data[columns].values)[:, component] + + tick_ks_stats = get_ks_statistic(tick_values, ref_values) + tick_ks_stats.update({"TICK": tick, "N": len(tick_values)}) + + tick_stats.append(tick_ks_stats) + + return tick_stats + + +def calculate_shape_stats_for_samples( + pca: PCA, + data: pd.DataFrame, + ref_data: np.ndarray, + component: int, + columns: list[str], + sample_reps: int, + sample_size: int, +) -> list[dict]: + ref_values = ref_data[:, component] + sample_stats = [] + + for sample in range(sample_reps): + sample_values = pca.transform( + data.sample(frac=1, random_state=sample) + .groupby("TICK") + .head(sample_size)[columns] + .values + )[:, component] + + sample_ks_stats = get_ks_statistic(sample_values, ref_values) + sample_ks_stats.update({"SAMPLE": sample, "N": len(sample_values)}) + + sample_stats.append(sample_ks_stats) - return { - "KS_STATISTIC": ksresult.statistic, - "KS_PVALUE": ksresult.pvalue, - } + return sample_stats diff --git a/src/abm_shape_collection/calculate_size_stats.py b/src/abm_shape_collection/calculate_size_stats.py index 4353121..db1bc80 100644 --- a/src/abm_shape_collection/calculate_size_stats.py +++ b/src/abm_shape_collection/calculate_size_stats.py @@ -1,36 +1,100 @@ import numpy as np import pandas as pd - -from abm_shape_collection.calculate_shape_stats import get_ks_statistic +from scipy.stats import ks_2samp def calculate_size_stats( - data: pd.DataFrame, ref_data: pd.DataFrame, regions: list[str] + data: pd.DataFrame, + ref_data: pd.DataFrame, + regions: list[str], + include_ticks: bool = False, + include_samples: bool = False, + sample_reps: int = 1, + sample_size: int = 1, ) -> pd.DataFrame: all_stats = [] - for region in regions: - for feature in ["volume", "height"]: - column_name = f"{feature}.{region}" if region != "DEFAULT" else feature - ref_values = ref_data[column_name].values - values = data[column_name].values + features = [ + f"{feature}.{region}" if region != "DEFAULT" else feature + for region in regions + for feature in ["volume", "height"] + ] + + for feature in features: + stats = calculate_size_stats_for_all(data, ref_data, feature) - ks_stats = get_ks_statistic(values, ref_values) - ks_stats.update({"FEATURE": column_name, "TICK": np.nan}) - all_stats.append(ks_stats) + if include_ticks: + stats = stats + calculate_size_stats_for_ticks(data, ref_data, feature) - for tick, tick_data in data.groupby("TICK"): - tick_values = tick_data[column_name].values + if include_samples: + stats = stats + calculate_size_stats_for_samples( + data, ref_data, feature, sample_reps, sample_size + ) - tick_ks_stats = get_ks_statistic(tick_values, ref_values) - tick_ks_stats.update( - { - "FEATURE": column_name, - "TICK": tick, - } - ) - all_stats.append(tick_ks_stats) + stats_df = pd.DataFrame(stats) + stats_df["FEATURE"] = feature + all_stats.append(stats_df) - all_stats_df = pd.DataFrame(all_stats) + all_stats_df = pd.concat(all_stats).astype({"N": int}) return all_stats_df + + +def calculate_size_stats_for_all( + data: pd.DataFrame, ref_data: pd.DataFrame, feature: str +) -> list[dict]: + ref_values = ref_data[feature].values + values = data[feature].values + + ks_stats = get_ks_statistic(values, ref_values) + ks_stats.update({"N": len(values)}) + + return [ks_stats] + + +def calculate_size_stats_for_ticks( + data: pd.DataFrame, ref_data: pd.DataFrame, feature: str +) -> list[dict]: + ref_values = ref_data[feature].values + tick_stats = [] + + for tick, tick_data in data.groupby("TICK"): + tick_values = tick_data[feature].values + + tick_ks_stats = get_ks_statistic(tick_values, ref_values) + tick_ks_stats.update({"TICK": tick, "N": len(tick_values)}) + + tick_stats.append(tick_ks_stats) + + return tick_stats + + +def calculate_size_stats_for_samples( + data: pd.DataFrame, ref_data: pd.DataFrame, feature: str, sample_reps: int, sample_size: int +) -> list[dict]: + ref_values = ref_data[feature].values + sample_stats = [] + + for sample in range(sample_reps): + sample_values = ( + data.sample(frac=1, random_state=sample) + .groupby("TICK") + .head(sample_size)[feature] + .values + ) + + sample_ks_stats = get_ks_statistic(sample_values, ref_values) + sample_ks_stats.update({"SAMPLE": sample, "N": len(sample_values)}) + + sample_stats.append(sample_ks_stats) + + return sample_stats + + +def get_ks_statistic(population_a: np.ndarray, population_b: np.ndarray) -> dict: + ksresult = ks_2samp(population_a, population_b, mode="asymp") + + return { + "KS_STATISTIC": ksresult.statistic, + "KS_PVALUE": ksresult.pvalue, + }