-
Notifications
You must be signed in to change notification settings - Fork 5
/
eval.py
250 lines (202 loc) · 8.19 KB
/
eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from collections import Counter
import multiprocessing
from scipy.optimize import linear_sum_assignment
import numpy as np
from tqdm import tqdm
from chainlite import get_logger
logger=get_logger(__name__)
def tuple_superset(
predicted_results: list, gold_results: list, discard_label_description=True
):
if predicted_results is None:
return gold_results is None
if type(predicted_results) is bool or type(gold_results) is bool:
return predicted_results == gold_results
# each gold result should appear somewhere in predicted results,
# neglecting the keys (which are the variable names)
for gold_result in gold_results:
if discard_label_description:
gold_values = [
(j[0], j[1])
for i in gold_result
for j in gold_result[i].items()
if not i.endswith("Label") or i.endswith("Description")
]
else:
gold_values = [
(j[0], j[1]) for i in gold_result.values() for j in i.items()
]
found = False
# loop through predicted to try to find the sublist
for predicted_result in predicted_results:
predicted_values = [
(j[0], j[1]) for i in predicted_result.values() for j in i.items()
]
if set(gold_values) <= set(predicted_values):
found = True
break
if not found:
return False
return True
def _compute_match_ratio(predicted, gold):
"""
Example `predicted` or `gold`:
{
"item": {
"type": "uri",
"value": "http://www.wikidata.org/entity/Q98926"
},
"itemLabel": {
"type": "literal",
"value": "Lola Landau",
"xml:lang": "en"
}
}
"""
gold_values = [
tuple(sorted(gold_value.items()))
for gold_key, gold_value in gold.items()
if not (gold_key.endswith("Label") or gold_key.endswith("Description"))
]
useful_predicted_values = [
tuple(sorted(predicted_value.items()))
for predicted_value in predicted.values()
if tuple(sorted(predicted_value.items())) in gold_values
]
# Find the intersection (minimum counts)
overlap = sum((Counter(gold_values) & Counter(useful_predicted_values)).values())
if len(gold_values) == 0:
logger.error("zero gold values for %s\n\n%s", predicted, gold)
return 0
return overlap / len(gold_values)
def f1_simple(predicted_results: list, gold_results: list):
"""
Simple F1 (no consideration for wikidata object results)
"""
def safe_divide(x, y):
if x == 0 and y == 0:
return 0
return x / y
true_positive = [x for x in predicted_results if x in gold_results]
false_positive = [x for x in predicted_results if x not in gold_results]
false_negative = [x for x in gold_results if x not in predicted_results]
precision = safe_divide(len(true_positive), len(true_positive) + len(false_positive))
recall = safe_divide(len(true_positive), len(true_positive) + len(false_negative))
if precision + recall == 0:
this_f1 = 0
else:
this_f1 = 2 * precision * recall / (precision + recall)
return this_f1
def f1(predicted_results, gold_results, maximal_matching=True):
"""
Calculates a row-major F1 score for each example.
"""
if predicted_results is None:
return 0
if type(predicted_results) is bool or type(gold_results) is bool:
return int(predicted_results == gold_results)
if maximal_matching:
# first compute a cost matrix between `predicted_results` and `gold_results`
cost_matrix = np.empty((len(predicted_results), len(gold_results)))
for i in range(len(predicted_results)):
for j in range(len(gold_results)):
i_j_recall = _compute_match_ratio(predicted_results[i], gold_results[j])
cost_matrix[i , j] = i_j_recall
row_ind, col_ind = linear_sum_assignment(cost_matrix, maximize=True)
assigned_values = cost_matrix[row_ind, col_ind]
# true positives are those that get matched (times their respective row-by-row recall)
tp = assigned_values[assigned_values > 0].sum()
# each matched row below 1 but above 0 will count as 1 - recall(i,j) to false negatives
below_one_above_zero = assigned_values[(assigned_values < 1) & (assigned_values > 0)]
fp_or_fn = (1 - below_one_above_zero).sum()
# each matched row PAIR with 0 match rate will count as 1 false negative and 1 false positive
fp_or_fn += 2 * np.sum(assigned_values <= 0)
# each individual unmatched row (due to cost matrix being rectangular) will
# count as either 1 false negative or 1 false positive
fp_or_fn += (len(predicted_results) - len(row_ind)) + (len(gold_results) - len(col_ind))
res = 2 * tp / (2 * tp + fp_or_fn)
else:
# an older implmentation with greedy matching
# SHOULD NO LONGER BE USED
gold_result_mapping = [
[gold_result, False]
for gold_result in gold_results # false denoting not matched yet
]
tp = 0
fp = 0
fn = 0
for predicted_result in predicted_results:
candidate_index = None
match_ratio = 0
# go over gold results yet to be matched to find the one with most matches
# greedily match that to this
for index, gold_result in enumerate(gold_result_mapping):
if gold_result[1] == True:
continue
gold_result = gold_result[0]
this_match_ratio = _compute_match_ratio(predicted_result, gold_result)
if this_match_ratio > match_ratio:
match_ratio = this_match_ratio
candidate_index = index
if candidate_index is not None:
gold_result_mapping[candidate_index][1] = True
if match_ratio == 0:
fp += 1
else:
tp += match_ratio
fn += 1 - match_ratio
fn += len(list(filter(lambda x: x[1] == False, gold_result_mapping)))
res = 2 * tp / (2 * tp + fp + fn)
assert(0 <= res)
assert(res <= 1)
return res
def f1_wrapper(t):
predicted_results, gold_results = t
return f1(predicted_results, gold_results)
def parallel_f1(predicted_results_list, gold_results_list):
"""
Runs the f1 function in parallel using multiprocessing.
:param predicted_results_list: List of lists of predicted results
:param gold_results_list: List of lists of gold results
:return: List of F1 scores for each pair of predicted and gold results
"""
if len(predicted_results_list) != len(gold_results_list):
raise ValueError(
"The length of predicted_results_list and gold_results_list must be the same."
)
with multiprocessing.Pool(16) as pool:
f1_scores = list(tqdm(pool.imap(f1_wrapper, zip(predicted_results_list, gold_results_list)), total=len(predicted_results_list), desc="Calculating F1"))
return f1_scores
if __name__ == "__main__":
# more unit tests in `test_eval.py`
predicted = [
{
"item": {"type": "uri", "value": "b"},
"surprise": {"type": "uri", "value": "c"},
},
{
"item": {"type": "uri", "value": "a"},
"surprise": {"type": "uri", "value": "c"},
},
{
"item": {"type": "uri", "value": "surprise"},
"surprise": {"type": "uri", "value": "c"},
},
{
"item": {"type": "uri", "value": "surprise_2"},
"surprise": {"type": "uri", "value": "c"},
},
]
gold = [
{
"item": {"type": "uri", "value": "b"},
"surprise": {"type": "uri", "value": "c"},
"surprise": {"type": "uri", "value": "c"},
},
{
"item": {"type": "uri", "value": "a"},
"surprise": {"type": "uri", "value": "c"},
"surprise": {"type": "uri", "value": "c"},
},
]
f1(predicted, gold)