-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add safety limits on relative action target #373
Changes from 9 commits
103ac92
1868169
4b5da7a
d8c49e6
eebc218
aeb176b
fe04d4d
52fb242
94386b7
c8942ab
f83e6cc
bee40cd
e726a29
2f59632
3560f97
104e195
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import logging | ||
import pickle | ||
import time | ||
from dataclasses import dataclass, field, replace | ||
|
@@ -164,6 +165,7 @@ class KochRobotConfig: | |
leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {}) | ||
follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {}) | ||
cameras: dict[str, Camera] = field(default_factory=lambda: {}) | ||
max_relative_target: list[float] | None = None | ||
|
||
|
||
class KochRobot: | ||
|
@@ -206,7 +208,15 @@ class KochRobot: | |
}, | ||
), | ||
} | ||
robot = KochRobot(leader_arms, follower_arms) | ||
robot = KochRobot( | ||
leader_arms=leader_arms, | ||
follower_arms=follower_arms, | ||
max_relative_target=[10, 10, 10, 10, 10, 15], | ||
) | ||
|
||
Notice the parameter `max_relative_target`. This is a safety measure that prevents someone from providing | ||
a positional target that is too far from the current robot position (which would then cause the robot to | ||
move too quickly, potentially burning out the motors or making violent impact with another object). | ||
|
||
# Connect motors buses and cameras if any (Required) | ||
robot.connect() | ||
|
@@ -218,7 +228,11 @@ class KochRobot: | |
Example of highest frequency data collection without camera: | ||
```python | ||
# Assumes leader and follower arms have been instantiated already (see first example) | ||
robot = KochRobot(leader_arms, follower_arms) | ||
robot = KochRobot( | ||
leader_arms=leader_arms, | ||
follower_arms=follower_arms, | ||
max_relative_target=[10, 10, 10, 10, 10, 15], | ||
) | ||
robot.connect() | ||
while True: | ||
observation, action = robot.teleop_step(record_data=True) | ||
|
@@ -236,7 +250,12 @@ class KochRobot: | |
} | ||
|
||
# Assumes leader and follower arms have been instantiated already (see first example) | ||
robot = KochRobot(leader_arms, follower_arms, cameras) | ||
robot = KochRobot( | ||
leader_arms=leader_arms, | ||
follower_arms=follower_arms, | ||
cameras=cameras, | ||
max_relative_target=[10, 10, 10, 10, 10, 15], | ||
) | ||
robot.connect() | ||
while True: | ||
observation, action = robot.teleop_step(record_data=True) | ||
|
@@ -245,7 +264,12 @@ class KochRobot: | |
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency): | ||
```python | ||
# Assumes leader and follower arms + cameras have been instantiated already (see previous example) | ||
robot = KochRobot(leader_arms, follower_arms, cameras) | ||
robot = KochRobot( | ||
leader_arms=leader_arms, | ||
follower_arms=follower_arms, | ||
cameras=cameras, | ||
max_relative_target=[10, 10, 10, 10, 10, 15], | ||
) | ||
robot.connect() | ||
while True: | ||
# Uses the follower arms and cameras to capture an observation | ||
|
@@ -392,7 +416,7 @@ def teleop_step( | |
# Send action | ||
for name in self.follower_arms: | ||
before_fwrite_t = time.perf_counter() | ||
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name]) | ||
self.send_action(torch.tensor(follower_goal_pos[name]), [name]) | ||
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t | ||
|
||
# Early exit when recording data is not requested | ||
|
@@ -474,21 +498,53 @@ def capture_observation(self): | |
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name]) | ||
return obs_dict | ||
|
||
def send_action(self, action: torch.Tensor): | ||
"""The provided action is expected to be a vector.""" | ||
def send_action(self, action: torch.Tensor, follower_names: list[str] | None = None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
"""Command the follower arms to move to a target joint configuration. | ||
|
||
The relative action magnitude may be clipped depending on the configuration parameter | ||
`max_relative_target`. | ||
|
||
Args: | ||
action: tensor containing the concatenated joint positions for the follower arms. | ||
follower_names: Pass follower arm names to only control a subset of all the follower arms. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
""" | ||
if not self.is_connected: | ||
raise RobotDeviceNotConnectedError( | ||
"KochRobot is not connected. You need to run `robot.connect()`." | ||
) | ||
|
||
if follower_names is None: | ||
follower_names = list(self.follower_arms) | ||
elif not set(follower_names).issubset(self.follower_arms): | ||
raise ValueError( | ||
f"You provided {follower_names=} but only the following arms are registered: " | ||
f"{list(self.follower_arms)}" | ||
) | ||
|
||
from_idx = 0 | ||
to_idx = 0 | ||
follower_goal_pos = {} | ||
for name in self.follower_arms: | ||
if name in self.follower_arms: | ||
to_idx += len(self.follower_arms[name].motor_names) | ||
follower_goal_pos[name] = action[from_idx:to_idx].numpy() | ||
from_idx = to_idx | ||
for name in follower_names: | ||
to_idx += len(self.follower_arms[name].motor_names) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have resolved this by adding a post_init validation to the config class. |
||
this_action = action[from_idx:to_idx] | ||
|
||
if self.config.max_relative_target is not None: | ||
max_relative_target = torch.tensor(self.config.max_relative_target) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
# Cap relative action target magnitude for safety. | ||
current_pos = torch.tensor(self.follower_arms[name].read("Present_Position")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
diff = this_action - current_pos | ||
safe_diff = torch.minimum(diff, max_relative_target) | ||
safe_diff = torch.maximum(safe_diff, -max_relative_target) | ||
safe_action = current_pos + safe_diff | ||
if not torch.allclose(safe_action, action): | ||
logging.warning( | ||
"Relative action magnitude had to be clamped to be safe.\n" | ||
f" requested relative action target: {diff}\n" | ||
f" clamped relative action target: {safe_diff}" | ||
) | ||
|
||
follower_goal_pos[name] = safe_action.numpy() | ||
from_idx = to_idx | ||
|
||
for name in self.follower_arms: | ||
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we choose a sensible default or default to None? @aliberts @Cadene