-
Notifications
You must be signed in to change notification settings - Fork 0
/
Agent.py
145 lines (117 loc) · 4.16 KB
/
Agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from typing import Dict, List
from File_Handler import load_paths
from Position import Position
from Status import Status
class Agent:
"""
Class representing an agent in a multi-agent system.
Attributes:
- position (Position): current position of the agent
- timestep (int): current timestep of the agent
- _id (int): id corresponding to agent's position in plans
- status (Status): status of the agent
"""
# Class Attribute containing plans of all agents
plans: Dict[int, List[Position]] | None = None
num_agents: int = 0
def __init__(self, filename: str):
"""
Initializes an Agent object.
Args:
- filename (str): name of the file containing the agent's plan
"""
self.position: Position | None = None
self.timestep: int = 0
self._id: int = Agent.num_agents
self.status: Status = Status.WAITING
Agent.num_agents += 1
if Agent.plans is None:
self.load_paths(filename)
def load_paths(self, filename: str) -> None:
"""
Loads the plans of all agents from a file.
Args:
- filename (str): name of the file containing the plans
"""
Agent.plans = load_paths(filename)
def get_next_position(self) -> Position | None:
"""
Returns the next position of the agent.
Returns:
- Position: the next position of the agent
"""
if Agent.plans is None:
print("Error: Plans have not been loaded.")
exit(1)
if self.timestep >= len(Agent.plans[self._id]):
return None
position = Agent.plans[self._id][self.timestep]
self.timestep += 1
return position
def get_plan(self) -> List[Position]:
"""
Returns the plan of the agent.
Returns:
- List[Position]: the plan of the agent
"""
if Agent.plans is None:
print("Error: Plans have not been loaded.")
exit(1)
return Agent.plans[self._id]
def view_position(self, timestep: int) -> Position:
"""
Returns the position of the agent at a given timestep.
Args:
- timestep (int): the timestep to view the position at
Returns:
- Position: the position of the agent at the given timestep
"""
if Agent.plans is None:
print("Error: Plans have not been loaded.")
exit(1)
if timestep >= len(Agent.plans[self._id]): # Plan is finished
return Agent.plans[self._id][-1] # Show at end of plan if finished plan (lifelong model)
return Agent.plans[self._id][timestep]
def get_initial_position(self) -> Position:
"""
Returns the initial position of the agent.
Returns:
- Position: the initial position of the agent
"""
if Agent.plans is None:
print("Error: Plans have not been loaded.")
exit(1)
return Agent.plans[self._id][0]
def __str__(self) -> str:
"""
Returns a string representation of the agent.
Returns:
- str: a string representation of the agent
"""
return f"ID: {self._id}, \
Status: {self.status}, \
Position: {self.position}, \
Timestep: {self.timestep}"
class OnlineAgent(Agent):
"""
Class representing an agent in a multi-agent system, where the plan is extended during runtime
This means the plans are initialised as an empty list
Attributes:
- position (Position): current position of the agent
- timestep (int): current timestep of the agent
- _id (int): id corresponding to agent's position in plans
- status (Status): status of the agent
"""
def __init__(self) -> None:
"""
Initializes an Agent object.
Args:
- filename (str): name of the file containing the agent's plan
"""
self.position: Position | None = None
self.timestep: int = 0
self._id: int = Agent.num_agents
self.status: Status = Status.WAITING
Agent.num_agents += 1
if Agent.plans is None:
Agent.plans = dict()