-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExecution_Policy.py
95 lines (75 loc) · 2.69 KB
/
Execution_Policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import abc
from typing import Dict, List, Tuple
from Position import Position
from Status import Status
class ExecutionPolicy(abc.ABC):
"""
Abstract base class for execution policies.
"""
@abc.abstractmethod
def get_next_position(self, agent_id: int) -> Tuple[List[Position], Tuple[int,int]]:
"""
Abstract method to get the next position.
Args:
agent_id (int): The index of the current position.
Returns:
Tuple[List[Position], Tuple[int,int]]: The next positions and the start and end timesteps
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, data: Dict) -> None:
"""
Abstract method to update the execution policy.
Args:
data (Dict): The data to update the policy.
"""
raise NotImplementedError
class OnlineExecutionPolicy(abc.ABC):
"""
Abstract base class for online execution policies that can extend plans.
"""
@abc.abstractmethod
def get_next_position(self, agent_id: int) -> Tuple[List[Position], Tuple[int, int]]:
"""
Abstract method to get the next position.
Args:
agent_id (int): The index of the current position.
Returns:
Tuple[List[Position], Tuple[int,int]]: The next positions and the start and end timesteps
"""
raise NotImplementedError
@abc.abstractmethod
def update(self, data: Dict) -> None:
"""
Abstract method to update the execution policy.
Args:
data (Dict): The data to update the policy.
"""
raise NotImplementedError
@abc.abstractmethod
def get_agent_locations(self) -> Tuple[List[Tuple[Position, int]], bool]:
"""
Abstract method to get the final committed positions of agents to extend plans on
Returns:
List[Tuple[Position, int]]: A list containing pairs of Position and the id of the agent there
"""
raise NotImplementedError
@abc.abstractmethod
def extend_plans(
self, extensions: List[Tuple[int, List[Position]]]
) -> None:
"""
Abstract method to extend plans of agents without changing existing plans
Args:
extensions:
List[Tuple[int, List[Tuple[Position, int]]]]:
A list containing pairs of agent_id and plan extensions,
where plan extensions are tuples of Position and timestep to reach it
"""
raise NotImplementedError
@abc.abstractmethod
def get_status(self) -> List[Tuple[int, Status]]:
"""
Abstract method to
"""
raise NotImplementedError