Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-949] Add mission client to wrapper #113

Merged
merged 9 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions spot_wrapper/spot_mission_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import logging
from typing import Optional

from bosdyn.api.mission import nodes_pb2
from bosdyn.client import RpcError, robot_command
from bosdyn.client.lease import LeaseClient, LeaseWallet
from bosdyn.client.robot import Robot
from bosdyn.mission.client import (
CompilationError,
MissionClient,
NoMissionError,
NoMissionPlayingError,
ValidationError,
)

from spot_wrapper.wrapper_helpers import RobotState


class SpotMission:
"""
Allow access to mission functionality through the SDK
"""

def __init__(
self,
robot: Robot,
logger: logging.Logger,
robot_state: RobotState,
mission_client: MissionClient,
robot_command_client: robot_command.RobotCommandClient,
lease_client: LeaseClient,
) -> None:
self._robot = robot
self._logger = logger
self._mission_client: MissionClient = mission_client
self._robot_command_client = robot_command_client
self._lease_client = lease_client
self._robot_state = robot_state
self._spot_check_resp = None
self._lease = None
self._lease_wallet: LeaseWallet = self._lease_client.lease_wallet

def load_mission(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: Optional[int] = None):
"""Load a mission
Args:
root: Root node in a mission.
leases: All leases necessary to initialize a mission.
data_chunk_byte_size: Optional max size of each streamed message
Raises:
RpcError: Problem communicating with the robot.
bosdyn.mission.client.CompilationError: The mission failed to compile.
bosdyn.mission.client.ValidationError: The mission failed to validate.
"""
if leases is None:
leases = []
if data_chunk_byte_size:
return self._load_mission_as_chunks(root, leases, data_chunk_byte_size)
try:
return self._mission_client.load_mission_async(root, leases)
except RpcError:
return False, "Could not communicate with the robot"
except CompilationError as e:
return False, f"The mission failed to compile: {e}"
except ValidationError as e:
return False, f"The mission could not be validated: {e}"

def _load_mission_as_chunks(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: int = 1000 * 1000):
"""Load a mission onto the robot.
Args:
root: Root node in a mission.
leases: All leases necessary to initialize a mission.
data_chunk_byte_size: max size of each streamed message
Raises:
RpcError: Problem communicating with the robot.
bosdyn.mission.client.CompilationError: The mission failed to compile.
bosdyn.mission.client.ValidationError: The mission failed to validate.
"""
if leases is None:
leases = []
try:
return self._mission_client.load_mission_as_chunks2(root, leases, data_chunk_byte_size)
except RpcError:
return False, "Could not communicate with the robot"
except CompilationError as e:
return False, f"The mission failed to compile: {e}"
except ValidationError as e:
return False, f"The mission could not be validated: {e}"

def get_mission_info(self):
"""Get static information about the loaded mission.

Raises:
RpcError: Problem communicating with the robot.
"""
try:
return self._mission_client.get_info()
except RpcError:
return False, "Could not communicate with the robot"

def play_mission(
self,
pause_time_secs: int,
leases=None,
settings=None,
):
"""Play loaded mission or continue a paused mission
Args:
pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs
will override this value, so you can use this to say "if you don't hear from me again,
stop running the mission at this time."
leases: Leases the mission service will need to use. Unlike other clients, these MUST
be specified.
Raises:
RpcError: Problem communicating with the robot.
NoMissionError: No mission Loaded.
"""
if leases is None:
leases = []
try:
return self._mission_client.play_mission_async(pause_time_secs, leases, settings)
except RpcError:
return False, "Could not communicate with the robot"
except NoMissionError:
return False, "No mission loaded"

def get_mission_state(
self,
upper_tick_bound: Optional[int] = None,
lower_tick_bound: Optional[int] = None,
past_ticks: Optional[int] = None,
):
"""Get the state of the current playing mission
Raises:
RpcError: Problem communicating with the robot.
NoMissionPlayingError: No mission playing.
"""
try:
return self._mission_client.get_state_async(upper_tick_bound, lower_tick_bound, past_ticks)
except RpcError:
return False, "Could not communicate with the robot"
except NoMissionPlayingError:
return False, "No mission playing"

