From 50f551b0d8d8928e9411503a2d8956ef19755cb5 Mon Sep 17 00:00:00 2001 From: "ben.holden" Date: Thu, 14 Sep 2023 14:03:24 +0100 Subject: [PATCH] Comments and linting --- vda5050_connector/test/conftest.py | 6 +- .../test/test_vda5050_controller.py | 80 +++++---- .../vda5050_controller.py | 167 +++++++++--------- 3 files changed, 128 insertions(+), 125 deletions(-) diff --git a/vda5050_connector/test/conftest.py b/vda5050_connector/test/conftest.py index 74a2e96..b6a8ea6 100644 --- a/vda5050_connector/test/conftest.py +++ b/vda5050_connector/test/conftest.py @@ -96,6 +96,7 @@ def publish_feedback(self, feedback): ) self.feedback_pub.publish(feedback_message) + class MockActionServerNavigateThroughNodes: def __init__(self, node): self.logger = node.get_logger() @@ -145,7 +146,8 @@ def publish_feedback(self, feedback): goal_id=self.goal_id, feedback=feedback ) self.feedback_pub.publish(feedback_message) - + + class MockActionServerProcessVDAAction: def __init__(self, node): self.logger = node.get_logger() @@ -213,10 +215,12 @@ def adapter_node(setup_rclpy): def action_server_nav_to_node(adapter_node): return MockActionServerNavigateToNode(adapter_node) + @pytest.fixture() def action_server_nav_through_nodes(adapter_node): return MockActionServerNavigateThroughNodes(adapter_node) + @pytest.fixture def action_server_process_vda_action(adapter_node): return MockActionServerProcessVDAAction(adapter_node) diff --git a/vda5050_connector/test/test_vda5050_controller.py b/vda5050_connector/test/test_vda5050_controller.py index 5982954..a920ecb 100644 --- a/vda5050_connector/test/test_vda5050_controller.py +++ b/vda5050_connector/test/test_vda5050_controller.py @@ -222,6 +222,7 @@ def get_order_update(order_id=str(uuid4()), order_update_id=0): ], ) + def get_order_w_unreleased_new(order_id=str(uuid4()), order_update_id=0): return Order( header_id=0, @@ -342,6 +343,7 @@ def get_order_w_unreleased_new(order_id=str(uuid4()), order_update_id=0): ], ) + def get_stitch_orders(order_id=str(uuid4())): action1 = Action( action_type="foo", @@ -787,8 +789,8 @@ def test_vda5050_controller_node_reject_order( error=OrderRejectErrors.ORDER_UPDATE_ERROR, description="New update id 0 lower than old update id 1", ) - -# Tests the original node list with the nav through nodes functionality + + def test_vda5050_controller_node_new_order_nav_through_nodes( mocker, adapter_node, @@ -798,7 +800,9 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( service_get_state, service_supported_actions, ): - nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True) + """Tests the original node list with the nav through nodes functionality.""" + nav_through_nodes_param = Parameter( + "enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True) node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param]) node.logger.set_level(LoggingSeverity.DEBUG) @@ -806,7 +810,7 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( spy_send_adapter_navigate_through_nodes = mocker.spy( node, "send_adapter_navigate_through_nodes" ) - + spy_process_last_edge_node = mocker.spy( node, "_process_last_edge_node" ) @@ -842,31 +846,31 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( # and that the parameters matches order's released edges # Note: the standard assumes the vehicle is on the first node already, # so the first navigation command is to the second order node. - + spy_send_adapter_navigate_through_nodes.assert_called_once_with( edges=order.edges[:4], nodes=order.nodes[1:5] ) - + feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage() # Check that a feedback of the current node doesn't affect the current state feedback_msg.feedback.last_node = order.nodes[0] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_not_called() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 4 assert len(node._current_state.edge_states) == 4 assert node._current_state.last_node_id == "node1" assert node._current_state.last_node_sequence_id == 0 - + # Next node has been reached and a feedback message is published feedback_msg.feedback.last_node = order.nodes[1] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 3 assert len(node._current_state.edge_states) == 3 assert node._current_state.last_node_id == "node2" @@ -875,22 +879,22 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( # Next node has been reached and a feedback message is published feedback_msg.feedback.last_node = order.nodes[2] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 2 assert len(node._current_state.edge_states) == 2 assert node._current_state.last_node_id == "node3" assert node._current_state.last_node_sequence_id == 4 - + # Next node has been reached and a feedback message is published feedback_msg.feedback.last_node = order.nodes[3] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 1 assert len(node._current_state.edge_states) == 1 assert node._current_state.last_node_id == "node4" @@ -898,21 +902,20 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( # Last node reached as indicated by the result coming through # A feedback message shouldn't be published for the final node in a navigation order - + # Simulate the adapter reached navigation goals future = Future() future.set_result(result=NavigateThroughNodes.Result()) node._navigate_to_node_result_callback(future) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() assert len(node._current_state.node_states) == 0 assert len(node._current_state.edge_states) == 0 assert node._current_state.last_node_id == "node1" assert node._current_state.last_node_sequence_id == 8 - - -# Tests the scenario where there are some unreleased nodes on the horizon and navigate through nodes is required + + def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( mocker, adapter_node, @@ -922,7 +925,9 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( service_get_state, service_supported_actions, ): - nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True) + """Tests with some unreleased nodes on the horizon and navigate through nodes.""" + nav_through_nodes_param = Parameter( + "enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True) node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param]) node.logger.set_level(LoggingSeverity.DEBUG) @@ -930,7 +935,7 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( spy_send_adapter_navigate_through_nodes = mocker.spy( node, "send_adapter_navigate_through_nodes" ) - + spy_process_last_edge_node = mocker.spy( node, "_process_last_edge_node" ) @@ -966,31 +971,31 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( # and that the parameters matches order's released edges # Note: the standard assumes the vehicle is on the first node already, # so the first navigation command is to the second order node. - + spy_send_adapter_navigate_through_nodes.assert_called_once_with( edges=order.edges[:3], nodes=order.nodes[1:4] ) - + feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage() # Check that a feedback of the current node doesn't affect the current state feedback_msg.feedback.last_node = order.nodes[0] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_not_called() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 4 assert len(node._current_state.edge_states) == 4 assert node._current_state.last_node_id == "node1" assert node._current_state.last_node_sequence_id == 0 - + # Next node has been reached and a feedback message is published feedback_msg.feedback.last_node = order.nodes[1] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 3 assert len(node._current_state.edge_states) == 3 assert node._current_state.last_node_id == "node2" @@ -999,10 +1004,10 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( # Next node has been reached and a feedback message is published feedback_msg.feedback.last_node = order.nodes[2] node._navigate_through_nodes_feedback_callback(feedback_msg) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() - + assert len(node._current_state.node_states) == 2 assert len(node._current_state.edge_states) == 2 assert node._current_state.last_node_id == "node3" @@ -1010,19 +1015,20 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( # Last node reached as indicated by the result coming through # A feedback message shouldn't be published for the final node in a navigation order - + # Simulate the adapter reached navigation goals future = Future() future.set_result(result=NavigateThroughNodes.Result()) node._navigate_to_node_result_callback(future) - + spy_process_last_edge_node.assert_called_once() spy_process_last_edge_node.reset_mock() assert len(node._current_state.node_states) == 1 assert len(node._current_state.edge_states) == 1 assert node._current_state.last_node_id == "node4" assert node._current_state.last_node_sequence_id == 6 - - # This is the final node with a released horizon. Therefore a request should be flagged on the next tick + + # This is the final node with a released horizon. + # Therefore a request should be flagged on the next tick node._on_active_order() - assert node._current_state.new_base_request == True + assert node._current_state.new_base_request is True diff --git a/vda5050_connector/vda5050_connector_py/vda5050_controller.py b/vda5050_connector/vda5050_connector_py/vda5050_controller.py index 29fcbe0..2a2f88e 100755 --- a/vda5050_connector/vda5050_connector_py/vda5050_controller.py +++ b/vda5050_connector/vda5050_connector_py/vda5050_controller.py @@ -253,7 +253,7 @@ def _configure_action_clients(self): self.logger.error( "NavigateToNode adapter action server not available, waiting again..." ) - + if self._enable_nav_through_nodes: # Action client for sending NavigateThroughNodes goals to adapter self._navigate_through_nodes_act_cli = ActionClient( @@ -265,7 +265,7 @@ def _configure_action_clients(self): self.logger.error( "NavigateThroughNodes adapter action server not available, waiting again..." ) - + self._navigate_to_node_goal_handle = None # Action client for sending ProcessVDAAction goals to adapter @@ -1436,19 +1436,39 @@ def get_list_type(type): self.send_adapter_process_vda_action(action) def _check_hard_actions(self, action_list): + """ + Check a list of actions for HARD actions. + + Returns + ------- + True if an action in the list is HARD blocking + + """ _hard_action_found = False for action in action_list: - # Hard actions should be the last in the list + # Hard actions should be the last in the list # because they must be the only thing running if action.blocking_type == VDAAction.HARD: _hard_action_found = True break - return _hard_action_found - + return _hard_action_found + def _get_released_edges(self): - # List of edges that are released in a row that don't have HARD actions on. + """ + Process the current order edges and return a list of those that are released. + + The list will end if there are no more released edges + in the current order OR if there are edges with HARD actions + If there are no edges that are traversable due to them not being + released the new_base_request flag will be set. + + Returns + ------- + released_edges: List of the released edges that are traversable in one go. + + """ released_edges = [] - + for edge in self._current_order.edges: if edge.sequence_id >= self._current_state.last_node_sequence_id + 1: if edge.released: @@ -1459,18 +1479,31 @@ def _get_released_edges(self): elif len(released_edges) == 0: # If there's no released edge available then request more and break if not self._current_state.new_base_request: - self.logger.warn("Next edge is part of the horizon. Stopping traversing of nodes.") + self.logger.warn( + "Next edge is part of the horizon. Stopping traversing of nodes." + ) self._update_state({"new_base_request": True}, publish_now=True) break else: # There are no released edges after a non released edge break return released_edges - + def _get_released_nodes(self): + """ + Process the current order nodes and return a list of those that are released. + + The list will end if there are no more released nodes + in the current order OR if there are nodes with HARD actions. + + Returns + ------- + released_nodes: List of the released nodes that are traversable in one go. + + """ # List of nodes that are released in a row that don't have HARD actions on. released_nodes = [] - + for node in self._current_order.nodes: if node.sequence_id >= self._current_state.last_node_sequence_id + 2: if node.released: @@ -1482,25 +1515,15 @@ def _get_released_nodes(self): # There are no released nodes after a non released node break return released_nodes - + def _process_next_navigation(self): """ - Process VDA5050 order's edges and nodes: + Process VDA5050 order's edges and nodes. + The edges and nodes are checked to see if they're released. Multiple released nodes/edges are sent to the navigate through nodes actions. Any node/edge actions that are hard will be the end of a single navigate. """ - # What does this need to do?? - # Create list of edges and nodes that will be traversed in a row. - DONE - # If it's only a list of 1 then I guess pass to the previous fn and use their logic... - DONE - # else - # Send to the navigate through nodes functions with whatever logic works with that... - # Nav through nodes - send action normally - like before - # get start response same as before - # Nav to pose Result callback logic should be separated and can be called from feedback callback. - # HOW DO WE UPDATE ON THE FLY? - # - released_edges = self._get_released_edges() if len(released_edges) == 0: # This only happens when there is no order or it has finished, @@ -1508,66 +1531,27 @@ def _process_next_navigation(self): # In this case, just exit. return else: - next_edge = released_edges[0] - + next_edge = released_edges[0] + released_nodes = self._get_released_nodes() if len(released_nodes) == 0: # Shouldn't happen if we got this far, but just checking for safety return else: - next_node = released_nodes[0] - + next_node = released_nodes[0] + if next_node != self._current_node_goal: if self._enable_nav_through_nodes and len(released_nodes) > 1: - # Do the nav through nodes as long as there's more than one node... - self.send_adapter_navigate_through_nodes(edges=released_edges, nodes=released_nodes) + # Do the nav through nodes as long as there's more than one released node + self.send_adapter_navigate_through_nodes( + edges=released_edges, nodes=released_nodes) else: - # Otherwise we can just + # Otherwise we can just use the single node navigation methods self.logger.info(f"Processing node: {next_node}") self.send_adapter_navigate_to_node(edge=next_edge, node=next_node) else: self.logger.error(f"{next_node} Already current goal") - - # ---- Navigate to node: send goals ---- - # self.logger.info(f"Current Order yo: {self._current_order}") - # self.logger.info(f"Released edges yo: {released_edges}") - # self.logger.info(f"Released Nodes yo: {released_nodes}") - - def _process_next_edge(self): # TODO - GET RID OF THIS METHOD, REPLACE WITH _process_next_navigation - """Process VDA5050 order's edge.""" - # Get next edge to be processed by looking for an edge - # with sequence_id equal to last_node_sequence_id + 1. - try: - next_edge = next( - edge - for edge in self._current_order.edges - if edge.sequence_id == self._current_state.last_node_sequence_id + 1 - ) - except StopIteration: - # This only happens when there is no order or it has finished, - # but there is an active instant action running. - # In this case, just exit. - return - - if not next_edge.released: - if not self._current_state.new_base_request: - self.logger.warn("Next edge is part of the horizon. Stopping traversing of nodes.") - self._update_state({"new_base_request": True}, publish_now=True) - return - - # After a released edge there is always a released node. - # Otherwise, the order gets rejected and this method is never called. - next_node = next( - node - for node in self._current_order.nodes - if node.sequence_id == self._current_state.last_node_sequence_id + 2 - ) - if next_node != self._current_node_goal: - self.logger.info(f"Processing node: {next_node}") - self.send_adapter_navigate_to_node(edge=next_edge, node=next_node) - else: - self.logger.error(f"{next_node} Already current goal") # ---- Navigate to node: send goals ---- def send_adapter_navigate_to_node(self, edge: VDAEdge, node: VDANode): @@ -1636,10 +1620,11 @@ def _navigate_to_node_result_callback(self, future: Future): # When the order is cancelled, this callback should avoid continuing its logic if self._canceling_order(): return - + self._process_last_edge_node() - + def _process_last_edge_node(self): + """Update the states with the last nodes and edges that were reported.""" last_edge = next( edge for edge in self._current_order.edges @@ -1664,7 +1649,7 @@ def _process_last_edge_node(self): } ) self._process_node(node=last_node) - + def send_adapter_navigate_through_nodes(self, edges: list[VDAEdge], nodes: list[VDANode]): """ Send navigation goal to the VDA5050 adapter. @@ -1679,7 +1664,7 @@ def send_adapter_navigate_through_nodes(self, edges: list[VDAEdge], nodes: list[ goal_msg = NavigateThroughNodes.Goal() goal_msg.edges = edges goal_msg.nodes = nodes - + # self.send_adapter_navigate_to_node(edge=edges[0], node=nodes[0]) # Wait for NavigateThroughNodes action server to be ready @@ -1688,35 +1673,43 @@ def send_adapter_navigate_through_nodes(self, edges: list[VDAEdge], nodes: list[ # Send goal to action server self.logger.info("Navigate through nodes goal request sent.") self._current_node_goal = nodes[0] - _send_goal_future = self._navigate_through_nodes_act_cli.send_goal_async(goal_msg, - feedback_callback=self._navigate_through_nodes_feedback_callback) - + _send_goal_future = self._navigate_through_nodes_act_cli.send_goal_async( + goal_msg, feedback_callback=self._navigate_through_nodes_feedback_callback) + # Register callback to be executed when the goal is accepted _send_goal_future.add_done_callback(self._navigate_to_node_goal_response_callback) - + def _navigate_through_nodes_feedback_callback(self, feedback_msg): - if feedback_msg.feedback.last_node == self._current_node_goal: - + """ + Process the feedback message from the navigate_through_nodes actions. + + Checks to see if the last node reported is equal to of greater than the current node goal, + Updates the states if it detects a change. + + Args: + ---- + feedback_msg : ROS2 Action Feedback object. + + """ + if feedback_msg.feedback.last_node.sequence_id >= self._current_node_goal.sequence_id: self._process_last_edge_node() - + released_edges = self._get_released_edges() if len(released_edges) == 0: # This only happens when there is no order or it has finished, # but there is an active instant action running. # In this case, just exit. return - else: - next_edge = released_edges[0] - + released_nodes = self._get_released_nodes() if len(released_nodes) == 0: # Shouldn't happen if we got this far, but just checking for safety return else: - next_node = released_nodes[0] - + next_node = released_nodes[0] + self._current_node_goal = next_node - + def _is_navigation_active(self) -> bool: """ Indicate if the robot was commanded to navigate to a node.