diff --git a/.gitignore b/.gitignore index ca4f2a7..401f37d 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,6 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +docs/readme.md \ No newline at end of file diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..0fc2426 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,13 @@ +version: 2 + +sphinx: + configuration: docs/conf.py + +build: + os: ubuntu-22.04 + tools: + python: "3.10" + +python: + install: + - requirements: docs/requirements.txt \ No newline at end of file diff --git a/README.md b/README.md index 500d9bb..8a82033 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,44 @@ # abb_robot_client -Python package to access ABB robots using Robot Web Services and EGM. \ No newline at end of file +[![Python](https://img.shields.io/badge/python-3.6+-blue.svg)](https://github.com/rpiRobotics/abb_robot_client) +![PyPI](https://img.shields.io/pypi/v/abb-robot-client) + +Python package providing clients for ABB robots using RWS (Robot Web Services) and Externally Guided Motion (EGM). +This package currently supports IRC5 controllers running RobotWare 6.xx. It does not support RobotWare 7+. + +This package is typically used with [abb-motion-program-exec](https://pypi.org/project/abb-motion-program-exec/), +which provides a higher level interface to generate motion programs. `abb-motion-program-exec` includes the ability +to initialize EGM operations, using this package to communicate with EGM. + +`abb_robot_client` includes three modules: `rws`, `rws_aio`, and `egm`. `rws` provides a synhronous client for Robot +Web Services (RWS) using HTTP REST, and the ability to create subscriptions using websockets. `rws_aio` provides +identical functionality to `rws`, but uses asyncio, with each method being `async`. `egm` provides an Externally +Guided Motion (EGM) client. + +Documentation can be found at: https://abb_robot_client.readthedocs.org + +## Installation + +`abb-robot-client` is avaliable on PyPi. Use the `[aio]` option to include support for asyncio: + +``` +pip install abb-robot-client[aio] +``` + +## Examples + +See the `examples/` directory for examples using the modules. + +## License + +Apache 2.0 + +## Acknowledgment + +This work was supported in part by Subaward No. ARM-TEC-21-02-F19 from the Advanced Robotics for Manufacturing ("ARM") Institute under Agreement Number W911NF-17-3-0004 sponsored by the Office of the Secretary of Defense. ARM Project Management was provided by Christopher Adams. The views and conclusions contained in this document are those of the authors and should not be interpreted as representing the official policies, either expressed or implied, of either ARM or the Office of the Secretary of Defense of the U.S. Government. The U.S. Government is authorized to reproduce and distribute reprints for Government purposes, notwithstanding any copyright notation herein. + +This work was supported in part by the New York State Empire State Development Division of Science, Technology and Innovation (NYSTAR) under contract C160142. + +![](docs/figures/arm_logo.jpg) ![](docs/figures/nys_logo.jpg) + + diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d0c3cbf --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/abb_robot_client/api/egm.rst b/docs/abb_robot_client/api/egm.rst new file mode 100644 index 0000000..41b44f8 --- /dev/null +++ b/docs/abb_robot_client/api/egm.rst @@ -0,0 +1,9 @@ +EGM (Externally Guided Motion) +============================== + +abb_robot_client.egm +-------------------- + +.. automodule:: abb_robot_client.egm + :members: + \ No newline at end of file diff --git a/docs/abb_robot_client/api/rws.rst b/docs/abb_robot_client/api/rws.rst new file mode 100644 index 0000000..41779a4 --- /dev/null +++ b/docs/abb_robot_client/api/rws.rst @@ -0,0 +1,9 @@ +RWS (Robot Web Services) +======================== + +abb_robot_client.rws +-------------------- + +.. automodule:: abb_robot_client.rws + :members: + \ No newline at end of file diff --git a/docs/abb_robot_client/api/rws_aio.rst b/docs/abb_robot_client/api/rws_aio.rst new file mode 100644 index 0000000..b29c929 --- /dev/null +++ b/docs/abb_robot_client/api/rws_aio.rst @@ -0,0 +1,11 @@ +RWS (Robot Web Services) AsyncIO +================================ + +abb_robot_client.rws_aio +------------------------ + +The `rws_aio` module implements asyncio methods. To use, the `abb_robot_client[aio]` option must be installed with pip. + +.. automodule:: abb_robot_client.rws_aio + :members: + \ No newline at end of file diff --git a/docs/abb_robot_client/api_reference.rst b/docs/abb_robot_client/api_reference.rst new file mode 100644 index 0000000..a6938f7 --- /dev/null +++ b/docs/abb_robot_client/api_reference.rst @@ -0,0 +1,9 @@ +API Reference +================ + +.. toctree:: + :maxdepth: 2 + + api/rws + api/rws_aio + api/egm \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..1617895 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,68 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +project = 'abb_robot_client' +copyright = '2022, Wason Technology, LLC' +author = 'John Wason' + +import abb_robot_client + +# The short X.Y version. +# version = abb_robot_client.__version__ +# The full version, including alpha/beta/rc tags. +# release = abb_robot_client.__version__ + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = [ + "sphinx.ext.autodoc", + "sphinx_autodoc_typehints", + "recommonmark" +] + +source_suffix = [".rst", ".md"] + +templates_path = ['_templates'] +exclude_patterns = [] + + + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = "sphinx_rtd_theme" +html_static_path = ['_static'] + +# https://www.lieret.net/2021/05/20/include-readme-sphinx/ +import pathlib + +# The readme that already exists +readme_path = pathlib.Path(__file__).parent.resolve().parent / "README.md" +# We copy a modified version here +readme_target = pathlib.Path(__file__).parent / "readme.md" + +with readme_target.open("w") as outf: + # Change the title to "Readme" + outf.write( + "\n".join( + [ + "Readme", + "======", + ] + ) + ) + lines = [] + for line in readme_path.read_text().split("\n"): + if line.startswith("# "): + # Skip title, because we now use "Readme" + # Could also simply exclude first line for the same effect + continue + line = line.replace("docs/figures/", "figures/") + lines.append(line) + outf.write("\n".join(lines)) diff --git a/docs/figures/arm_logo.jpg b/docs/figures/arm_logo.jpg new file mode 100644 index 0000000..efeee11 Binary files /dev/null and b/docs/figures/arm_logo.jpg differ diff --git a/docs/figures/nys_logo.jpg b/docs/figures/nys_logo.jpg new file mode 100644 index 0000000..4a033f5 Binary files /dev/null and b/docs/figures/nys_logo.jpg differ diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..6c5ae65 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,21 @@ +.. abb_robot_client documentation master file, created by + sphinx-quickstart on Tue Dec 27 02:14:13 2022. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to abb_robot_client's documentation! +============================================ + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + readme + abb_robot_client/api_reference + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..76b4a46 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0..\ + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=docs +set BUILDDIR=build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..d6e3782 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,9 @@ +sphinx_autodoc_typehints +recommonmark +setuptools +requests +numpy +protobuf +websocket-client +httpx +websockets diff --git a/src/abb_robot_client/egm.py b/src/abb_robot_client/egm.py index 7e8ce9a..354cd11 100644 --- a/src/abb_robot_client/egm.py +++ b/src/abb_robot_client/egm.py @@ -7,15 +7,31 @@ from typing import Tuple, NamedTuple, Any class EGMRobotState(NamedTuple): + """ + State information returned from robot through EGM + """ joint_angles: np.array + """Joint angles of robot in radians. Length is 6 or 7 depending on robot""" rapid_running: bool + """True if RAPID program is running on controller""" motors_on: bool + """True if motors are on""" robot_message: Any + """Full message returned from robot""" cartesian: Tuple[np.array,np.array] + """Cartesian position of robots, in ([x,y,z],[w,x,y,z]) format""" class EGM(object): + """ + ABB EGM (Externally Guided Motion) client. EGM provides a real-time streaming connection to the robot using + UDP, typically at a rate of 250 Hz. The robot controller initiates the connection. The IP address and port of the + client must be configured on the robot controller side. The EGM client will send commands to the port it receives + packets from. + + :param port: The port to receive UDP packets. Defaults to 6510 + """ def __init__(self, port : int=6510): @@ -26,7 +42,13 @@ def __init__(self, port : int=6510): self.count=0 def receive_from_robot(self, timeout : float=0) -> Tuple[bool,EGMRobotState]: + """ + Receive feedback from the robot. Specify an optional timeout. Returns a tuple with success and the current + robot state. + :param timeout: Timeout in seconds. May be zero to immediately return if there is no new data. + :return: Success and robot state as a tuple + """ s=self.socket s_list=[s] try: @@ -72,6 +94,13 @@ def receive_from_robot(self, timeout : float=0) -> Tuple[bool,EGMRobotState]: return True, EGMRobotState(joint_angles, rapid_running, motors_on, robot_message, cartesian) def send_to_robot(self, joint_angles: np.array) -> bool: + """ + Send a joint command to robot. Returns False if no data has been received from the robot yet. The EGM + operation must have been started with EGMActJoint and EGMRunJoint. + + :param joint_angles: Joint angle command in radians + :return: True if successful, False if no data received from robot yet + """ if not self.egm_addr: return False @@ -100,7 +129,16 @@ def send_to_robot(self, joint_angles: np.array) -> bool: return True - def send_to_robot_cart(self, pos, orient): + def send_to_robot_cart(self, pos: np.ndarray, orient: np.ndarray): + """ + Send a cartesian command to robot. Returns False if no data has been received from the robot yet. The pose + is relative to the tool, workobject, and frame specified when the EGM operation is initialized. The EGM + operation must have been started with EGMActPose and EGMRunPose. + + :param pos: The position of the TCP in millimeters [x,y,z] + :param orient: The orientation of the TCP in quaternions [w,x,y,z] + :return: True if successful, False if no data received from robot yet + """ if not self.egm_addr: return False @@ -137,8 +175,17 @@ def send_to_robot_cart(self, pos, orient): return True - def send_to_robot_path_corr(self, pos, age=1): - + def send_to_robot_path_corr(self, pos: np.ndarray, age: float=1): + """ + Send a path correction command. Returns False if no data has been received from the robot yet. The path + correction is a displacement [x,y,z] in millimeters in **path coordinates**. The displacement uses + "path coordinates", which relate the direction of movement of the end effector. See `CorrConn` command in + *Technical reference manual RAPID Instructions, Functions and Data types* for a detailed description of path + coordinates. The EGM operation must have been started with EGMActMove, and use EGMMoveL and EGMMoveC commands. + + :param pos: The displacement in path coordinates in millimeters [x,y,z] + :return: True if successful, False if no data received from robot yet + """ self.send_sequence_number+=1 sensorMessage=egm_pb2.EgmSensorPathCorr() diff --git a/src/abb_robot_client/rws.py b/src/abb_robot_client/rws.py index 74e4dc8..7eaf0f4 100644 --- a/src/abb_robot_client/rws.py +++ b/src/abb_robot_client/rws.py @@ -8,97 +8,176 @@ import websocket import threading -from typing import Callable, NamedTuple, Any, List, Union +from typing import Callable, NamedTuple, Any, List, Union, Optional from enum import IntEnum class ABBException(Exception): + """ + Exception returned from ABB controller + """ def __init__(self, message, code): super(ABBException, self).__init__(message) self.code=code class RAPIDExecutionState(NamedTuple): + """ + Execution state of RAPID tasks on controller. See :meth:`.RWS.get_execution_state()` + """ ctrlexecstate: Any + """Current controller execution state""" cycle: Any + """Current controller cycle state""" class EventLogEntry(NamedTuple): + """Entry within an event log. See :meth:`.RWS.read_event_log()`""" seqnum: int + """Sequence number of the entry""" msgtype: int - code: int + """Message type of the entry. 1 for info, 2 for warning, 3 for error""" + code: int + """The error code of the entry""" tstamp: datetime.datetime + """Entry timestamp in controller clock""" args: List[Any] + """Arguments specified when entry created""" title: str + """Entry title""" desc: str + """Entry description""" conseqs: str + """Consequences of the event""" causes: str + """Potential causes of the event""" actions: str + """Actions to correct the event""" class EventLogEntryEvent(NamedTuple): + """ + Event returned by subscription when event log entry created + """ seqnum: int + """seqnum of created event log entry""" class TaskState(NamedTuple): + """ + Current state of task running on controller. See :meth:`.RWS.get_tasks()` + """ name: str + """Name of the controller task""" type_: str + """The type of the task""" taskstate: str + """The curret task state""" excstate: str + """The current execution state""" active: bool + """True if the task is currently active""" motiontask: bool + """True if the task is a motion task""" class JointTarget(NamedTuple): + """Joint target in degrees or millimeters""" robax: np.array + """Robot axes positions. Six entry array""" extax: np.array + """Extra axes positions. Six entry array""" class RobTarget(NamedTuple): + """Robtarget""" trans: np.array + """Translation in millimeters. Three entry array""" rot: np.array + """Rotation in quaternion units [w,x,y,z]""" robconf: np.array + """4 entry configuration of robot. See `confdata` datatype in the ABB RAPID manuals for explanation""" extax: np.array - + """Extra axes positions. Six entry array""" class IpcMessage(NamedTuple): + """IPC queue message. Also used for RMQ. See :meth:`.RMQ.try_create_ipc_queue()`""" data: str + """Message data encoded as string""" userdef: str + """User defined message content encoded as string""" msgtype: str + """Type of messag""" cmd: str + """Message command""" queue_name: str + """Queue containing message""" class Signal(NamedTuple): + """Signal state""" name: str + """Name of the signal""" lvalue: str + """Logical value of the signal""" class ControllerState(NamedTuple): + """Controller state. See :meth:`.RWS.get_controller_state()`""" state: str + """The controller state""" class OperationalMode(NamedTuple): + """Operational mode. See :meth:`.RWS.get_operation_mode()`""" mode: str + """The operational mode""" class VariableValue(NamedTuple): + """RAPID variable value""" name: str + """The name of the RAPId variable""" value: str + """The variable value encoded as string""" task: str = None + """The task containing the variable""" class SubscriptionResourceType(IntEnum): + """Enum to select resource to subscribe. See :meth:`.RWS.subscribe()`""" ControllerState = 1 + """Subscribe to controller state resource""" OperationalMode = 2 + """Subsribe to operational mode resource""" ExecutionState = 3 + """Subscribe to executio state resource""" PersVar = 4 + """Subscribe to RAPID pers variable resource""" IpcQueue = 5 + """Subscribe to IPC queue resource""" Elog = 6 + """Subscribe to Event Log resource""" Signal = 7 + """Subscribe to signal resource""" class SubscriptionResourcePriority(IntEnum): + """Priority of subscribed resource. Only Signal and PersVar support high priority. See :meth:`.RWS.subscribe()`""" Low = 0 Medium = 1 High = 2 class SubscriptionResourceRequest(NamedTuple): + """Specify resource to subscribe. See :meth:`.RWS.subscribe()`""" resource_type: SubscriptionResourceType + """Type of resource to subscribe""" priority: SubscriptionResourcePriority + """Subscription priority""" param: Any = None - + """Parameter for subscription request""" class RWS: - def __init__(self, base_url='http://127.0.0.1:80', username=None, password=None): + """ + Robot Web Services synchronous client. This class uses ABB Robot Web Services HTTP REST interface to interact + with robot controller. Subscriptions can be created to provide streaming information. See the ABB + documentation for more information: https://developercenter.robotstudio.com/api/rwsApi/ + + :param base_url: Base URL of the robot. For Robot Studio instances, this should be http://127.0.0.1:80, + the default value. For a real robot, 127.0.0.1 should be replaced with the IP address + of the robot controller. The WAN port ethernet must be used, not the maintenance port. + :param username: The HTTP username for the robot. Defaults to 'Default User' + :param password: The HTTP password for the robot. Defaults to 'robotics' + """ + def __init__(self, base_url: str='http://127.0.0.1:80', username: str=None, password: str=None): self.base_url=base_url if username is None: username = 'Default User' @@ -167,7 +246,13 @@ def _process_response(self, response): raise ABBException(error_message, error_code) - def start(self, cycle: str='asis',tasks: List[str]=['T_ROB1']) -> None: + def start(self, cycle: Optional[str]='asis',tasks: Optional[List[str]]=['T_ROB1']): + """ + Start one or more RAPID tasks + + :param cycle: The cycle mode of the robot. Can be `asis`, `once`, or `forever`. + :param tasks: One or more tasks to start. + """ rob_tasks = self.get_tasks() for t in tasks: @@ -186,26 +271,52 @@ def start(self, cycle: str='asis',tasks: List[str]=['T_ROB1']) -> None: payload={"regain": "continue", "execmode": "continue" , "cycle": cycle, "condition": "none", "stopatbp": "disabled", "alltaskbytsp": "true"} res=self._do_post("rw/rapid/execution?action=start", payload) - def activate_task(self, task: str) -> None: + def activate_task(self, task: str): + """ + Activate a RAPID task + + :param task: The name of the task to activate + """ payload={} self._do_post(f"rw/rapid/tasks/{task}?action=activate",payload) def deactivate_task(self, task: str) -> None: + """ + Deactivate a RAPID task + + :param task: The name of the task to activate + """ payload={} self._do_post(f"rw/rapid/tasks/{task}?action=deactivate",payload) - def stop(self) -> None: + def stop(self): + """ + Stop RAPID execution of normal tasks + """ payload={"stopmode": "stop"} res=self._do_post("rw/rapid/execution?action=stop", payload) - def resetpp(self) -> None: + def resetpp(self): + """ + Reset RAPID program pointer to main in normal tasks + """ res=self._do_post("rw/rapid/execution?action=resetpp") def get_ramdisk_path(self) -> str: + """ + Get the path of the RAMDISK variable on the controller + + :return: The RAMDISK path + """ res_json = self._do_get("ctrl/$RAMDISK") return res_json["_embedded"]["_state"][0]["_value"] def get_execution_state(self) -> RAPIDExecutionState: + """ + Get the RAPID execution state + + :return: The RAPID execution state + """ res_json = self._do_get("rw/rapid/execution") state = res_json["_embedded"]["_state"][0] ctrlexecstate=state["ctrlexecstate"] @@ -213,6 +324,15 @@ def get_execution_state(self) -> RAPIDExecutionState: return RAPIDExecutionState(ctrlexecstate, cycle) def get_controller_state(self) -> str: + """ + Get the controller state. The controller state can have the following values: + + `init`, `motoroff`, `motoron`, `guardstop`, `emergencystop`, `emergencystopreset`, `sysfail` + + RAPID can only be executed and the robot can only be moved in the `motoron` state. + + :return: The controller state + """ res_json = self._do_get("rw/panel/ctrlstate") state = res_json["_embedded"]["_state"][0] return state['ctrlstate'] @@ -222,30 +342,77 @@ def set_controller_state(self, ctrl_state): res=self._do_post("rw/panel/ctrlstate?action=setctrlstate", payload) def get_operation_mode(self) -> str: + """ + Get the controller operational mode. The controller operational mode can have the following values: + + `INIT`, `AUTO_CH`, `MANF_CH`, `MANR`, `MANF`, `AUTO`, `UNDEF` + + Typical values returned by the controller are `AUTO` for auto mode, and `MANR` for manual reduced-speed mode. + + :return: The controller operational mode. + """ res_json = self._do_get("rw/panel/opmode") state = res_json["_embedded"]["_state"][0] return state["opmode"] def get_digital_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> int: + """ + Get the value of a digital IO signal. + + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + :return: The value of the signal. Typically 1 for ON and 0 for OFF + """ res_json = self._do_get("rw/iosystem/signals/" + network + "/" + unit + "/" + signal) state = res_json["_embedded"]["_state"][0]["lvalue"] return int(state) - def set_digital_io(self, signal: str, value: Union[bool,int], network: str='Local', unit: str='DRV_1') -> None: + def set_digital_io(self, signal: str, value: Union[bool,int], network: str='Local', unit: str='DRV_1'): + """ + Set the value of an digital IO signal. + + :param value: The value of the signal. Bool or bool convertible input + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + """ lvalue = '1' if bool(value) else '0' payload={'lvalue': lvalue} res=self._do_post("rw/iosystem/signals/" + network + "/" + unit + "/" + signal + "?action=set", payload) - def get_analog_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> int: + def get_analog_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> float: + """ + Get the value of an analog IO signal. + + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + :return: The value of the signal + """ res_json = self._do_get("rw/iosystem/signals/" + network + "/" + unit + "/" + signal) state = res_json["_embedded"]["_state"][0]["lvalue"] - return int(state) + return float(state) - def set_analog_io(self, signal: str, value: int, network: str='Local', unit: str='DRV_1') -> None: + def set_analog_io(self, signal: str, value: Union[int,float], network: str='Local', unit: str='DRV_1'): + """ + Set the value of an analog IO signal. + + :param value: The value of the signal + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + """ payload={"mode": "value",'lvalue': value} res=self._do_post("rw/iosystem/signals/" + network + "/" + unit + "/" + signal + "?action=set", payload) - def get_rapid_variables(self, task: str="T_ROB1") -> str: + def get_rapid_variables(self, task: str="T_ROB1") -> List[str]: + """ + Get a list of the persistent variables in a task + + :param task: The RAPID task to query + :return: List of persistent variables in task + """ payload={ "view": "block", "vartyp": "any", @@ -263,6 +430,13 @@ def get_rapid_variables(self, task: str="T_ROB1") -> str: return state def get_rapid_variable(self, var: str, task: str = "T_ROB1") -> str: + """ + Get value of a RAPID pers variable + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a string + """ if task is not None: var1 = f"{task}/{var}" else: @@ -272,6 +446,13 @@ def get_rapid_variable(self, var: str, task: str = "T_ROB1") -> str: return state def set_rapid_variable(self, var: str, value: str, task: str = "T_ROB1"): + """ + Set value of a RAPID pers variable + + :param var: The pers variable name + :param value: The new variable value encoded as a string + :param task: The task containing the pers variable + """ payload={'value': value} if task is not None: var1 = f"{task}/var" @@ -280,6 +461,12 @@ def set_rapid_variable(self, var: str, value: str, task: str = "T_ROB1"): res=self._do_post("rw/rapid/symbol/data/RAPID/" + var1 + "?action=set", payload) def read_file(self, filename: str) -> bytes: + """ + Read a file off the controller + + :param filename: The filename to read + :return: The file bytes + """ url="/".join([self.base_url, "fileservice", filename]) res=self._session.get(url, auth=self.auth) assert res.ok, f"File not found {filename}" @@ -288,23 +475,46 @@ def read_file(self, filename: str) -> bytes: finally: res.close() - def upload_file(self, filename: str, contents: bytes) -> None: + def upload_file(self, filename: str, contents: bytes): + """ + Upload a file to the controller + + :param filename: The filename to write + :param contents: The file content bytes + """ url="/".join([self.base_url, "fileservice" , filename]) res=self._session.put(url, contents, auth=self.auth) assert res.ok, res.reason res.close() - def delete_file(self, filename: str) -> None: + def delete_file(self, filename: str): + """ + Delete a file on the controller + + :param filename: The filename to delete + """ url="/".join([self.base_url, "fileservice" , filename]) res=self._session.delete(url, auth=self.auth) res.close() def list_files(self, path: str) -> List[str]: + """ + List files at a path on a controller + + :param path: The path to list + :return: The filenames in the path + """ res_json = self._do_get("fileservice/" + str(path) + "") state = res_json["_embedded"]["_state"] return [f["_title"] for f in state] def read_event_log(self, elog: int=0) -> List[EventLogEntry]: + """ + Read the controller event log + + :param elog: The event log id to read + :return: The event log entries + """ o=[] res_json = self._do_get("rw/elog/" + str(elog) + "/?lang=en") state = res_json["_embedded"]["_state"] @@ -329,6 +539,11 @@ def read_event_log(self, elog: int=0) -> List[EventLogEntry]: return o def get_tasks(self) -> List[TaskState]: + """ + Get controller tasks and task state + + :return: The tasks and task state + """ o = {} res_json = self._do_get("rw/rapid/tasks") state = res_json["_embedded"]["_state"] @@ -351,7 +566,13 @@ def get_tasks(self) -> List[TaskState]: return o - def get_jointtarget(self, mechunit="ROB_1"): + def get_jointtarget(self, mechunit="ROB_1") -> JointTarget: + """ + Get the current jointtarget for specified mechunit + + :param mechunit: The mechanical unit to read + :return: The current jointtarget + """ res_json=self._do_get("rw/motionsystem/mechunits/" + mechunit + "/jointtarget") state = res_json["_embedded"]["_state"][0] assert state["_type"] == "ms-jointtarget" @@ -362,7 +583,17 @@ def get_jointtarget(self, mechunit="ROB_1"): return JointTarget(robjoint,extjoint) - def get_robtarget(self, mechunit='ROB_1', tool='tool0', wobj='wobj0', coordinate='Base'): + def get_robtarget(self, mechunit='ROB_1', tool='tool0', wobj='wobj0', coordinate='Base') -> RobTarget: + """ + Get the current robtarget (cartesian pose) for the specified mechunit + + :param mechunit: The mechanical unit to read + :param tool: The tool to use to compute robtarget + :param wobj: The wobj to use to compute robtarget + :param coordinate: The coordinate system to use to compute robtarget. Can be `Base`, `World`, `Tool`, or `Wobj` + :return: The current robtarget + + """ res_json=self._do_get(f"rw/motionsystem/mechunits/{mechunit}/robtarget?tool={tool}&wobj={wobj}&coordinate={coordinate}") state = res_json["_embedded"]["_state"][0] assert state["_type"] == "ms-robtargets" @@ -387,11 +618,25 @@ def _jointtarget_to_rws_value(self, val): rws_value="[[" + robax + "],[" + extax + "]]" return rws_value - def get_rapid_variable_jointtarget(self, var, task: str = "T_ROB1"): + def get_rapid_variable_jointtarget(self, var, task: str = "T_ROB1") -> JointTarget: + """ + Get a RAPID pers variable and convert to JointTarget + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a JointTarget + """ v = self.get_rapid_variable(var, task) return self._rws_value_to_jointtarget(v) - def set_rapid_variable_jointtarget(self,var,value, task: str = "T_ROB1"): + def set_rapid_variable_jointtarget(self,var: str, value: JointTarget, task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a JointTarget + + :param var: The pers variable name + :param value: The new variable JointTarget value + :param task: The task containing the pers variable + """ rws_value=self._jointtarget_to_rws_value(value) self.set_rapid_variable(var, rws_value, task) @@ -411,31 +656,80 @@ def _rws_value_to_jointtarget_array(self,val): def _jointtarget_array_to_rws_value(self, val): return "[" + ','.join([self._jointtarget_to_rws_value(v) for v in val]) + "]" - def get_rapid_variable_jointtarget_array(self, var, task: str = "T_ROB1"): + def get_rapid_variable_jointtarget_array(self, var: str, task: str = "T_ROB1") -> List[JointTarget]: + """ + Get a RAPID pers variable and convert to JointTarget list + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a list of JointTarget + """ v = self.get_rapid_variable(var, task) return self._rws_value_to_jointtarget_array(v) - def set_rapid_variable_jointtarget_array(self,var,value, task: str = "T_ROB1"): + def set_rapid_variable_jointtarget_array(self,var: str,value: List[JointTarget], task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a JointTarget list + + :param var: The pers variable name + :param value: The new variable JointTarget value + :param task: The task containing the pers variable + """ rws_value=self._jointtarget_array_to_rws_value(value) self.set_rapid_variable(var, rws_value, task) - def get_rapid_variable_num(self, var, task: str = "T_ROB1"): + def get_rapid_variable_num(self, var: str, task: str = "T_ROB1") -> float: + """ + Get a RAPID pers variable and convert to float + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a float + """ return float(self.get_rapid_variable(var,task)) - def set_rapid_variable_num(self, var, val, task: str = "T_ROB1"): + def set_rapid_variable_num(self, var: str, val: float, task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a float + + :param var: The pers variable name + :param value: The new variable float value + :param task: The task containing the pers variable + """ self.set_rapid_variable(var, str(val), task) - def get_rapid_variable_num_array(self, var, task: str = "T_ROB1"): + def get_rapid_variable_num_array(self, var, task: str = "T_ROB1") -> np.ndarray: + """ + Get a RAPID pers variable float array + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The variable value as an array + """ val1=self.get_rapid_variable(var,task) m=re.match("^\\[([^\\]]*)\\]$", val1) val2=m.groups()[0].strip() return np.fromstring(val2,sep=',') - def set_rapid_variable_num_array(self, var, val, task: str = "T_ROB1"): + def set_rapid_variable_num_array(self, var: str, val: List[float], task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a float list or array + + :param var: The pers variable name + :param value: The new variable float array value + :param task: The task containing the pers variable + """ self.set_rapid_variable(var, "[" + ','.join([str(s) for s in val]) + "]", task) - def read_ipc_message(self, queue_name, timeout=0): - + def read_ipc_message(self, queue_name: str, timeout: float=0) -> List[IpcMessage]: + """ + Read IPC message. IPC is used to communicate with RMQ in controller tasks. Create IPC using + try_create_ipc_queue(). + + :param queue_name: The name of the queue created using try_create_ipc_queue() + :param timeout: The timeout to receive a message in seconds + :return: Messages received from IPC queue + """ o=[] timeout_str="" @@ -452,16 +746,41 @@ def read_ipc_message(self, queue_name, timeout=0): #o.append(RAPIDEventLogEntry(msg_type,code,tstamp,args,title,desc,conseqs,causes,actions)) return o - def send_ipc_message(self, target_queue, data, queue_name, cmd=111, userdef=1, msgtype=1 ): + def send_ipc_message(self, target_queue: str, data: str, queue_name: str, cmd: int=111, userdef: int=1, msgtype: int=1 ): + """ + Send an IPC message to the specified queue + + :param target_queue: The target IPC queue. Can also be the name of a task to send to RMQ of controller task. + :param data: The data to send to the controller. Encoding must match the expected type of RMQ + :param queue_name: The queue to send message from. Must be created with try_create_ipc_queue() + :param cmd: The cmd entry in the message + :param userdef: User defined value + :param msgtype: The type of message. Must be 0 or 1 + """ payload={"dipc-src-queue-name": queue_name, "dipc-cmd": str(cmd), "dipc-userdef": str(userdef), \ "dipc-msgtype": str(msgtype), "dipc-data": data} res=self._do_post("rw/dipc/" + target_queue + "?action=dipc-send", payload) - def get_ipc_queue(self, queue_name): + def get_ipc_queue(self, queue_name: str) -> Any: + """ + Get the IPC queue + + :param queue_name: The name of the queue + """ res=self._do_get("rw/dipc/" + queue_name + "?action=dipc-read") return res - def try_create_ipc_queue(self, queue_name, queue_size=4440, max_msg_size=444): + def try_create_ipc_queue(self, queue_name: str, queue_size: int=4440, max_msg_size: int=444) -> bool: + """ + Try creating an IPC queue. Returns True if the queue is created, False if queue already exists. Raises + exception for all other errors. + + :param queue_name: The name of the new IPC queue + :param queue_size: The buffer size of the queue + :param max_msg_size: The maximum message size of the queue + :return: True if queue created, False if queue already exists + + """ try: payload={"dipc-queue-name": queue_name, "dipc-queue-size": str(queue_size), "dipc-max-msg-size": str(max_msg_size)} self._do_post("rw/dipc?action=dipc-create", payload) @@ -471,7 +790,14 @@ def try_create_ipc_queue(self, queue_name, queue_size=4440, max_msg_size=444): return False raise - def request_rmmp(self, timeout=5): + def request_rmmp(self, timeout: float=5): + """ + Request Remote Mastering. Required to alter pers variables in manual control mode. The teach pendant + will prompt to enable remote mastering, and the user must confirm. Once remote mastering is enabled, + poll_rmmp() must be executed periodically to maintain rmmp. + + :param timeout: The request timeout in seconds + """ t1=time.time() self._do_post('users/rmmp?json=1', {'privilege': 'modify'}) while time.time() - t1 < timeout: @@ -489,6 +815,9 @@ def request_rmmp(self, timeout=5): raise Exception("User did not grant remote access") def poll_rmmp(self): + """ + Poll rmmp to maintain remote mastering. Call periodically after rmmp enabled using request_rmmp() + """ # A "persistent session" can only make 400 calls before # being disconnected. Once this connection is lost, @@ -532,8 +861,41 @@ def poll_rmmp(self): return state["status"] == "GRANTED" - def subscribe(self, resources: List[SubscriptionResourceRequest], handler: Callable): + def subscribe(self, resources: List[SubscriptionResourceRequest], handler: Callable[[Any],None]) -> "RWSSubscription": + """ + Create subscription that will receive real-time updates from the controller. handler will be called + with the new values. RWS subscriptions are relatively slow, with delays in the hundreds of milliseconds. + Use EGM for faster real-time updates. + + A list of `SubscriptionResourceRequest` are used to specify what resources to subscribe. The following are + supported: + + * Controller State: `SubscriptionResourceRequest(SubscriptionResourceType.ControllerState, rws.SubscriptionResourcePriority.Medium)` + * Operational Mode: `SubscriptionResourceRequest(SubscriptionResourceType.OperationalMode, rws.SubscriptionResourcePriority.Medium)` + * Execution State: `SubscriptionResourceRequest(SubscriptionResourceType.ExecutionState, rws.SubscriptionResourcePriority.Medium)` + * Variable: `SubscriptionResourceRequest(SubscriptionResourceType.PersVar, rws.SubscriptionResourcePriority.High, "variable_name")` + * IPC Queue: SubscriptionResourceRequest(SubscriptionResourceType.IpcQueue, rws.SubscriptionResourcePriority.Medium, "queue_name")` + * Event Log: SubscriptionResourceRequest(SubscriptionResourceType.Elog, rws.SubscriptionResourcePriority.Medium) + * Signal: SubscriptionResourceRequest(SubscriptionResourceType.Signal, rws.SubscriptionResourcePriority.High, "signal_name") + + The `handler` parameter will be called on each event from the controller. The passed parameter will + have the following type: + + * Controller State: :class:`.ControllerState` + * Operational Mode: :class:`.OperationalMode` + * Execution State: :class:`.RAPIDExecutionState` + * Variable: :class:`.PersVar` + * IPC Queue: :class:`.IpcMessage` + * Event Log: :class:`.EventLogEntry` + * Signal: :class:`.Signal` + + The returned `RWSSubscription` creates a persistent websocket connection to the controller. It must + be closed with the close() method when no longer in use. + :param resources: Controller resources to subscribe + :param handler: Callable to call on resource events + :return: Active subscription + """ payload = {} payload_ind = 0 for r in resources: @@ -606,6 +968,9 @@ class SubscriptionClosed(NamedTuple): msg: str class RWSSubscription: + """ + Subscription returned from :meth:`RWS.subscribe()` + """ def __init__(self, ws_url, header, handler): self.handler = handler @@ -680,4 +1045,7 @@ def _on_open(self, ws): pass def close(self): + """ + Close the subscription + """ self.ws.close() diff --git a/src/abb_robot_client/rws_aio.py b/src/abb_robot_client/rws_aio.py index 9f88e6f..3d4b059 100644 --- a/src/abb_robot_client/rws_aio.py +++ b/src/abb_robot_client/rws_aio.py @@ -16,6 +16,16 @@ SubscriptionResourcePriority, SubscriptionResourceRequest, SubscriptionException, SubscriptionClosed class RWS_AIO: + """ + Robot Web Services asyncio client. This class has the same functionality as :class:`abb_robot_client.rws.RWS`, but + uses asyncio instead of synchronous interface. + + :param base_url: Base URL of the robot. For Robot Studio instances, this should be http://127.0.0.1:80, + the default value. For a real robot, 127.0.0.1 should be replaced with the IP address + of the robot controller. The WAN port ethernet must be used, not the maintenance port. + :param username: The HTTP username for the robot. Defaults to 'Default User' + :param password: The HTTP password for the robot. Defaults to 'robotics' + """ def __init__(self, base_url='http://127.0.0.1:80', username=None, password=None): self.base_url=base_url if username is None: @@ -87,8 +97,13 @@ def _process_response(self, response): raise ABBException(error_message, error_code) - async def start(self, cycle: str='asis',tasks: List[str]=['T_ROB1']) -> None: + async def start(self, cycle: str='asis',tasks: List[str]=['T_ROB1']): + """ + Start one or more RAPID tasks + :param cycle: The cycle mode of the robot. Can be `asis`, `once`, or `forever`. + :param tasks: One or more tasks to start. + """ rob_tasks = await self.get_tasks() for t in tasks: assert t in rob_tasks, f"Cannot start unknown task {t}" @@ -106,26 +121,52 @@ async def start(self, cycle: str='asis',tasks: List[str]=['T_ROB1']) -> None: payload={"regain": "continue", "execmode": "continue" , "cycle": cycle, "condition": "none", "stopatbp": "disabled", "alltaskbytsp": "true"} res=await self._do_post("rw/rapid/execution?action=start", payload) - async def activate_task(self, task: str) -> None: + async def activate_task(self, task: str): + """ + Activate a RAPID task + + :param task: The name of the task to activate + """ payload={} await self._do_post(f"rw/rapid/tasks/{task}?action=activate",payload) - async def deactivate_task(self, task: str) -> None: + async def deactivate_task(self, task: str): + """ + Deactivate a RAPID task + + :param task: The name of the task to activate + """ payload={} await self._do_post(f"rw/rapid/tasks/{task}?action=deactivate",payload) - async def stop(self) -> None: + async def stop(self): + """ + Stop RAPID execution of normal tasks + """ payload={"stopmode": "stop"} res=await self._do_post("rw/rapid/execution?action=stop", payload) - async def resetpp(self) -> None: + async def resetpp(self): + """ + Reset RAPID program pointer to main in normal tasks + """ res=await self._do_post("rw/rapid/execution?action=resetpp") async def get_ramdisk_path(self) -> str: + """ + Get the path of the RAMDISK variable on the controller + + :return: The RAMDISK path + """ res_json = await self._do_get("ctrl/$RAMDISK") return res_json["_embedded"]["_state"][0]["_value"] async def get_execution_state(self) -> RAPIDExecutionState: + """ + Get the RAPID execution state + + :return: The RAPID execution state + """ res_json = await self._do_get("rw/rapid/execution") state = res_json["_embedded"]["_state"][0] ctrlexecstate=state["ctrlexecstate"] @@ -133,6 +174,15 @@ async def get_execution_state(self) -> RAPIDExecutionState: return RAPIDExecutionState(ctrlexecstate, cycle) async def get_controller_state(self) -> str: + """ + Get the controller state. The controller state can have the following values: + + `init`, `motoroff`, `motoron`, `guardstop`, `emergencystop`, `emergencystopreset`, `sysfail` + + RAPID can only be executed and the robot can only be moved in the `motoron` state. + + :return: The controller state + """ res_json = await self._do_get("rw/panel/ctrlstate") state = res_json["_embedded"]["_state"][0] return state['ctrlstate'] @@ -142,30 +192,77 @@ async def set_controller_state(self, ctrl_state): res=await self._do_post("rw/panel/ctrlstate?action=setctrlstate", payload) async def get_operation_mode(self) -> str: + """ + Get the controller operational mode. The controller operational mode can have the following values: + + `INIT`, `AUTO_CH`, `MANF_CH`, `MANR`, `MANF`, `AUTO`, `UNDEF` + + Typical values returned by the controller are `AUTO` for auto mode, and `MANR` for manual reduced-speed mode. + + :return: The controller operational mode. + """ res_json = await self._do_get("rw/panel/opmode") state = res_json["_embedded"]["_state"][0] return state["opmode"] async def get_digital_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> int: + """ + Get the value of a digital IO signal. + + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + :return: The value of the signal. Typically 1 for ON and 0 for OFF + """ res_json = await self._do_get("rw/iosystem/signals/" + network + "/" + unit + "/" + signal) state = res_json["_embedded"]["_state"][0]["lvalue"] return int(state) - async def set_digital_io(self, signal: str, value: Union[bool,int], network: str='Local', unit: str='DRV_1') -> None: + async def set_digital_io(self, signal: str, value: Union[bool,int], network: str='Local', unit: str='DRV_1'): + """ + Set the value of an digital IO signal. + + :param value: The value of the signal. Bool or bool convertible input + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + """ lvalue = '1' if bool(value) else '0' payload={'lvalue': lvalue} res=await self._do_post("rw/iosystem/signals/" + network + "/" + unit + "/" + signal + "?action=set", payload) - async def get_analog_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> int: + async def get_analog_io(self, signal: str, network: str='Local', unit: str='DRV_1') -> float: + """ + Get the value of an analog IO signal. + + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + :return: The value of the signal + """ res_json = await self._do_get("rw/iosystem/signals/" + network + "/" + unit + "/" + signal) state = res_json["_embedded"]["_state"][0]["lvalue"] - return int(state) + return float(state) - async def set_analog_io(self, signal: str, value: int, network: str='Local', unit: str='DRV_1') -> None: + async def set_analog_io(self, signal: str, value: int, network: str='Local', unit: str='DRV_1'): + """ + Set the value of an analog IO signal. + + :param value: The value of the signal + :param signal: The name of the signal + :param network: The network the signal is on. The default `Local` will work for most signals. + :param unit: The drive unit of the signal. The default `DRV_1` will work for most signals. + """ payload={"mode": "value",'lvalue': value} res=await self._do_post("rw/iosystem/signals/" + network + "/" + unit + "/" + signal + "?action=set", payload) async def get_rapid_variables(self, task: str="T_ROB1") -> str: + """ + Get a list of the persistent variables in a task + + :param task: The RAPID task to query + :return: List of persistent variables in task + """ payload={ "view": "block", "vartyp": "any", @@ -183,6 +280,13 @@ async def get_rapid_variables(self, task: str="T_ROB1") -> str: return state async def get_rapid_variable(self, var: str, task: str = "T_ROB1") -> str: + """ + Get value of a RAPID pers variable + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a string + """ if task is not None: var1 = f"{task}/{var}" else: @@ -192,6 +296,13 @@ async def get_rapid_variable(self, var: str, task: str = "T_ROB1") -> str: return state async def set_rapid_variable(self, var: str, value: str, task: str = "T_ROB1"): + """ + Set value of a RAPID pers variable + + :param var: The pers variable name + :param value: The new variable value encoded as a string + :param task: The task containing the pers variable + """ payload={'value': value} if task is not None: var1 = f"{task}/var" @@ -200,6 +311,12 @@ async def set_rapid_variable(self, var: str, value: str, task: str = "T_ROB1"): res=await self._do_post("rw/rapid/symbol/data/RAPID/" + var1 + "?action=set", payload) async def read_file(self, filename: str) -> bytes: + """ + Read a file off the controller + + :param filename: The filename to read + :return: The file bytes + """ url="/".join([self.base_url, "fileservice", filename]) res=await self._session.get(url) assert res.is_success, f"File not found {filename}" @@ -209,22 +326,45 @@ async def read_file(self, filename: str) -> bytes: await res.aclose() async def upload_file(self, filename: str, contents: bytes) -> None: + """ + Upload a file to the controller + + :param filename: The filename to write + :param contents: The file content bytes + """ url="/".join([self.base_url, "fileservice" , filename]) res=await self._session.put(url, content=contents) assert res.is_success, res.reason_phrase await res.aclose() async def delete_file(self, filename: str) -> None: + """ + Delete a file on the controller + + :param filename: The filename to delete + """ url="/".join([self.base_url, "fileservice" , filename]) res=await self._session.delete(url) await res.aclose() async def list_files(self, path: str) -> List[str]: + """ + List files at a path on a controller + + :param path: The path to list + :return: The filenames in the path + """ res_json = await self._do_get("fileservice/" + str(path) + "") state = res_json["_embedded"]["_state"] return [f["_title"] for f in state] async def read_event_log(self, elog: int=0) -> List[EventLogEntry]: + """ + Read the controller event log + + :param elog: The event log id to read + :return: The event log entries + """ o=[] res_json = await self._do_get("rw/elog/" + str(elog) + "/?lang=en") state = res_json["_embedded"]["_state"] @@ -249,6 +389,11 @@ async def read_event_log(self, elog: int=0) -> List[EventLogEntry]: return o async def get_tasks(self) -> List[TaskState]: + """ + Get controller tasks and task state + + :return: The tasks and task state + """ o = {} res_json = await self._do_get("rw/rapid/tasks") state = res_json["_embedded"]["_state"] @@ -271,7 +416,13 @@ async def get_tasks(self) -> List[TaskState]: return o - async def get_jointtarget(self, mechunit="ROB_1"): + async def get_jointtarget(self, mechunit: str="ROB_1"): + """ + Get the current jointtarget for specified mechunit + + :param mechunit: The mechanical unit to read + :return: The current jointtarget + """ res_json=await self._do_get("rw/motionsystem/mechunits/" + mechunit + "/jointtarget") state = res_json["_embedded"]["_state"][0] assert state["_type"] == "ms-jointtarget" @@ -282,7 +433,17 @@ async def get_jointtarget(self, mechunit="ROB_1"): return JointTarget(robjoint,extjoint) - async def get_robtarget(self, mechunit='ROB_1', tool='tool0', wobj='wobj0', coordinate='Base'): + async def get_robtarget(self, mechunit: str='ROB_1', tool: str='tool0', wobj: str='wobj0', coordinate: str='Base'): + """ + Get the current robtarget (cartesian pose) for the specified mechunit + + :param mechunit: The mechanical unit to read + :param tool: The tool to use to compute robtarget + :param wobj: The wobj to use to compute robtarget + :param coordinate: The coordinate system to use to compute robtarget. Can be `Base`, `World`, `Tool`, or `Wobj` + :return: The current robtarget + + """ res_json=await self._do_get(f"rw/motionsystem/mechunits/{mechunit}/robtarget?tool={tool}&wobj={wobj}&coordinate={coordinate}") state = res_json["_embedded"]["_state"][0] assert state["_type"] == "ms-robtargets" @@ -307,11 +468,25 @@ def _jointtarget_to_rws_value(self, val): rws_value="[[" + robax + "],[" + extax + "]]" return rws_value - async def get_rapid_variable_jointtarget(self, var, task: str = "T_ROB1"): + async def get_rapid_variable_jointtarget(self, var: str, task: str = "T_ROB1"): + """ + Get a RAPID pers variable and convert to JointTarget + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a JointTarget + """ v = await self.get_rapid_variable(var, task) return self._rws_value_to_jointtarget(v) - async def set_rapid_variable_jointtarget(self,var,value, task: str = "T_ROB1"): + async def set_rapid_variable_jointtarget(self,var: str, value: JointTarget, task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a JointTarget + + :param var: The pers variable name + :param value: The new variable JointTarget value + :param task: The task containing the pers variable + """ rws_value=self._jointtarget_to_rws_value(value) await self.set_rapid_variable(var, rws_value, task) @@ -332,29 +507,79 @@ def _jointtarget_array_to_rws_value(self, val): return "[" + ','.join([self._jointtarget_to_rws_value(v) for v in val]) + "]" async def get_rapid_variable_jointtarget_array(self, var, task: str = "T_ROB1"): + """ + Get a RAPID pers variable and convert to JointTarget list + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a list of JointTarget + """ v = await self.get_rapid_variable(var, task) return self._rws_value_to_jointtarget_array(v) - async def set_rapid_variable_jointtarget_array(self,var,value, task: str = "T_ROB1"): + async def set_rapid_variable_jointtarget_array(self,var: str, value: List[JointTarget], task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a JointTarget list + + :param var: The pers variable name + :param value: The new variable JointTarget value + :param task: The task containing the pers variable + """ rws_value=self._jointtarget_array_to_rws_value(value) await self.set_rapid_variable(var, rws_value, task) - async def get_rapid_variable_num(self, var, task: str = "T_ROB1"): + async def get_rapid_variable_num(self, var: str, task: str = "T_ROB1"): + """ + Get a RAPID pers variable and convert to float + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The pers variable encoded as a float + """ return float(await self.get_rapid_variable(var,task)) - async def set_rapid_variable_num(self, var, val, task: str = "T_ROB1"): + async def set_rapid_variable_num(self, var: str, val: float, task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a float + + :param var: The pers variable name + :param value: The new variable float value + :param task: The task containing the pers variable + """ await self.set_rapid_variable(var, str(val), task) - async def get_rapid_variable_num_array(self, var, task: str = "T_ROB1"): + async def get_rapid_variable_num_array(self, var, task: str = "T_ROB1") -> np.ndarray: + """ + Get a RAPID pers variable float array + + :param var: The pers variable name + :param task: The task containing the pers variable + :return: The variable value as an array + """ val1=await self.get_rapid_variable(var,task) m=re.match("^\\[([^\\]]*)\\]$", val1) val2=m.groups()[0].strip() return np.fromstring(val2,sep=',') - async def set_rapid_variable_num_array(self, var, val, task: str = "T_ROB1"): + async def set_rapid_variable_num_array(self, var: str, val: List[float], task: str = "T_ROB1"): + """ + Set a RAPID pers variable from a float list or array + + :param var: The pers variable name + :param value: The new variable float array value + :param task: The task containing the pers variable + """ await self.set_rapid_variable(var, "[" + ','.join([str(s) for s in val]) + "]", task) - async def read_ipc_message(self, queue_name, timeout=0): + async def read_ipc_message(self, queue_name: str, timeout: float=0) -> List[IpcMessage]: + """ + Read IPC message. IPC is used to communicate with RMQ in controller tasks. Create IPC using + try_create_ipc_queue(). + + :param queue_name: The name of the queue created using try_create_ipc_queue() + :param timeout: The timeout to receive a message in seconds + :return: Messages received from IPC queue + """ o=[] @@ -372,16 +597,42 @@ async def read_ipc_message(self, queue_name, timeout=0): #o.append(RAPIDEventLogEntry(msg_type,code,tstamp,args,title,desc,conseqs,causes,actions)) return o - async def send_ipc_message(self, target_queue, data, queue_name, cmd=111, userdef=1, msgtype=1 ): + async def send_ipc_message(self, target_queue: str, data: str, queue_name: str, cmd: int=111, userdef: int=1, msgtype: int=1 ): + """ + Send an IPC message to the specified queue + + :param target_queue: The target IPC queue. Can also be the name of a task to send to RMQ of controller task. + :param data: The data to send to the controller. Encoding must match the expected type of RMQ + :param queue_name: The queue to send message from. Must be created with try_create_ipc_queue() + :param cmd: The cmd entry in the message + :param userdef: User defined value + :param msgtype: The type of message. Must be 0 or 1 + """ + payload={"dipc-src-queue-name": queue_name, "dipc-cmd": str(cmd), "dipc-userdef": str(userdef), \ "dipc-msgtype": str(msgtype), "dipc-data": data} res=await self._do_post("rw/dipc/" + target_queue + "?action=dipc-send", payload) async def get_ipc_queue(self, queue_name): + """ + Get the IPC queue + + :param queue_name: The name of the queue + """ res=await self._do_get("rw/dipc/" + queue_name + "?action=dipc-read") return res - async def try_create_ipc_queue(self, queue_name, queue_size=4440, max_msg_size=444): + async def try_create_ipc_queue(self, queue_name: str, queue_size: int=4440, max_msg_size: int=444): + """ + Try creating an IPC queue. Returns True if the queue is created, False if queue already exists. Raises + exception for all other errors. + + :param queue_name: The name of the new IPC queue + :param queue_size: The buffer size of the queue + :param max_msg_size: The maximum message size of the queue + :return: True if queue created, False if queue already exists + + """ try: payload={"dipc-queue-name": queue_name, "dipc-queue-size": str(queue_size), "dipc-max-msg-size": str(max_msg_size)} await self._do_post("rw/dipc?action=dipc-create", payload) @@ -391,7 +642,14 @@ async def try_create_ipc_queue(self, queue_name, queue_size=4440, max_msg_size=4 return False raise - async def request_rmmp(self, timeout=5): + async def request_rmmp(self, timeout: float=5): + """ + Request Remote Mastering. Required to alter pers variables in manual control mode. The teach pendant + will prompt to enable remote mastering, and the user must confirm. Once remote mastering is enabled, + poll_rmmp() must be executed periodically to maintain rmmp. + + :param timeout: The request timeout in seconds + """ t1=time.time() await self._do_post('users/rmmp?json=1', {'privilege': 'modify'}) while time.time() - t1 < timeout: @@ -409,6 +667,9 @@ async def request_rmmp(self, timeout=5): raise Exception("User did not grant remote access") async def poll_rmmp(self): + """ + Poll rmmp to maintain remote mastering. Call periodically after rmmp enabled using request_rmmp() + """ # A "persistent session" can only make 400 calls before # being disconnected. Once this connection is lost, @@ -461,7 +722,19 @@ async def logout(self): pass async def subscribe(self, resources: List[SubscriptionResourceRequest]): - + """ + Create subscription that will receive real-time updates from the controller. handler will be called + with the new values. RWS subscriptions are relatively slow, with delays in the hundreds of milliseconds. + Use EGM for faster real-time updates. + + See :meth:`abb_robot_client.RWS.subscribe()` for more information on `resources` parameter. + + The asyncio version of subscribe() returns an async generator. Use `async for` to receive events. + + :param resources: Controller resources to subscribe + :param handler: Callable to call on resource events + :return: Generator to use with `async for`. + """ self._init_subscription_convert_message() payload = {} @@ -587,4 +860,6 @@ def _convert_subscription_message(self, message): return UnknownSubscriptionMessage(message) class UnknownSubscriptionMessage(NamedTuple): + """Contents of an unknown subscription resource type""" xml: str + """Raw xml returned by controller"""