diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..3a9df2b
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,21 @@
+{
+ "editor.tabSize": 2,
+
+ "pylint.args": [
+ "--rcfile=.pylintrc"
+ ],
+
+ "yapf.args": [
+ "--style",
+ "{based_on_style: google, indent_width: 2}"
+ ],
+
+ "[python]": {
+ "editor.formatOnSaveMode": "file",
+ "editor.formatOnSave": true,
+ "editor.defaultFormatter": "eeyore.yapf",
+ "editor.codeActionsOnSave": {
+ "source.organizeImports": true
+ },
+ },
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 5d524b4..3fe3ddf 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -21,7 +21,7 @@
{
"label": "doc-api",
"type": "shell",
- "command": "touch src/dm_robotics/__init__.py && sphinx-apidoc -d 1 -M -T -f -o doc src/dm_robotics && rm src/dm_robotics/__init__.py && rm doc/dm_robotics.rst"
+ "command": "touch src/dm_robotics/__init__.py && sphinx-apidoc -e -d 1 -M -T -f -o doc src/dm_robotics && rm src/dm_robotics/__init__.py && rm doc/dm_robotics.rst"
}
]
}
diff --git a/examples/assets/two_arm.xml b/examples/assets/two_arm.xml
index a78899a..8efd397 100644
--- a/examples/assets/two_arm.xml
+++ b/examples/assets/two_arm.xml
@@ -15,6 +15,7 @@
friction='0.4'/>
+
diff --git a/examples/gripper.py b/examples/gripper.py
index 99ca69c..ff71f4f 100644
--- a/examples/gripper.py
+++ b/examples/gripper.py
@@ -13,7 +13,7 @@
import numpy as np
from dm_env import specs
-from dm_robotics.panda import env_builder
+from dm_robotics.panda import environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -48,12 +48,12 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
parser = utils.default_arg_parser()
args = parser.parse_args()
- # The Panda model and environment support many customization
- # parameters. Here we use only the defaults and robot IP if provided.
+ # Use RobotParams to customize Panda robots added to the environment.
+ # We use robot_ip for hardware-in-the-loop operation.
robot_params = params.RobotParams(robot_ip=args.robot_ip)
- panda_env_builder = env_builder.PandaEnvironmentBuilder(robot_params)
+ panda_env = environment.PandaEnvironment(robot_params)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification.
utils.full_spec(env)
# Initialize the agent.
diff --git a/examples/haptics.py b/examples/haptics.py
index 55a1563..16b8fef 100644
--- a/examples/haptics.py
+++ b/examples/haptics.py
@@ -8,10 +8,10 @@
import dm_env
import numpy as np
-from dm_control import mjcf
+from dm_control import composer, mjcf
from dm_env import specs
-from dm_robotics.panda import arm_constants, env_builder
+from dm_robotics.panda import arm_constants, environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import utils
@@ -41,20 +41,16 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
# Load environment from an MJCF file.
XML_PATH = os.path.join(os.path.dirname(__file__), 'assets', 'haptics.xml')
- arena = mjcf.from_file(XML_PATH)
+ arena = composer.Arena(xml_path=XML_PATH)
# Robot parameters include the robot's IP for HIL,
# haptic actuation mode and joint damping.
robot_params = params.RobotParams(robot_ip=args.robot_ip,
actuation=arm_constants.Actuation.HAPTIC,
joint_damping=np.zeros(7))
- # Environment parameters consist of the MJCF element defined above
- # as arena and a higher control timestep for smooth interaction.
- env_params = params.EnvirontmentParameters(arena=arena, control_timestep=0.01)
- panda_env_builder = env_builder.PandaEnvironmentBuilder(robot_params,
- env_params=env_params)
+ panda_env = environment.PandaEnvironment(robot_params, arena, 0.01)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification.
utils.full_spec(env)
# Initialize the agent.
diff --git a/examples/minimal_working_example.py b/examples/minimal_working_example.py
index 3939c02..992fc4f 100644
--- a/examples/minimal_working_example.py
+++ b/examples/minimal_working_example.py
@@ -3,7 +3,7 @@
import numpy as np
from dm_env import specs
-from dm_robotics.panda import env_builder
+from dm_robotics.panda import environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -30,12 +30,11 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
parser = utils.default_arg_parser()
args = parser.parse_args()
- # The MoMa model and environment support many customization
- # parameters. Here we use only the defaults.
+ # Use RobotParams to customize Panda robots added to the environment.
robot_params = params.RobotParams(robot_ip=args.robot_ip)
- panda_env_builder = env_builder.PandaEnvironmentBuilder(robot_params)
+ panda_env = environment.PandaEnvironment(robot_params)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification
utils.full_spec(env)
# Initialize the agent
diff --git a/examples/motion_cartesian.py b/examples/motion_cartesian.py
index 2a598f3..bcbbd03 100644
--- a/examples/motion_cartesian.py
+++ b/examples/motion_cartesian.py
@@ -7,7 +7,7 @@
import numpy as np
from dm_env import specs
-from dm_robotics.panda import env_builder
+from dm_robotics.panda import environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -46,12 +46,12 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
parser = utils.default_arg_parser()
args = parser.parse_args()
- # The Panda model and environment support many customization
- # parameters. Here we use only the defaults and robot IP if provided.
+ # Use RobotParams to customize Panda robots added to the environment.
+ # We use robot_ip for hardware-in-the-loop operation.
robot_params = params.RobotParams(robot_ip=args.robot_ip)
- panda_env_builder = env_builder.PandaEnvironmentBuilder(robot_params)
+ panda_env = environment.PandaEnvironment(robot_params)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification.
utils.full_spec(env)
# Initialize the agent.
diff --git a/examples/motion_joint.py b/examples/motion_joint.py
index 5b5ed45..53dde5e 100644
--- a/examples/motion_joint.py
+++ b/examples/motion_joint.py
@@ -5,7 +5,7 @@
import numpy as np
from dm_env import specs
-from dm_robotics.panda import arm_constants, env_builder
+from dm_robotics.panda import arm_constants, environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -36,14 +36,14 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
parser = utils.default_arg_parser()
args = parser.parse_args()
- # The Panda model and environment support many customization
- # parameters. Here we use only the robot IP if provided and set
+ # Use RobotParams to customize Panda robots added to the environment.
+ # We use robot_ip for hardware-in-the-loop operation and set
# the actuation mode to joint velocities.
robot_params = params.RobotParams(
robot_ip=args.robot_ip, actuation=arm_constants.Actuation.JOINT_VELOCITY)
- panda_env_builder = env_builder.PandaEnvironmentBuilder(robot_params)
+ panda_env = environment.PandaEnvironment(robot_params)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification.
utils.full_spec(env)
# Initialize the agent.
diff --git a/examples/multiple_robots.py b/examples/multiple_robots.py
index 5152a02..988dac7 100644
--- a/examples/multiple_robots.py
+++ b/examples/multiple_robots.py
@@ -3,7 +3,7 @@
import numpy as np
from dm_env import specs
-from dm_robotics.panda import env_builder
+from dm_robotics.panda import environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -38,10 +38,9 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
pose=[.5, -.5, 0, 0, 0, np.pi * 3 / 4])
robot_3 = params.RobotParams(name='robot_3',
pose=[.5, .5, 0, 0, 0, np.pi * 5 / 4])
- panda_env_builder = env_builder.PandaEnvironmentBuilder(
- [robot_1, robot_2, robot_3])
+ panda_env = environment.PandaEnvironment([robot_1, robot_2, robot_3])
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification
utils.full_spec(env)
# Initialize the agent
diff --git a/examples/rewards_and_observations.py b/examples/rewards_and_observations.py
new file mode 100644
index 0000000..f120b31
--- /dev/null
+++ b/examples/rewards_and_observations.py
@@ -0,0 +1,92 @@
+""" Minimal working example of the dm_robotics Panda model. """
+import typing
+
+import dm_env
+import numpy as np
+from dm_control import composer, mjcf
+from dm_env import specs
+from dm_robotics.agentflow import spec_utils
+from dm_robotics.agentflow.preprocessors import (observation_transforms,
+ rewards, timestep_preprocessor)
+from dm_robotics.geometry import pose_distribution
+from dm_robotics.moma import entity_initializer, prop, robot, sensor
+from dm_robotics.moma.models.arenas import empty
+from dm_robotics.moma.sensors import prop_pose_sensor
+
+from dm_robotics.panda import environment
+from dm_robotics.panda import parameters as params
+from dm_robotics.panda import run_loop, utils
+
+
+class Agent:
+ """ Agents are used to control a robot's actions given
+ current observations and rewards. This agent does nothing.
+ """
+
+ def __init__(self, spec: specs.BoundedArray) -> None:
+ self._spec = spec
+
+ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
+ """ Provides robot actions every control-timestep. """
+ del timestep # not used
+ action = np.zeros(shape=self._spec.shape, dtype=self._spec.dtype)
+ return action
+
+
+if __name__ == '__main__':
+ # We initialize the default configuration for logging
+ # and argument parsing. These steps are optional.
+ utils.init_logging()
+ parser = utils.default_arg_parser()
+ args = parser.parse_args()
+
+ class Ball(prop.Prop):
+
+ def _build(self, *args, **kwargs):
+ mjcf_root = mjcf.RootElement()
+ body = mjcf_root.worldbody.add('body', name='prop_root')
+ body.add('geom', type='sphere', size=[0.05], solref=[0.01, 0.5], mass=10)
+ super()._build('ball', mjcf_root)
+
+ def set_initializer_pose(self, physics: mjcf.Physics, pos: np.ndarray,
+ quat: np.ndarray,
+ random_state: np.random.RandomState):
+ del random_state
+ self.set_pose(physics, pos, quat)
+
+ ball = Ball()
+ ball_pose_dist = pose_distribution.UniformPoseDistribution(
+ min_pose_bounds=[0.2, -0.5, 0.1, 0, 0, 0],
+ max_pose_bounds=[0.5, 0.5, 0.5, 0, 0, 0])
+ ball_initializer = entity_initializer.PoseInitializer(
+ ball.set_initializer_pose, ball_pose_dist.sample_pose)
+
+ def ball_reward(observation: spec_utils.ObservationValue):
+ ball_distance = np.linalg.norm(observation['ball_pose'][:3] -
+ observation['panda_tcp_pos'])
+ return np.clip(1.0 - ball_distance, 0, 1)
+
+ reward = rewards.ComputeReward(
+ ball_reward,
+ validation_frequency=timestep_preprocessor.ValidationFrequency.ALWAYS)
+
+ # Use RobotParams to customize Panda robots added to the environment.
+ robot_params = params.RobotParams(robot_ip=args.robot_ip)
+ panda_env = environment.PandaEnvironment(robot_params)
+
+ panda_env.add_timestep_preprocessors([reward])
+ panda_env.add_props([ball])
+ panda_env.add_entity_initializers([ball_initializer])
+ panda_env.add_extra_sensors([prop_pose_sensor.PropPoseSensor(ball, 'ball')])
+
+ with panda_env.build_task_environment() as env:
+ # Print the full action, observation and reward specification
+ utils.full_spec(env)
+ # Initialize the agent
+ agent = Agent(env.action_spec())
+ # Run the environment and agent either in headless mode or inside the GUI.
+ if args.gui:
+ app = utils.ApplicationWithPlot()
+ app.launch(env, policy=agent.step)
+ else:
+ run_loop.run(env, agent, [], max_steps=1000, real_time=True)
diff --git a/examples/two_arm_robot.py b/examples/two_arm_robot.py
index 8ebfb93..d6019b7 100644
--- a/examples/two_arm_robot.py
+++ b/examples/two_arm_robot.py
@@ -3,10 +3,10 @@
import dm_env
import numpy as np
-from dm_control import mjcf
+from dm_control import composer, mjcf
from dm_env import specs
-from dm_robotics.panda import env_builder
+from dm_robotics.panda import environment
from dm_robotics.panda import parameters as params
from dm_robotics.panda import run_loop, utils
@@ -39,10 +39,10 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
# The environment includes a simple robot frame and
# reference frame for our two-arm robot.
XML_PATH = os.path.join(os.path.dirname(__file__), 'assets', 'two_arm.xml')
- arena = mjcf.from_file(XML_PATH)
- left_frame = arena.find('site', 'left')
- right_frame = arena.find('site', 'right')
- control_frame = arena.find('site', 'control')
+ arena = composer.Arena(xml_path=XML_PATH)
+ left_frame = arena.mjcf_model.find('site', 'left')
+ right_frame = arena.mjcf_model.find('site', 'right')
+ control_frame = arena.mjcf_model.find('site', 'control')
# We use the sites defined in the MJCF to attach the robot arms
# to the body. The robot's control site is used as a reference frame
@@ -53,11 +53,10 @@ def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
right = params.RobotParams(attach_site=right_frame,
name='right',
control_frame=control_frame)
- env_params = params.EnvirontmentParameters(arena=arena)
- panda_env_builder = env_builder.PandaEnvironmentBuilder([left, right],
- env_params)
+ env_params = params.EnvirontmentParameters(mjcf_root=arena)
+ panda_env = environment.PandaEnvironment([left, right], arena)
- with panda_env_builder.build_task_environment() as env:
+ with panda_env.build_task_environment() as env:
# Print the full action, observation and reward specification
utils.full_spec(env)
# Initialize the agent
diff --git a/src/dm_robotics/panda/arm.py b/src/dm_robotics/panda/arm.py
index 7d3f17c..a8deb35 100644
--- a/src/dm_robotics/panda/arm.py
+++ b/src/dm_robotics/panda/arm.py
@@ -56,7 +56,16 @@ class _ActuatorParams:
class Panda(robot_arm.RobotArm):
- """A class representing a Panda robot arm."""
+ """A class representing a Panda robot arm.
+
+ Args:
+ name: The name of this robot. Used as a prefix in the MJCF name
+ attributes.
+ actuation: Instance of :py:class:`dm_robotics.panda.arm_constants.Actuation`
+ specifying which actuation mode to use.
+ use_rotated_gripper: If True, mounts the gripper in a rotated position to
+ match the real placement of the gripper on the physical Panda.
+ """
# Define member variables that are created in the _build function. This is to
# comply with pytype correctly.
@@ -74,16 +83,6 @@ def _build(self,
actuation: consts.Actuation = consts.Actuation.CARTESIAN_VELOCITY,
use_rotated_gripper: bool = True,
hardware=None) -> None:
- """Initializes Panda.
-
- Args:
- name: The name of this robot. Used as a prefix in the MJCF name
- attributes.
- actuation: Instance of `consts.Actuation` specifying which
- actuation mode to use.
- use_rotated_gripper: If True, mounts the gripper in a rotated position to
- match the real placement of the gripper on the physical Panda.
- """
self.hardware = hardware
self._mjcf_root = mjcf.from_path(consts.XML_PATH)
self._mjcf_root.model = name
@@ -97,14 +96,6 @@ def initialize_episode(self, physics: mjcf.Physics,
"""Function called at the beginning of every episode."""
del random_state # Unused.
- # Apply gravity compensation
- # body_elements = self.mjcf_model.find_all('body')
- # gravity = np.hstack([physics.model.opt.gravity, [0, 0, 0]])
- # physics_bodies = physics.bind(body_elements)
- # if physics_bodies is None:
- # raise ValueError('Calling physics.bind with bodies returns None.')
- # physics_bodies.xfrc_applied[:] = -gravity * physics_bodies.mass[..., None]
-
@property
def joints(self) -> List[types.MjcfElement]:
"""List of joint elements belonging to the arm."""
@@ -184,16 +175,6 @@ def set_joint_angles(self, physics: mjcf.Physics,
def after_substep(self, physics: mjcf.Physics,
random_state: np.random.RandomState) -> None:
- """A callback which is executed after a simulation step.
-
- This function is necessary when using the integrated velocity mujoco
- actuator. Mujoco will limit the incoming velocity but the hidden state of
- the integrated velocity actuators must be clipped to the actuation range.
-
- Args:
- physics: An instance of `mjcf.Physics`.
- random_state: An instance of `np.random.RandomState`.
- """
del random_state # Unused.
# Clip the actuator.act with the actuator limits.
@@ -206,7 +187,6 @@ def after_substep(self, physics: mjcf.Physics,
a_max=consts.JOINT_LIMITS['max'])
def _add_mjcf_elements(self, use_rotated_gripper: bool):
- """Defines the arms MJCF joints and sensors."""
self._joints = [
self._mjcf_root.find('joint', j) for j in consts.JOINT_NAMES
]
@@ -232,7 +212,6 @@ def _add_mjcf_elements(self, use_rotated_gripper: bool):
self._attachment_site = self._wrist_site
def _add_actuators(self):
- """Adds the Mujoco actuators to the robot arm."""
if self._actuation not in consts.Actuation:
raise ValueError((f'Actuation {self._actuation} is not a valid actuation.'
'Please specify one of '
@@ -246,14 +225,6 @@ def _add_actuators(self):
self._add_mjcf_actuators(dyntype='none')
def _add_mjcf_actuators(self, dyntype: str = 'integrator') -> None:
- """Adds integrated velocity actuators to the mjcf model.
-
- This function adds integrated velocity actuators and default class
- attributes to the mjcf model according to the values in `sawyer_constants`,
- `_SAWYER_ACTUATOR_PARAMS` and `_INTEGRATED_VELOCITY_DEFAULT_DCLASS`.
- `self._actuators` is created to contain the list of actuators created.
- """
-
# Construct list of ctrlrange tuples from act limits and actuation mode.
ctrl_ranges = list(
zip(consts.ACTUATION_LIMITS[self._actuation]['min'],
@@ -392,9 +363,10 @@ def set_control(self, physics: mjcf.Physics, command: np.ndarray) -> None:
class ArmEffector(arm_effector.ArmEffector):
- """Robot arm effector for the Panda MoMa model that takes `parameters.RobotParams`
+ """Robot arm effector for the Panda MoMa model that takes
+ :py:class:`dm_robotics.panda.parameters.RobotParams`
and changes the joint stiffness and damping of the robot arm. Otherwise behaves
- likes `dm_robotics.moma.effectors.arm_effector.ArmEffector`."""
+ likes :py:class:`dm_robotics.moma.effectors.arm_effector.ArmEffector`."""
def __init__(self, robot_params: params.RobotParams, arm: robot_arm.RobotArm):
"""
@@ -481,6 +453,20 @@ def _project_wrench(self, wrench: np.ndarray,
return torque.astype(np.float32)
+@enum.unique
+class ControlObservations(enum.Enum):
+ """Observations exposed by this sensor in control frame."""
+ POS = '{}_pos_control'
+ QUAT = '{}_quat_control'
+ POSE = '{}_pose_control'
+ RMAT = '{}_rmat_control'
+ VEL = '{}_vel_control'
+
+ def get_obs_key(self, name: str) -> str:
+ """Returns the key to the observation in the observables dict."""
+ return self.value.format(name)
+
+
class RobotTCPSensor(site_sensor.SiteSensor):
"""Version of `dm_robotics.moma.sensors.site_sensor.SiteSensor` that
takes tool center point pose measurements of a gripper and accepts
@@ -497,30 +483,38 @@ def __init__(self, gripper: robot_hand.AnyRobotHand,
as reference frame. Falls back to world frame if `None`."""
super().__init__(gripper.tool_center_point, f'{robot_params.name}_tcp')
self._frame = robot_params.control_frame
+ self._observables.update({
+ self.get_obs_key(ControlObservations.POS):
+ observable.Generic(self._pos_control),
+ self.get_obs_key(ControlObservations.QUAT):
+ observable.Generic(self._quat_control),
+ self.get_obs_key(ControlObservations.RMAT):
+ observable.Generic(self._rmat_control),
+ self.get_obs_key(ControlObservations.POSE):
+ observable.Generic(self._pose_control),
+ self.get_obs_key(ControlObservations.VEL):
+ observable.Generic(self._vel_control),
+ })
+ for obs in self._observables.values():
+ obs.enabled = True
- def _site_pos(self, physics: mjcf.Physics) -> np.ndarray:
- return self._site_pose(physics)[:3]
+ def _pos_control(self, physics: mjcf.Physics) -> np.ndarray:
+ return self._pose_control(physics)[:3]
- def _site_quat(self, physics: mjcf.Physics) -> np.ndarray:
- return self._site_pose(physics)[3:]
+ def _quat_control(self, physics: mjcf.Physics) -> np.ndarray:
+ return self._pose_control(physics)[3:]
- def _site_pose(self, physics: mjcf.Physics) -> np.ndarray:
+ def _pose_control(self, physics: mjcf.Physics) -> np.ndarray:
pos = physics.bind(self._site).xpos
quat = tr.mat_to_quat(np.reshape(physics.bind(self._site).xmat, [3, 3]))
quat = tr.positive_leading_quat(quat)
return geometry.PoseStamped(geometry.Pose(pos, quat)).get_relative_pose(
self._frame, mujoco_physics.wrap(physics)).to_posquat()
- # return geometry.PoseStamped(tr.pos_quat_to_hmat(
- # pos,
- # quat)).get_relative_pose(self._frame,
- # mujoco_physics.wrap(physics)).to_posquat()
- # return geometry.PoseStamped(tr.pos_quat_to_hmat(pos, quat)).to_frame(
- # self._frame, mujoco_physics.wrap(physics)).pose.to_posquat()
- def _site_rmat(self, physics: mjcf.Physics) -> np.ndarray:
- return tr.quat_to_mat(self._site_quat(physics))[:3, :3].reshape((-1,))
+ def _rmat_control(self, physics: mjcf.Physics) -> np.ndarray:
+ return tr.quat_to_mat(self._quat_control(physics))[:3, :3].reshape((-1,))
- def _site_vel_world(self, physics: mjcf.Physics) -> np.ndarray:
+ def _vel_control(self, physics: mjcf.Physics) -> np.ndarray:
return geometry.TwistStamped(super()._site_vel_world(physics),
None).get_relative_twist(
self._frame,
diff --git a/src/dm_robotics/panda/arm_constants.py b/src/dm_robotics/panda/arm_constants.py
index dbd72fe..b609094 100644
--- a/src/dm_robotics/panda/arm_constants.py
+++ b/src/dm_robotics/panda/arm_constants.py
@@ -9,15 +9,14 @@
class Actuation(enum.Enum):
- """Available actuation methods for the Panda MoMa model. Note that
- `POSITION` is supported in simulation only and is used to track
- the real robot joint positions in haptic mode to generate realistic
- constraint forces. The actuation methods use the impedance as
- defined in `panda.parameters.RobotParams` where applicable."""
+ """Available actuation methods for the Panda MoMa model.
+ The actuation methods use the joint stiffness and damping as
+ defined in :py:class:`dm_robotics.panda.parameters.RobotParams` where applicable."""
CARTESIAN_VELOCITY = 0
+ """Cartesian end-effector velocity control."""
JOINT_VELOCITY = 1
"""Joint actuators receive a velocity signal that is integrated and
- tracked by a position controller."""
+ tracked by a position controller."""
HAPTIC = 2
"""Enables haptic interaction between a physical Panda robot and the simulation."""
diff --git a/src/dm_robotics/panda/env_builder.py b/src/dm_robotics/panda/env_builder.py
deleted file mode 100644
index f31a8f7..0000000
--- a/src/dm_robotics/panda/env_builder.py
+++ /dev/null
@@ -1,195 +0,0 @@
-"""RL environment builder for the Panda MoMa model.
- This module allows you to build a full RL environment,
- including fully configured arena, initializers, robot and task.
- As such it can be considered the main module for users to
- interact with. The default configuration is a ready to run
- `dm_control.rl.control.Environment` but all aspects like e.g.
- reward, observation and action spec can be fully customized."""
-import dataclasses
-from typing import Callable, Optional, Sequence, Union
-
-from dm_control import composer, mjcf
-from dm_control.rl import control
-from dm_robotics.agentflow.preprocessors import (observation_transforms,
- timestep_preprocessor)
-from dm_robotics.geometry import joint_angles_distribution, pose_distribution
-from dm_robotics.moma import base_task, entity_initializer, prop
-from dm_robotics.moma import robot as robot_module
-from dm_robotics.moma import (scene_initializer, subtask_env,
- subtask_env_builder)
-from dm_robotics.moma.models.arenas import empty
-
-from dm_robotics import agentflow as af
-
-from . import arm, hardware
-from . import parameters as params
-from . import utils
-
-
-class UserArena(empty.Arena):
-
- def _build(self, root_element: mjcf.RootElement):
- self._mjcf_root = root_element
- self._ground = self._mjcf_root.find('geom', 'ground')
-
-
-class PandaEnvironmentBuilder:
- """Subtask environment builder class for the Panda MoMa model.
-
- Execution order:
-
- * build_arena
- * build_robot
- * build_entity_initializer
- * build_scene_initializer"""
-
- def __init__(
- self,
- robot_params: Union[params.RobotParams,
- Sequence[params.RobotParams]] = params.RobotParams(),
- env_params: params.EnvirontmentParameters = params.EnvirontmentParameters(
- ),
- builder_extensions: params.BuilderExtensions = params.BuilderExtensions(),
- timestep_preprocessors: Optional[Sequence[
- timestep_preprocessor.TimestepPreprocessor]] = None
- ) -> None:
- self._robot_params = robot_params if isinstance(
- robot_params, Sequence) else [robot_params]
- self._env_params = env_params
- self._builder_extensions = builder_extensions
- self._timestep_preprocessors = timestep_preprocessors
-
- def build_task_environment(self) -> subtask_env.SubTaskEnvironment:
- """Builds an rl environment for the Panda robot."""
- task = self._build_task()
-
- env_builder = subtask_env_builder.SubtaskEnvBuilder()
- env_builder.set_task(task)
-
- base_env = env_builder.build_base_env()
- physics = base_env.physics
-
- parent_action_spec = task.effectors_action_spec(physics)
- full_action_space = af.IdentityActionSpace(parent_action_spec)
-
- if self._timestep_preprocessors is not None:
- for preproc in self._timestep_preprocessors:
- env_builder.add_preprocessor(preproc)
-
- env_builder.set_action_space(full_action_space)
- env = env_builder.build()
-
- return env
-
- def _bulid_arena(self) -> composer.Arena:
- arena = empty.Arena()
- if self._env_params.arena is not None:
- arena = UserArena(self._env_params.arena)
- # arena.mjcf_model.include_copy(self._env_params.arena,
- # override_attributes=True)
- arena.mjcf_model.compiler.angle = 'degree'
- arena.mjcf_model.option.timestep = 0.002
- if self._builder_extensions.build_arena is not None:
- self._builder_extensions.build_arena(arena)
- return arena
-
- def _build_scene_initializer(
- self, robots: Sequence[robot_module.Robot],
- arena: composer.Arena) -> base_task.SceneInitializer:
- scene_initializers = []
- if self._builder_extensions.build_scene_initializer is not None:
- scene_initializers.append(
- self._builder_extensions.build_scene_initializer(robots, arena))
- for robot, robot_params in zip(robots, self._robot_params):
- if robot_params.pose is not None:
- arm_pose = pose_distribution.ConstantPoseDistribution(robot_params.pose)
- scene_initializers.append(
- scene_initializer.EntityPoseInitializer(robot.arm_frame,
- arm_pose.sample_pose))
- return scene_initializer.CompositeSceneInitializer(scene_initializers)
-
- def _build_entity_initializer(
- self, robots: Sequence[robot_module.Robot],
- arena: composer.Arena) -> entity_initializer.base_initializer.Initializer:
- entity_initializers = []
- if self._builder_extensions.build_entity_initializer is not None:
- entity_initializers.append(
- self._builder_extensions.build_entity_initializer(robots, arena))
- for robot, robot_params in zip(robots, self._robot_params):
- arm_joint_values = joint_angles_distribution.ConstantPanTiltDistribution(
- robot_params.joint_values)
- arm_init = entity_initializer.JointsInitializer(
- robot.position_arm_joints, arm_joint_values.sample_angles)
- entity_initializers.append(arm_init)
- if robot_params.has_hand:
- gripper_joint_values = joint_angles_distribution.ConstantPanTiltDistribution(
- [.04, .04])
- initialize_gripper = entity_initializer.JointsInitializer(
- robot.gripper.set_joint_angles, gripper_joint_values.sample_angles)
- entity_initializers.append(initialize_gripper)
- return entity_initializer.TaskEntitiesInitializer(entity_initializers)
-
- def _build_robot(self) -> Sequence[robot_module.Robot]:
- robots = []
- for robot_params in self._robot_params:
- if robot_params.robot_ip is not None:
- robot = hardware.build_robot(robot_params)
- else:
- robot = arm.build_robot(robot_params)
- robots.append(robot)
- if self._builder_extensions.build_robots is not None:
- self._builder_extensions.build_robots(robots)
- return robots
-
- def _build_props(self) -> Sequence[prop.Prop]:
- return []
-
- def _build_task(self) -> base_task.BaseTask:
- arena = self._bulid_arena()
- robots = self._build_robot()
-
- for robot, robot_params in zip(robots, self._robot_params):
- arena.attach(robot.arm, robot_params.attach_site)
-
- init_episode = self._build_entity_initializer(robots, arena)
- init_scene = self._build_scene_initializer(robots, arena)
-
- extra_effectors = []
-
- if self._builder_extensions.build_extra_effectors is not None:
- extra_effectors = self._builder_extensions.build_extra_effectors(
- robots, arena)
-
- # block = prop.Block()
- # # frame = arena.attach(block)
- # frame = arena.add_free_entity(block)
- # block.set_freejoint(frame.freejoint)
-
- # gripper_pose_dist = pose_distribution.UniformPoseDistribution(
- # # Provide 6D min and max bounds for the 3D position and 3D euler angles.
- # min_pose_bounds=np.array(
- # [0.5, -0.1, 0.1, 0.75 * np.pi, -0.25 * np.pi, -0.5 * np.pi]),
- # max_pose_bounds=np.array(
- # [0.7, 0.1, 0.2, 1.25 * np.pi, 0.25 * np.pi, 0.5 * np.pi]))
-
- # initialize_arm = entity_initializer.PoseInitializer(
- # moma_robot.position_gripper, gripper_pose_dist.sample_pose)
-
- # block_pose_dist = pose_distribution.UniformPoseDistribution(
- # min_pose_bounds=[0.1, -0.5, 0.02, 0, 0, 0],
- # max_pose_bounds=[0.5, 0.5, 0.02, 0, 0, 0])
- # initialize_block = entity_initializer.PoseInitializer(
- # block.set_pose, block_pose_dist.sample_pose)
- # init_episode = entity_initializer.TaskEntitiesInitializer([init_episode, initialize_block])
-
- task = base_task.BaseTask(
- task_name='panda',
- arena=arena,
- robots=robots,
- props=[],
- extra_sensors=[utils.TimeSensor()],
- extra_effectors=extra_effectors,
- control_timestep=self._env_params.control_timestep,
- scene_initializer=init_scene,
- episode_initializer=init_episode)
- return task
diff --git a/src/dm_robotics/panda/environment.py b/src/dm_robotics/panda/environment.py
new file mode 100644
index 0000000..6d4bd2d
--- /dev/null
+++ b/src/dm_robotics/panda/environment.py
@@ -0,0 +1,140 @@
+"""RL environment builder for the Panda MoMa model.
+ This module allows you to build a full RL environment,
+ including fully configured arena, initializers, robot and task.
+ As such it can be considered the main module for users to
+ interact with. The default configuration is a ready to run
+ `dm_control.rl.control.Environment` but all aspects like e.g.
+ reward, observation and action spec can be fully customized."""
+import dataclasses
+from typing import Callable, Optional, Sequence, Union
+
+from dm_control import composer, mjcf
+from dm_control.rl import control
+from dm_robotics.agentflow.preprocessors import (observation_transforms,
+ timestep_preprocessor)
+from dm_robotics.geometry import joint_angles_distribution, pose_distribution
+from dm_robotics.moma import base_task, entity_initializer, prop
+from dm_robotics.moma import robot as robot_module
+from dm_robotics.moma import (scene_initializer, sensor, subtask_env,
+ subtask_env_builder)
+from dm_robotics.moma.models.arenas import empty
+
+from dm_robotics import agentflow as af
+
+from . import arm, hardware
+from . import parameters as params
+from . import utils
+
+
+class PandaEnvironment:
+
+ def __init__(self,
+ robot_params: Union[
+ params.RobotParams,
+ Sequence[params.RobotParams]] = params.RobotParams(),
+ arena: composer.Arena = empty.Arena(),
+ control_timestep: float = 0.1,
+ physics_timestep: float = 0.002) -> None:
+ self._arena = arena
+ self._robot_params = robot_params if isinstance(
+ robot_params, Sequence) else [robot_params]
+ self._control_timestep = control_timestep
+
+ self._arena.mjcf_model.compiler.angle = 'degree'
+ self._arena.mjcf_model.option.timestep = physics_timestep
+
+ self._robots = []
+ for robot_params in self._robot_params:
+ if robot_params.robot_ip is not None:
+ robot = hardware.build_robot(robot_params)
+ else:
+ robot = arm.build_robot(robot_params)
+ self._robots.append(robot)
+ for robot, robot_params in zip(self._robots, self._robot_params):
+ self._arena.attach(robot.arm, robot_params.attach_site)
+
+ self._scene_initializers = []
+ self._entity_initializers = []
+ self._extra_sensors = []
+ self._extra_effectors = []
+ self._props = []
+ self._timestep_preprocessors = []
+
+ @property
+ def robots(self) -> Sequence[robot_module.Robot]:
+ return self._robots
+
+ def build_task_environment(self) -> subtask_env.SubTaskEnvironment:
+ """Builds an rl environment for the Panda robot."""
+ self._extra_sensors.append(utils.TimeSensor())
+ task = base_task.BaseTask(
+ task_name='panda',
+ arena=self._arena,
+ robots=self._robots,
+ props=self._props,
+ extra_sensors=self._extra_sensors,
+ extra_effectors=self._extra_effectors,
+ control_timestep=self._control_timestep,
+ scene_initializer=self._build_scene_initializer(),
+ episode_initializer=self._build_entity_initializer())
+
+ env_builder = subtask_env_builder.SubtaskEnvBuilder()
+ env_builder.set_task(task)
+
+ base_env = env_builder.build_base_env()
+ physics = base_env.physics
+
+ parent_action_spec = task.effectors_action_spec(physics)
+ full_action_space = af.IdentityActionSpace(parent_action_spec)
+
+ for preproc in self._timestep_preprocessors:
+ env_builder.add_preprocessor(preproc)
+
+ env_builder.set_action_space(full_action_space)
+ env = env_builder.build()
+
+ return env
+
+ def add_props(self, props: Sequence[prop.Prop]):
+ self._props.extend(props)
+ for p in self._props:
+ self._arena.add_free_entity(p)
+
+ def add_entity_initializers(
+ self,
+ initializers: Sequence[entity_initializer.base_initializer.Initializer]):
+ self._entity_initializers.extend(initializers)
+
+ def add_timestep_preprocessors(
+ self,
+ preprocessors: Sequence[timestep_preprocessor.TimestepPreprocessor]):
+ self._timestep_preprocessors.extend(preprocessors)
+
+ def add_extra_sensors(self, extra_sensors: Sequence[sensor.Sensor]):
+ self._extra_sensors.extend(extra_sensors)
+
+ def _build_scene_initializer(self) -> base_task.SceneInitializer:
+ for robot, robot_params in zip(self._robots, self._robot_params):
+ if robot_params.pose is not None:
+ arm_pose = pose_distribution.ConstantPoseDistribution(robot_params.pose)
+ self._scene_initializers.insert(
+ 0,
+ scene_initializer.EntityPoseInitializer(robot.arm_frame,
+ arm_pose.sample_pose))
+ return scene_initializer.CompositeSceneInitializer(self._scene_initializers)
+
+ def _build_entity_initializer(
+ self) -> entity_initializer.base_initializer.Initializer:
+ for robot, robot_params in zip(self._robots, self._robot_params):
+ arm_joint_values = joint_angles_distribution.ConstantPanTiltDistribution(
+ robot_params.joint_values)
+ arm_init = entity_initializer.JointsInitializer(
+ robot.position_arm_joints, arm_joint_values.sample_angles)
+ self._entity_initializers.insert(0, arm_init)
+ if robot_params.has_hand:
+ gripper_joint_values = joint_angles_distribution.ConstantPanTiltDistribution(
+ [.04, .04])
+ initialize_gripper = entity_initializer.JointsInitializer(
+ robot.gripper.set_joint_angles, gripper_joint_values.sample_angles)
+ self._entity_initializers.insert(0, initialize_gripper)
+ return entity_initializer.TaskEntitiesInitializer(self._entity_initializers)
diff --git a/src/dm_robotics/panda/parameters.py b/src/dm_robotics/panda/parameters.py
index 750c18b..72bac91 100644
--- a/src/dm_robotics/panda/parameters.py
+++ b/src/dm_robotics/panda/parameters.py
@@ -3,7 +3,8 @@
from typing import Callable, Optional, Sequence
from dm_control import composer, mjcf
-from dm_robotics.moma import base_task, effector, entity_initializer, robot
+from dm_robotics.moma import (base_task, effector, entity_initializer, robot,
+ sensor)
from dm_robotics.moma.models.arenas import empty
from . import arm_constants
@@ -41,7 +42,7 @@ class RobotParams:
@dataclasses.dataclass
-class BuilderExtensions:
+class Extensions:
"""Defines callables to extend the subtask environment builder."""
build_arena: Optional[Callable[[empty.Arena], None]] = None
build_robots: Optional[Callable[[Sequence[robot.Robot]], None]] = None
@@ -54,6 +55,8 @@ class BuilderExtensions:
build_extra_effectors: Optional[
Callable[[Sequence[robot.Robot], composer.Arena],
Sequence[effector.Effector]]] = None
+ build_extra_sensors: Optional[Callable[
+ [Sequence[robot.Robot], composer.Arena], Sequence[sensor.Sensor]]] = None
@dataclasses.dataclass
@@ -61,8 +64,10 @@ class EnvirontmentParameters:
"""Task-level parameters.
Args:
- arena: MJCF model describing the environment/arena. A copy
- of this will be included in `dm_robotics.moma.models.arenas.empty.Arena`.
+ mjcf_root: MJCF model of the scene. The Panda robots will be added
+ by :py:class:`dm_robotics.panda.env_builder.PandaEnvironmentBuilder`-
+ arena: Reinforcement learning environment.
control_timestep: Timestep size of the controlling agent."""
- arena: Optional[mjcf.RootElement] = None
+ mjcf_root: Optional[mjcf.RootElement] = None
+ arena: Optional[composer.Environment] = None
control_timestep: float = 0.1
diff --git a/src/dm_robotics/panda/utils.py b/src/dm_robotics/panda/utils.py
index 7ca9f5e..6d44afe 100644
--- a/src/dm_robotics/panda/utils.py
+++ b/src/dm_robotics/panda/utils.py
@@ -151,15 +151,12 @@ def _time(self, physics: mjcf.Physics) -> np.ndarray:
return np.array([physics.data.time])
-class PlotComponent(renderer.Component):
- """ A plotting component for `dm_control.viewer.application.Application`. """
+class Plot(renderer.Component):
def __init__(self,
runtime: runtime_module.Runtime,
maxlen: int = 500) -> None:
self._rt = runtime
- self._obs_idx = None
- self._obs_keys = None
self.maxlen = min(maxlen, mujoco.mjMAXLINEPNT)
self.maxlines = 0
self.x = np.linspace(-self.maxlen, 0, self.maxlen)
@@ -169,6 +166,27 @@ def __init__(self,
self.fig.flg_barplot = 0
self.fig.flg_selection = 0
self.fig.range = [[1, 0], [1, 0]]
+ self.fig.linewidth = 1.5
+
+ def reset_data(self):
+ for i in range(self.maxlines):
+ for j in range(self.maxlen):
+ del j
+ self.y[i].append(0)
+
+
+class ObservationPlot(Plot):
+ """
+ Plotting component for :py:class:`dm_control.viewer.application.Application`
+ that allows you to browse through the observations.
+ """
+
+ def __init__(self,
+ runtime: runtime_module.Runtime,
+ maxlen: int = 1000) -> None:
+ super().__init__(runtime, maxlen)
+ self._obs_idx = None
+ self._obs_keys = None
def _init_buffer(self):
for obs in self._rt._time_step.observation.values():
@@ -180,13 +198,16 @@ def _init_buffer(self):
self.reset_data()
self._obs_idx = 0
self._obs_keys = list(self._rt._time_step.observation.keys())
+ self.update_title()
+
+ def update_title(self):
+ self.fig.title = f'{self._obs_keys[self._obs_idx]:100s}'
def render(self, context, viewport):
if self._rt._time_step is None:
return
if self._obs_idx is None:
self._init_buffer()
- pos = mujoco.MjrRect(5, viewport.height - 256 - 5, 256, 256)
obs = np.atleast_1d(
self._rt._time_step.observation[self._obs_keys[self._obs_idx]])
for i in range(self.maxlines):
@@ -197,22 +218,78 @@ def render(self, context, viewport):
]).T.reshape((-1,))
else:
self.fig.linepnt[i] = 0
- self.fig.title = f'{self._obs_keys[self._obs_idx]:100s}'
+ pos = mujoco.MjrRect(5, viewport.height - 200 - 5, 300, 200)
mujoco.mjr_figure(pos, self.fig, context.ptr)
- def reset_data(self):
- for i in range(self.maxlines):
- for j in range(self.maxlen):
- del j
- self.y[i].append(0)
-
def next_obs(self):
self._obs_idx = (self._obs_idx + 1) % len(self._obs_keys)
self.reset_data()
+ self.update_title()
def prev_obs(self):
self._obs_idx = (self._obs_idx - 1) % len(self._obs_keys)
self.reset_data()
+ self.update_title()
+
+
+class ActionPlot(Plot):
+ """
+ A plotting component for :py:class:`dm_control.viewer.application.Application`
+ that plots the agent's actions.
+ """
+
+ def __init__(self,
+ runtime: runtime_module.Runtime,
+ maxlen: int = 1000) -> None:
+ super().__init__(runtime, maxlen)
+ self._init_buffer()
+ self.fig.title = 'Actions'
+
+ def _init_buffer(self):
+ self.maxlines = self._rt._default_action.shape[0]
+ for _1 in range(self.maxlines):
+ self.y.append(deque(maxlen=self.maxlen))
+ self.reset_data()
+
+ def render(self, context, viewport):
+ if self._rt._time_step is None:
+ return
+ for i, a in enumerate(self._rt.last_action):
+ self.fig.linepnt[i] = self.maxlen
+ self.y[i].append(a)
+ self.fig.linedata[i][:self.maxlen * 2] = np.array([self.x,
+ self.y[i]]).T.reshape(
+ (-1,))
+ pos = mujoco.MjrRect(300 + 5, viewport.height - 200 - 5, 300, 200)
+ mujoco.mjr_figure(pos, self.fig, context.ptr)
+
+
+class RewardPlot(Plot):
+ """
+ A plotting component for :py:class:`dm_control.viewer.application.Application`
+ that plots the environment's reward.
+ """
+
+ def __init__(self,
+ runtime: runtime_module.Runtime,
+ maxlen: int = 1000) -> None:
+ super().__init__(runtime, maxlen)
+ self.fig.title = 'Reward'
+ self.maxlines = 1
+ self.y.append(deque(maxlen=self.maxlen))
+ self.reset_data()
+
+ def render(self, context, viewport):
+ if self._rt._time_step is None:
+ return
+ r = self._rt._time_step.reward
+ self.fig.linepnt[0] = self.maxlen
+ self.y[0].append(r)
+ self.fig.linedata[0][:self.maxlen * 2] = np.array([self.x,
+ self.y[0]]).T.reshape(
+ (-1,))
+ pos = mujoco.MjrRect(2 * 300 + 5, viewport.height - 200 - 5, 300, 200)
+ mujoco.mjr_figure(pos, self.fig, context.ptr)
class PlotHelp(views.ColumnTextModel):
@@ -235,7 +312,9 @@ def __init__(self, title='Explorer', width=1024, height=768):
def _perform_deferred_reload(self, params):
super()._perform_deferred_reload(params)
- cmp = PlotComponent(self._runtime)
+ cmp = ObservationPlot(self._runtime)
self._renderer.components += cmp
+ self._renderer.components += ActionPlot(self._runtime)
+ self._renderer.components += RewardPlot(self._runtime)
self._input_map.bind(cmp.next_obs, user_input.KEY_F4)
self._input_map.bind(cmp.prev_obs, user_input.KEY_F3)