-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTrust.py
253 lines (227 loc) · 11.1 KB
/
Trust.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
This class handles trust and belief from the robot side.
"""
from belief.bayesianNetwork import BeliefNetwork
from belief.datasetParser import DatasetParser
from belief.episode import Episode
from belief.face_vision import FaceVision, TrainingData
from robots.robot_selector import robot
import cv2
import os
class Trust:
def __init__(self, debug, logger):
self.training_data = TrainingData()
self.informants = 0
self.beliefs = []
self.time = None
self.face_frames_captured = 10
self.load_time()
self.debug = debug
self.log = logger # Logger that keeps record of what happens, for data analysis purpose
# --- FACE DETECTION AND RECOGNITION ---
# If image contains a face, it retrieves the cropped region of interest
def detect_face(self, image, grayscale=True):
roi = FaceVision.facial_detection(image, grayscale=grayscale)
return False if roi is None else True, roi
# Captures a certain amount of face frames
def collect_face_frames(self, number):
face_frames = []
found_faces = 0
undetected_frames = 0
while found_faces < number:
image = robot.get_camera_frame()
detected, roi = self.detect_face(image)
if detected:
found_faces += 1
face_frames.append(roi)
undetected_frames = 0
else:
undetected_frames += 1
if undetected_frames % 10 == 0:
robot.say("I can't see you well. Can you please move closer?")
#cv2.putText(image, str("Detected: " + str(found_faces) + " / " + str(number)),
# (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
#cv2.imshow("Robot Eyes", image)
#cv2.waitKey(1)
return face_frames
# Obtains training samples of one of the informers
# Automatically updates the informant number
# Saves the frames in the captures directory
def acquire_examples(self, number_of_frames, informant_number):
robot.say("Hello informer number " + str(informant_number) + ". Please look at me")
frames = self.collect_face_frames(number_of_frames)
robot.say("Thank you")
count = 1
for frame in frames:
self.training_data.images.append(frame)
self.training_data.labels.append(informant_number)
cv2.imwrite("captures/" + str(informant_number) + "-" + str(count) + ".jpg", frame)
count += 1
self.informants += 1
# Finalizes learning by training the model with all the data acquired
def face_learning(self):
FaceVision.recognition_train(self.training_data.prepare_for_training())
# Recognizes a face
# Collects an amount of frames, gets a prediction on each of them and returns the most predicted label
def face_recognition(self, number_of_frames=5, announce=True):
unknown = False
robot.say("Please look at me")
# Collect face data
frames = self.collect_face_frames(number_of_frames)
# Counts the recognized labels
predictions = [0 for i in range(self.informants+1)] # An extra slot to consider the -1 (unknown informant) case
for frame in frames:
predictions[FaceVision.recognition_predict(frame)] += 1
# Returns the maximum
guess = predictions.index(max(predictions))
# If the maximum value found is in the last position of the list, it's an unrecognized informant
if guess == len(predictions)-1:
unknown = True
# Unknown informant! Adding it to the known ones and generating episodic memory
self.manage_unknown_informant(frames)
# This new informant has the biggest label yet
guess = self.informants - 1
if announce:
if not unknown:
robot.say("Hello again, informer " + str(guess))
else:
robot.say("I've never seen you before, I'll call you informer " + str(guess))
return guess
# Manages the unknown informant detection
def manage_unknown_informant(self, frames):
# Updates the model with the acquired frames and the right label
new_data = TrainingData()
new_data.images = frames
new_data.labels = [self.informants for i in range(len(frames))]
FaceVision.recognition_update(new_data.prepare_for_training())
# Creates an episodic belief network
name = "Informer" + str(self.informants)
episodic_network = BeliefNetwork.create_episodic(self.beliefs, self.get_and_inc_time(), name=name)
self.beliefs.append(episodic_network)
# Updates the total of known informants
self.informants += 1 # This is done at the end because the label for the class is actually self.informants-1
# --- TIME (episodic memory) ---
# Load time value from file
def load_time(self):
if os.path.isfile("current_time.csv"):
with open("current_time.csv", 'r') as f:
self.time = int(f.readline())
else:
self.time = 0
# Increases and saves the current time value
def get_and_inc_time(self):
previous_time = self.time
self.time += 1
with open("current_time.csv", 'w') as f:
f.write(str(self.time))
return previous_time
# Reset time
def reset_time(self):
if os.path.isfile("current_time.csv"):
with open("current_time.csv", 'w') as f:
f.write("0")
self.time = 0
# --- TRUST MODEL SAVE/LOAD ---
# Saves the beliefs
def save_beliefs(self):
if not os.path.exists("datasets/"):
os.makedirs("datasets/")
for belief in self.beliefs:
belief.save()
# Loads the beliefs and updates the interal parameters
def load_beliefs(self, path="datasets/"):
# Resets previous beliefs
self.beliefs = []
i = 0
while os.path.isfile(path + "Informer" + str(i) + ".csv"):
self.beliefs.append(BeliefNetwork("Informer" + str(i), path + "Informer" + str(i) + ".csv"))
i += 1
self.informants = i
self.load_time()
# --- TRUST MANIPULATION ---
# Updates the trust in the user with two symmetrical positive or negative examples.
# Returns 0 is trust has not changed, +1 if it has become positive or -1 if it has become negative.
def update_trust(self, informant_id, correctness, by_robot):
assert 0 <= informant_id < self.informants, "Invalid informant_id argument"
assert isinstance(correctness, bool), "Correctness argument must be boolean"
old_trust, old_reliability = self.beliefs[informant_id].is_informant_trustable()
# ROGUE CODE ----
if (old_reliability >= 0.5 and correctness) or (old_reliability <= -0.5 and not correctness):
print("[DEBUG] Trust level reached the limit.")
# The repository seems to have a bug: too many queries to a BN will eventually cause a RecursionError.
# I can prevent this by recreating the network with the same parameters, e.g. "refreshing" it.
self.beliefs[informant_id].refresh_belief()
return 0
# ---------------
if correctness:
new_evidence = Episode.create_positive()
else:
new_evidence = Episode.create_negative()
self.beliefs[informant_id].update_belief(new_evidence) # updates belief with correct or wrong episode
self.beliefs[informant_id].update_belief(new_evidence.generate_symmetric()) # symmetric episode is generated
# A success by the human is worth double of a success because the robot was the one building
if not by_robot:
self.beliefs[informant_id].update_belief(new_evidence)
self.beliefs[informant_id].update_belief(new_evidence.generate_symmetric())
new_trust, new_reliability = self.beliefs[informant_id].is_informant_trustable()
print("[DEBUG] Trust for informant " + str(informant_id) + " changed from " + str(old_reliability) +
" to " + str(new_reliability))
# Evaluates if the trust has changed after the update to the belief network
if old_trust != new_trust:
if old_trust is False:
# Informant has gained trust
return 1
else:
# Informant has lost trust
return -1
else:
# Trust level has not changed
return 0
# Updates the trust of a user based on the output of the task
def update_trust_r1(self, informant_id, correctness):
assert 0 <= informant_id < self.informants, "Invalid informant_id argument"
assert isinstance(correctness, bool), "Correctness argument must be boolean"
old_trust, old_reliability = self.beliefs[informant_id].is_informant_trustable()
# ROGUE CODE ----
#if (old_reliability >= 0.5 and correctness) or (old_reliability <= -0.5 and not correctness):
# print("[DEBUG] Trust level reached the limit.")
# The repository seems to have a bug: too many queries to a BN will eventually cause a RecursionError.
# I can prevent this by recreating the network with the same parameters, e.g. "refreshing" it.
# self.beliefs[informant_id].refresh_belief()
# return 0
# ---------------
if correctness:
new_evidence = Episode.create_positive()
else:
new_evidence = Episode.create_negative()
self.beliefs[informant_id].update_belief(new_evidence) # updates belief with correct or wrong episode
self.beliefs[informant_id].update_belief(new_evidence.generate_symmetric()) # symmetric episode is generated
new_trust, new_reliability = self.beliefs[informant_id].is_informant_trustable()
if self.debug:
print("[DEBUG] Trust for informant " + str(informant_id) + " changed from " + str(old_reliability) +
" to " + str(new_reliability))
# Evaluates if the trust has changed after the update to the belief network
if old_trust != new_trust:
if old_trust is False:
# Informant has gained trust
return 1
else:
# Informant has lost trust
return -1
else:
# Trust level has not changed
return 0
# In the experiment, the trainer must be trusted automatically
# Equivalent to a Vanderbilt familiarization phase with one trustable informant
def learn_and_trust_trainer(self):
self.acquire_examples(self.face_frames_captured, 0)
self.beliefs.append(BeliefNetwork("Informer 0", "belief/datasets/examples/naive.csv"))
# todo set time equals to the max in the training
self.face_learning()
# The simulation equivalent of the above function
def initialize_trusted_trainer(self):
self.beliefs.append(BeliefNetwork("Informer 0", "belief/datasets/examples/naive.csv"))
self.informants += 1
# Updates the log
def update_log(self, trust):
self.log.update_latest_trust(trust)