-
Notifications
You must be signed in to change notification settings - Fork 1
/
bdi.py
256 lines (217 loc) · 13.3 KB
/
bdi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from world_model import WorldModel
from hierarchical_task_network_planner import HierarchicalTaskNetworkPlanner
from collections import deque
from perception import Perception
from coordination import Coordination
from monitoring import Monitoring
import datetime
from spade.agent import Agent
from spade.behaviour import PeriodicBehaviour
# from spade.behaviour import CyclicBehaviour # TODO: Need?
# from spade.message import Message # TODO: Need?
"""
Belief-Desire-Intention (BDI) - Practical Reasoning for resource bounded agents - Bratman 1988
practical reasoning = deliberation + means-ends reasoning
Base agent control loop
while true
OBSERVE the world;
UPDATE internal world model;
DELIBERATE about what INTENTION to achieve next;
use MEANS-END REASONING to get a PLAN for the intention;
EXECUTE the plan;
end while
"""
class BDIAgent(Agent):
class BDIBehaviour(PeriodicBehaviour):
async def on_start(self):
self.terminate = False
self.SUCCESS = False
self.verbose = False
# Initialization
self.beliefs = WorldModel() # B := B0; Initial Beliefs
self.goals = self.beliefs.current_world_model.goals
self.intentions = self.beliefs.current_world_model.goals # I := I0; Initial Intentions
self.htn_planner = HierarchicalTaskNetworkPlanner(self.beliefs)
self.perception = Perception(self.beliefs)
self.coordination = Coordination(self.beliefs)
self.monitoring = Monitoring()
self.what, self.why, self.how_well, self.what_else, self.why_failed = "", "", "", "", ""
self.plans = []
self.selected_plan = []
self.percept = {}
self.action = ""
self.start_time = datetime.datetime.now()
async def run(self):
"""
Agent control loop version 7 (Intention Reconsideration)
I := I0; Initial Intentions
B := B0; Initial Beliefs
while true do
get next percept ρ; # OBSERVE the world
B:= brf(B, ρ); # Belief revision function
D: = option(B, I);
I := filter(B, D, I);
π := plan(B, I); # MEANS_END REASONING
while not (empty(π) or succeeded(Ι, Β) or impossible(I, B)) do # Drop impossible or succeeded intentions
α := hd(π); # Pop first action
execute(α);
π := tail(π);
get next percept ρ;
B:= brf(B, ρ);
if reconsider(I, B) then # Meta-level control: explicit decision, to avoid reconsideration cost
D := options(B, I);
I := filter(B, D, I);
end-if
if not sound(π, I, B) then # Re-activity, re-plan
π := plan(B, I);
end-if
end-while
end-while
"""
if self.verbose:
print(f"--- arm_agent: "
f"PeriodicSenderBehaviour running "
f"at {datetime.datetime.now().time()}: "
f"{self.beliefs.current_world_model.ticks}")
# while true do
if not self.SUCCESS and not self.terminate and self.beliefs.update_tick() < self.beliefs.current_world_model.max_ticks:
if len(self.selected_plan) == 0:
# msg = Message(to="madks2@temp3rr0r-pc") # Instantiate the message # TODO: place holder for IM
# msg.body = "Hello World: " + str(self.counter) # Set the message content
# await self.send(msg)
# TODO: engrave figures: arm IK, gripper
self.percept = self.perception.get_percept(text_engraving=(self.why_failed, self.how_well)) # get next percept ρ; OBSERVE the world
self.beliefs = self.perception.belief_revision(self.beliefs, self.percept) # B:= brf(B, ρ);
self.beliefs = self.monitoring.fire_events(self.beliefs, self.percept)
self.intentions = self.deliberate(self.beliefs,
self.intentions) # DELIBERATE about what INTENTION to achieve next
self.SUCCESS = True if self.intentions == "" else False
self.plans = self.htn_planner.get_plans(self.beliefs.current_world_model,
self.intentions) # π := plan(B, I); MEANS_END REASONING
if self.plans != False:
if len(self.plans) > 0:
self.plans.sort(key=len) # TODO: check why sorting doesn't work on "deeper" levels
if self.verbose:
print("{}: Plan: {}".format(self.beliefs.current_world_model.tick, self.plans[0]))
self.selected_plan = deque(
self.plans[0]) # TODO: Use a "cost function" to evaluate the best plan, not shortest
self.why_failed = ""
else:
self.why_failed = self.htn_planner.failure_reason
if self.verbose:
print("Plan failure_reason: {}".format(self.why_failed), end=" ")
self.how_well = (
self.beliefs.current_world_model.tick, self.beliefs.current_world_model.max_ticks,
int((datetime.datetime.now() - self.start_time).total_seconds() * 1000), # milliseconds
self.plans)
if self.verbose:
print("how_well: {}".format(self.how_well))
else: # while not (empty(π) or succeeded(Ι, Β) or impossible(I, B)) do
self.action, self.selected_plan = self.selected_plan.popleft(), self.selected_plan # α := hd(π); π := tail(π);
if self.verbose:
print("{}: Action: {}".format(self.beliefs.current_world_model.tick, self.action))
self.coordination.execute_action(self.action, self.beliefs.current_world_model) # execute(α);
self.what, self.why = self.action, self.intentions
self.how_well = (self.beliefs.current_world_model.tick, self.beliefs.current_world_model.max_ticks,
int((datetime.datetime.now() - self.start_time).total_seconds() * 1000), # milliseconds
self.selected_plan)
self.what_else = self.plans[1] if len(self.plans) > 1 else self.plans[0]
if self.verbose:
self.print_answers(self.what, self.why, self.how_well, self.what_else)
# get next percept ρ; OBSERVE the world
# TODO: Don't update target object location, when it is obscured
if self.action != ('move_arm_above', 'target_object') and self.action != ('move_arm', 'target_object'):
self.percept = self.perception.get_percept(text_engraving=(self.what, self.why, self.how_well,
self.what_else))
self.beliefs = self.perception.belief_revision(self.beliefs, self.percept)
self.beliefs = self.monitoring.fire_events(self.beliefs, self.percept)
# TODO: trigger sound percept?
if self.action == ('initialize', 'arm'):
self.percept = {"initialized": {'arm': True}} # TODO: post conditions or monitoring
self.beliefs = self.perception.belief_revision(self.beliefs, self.percept) # TODO: post conditions
self.beliefs = self.monitoring.fire_events(self.beliefs, self.percept)
elif self.action == ('move_arm', 'target_object'):
self.percept = {"distance": {'distance_to_gripper': 2.2}} # TODO: update with monitoring
self.beliefs = self.perception.belief_revision(self.beliefs, self.percept)
self.beliefs = self.monitoring.fire_events(self.beliefs, self.percept)
elif self.action == ('move_arm_above', 'container'):
self.percept = {"location": {"target_object": "container"}, "grabbed": {'target_object': False}}
self.beliefs = self.perception.belief_revision(self.beliefs, self.percept)
self.beliefs = self.monitoring.fire_events(self.beliefs, self.percept)
# if reconsider(I, B) then
# D := options(B, I);
# I := filter(B, D, I);
# if not sound(π, I, B) then
# π := plan(B, I)
# TODO: Backtracking of action?
else:
self.done()
self.kill()
async def on_end(self):
print("-- on_end")
print("Done.")
self.perception.destroy()
# stop agent from behaviour
await self.agent.stop()
# async def _step(self): # TODO: Need?
# # the bdi stuff
# pass
def done(self):
print("-- Sender Agent: Done")
show_history = True
print("Final World model:")
print("-- Ticks: {}".format(self.beliefs.current_world_model.tick))
print("-- initialized: {}".format(self.beliefs.current_world_model.initialized))
print("-- location: {}".format(self.beliefs.current_world_model.location))
print("-- grabbed: {}".format(self.beliefs.current_world_model.grabbed))
if show_history:
print()
print("World model History:")
for tick in range(len(self.beliefs.world_model_history)):
print("Tick {}:".format(tick))
print("-- initialized: {}".format(self.beliefs.world_model_history[tick].initialized))
print("-- location: {}".format(self.beliefs.world_model_history[tick].location))
print("-- grabbed: {}".format(self.beliefs.world_model_history[tick].grabbed))
print()
print("{}!".format("SUCCESS" if self.SUCCESS else "FAIL"))
def print_answers(self, what_answer, why_answer, how_well_answer, what_else_answer):
print("Q: What is the robot doing? A: {}\n"
"Q: Why is it doing it? A: {}\n"
"Q: How well is it doing it? A: {}\n"
"Q: What else could it have been doing instead? A: {}"
.format(what_answer, why_answer, how_well_answer, what_else_answer))
def deliberate(self, current_beliefs, current_intentions):
"""
Decide WHAT sate of affairs you want to achieve.
:param current_beliefs: WordModel instance, the world model, information about the world.
:param current_intentions: List of tuples, tasks that the agent has COMMITTED to accomplish.
:return: List of tuples, filtered intentions that the agent COMMITS to accomplish.
"""
# 1. Option Generation: The agent generates a set of possible alternatives.
current_desires = current_intentions # TODO: D := option(B, I);
# 2. Filtering: Agent chooses between competing alternatives and commits to achieving them.
current_intentions = self.filter_intentions(
current_beliefs, current_desires, current_intentions) # I := filter(B, D, I);
return current_intentions
def filter_intentions(self, current_beliefs, current_desires, current_intentions):
"""
Choose between competing alternatives and COMMITTING to achieving them.
:param current_beliefs: WordModel instance, the world model, information about the world.
:param current_desires: List of tuples, tasks that the agent would like to accomplish.
:param current_intentions: List of tuples, tasks that the agent has COMMITTED to accomplish.
:return: List of tuples, filtered intentions that the agent COMMITS to accomplish.
"""
for current_intention in current_intentions:
if current_intention == self.goals[0] and current_beliefs.current_world_model.location["target_object"] \
== "container":
current_intentions = "" # if goal(s) achieved, empty I
return current_intentions
async def setup(self):
print(f"--- arm_agent: PeriodicSenderAgent started at {datetime.datetime.now().time()}")
init_world_model = WorldModel()
start_at = datetime.datetime.now() + datetime.timedelta(seconds=init_world_model.current_world_model.init_delay_seconds["arm"])
bdi_behaviour = self.BDIBehaviour(period=init_world_model.current_world_model.real_time_clock_period_seconds["arm"], start_at=start_at)
self.add_behaviour(bdi_behaviour)
if __name__ == "__main__":
arm_agent = BDIAgent("madks@temp3rr0r-pc", "ma121284")
arm_agent.start()