From 347568c2a2eeed44623080b3cd44951964ec4280 Mon Sep 17 00:00:00 2001 From: lukeyf Date: Wed, 12 Jun 2024 12:43:07 +0100 Subject: [PATCH] clean up survey_processing --- survey_processing/main.py | 77 ++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/survey_processing/main.py b/survey_processing/main.py index 80f0313..3f5078b 100644 --- a/survey_processing/main.py +++ b/survey_processing/main.py @@ -35,10 +35,12 @@ import json import geopandas as gpd from sklearn.preprocessing import MinMaxScaler +import pandas as pd +from sklearn.model_selection import KFold # Ignore all warnings warnings.filterwarnings('ignore') - +par_dir = r'survey_processing/' def find_sub_file(directory, pattern:str): for f in os.listdir(directory): if pattern.lower() in f.lower(): @@ -534,7 +536,7 @@ def process_dhs(parent_dir, config_file): parent_dir+=r'/' with open(config_file, 'r') as file: config_data = json.load(file) - with open('dhs_country_code.json', 'r') as file: + with open(f'{par_dir}dhs_country_code.json', 'r') as file: dhs_cc = json.load(file) root_grid = parent_dir save_to_csv_dir = os.path.join(parent_dir, "dhs_variables.csv") @@ -543,17 +545,22 @@ def process_dhs(parent_dir, config_file): raise FileNotFoundError('DHS data incomplete') pov_dfs = [] + print('Summarizing poverty...') for f in tqdm(os.listdir(root_grid)): if 'DHS' in f: try: - # pov_df = get_poverty(root_grid+f+'/', save_csv=True) - pov_df = pd.read_csv(os.path.join(root_grid,f,'poverty_variables.csv')) - pov_dfs.append(pov_df) - # print('passed', f) + pov_cache = os.path.join(root_grid,f,'poverty_variables.csv') + if os.path.exists(pov_cache): + pov_df = pd.read_csv(pov_cache) + else: + pov_df = get_poverty(root_grid+f+'/', save_csv=True) + + pov_dfs.append((f[3:7],pov_df)) except Exception as e: - # print(e, f) pass + dhs_dfs = [] + print('Extracting DHS variables...') for f in tqdm(os.listdir(root_grid)): if 'DHS' in f: dhs_dfs.append((f[3:7], pd.read_csv(os.path.join(root_grid,f,'dhs_variables.csv')))) @@ -563,6 +570,7 @@ def process_dhs(parent_dir, config_file): matches = config_data['matches'] dhs_dfs_agg = [] + print('Aggregating DHS variables...') for year,df in tqdm(dhs_dfs): ccode = df['v000'][0] for column, threshold in thresholds.items(): @@ -591,15 +599,9 @@ def process_dhs(parent_dir, config_file): cols_to_select = existing_cols + additional_cols + ['id'] dhs_df_all = dhs_df_all[list(set(cols_to_select))] - poverty_dfs = [] - for f in tqdm(os.listdir(root_grid)): - if 'DHS' in f: - for sub_f in os.listdir(os.path.join(root_grid,f)): - if 'poverty_variables' in sub_f: - poverty_dfs.append((f[3:7],pd.read_csv(os.path.join(root_grid,f,sub_f)))) - + print('Aggregating poverty data...') poverty_dfs_agg = [] - for df in tqdm(poverty_dfs): + for df in tqdm(pov_dfs): ccode = df[1].countrycode2[0] df_agg = df[1].select_dtypes(include=[np.number]).groupby('cluster').agg('mean').reset_index() df_agg['id'] = ccode[:2]+df[0]+ df_agg['cluster'].apply(make_string) @@ -664,30 +666,45 @@ def scale_column(col): # Save min-max dictionary locally with open('min_max_values.json', 'w') as f: json.dump(min_max_dict, f, indent=4) + import re + col_pattern = r"^[a-zA-Z]*\d*_[^a-zA-Z]" + matching_columns = [col for col in df_processed.columns if re.match(f"^{col_pattern}", col)] - def merge_duplicates(group): - return group.apply(lambda x: x.bfill().ffill().iloc[0]) - - # Group by CENTROID_ID and merge duplicates with progress bar - grouped = df_processed.groupby('CENTROID_ID') - results = [] - - - # Wrap the grouped object with tqdm to show progress - for name, group in tqdm(grouped, desc="Processing groups"): - merged = merge_duplicates(group) - results.append(merged) + # Fill NaN values with 0 in the matched columns + df_processed[matching_columns] = df_processed[matching_columns].fillna(0) + + df_processed.to_csv(save_to_csv_dir) - df_merged = pd.DataFrame(results).reset_index(drop=True) + save_split(df_processed) + - df_merged.iloc[1:].to_csv(save_to_csv_dir) +def save_split(df): + df = df.sample(frac=1, random_state=42) + kf = KFold(n_splits=5, shuffle=True, random_state=42) + save_par_dir = r'survey_processing/processed_data/' + fold = 1 + for train_index, test_index in kf.split(df): + # Generate train and test subsets + train_df = df.iloc[train_index] + test_df = df.iloc[test_index] + + # Save to CSV files + train_df.to_csv(f'{save_par_dir}train_fold_{fold}.csv', index=False) + test_df.to_csv(f'{save_par_dir}test_fold_{fold}.csv', index=False) + + fold += 1 + old_df = df[df['YEAR'] < 2020] + new_df = df[df['YEAR'] >= 2020] + new_df.to_csv(f'{save_par_dir}after_2020.csv', index=False) + old_df.to_csv(f'{save_par_dir}before_2020.csv', index=False) + def main(): # Setup argument parser parser = argparse.ArgumentParser(description="Process DHS data to a single CSV file.") parser.add_argument("parent_dir", help="The parent directory enclosing all DHS folders") - parser.add_argument("config_file", nargs='?', default='processing_params.json', help="The configuration parameters for preprocessing (default: processing_params.json)") + parser.add_argument("config_file", nargs='?', default=f'{par_dir}processing_params.json', help="The configuration parameters for preprocessing (default: processing_params.json)") # Parse arguments args = parser.parse_args()