def pause_mission(self):
"""Pause the current mission
Raises:
RpcError: Problem communicating with the robot.
NoMissionPlayingError: No mission playing.
"""
try:
return self._mission_client.pause_mission_async()
except RpcError:
return False, "Could not communicate with the robot"
except NoMissionPlayingError:
return False, "No mission playing"

def restart_mission(self, pause_time_secs, leases=None, settings=None):
"""Restart mission from the beginning
Args:
pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs
to RestartMission will override this value, so you can use this to say "if you don't hear
from me again, stop running the mission at this time."
leases: Leases the mission service will need to use. Unlike other clients, these MUST
be specified.
Raises:
RpcError: Problem communicating with the robot.
NoMissionError: No Mission Loaded.
bosdyn.mission.client.ValidationError: The mission failed to validate.
"""
if leases is None:
leases = []
try:
return self._mission_client.restart_mission_async(pause_time_secs, leases, settings)
except RpcError:
return False, "Could not communicate with the robot"
except NoMissionError:
return False, "No mission loaded"
except ValidationError as e:
return False, f"The mission could not be validated: {e}"

def stop_mission(self):
"""Stop the current mission
Raises:
RpcError: Problem communicating with the robot.
NoMissionPlayingError: No mission playing.
"""
try:
return self._mission_client.stop_mission_async()
except RpcError:
return False, "Could not communicate with the robot"
except NoMissionPlayingError:
return False, "No mission playing"
21 changes: 20 additions & 1 deletion spot_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from bosdyn.client.time_sync import TimeSyncEndpoint
from bosdyn.client.world_object import WorldObjectClient
from bosdyn.geometry import EulerZXY
from bosdyn.mission.client import MissionClient
from google.protobuf.timestamp_pb2 import Timestamp

from .spot_arm import SpotArm
Expand All @@ -64,6 +65,7 @@
from .spot_eap import SpotEAP
from .spot_graph_nav import SpotGraphNav
from .spot_images import SpotImages
from .spot_mission_wrapper import SpotMission
from .spot_world_objects import SpotWorldObjects
from .wrapper_helpers import ClaimAndPowerDecorator, RobotCommandData, RobotState

Expand Down Expand Up @@ -392,7 +394,9 @@ def __init__(
self._command_data = RobotCommandData()

try:
self._sdk = create_standard_sdk(SPOT_CLIENT_NAME, cert_resource_glob=cert_resource_glob)
self._sdk = create_standard_sdk(
SPOT_CLIENT_NAME, service_clients=[MissionClient], cert_resource_glob=cert_resource_glob
)
except Exception as e:
self._logger.error("Error creating SDK object: %s", e)
self._valid = False
Expand Down Expand Up @@ -437,6 +441,7 @@ def __init__(
self._estop_client = self._robot.ensure_client(EstopClient.default_service_name)
self._docking_client = self._robot.ensure_client(DockingClient.default_service_name)
self._spot_check_client = self._robot.ensure_client(SpotCheckClient.default_service_name)
self._mission_client = self._robot.ensure_client(MissionClient.default_service_name)
self._license_client = self._robot.ensure_client(LicenseClient.default_service_name)
if self._robot.has_arm():
self._gripper_cam_param_client = self._robot.ensure_client(
Expand Down Expand Up @@ -521,6 +526,15 @@ def __init__(
self._lease_client,
)

self._spot_mission = SpotMission(
self._robot,
self._logger,
self._state,
self._mission_client,
self._robot_command_client,
self._lease_client,
)

if self._robot.has_arm():
self._spot_arm = SpotArm(
self._robot,
Expand Down Expand Up @@ -726,6 +740,11 @@ def spot_check(self) -> SpotCheck:
"""Return SpotCheck instance"""
return self._spot_check

@property
def spot_mission(self) -> SpotMission:
"""Return SpotMission instance"""
return self._spot_mission

@property
def spot_eap_lidar(self) -> typing.Optional[SpotEAP]:
"""Return SpotEAP instance"""
Expand Down