Skip to content

Commit

Permalink
clean up survey_processing
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeyf committed Jun 12, 2024
1 parent fd6c047 commit 347568c
Showing 1 changed file with 47 additions and 30 deletions.
77 changes: 47 additions & 30 deletions survey_processing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import json
import geopandas as gpd
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
from sklearn.model_selection import KFold

# Ignore all warnings
warnings.filterwarnings('ignore')

par_dir = r'survey_processing/'
def find_sub_file(directory, pattern:str):
for f in os.listdir(directory):
if pattern.lower() in f.lower():
Expand Down Expand Up @@ -534,7 +536,7 @@ def process_dhs(parent_dir, config_file):
parent_dir+=r'/'
with open(config_file, 'r') as file:
config_data = json.load(file)
with open('dhs_country_code.json', 'r') as file:
with open(f'{par_dir}dhs_country_code.json', 'r') as file:
dhs_cc = json.load(file)
root_grid = parent_dir
save_to_csv_dir = os.path.join(parent_dir, "dhs_variables.csv")
Expand All @@ -543,17 +545,22 @@ def process_dhs(parent_dir, config_file):
raise FileNotFoundError('DHS data incomplete')

pov_dfs = []
print('Summarizing poverty...')
for f in tqdm(os.listdir(root_grid)):
if 'DHS' in f:
try:
# pov_df = get_poverty(root_grid+f+'/', save_csv=True)
pov_df = pd.read_csv(os.path.join(root_grid,f,'poverty_variables.csv'))
pov_dfs.append(pov_df)
# print('passed', f)
pov_cache = os.path.join(root_grid,f,'poverty_variables.csv')
if os.path.exists(pov_cache):
pov_df = pd.read_csv(pov_cache)
else:
pov_df = get_poverty(root_grid+f+'/', save_csv=True)

pov_dfs.append((f[3:7],pov_df))
except Exception as e:
# print(e, f)
pass

dhs_dfs = []
print('Extracting DHS variables...')
for f in tqdm(os.listdir(root_grid)):
if 'DHS' in f:
dhs_dfs.append((f[3:7], pd.read_csv(os.path.join(root_grid,f,'dhs_variables.csv'))))
Expand All @@ -563,6 +570,7 @@ def process_dhs(parent_dir, config_file):
matches = config_data['matches']

dhs_dfs_agg = []
print('Aggregating DHS variables...')
for year,df in tqdm(dhs_dfs):
ccode = df['v000'][0]
for column, threshold in thresholds.items():
Expand Down Expand Up @@ -591,15 +599,9 @@ def process_dhs(parent_dir, config_file):
cols_to_select = existing_cols + additional_cols + ['id']
dhs_df_all = dhs_df_all[list(set(cols_to_select))]

poverty_dfs = []
for f in tqdm(os.listdir(root_grid)):
if 'DHS' in f:
for sub_f in os.listdir(os.path.join(root_grid,f)):
if 'poverty_variables' in sub_f:
poverty_dfs.append((f[3:7],pd.read_csv(os.path.join(root_grid,f,sub_f))))

print('Aggregating poverty data...')
poverty_dfs_agg = []
for df in tqdm(poverty_dfs):
for df in tqdm(pov_dfs):
ccode = df[1].countrycode2[0]
df_agg = df[1].select_dtypes(include=[np.number]).groupby('cluster').agg('mean').reset_index()
df_agg['id'] = ccode[:2]+df[0]+ df_agg['cluster'].apply(make_string)
Expand Down Expand Up @@ -664,30 +666,45 @@ def scale_column(col):
# Save min-max dictionary locally
with open('min_max_values.json', 'w') as f:
json.dump(min_max_dict, f, indent=4)
import re
col_pattern = r"^[a-zA-Z]*\d*_[^a-zA-Z]"
matching_columns = [col for col in df_processed.columns if re.match(f"^{col_pattern}", col)]

def merge_duplicates(group):
return group.apply(lambda x: x.bfill().ffill().iloc[0])

# Group by CENTROID_ID and merge duplicates with progress bar
grouped = df_processed.groupby('CENTROID_ID')
results = []


# Wrap the grouped object with tqdm to show progress
for name, group in tqdm(grouped, desc="Processing groups"):
merged = merge_duplicates(group)
results.append(merged)
# Fill NaN values with 0 in the matched columns
df_processed[matching_columns] = df_processed[matching_columns].fillna(0)

df_processed.to_csv(save_to_csv_dir)

df_merged = pd.DataFrame(results).reset_index(drop=True)
save_split(df_processed)


df_merged.iloc[1:].to_csv(save_to_csv_dir)
def save_split(df):
df = df.sample(frac=1, random_state=42)
kf = KFold(n_splits=5, shuffle=True, random_state=42)
save_par_dir = r'survey_processing/processed_data/'
fold = 1
for train_index, test_index in kf.split(df):
# Generate train and test subsets
train_df = df.iloc[train_index]
test_df = df.iloc[test_index]

# Save to CSV files
train_df.to_csv(f'{save_par_dir}train_fold_{fold}.csv', index=False)
test_df.to_csv(f'{save_par_dir}test_fold_{fold}.csv', index=False)

fold += 1

old_df = df[df['YEAR'] < 2020]
new_df = df[df['YEAR'] >= 2020]
new_df.to_csv(f'{save_par_dir}after_2020.csv', index=False)
old_df.to_csv(f'{save_par_dir}before_2020.csv', index=False)

def main():

# Setup argument parser
parser = argparse.ArgumentParser(description="Process DHS data to a single CSV file.")
parser.add_argument("parent_dir", help="The parent directory enclosing all DHS folders")
parser.add_argument("config_file", nargs='?', default='processing_params.json', help="The configuration parameters for preprocessing (default: processing_params.json)")
parser.add_argument("config_file", nargs='?', default=f'{par_dir}processing_params.json', help="The configuration parameters for preprocessing (default: processing_params.json)")
# Parse arguments
args = parser.parse_args()

Expand Down

0 comments on commit 347568c

Please sign in to comment.