-
Notifications
You must be signed in to change notification settings - Fork 0
/
userstudy1_parking.py
145 lines (127 loc) · 4.7 KB
/
userstudy1_parking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
'''
Code for Section 6 User Study
This code runs the simulation for the study in 6.1
The scenario here is Parking
'''
import numpy as np
import random
import copy
from matplotlib import pyplot as plt
import argparse
import pickle
# by default runs the simulation opaque algorithm
# get parameters for simulation
parser = argparse.ArgumentParser()
parser.add_argument('--alg', default="ours", help='which algorithm to run. options are ours and trans')
args = parser.parse_args()
# formalize the stochastic bayesian game
class ParkingSBG:
# initialization
def __init__(self):
# time horizon
self.T = 4
# augmented state space
# (timestep t, state x, state y, belief b)
self.states = []
for t in range(self.T):
for sx in np.linspace(0, 6.0, 7):
for sy in np.linspace(-1.2, 1.2, 13):
for b in np.linspace(0, 1.0, 3):
augmented_state = (t, round(sx,1), round(sy,1), round(b,1))
self.states.append(augmented_state)
# action space
# action space for the confused robot
self.actions_r1 = ((1., 0.),)
# action space for the capable robot
self.actions_r2 = ((1., 0.), (0., -0.2), (0., 0.2))
# action space for the human
self.actions_h = ((1., 0.), (0., -0.2), (0., 0.2))
# initialize policy, needed for boltzmann model
self.pi = {s: None for s in self.states}
# dynamics
def f(self, s, ah, ar):
timestep = s[0]
statex = s[1] + ah[0] + ar[0]
statey = s[2] + ah[1] + ar[1]
belief = s[3]
statex = min([6.0, statex])
statex = max([0.0, statex])
statey = min([+1.2, statey])
statey = max([-1.2, statey])
if belief > 0.01 and belief < 0.99:
# if both robots take same action,
# cannot learn anything, belief stays same
if abs(self.pi[s][1][0] - self.pi[s][2][0]) < 0.01 and abs(self.pi[s][1][1] - self.pi[s][2][1]) < 0.01:
belief = s[3]
# if robot action matches the action of type2
# we must be working with type2
elif abs(ar[1]) > 0.01:
belief = 1.0
else:
belief = 0.0
return (timestep+1, round(statex,1), round(statey,1), round(belief,1))
# reward function
def reward(self, s):
timestep, statex, statey = s[0], s[1], s[2]
if statey > 1.0:
return 1.5
elif statex > 5.0:
return 1.0
else:
return 0.0
# bonus reward for transparency
def bonus_reward(self, a):
ah, ar1, ar2 = a
difference = abs(ar1[0] - ar2[0]) + abs(ar1[1] - ar2[1])
return difference
# modified Harsanyi-Bellman Ad Hoc Coordination
# see equations (4)-(6) in paper
# pi maps state to optimal human and robot actions
def value_iteration(self, args):
V1 = {s: 0 for s in self.states}
pi1 = {s: None for s in self.states}
for _ in range(self.T+1):
V = V1.copy()
for s in self.states:
if s[0] == self.T-1:
V1[s] = self.reward(s)
continue
v_next_max = -np.inf
for ah in self.actions_h:
for ar1 in self.actions_r1:
for ar2 in self.actions_r2:
self.pi[s] = [ah, ar1, ar2]
s1 = self.f(s, ah, ar1)
s2 = self.f(s, ah, ar2)
eV1 = (1-s[3]) * V[s1]
eV2 = s[3] * V[s2]
if args.alg == "trans":
eV1 += 1.0 * self.bonus_reward([ah, ar1, ar2])
if eV1 + eV2 > v_next_max:
v_next_max = eV1 + eV2
pi1[s] = [ah, ar1, ar2]
V1[s] = self.reward(s) + v_next_max
self.pi = pi1
return pi1, V1
def main(args):
# get optimal policy for human and robot
env = ParkingSBG()
pi, V = env.value_iteration(args)
# choose initial augmented state
# (timestep t, state s, belief b)
init_state = (0, 0, 0, 0.5)
# rollout policy with robot type 1
s1 = copy.deepcopy(init_state)
print("[*] type 1")
for t in range(env.T-1):
astar = pi[s1]
s1 = env.f(s1, astar[0], astar[1])
print(s1, astar[1], astar[2])
# rollout policy with robot type 1
s2 = copy.deepcopy(init_state)
print("[*] type 2")
for t in range(env.T-1):
astar = pi[s2]
s2 = env.f(s2, astar[0], astar[2])
print(s2, astar[1], astar[2])
main(args)