-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathanalysis_via_clf.py
291 lines (250 loc) · 9.87 KB
/
analysis_via_clf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import torch
import sysbinder
import os
import numpy as np
import matplotlib
matplotlib.use("Agg")
from rtpt import RTPT
from tqdm import tqdm
from torch.utils.data import DataLoader
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import CategoricalNB
from sklearn import metrics
from sysbinder.sysbinder import SysBinderImageAutoEncoder
from data import CLEVREasy_1_WithAnnotations, CLEVR4_1_WithAnnotations
from neural_concept_binder import NeuralConceptBinder
# Baseline, Repository needs to be cloned from https://github.com/yfw/nlotm
from nlotm.nlotm import NlotmImageAutoEncoder
import utils_ncb as utils_ncb
torch.set_num_threads(40)
OMP_NUM_THREADS = 40
MKL_NUM_THREADS = 40
SEED = 0
def get_args():
args = utils_ncb.get_parser(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
).parse_args()
utils_ncb.set_seed(SEED)
if args.model_type == "nlotm":
args.fp16 = False
args.vq_type = "vq_ema_dcr"
args.vq_beta = 1.0
args.commitment_beta = 50.0
args.slot_init_type = "random"
return args
def gather_encs(model, loader, args):
model.eval()
torch.set_grad_enabled(True)
all_labels_multi = []
all_labels_single = []
all_codes = []
for i, sample in tqdm(enumerate(loader)):
img_locs = sample[-1]
sample = sample[:-1]
imgs, masks, annotations, annotations_multihot = map(
lambda x: x.to(args.device), sample
)
if args.model_type == "nlotm":
model.downstream_data_type = "z"
model.downstream_type = "z"
slots, attns_vis, attns, indices = model.get_z_for_clf(imgs)
codes = []
for sample_id in range(imgs.shape[0]):
mask = masks[sample_id]
tmp = []
for j in range(4):
tmp.append(torch.sum(attns[sample_id, j] * mask))
# get id of slot with highest attention
slot_id = torch.argmax(torch.stack(tmp))
codes.append(indices[sample_id, slot_id])
codes = torch.stack(codes)
codes = codes.unsqueeze(1)
print(codes.shape)
else:
# encode image with whatever model is being used
encs = model.encode(imgs)
if "sysbind" in args.model_type:
codes = encs[0]
# if we wish to use the sysbinder ptototype attention values as code rather than the weighted prototypes
if args.attention_codes:
codes = torch.argmax(
encs[3][1], dim=-1
) # [B, N_ObjSlots, N_Blocks, N_BlockPrototypes]
codes = codes.reshape(
(codes.shape[0], codes.shape[1], -1)
) # [B, N_ObjSlots, N_Blocks*N_BlockPrototypes]
elif args.model_type == "ncb":
codes = encs[0]
# probs = encs[1]
assert annotations.shape[1] == 1
# we consider each attribute for an object as one class
annotations = annotations.squeeze(dim=1)
all_labels_single.extend(annotations.detach().cpu().numpy())
all_labels_multi.extend(annotations_multihot.detach().cpu().numpy())
# make sure only one object/slot per image
assert codes.shape[0] == args.batch_size and codes.shape[1] == 1
codes = codes.squeeze(dim=1)
codes = codes.detach().cpu().numpy()
all_codes.append(codes)
all_labels_multi = np.array(all_labels_multi)
all_labels_single = np.array(all_labels_single)
all_codes = np.concatenate(all_codes, axis=0)
return all_codes, all_labels_single, all_labels_multi
def clf_per_cat(train_encs, train_labels, test_encs, test_labels, model, args):
"""
Per attribute category fit one linear model to predict the attributes of that category from the
model encodings.
"""
train_labels = np.transpose(train_labels)
test_labels = np.transpose(test_labels)
accs_per_cat = []
clfs = []
max_leaf_nodes = [3, 8]
for cat_id in range(args.num_categories):
# initialize linear classifier
if args.clf_type == "dt":
clf = DecisionTreeClassifier(random_state=0)
# clf = DecisionTreeClassifier(random_state=0)
elif args.clf_type == "nb":
# TODO: something isn'' working here with NB?
min_categories = get_min_categories_per_block(model, args)
clf = CategoricalNB(min_categories=min_categories)
# fit clf on training encodings and labels
clf.fit(train_encs, train_labels[cat_id])
# apply to test encodings
test_pred = clf.predict(test_encs)
# compute balanced accuracy
accs_per_cat.append(
metrics.balanced_accuracy_score(test_labels[cat_id], test_pred)
)
clfs.append(clf)
return accs_per_cat, clfs
def get_min_categories_per_block(model, args):
min_categories = []
for block_id in range(args.num_blocks):
if args.model_type == "ncb":
min_categories.append(
len(
np.unique(
model.retrieval_corpus[block_id]["ids"].detach().cpu().numpy()
)
)
)
else:
min_categories.append(model.num_prototypes)
return np.array(min_categories)
def main():
args = get_args()
# we train the classifier on the original validation set and test on the original test set
if "CLEVR-Easy-1" in args.data_path:
train_dataset = CLEVREasy_1_WithAnnotations(
root=args.data_path,
phase="val",
img_size=args.image_size,
max_num_objs=args.num_slots,
num_categories=args.num_categories,
perc_imgs=args.perc_imgs,
)
test_dataset = CLEVREasy_1_WithAnnotations(
root=args.data_path,
phase="test",
img_size=args.image_size,
max_num_objs=args.num_slots,
num_categories=args.num_categories,
perc_imgs=1.0,
)
elif "CLEVR-4-1" in args.data_path:
train_dataset = CLEVR4_1_WithAnnotations(
root=args.data_path,
phase="val",
img_size=args.image_size,
max_num_objs=args.num_slots,
num_categories=args.num_categories,
perc_imgs=args.perc_imgs,
)
test_dataset = CLEVR4_1_WithAnnotations(
root=args.data_path,
phase="test",
img_size=args.image_size,
max_num_objs=args.num_slots,
num_categories=args.num_categories,
perc_imgs=1.0,
)
loader_kwargs = {
"batch_size": args.batch_size,
"shuffle": False,
"num_workers": args.num_workers,
"pin_memory": True,
"drop_last": True,
}
train_loader = DataLoader(train_dataset, **loader_kwargs)
loader_kwargs = {
"batch_size": args.batch_size,
"shuffle": False,
"num_workers": args.num_workers,
"pin_memory": True,
"drop_last": True,
}
test_loader = DataLoader(test_dataset, **loader_kwargs)
print("-------------------------------------------\n")
print(f"{len(train_dataset)} train samples, {len(test_dataset)} test samples")
print(
f"{args.checkpoint_path} loading for {args.model_type} encoding classification"
)
if args.model_type == "ncb":
model = NeuralConceptBinder(args)
elif "sysbind" in args.model_type:
if "step" in args.model_type or "hard" in args.model_type:
assert args.binarize == True
model = SysBinderImageAutoEncoder(args)
if os.path.isfile(args.checkpoint_path):
checkpoint = torch.load(args.checkpoint_path, map_location="cpu")
try:
model.load_state_dict(checkpoint["model"])
model.image_encoder.sysbinder.prototype_memory.attn.temp = checkpoint[
"temp"
]
except:
model.load_state_dict(checkpoint)
if args.model_type == "sysbind_step":
model.image_encoder.sysbinder.prototype_memory.attn.temp = 0.001
elif args.model_type == "sysbind_hard":
model.image_encoder.sysbinder.prototype_memory.attn.temp = 1e-4
else:
model.image_encoder.sysbinder.prototype_memory.attn.temp = 1.0
args.log_dir = os.path.join(*args.checkpoint_path.split(os.path.sep)[:-1])
print(f"loaded ...{args.checkpoint_path}")
else:
print("Model path for Sysbinder was not found.")
return
elif args.model_type == "nlotm":
print("Loading NLOTM model")
model = NlotmImageAutoEncoder(args)
checkpoint = torch.load(args.checkpoint_path, map_location="cpu")
state_dict = checkpoint["model"]
model.load_state_dict(state_dict, strict=False)
else:
raise ValueError(f"Model type {args.model_type} not recognized")
model.to(args.device)
# Create and start RTPT object
rtpt = RTPT(
name_initials="YOURINITIALS", experiment_name=f"NCB", max_iterations=1
)
rtpt.start()
# gather encodings and corresponding labels
train_encs, train_labels_single, train_labels_multi = gather_encs(
model, train_loader, args
)
test_encs, test_labels_single, test_labels_multi = gather_encs(
model, test_loader, args
)
if args.clf_type is not None:
# classify each attribute category with one linear model
acc, clf = clf_per_cat(
train_encs, train_labels_single, test_encs, test_labels_single, model, args
)
print(acc)
print(f"Accuracy of {args.checkpoint_path}: {100 * np.round(np.mean(acc), 4)}")
print("-------------------------------------------\n")
if __name__ == "__main__":
main()