-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmatch_variables.py
197 lines (161 loc) · 6.03 KB
/
match_variables.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# type: ignore
"""Creates a `variable_replacements.json` file of {old variable id} to
{new variable id} key-value pairs.
`variable_replacements.json` is used in `suggest_chart_revisions.py` for
determining which charts to update.
Usage:
python -m worldbank_wdi.match_variables
"""
import os
import simplejson as json
from typing import List, Dict
import pandas as pd
from pymysql import Connection
from db import get_connection
from worldbank_wdi import OUTPATH, CONFIGPATH
CUSTOM_FNAME = "custom_variable_replacements.json"
def main():
old_var_id2new_var_id = get_variable_replacements_from_string_matches()
custom_old_var_id2new_var_id = get_variable_replacements_from_custom_matches()
old_var_id2new_var_id.update(custom_old_var_id2new_var_id)
if not os.path.exists(OUTPATH):
os.makedirs(OUTPATH)
with open(os.path.join(OUTPATH, "variable_replacements.json"), "w") as f:
json.dump(old_var_id2new_var_id, f, indent=2, sort_keys=True)
def get_variable_replacements_from_string_matches() -> Dict[int, int]:
with get_connection() as conn:
# retrieves old and new datasets
df_old_datasets = get_datasets(conn, new=False)
df_new_datasets = get_datasets(conn, new=True)
# retrieves old and new variables
df_old_vars = get_variables(conn, dataset_ids=df_old_datasets["id"])
df_new_vars = get_variables(conn, dataset_ids=df_new_datasets["id"])
# merges old and new variables on name
df_vars = pd.merge(
df_old_vars,
df_new_vars,
on="name",
how="inner",
suffixes=["_old", "_new"],
validate="m:1",
)
assert df_vars.id_old.notnull().all() and df_vars.id_new.notnull().all()
old_var_id2new_var_id = (
df_vars.dropna(subset=["id_old", "id_new"])
.set_index("id_old")["id_new"]
.squeeze()
.to_dict()
)
return old_var_id2new_var_id
def get_variable_replacements_from_custom_matches() -> Dict[int, int]:
with get_connection() as conn:
# retrieves old and new datasets
df_old_datasets = get_datasets(conn, new=False)
df_new_datasets = get_datasets(conn, new=True)
# retrieves old and new variables
df_old_vars = get_variables(conn, dataset_ids=df_old_datasets["id"])
df_new_vars = get_variables(conn, dataset_ids=df_new_datasets["id"])
if not os.path.exists(os.path.join(CONFIGPATH, CUSTOM_FNAME)):
return {}
with open(os.path.join(CONFIGPATH, CUSTOM_FNAME), "r") as f:
df_custom_replacements = pd.DataFrame(json.load(f))
n_expected = df_custom_replacements.shape[0]
df_custom_replacements = df_custom_replacements.merge(
df_new_vars[["name", "id"]],
left_on="newName",
right_on="name",
how="left",
validate="1:1",
).merge(
df_old_vars[["name", "id"]],
left_on="oldId",
right_on="id",
how="left",
validate="1:1",
suffixes=["_new", "_old"],
)
assert (
df_custom_replacements["oldName"] == df_custom_replacements["name_old"]
).all(), (
f"One or more old names in {CUSTOM_FNAME} do not match the "
"name of the variable with the corresponding `oldId`."
)
assert df_custom_replacements.shape[0] == n_expected, (
f"Something went wrong in constructing custom replacements. "
f"Expected {n_expected} custom replacements, but only "
f"{df_custom_replacements.shape[0]} match a new variable name "
"and old variable id."
)
assert df_custom_replacements["oldName"].duplicated().sum() == 0, (
"Expected 0 duplicate old variable names. Something is wrong with "
f"{CUSTOM_FNAME}."
)
custom_old_var_id2new_var_id = (
df_custom_replacements.set_index("id_old")["id_new"].squeeze().to_dict()
)
assert len(custom_old_var_id2new_var_id) == df_custom_replacements.shape[0]
return custom_old_var_id2new_var_id
def get_datasets(conn: Connection, new: bool = True) -> pd.DataFrame:
"""retrieves new datasets if `new=True`, else retrieves old datasets.
Arguments:
new: bool = True. If True, retrieves new datasets. Else retrieves
old datasets.
Returns:
pd.DataFrame: dataframe of old or new datasets.
"""
columns = ["id", "name", "createdAt", "updatedAt"]
try:
datasets = pd.read_csv(os.path.join(OUTPATH, "datasets.csv"))
new_dataset_names = datasets.name.unique().tolist()
except FileNotFoundError:
new_dataset_names = []
if new:
query = f"""
SELECT {','.join(columns)}
FROM datasets
WHERE name IN ({','.join([f'"{n}"' for n in new_dataset_names])})
"""
else:
query = f"""
SELECT {','.join(columns)}
FROM datasets
WHERE name COLLATE UTF8_GENERAL_CI LIKE '%world development indicators%'
"""
if len(new_dataset_names):
new_dataset_names_str = ",".join([f'"{n}"' for n in new_dataset_names])
query += f" AND name NOT IN ({new_dataset_names_str})"
df_datasets = pd.read_sql(query, conn)
return df_datasets
def get_variables(conn: Connection, dataset_ids: List[int]) -> pd.DataFrame:
"""retrieves all variables in dataset(s).
Also retrieves the min year and max year of available data for each variable.
Arguments:
dataset_ids: List[Union[int, str]]. List of dataset ids for which
to retrieve variables.
Returns:
pd.DataFrame. Dataframe of variables.
"""
# retrieves all variables in old dataset(s)
dataset_ids_str = ",".join([str(_id) for _id in dataset_ids])
columns = [
"id",
"name",
"description",
"unit",
"display",
"createdAt",
"updatedAt",
"datasetId",
"sourceId",
]
df_vars = pd.read_sql(
f"""
SELECT {','.join(columns)}
FROM variables
WHERE datasetId IN ({dataset_ids_str})
""",
conn,
)
return df_vars
if __name__ == "__main__":
main()