diff --git a/vda5050_connector/test/test_vda5050_controller.py b/vda5050_connector/test/test_vda5050_controller.py index 877d033..2a96a4d 100644 --- a/vda5050_connector/test/test_vda5050_controller.py +++ b/vda5050_connector/test/test_vda5050_controller.py @@ -813,6 +813,130 @@ def test_vda5050_controller_node_new_order_nav_through_nodes( # add a spy to validate accept order is called correctly spy_accept_order = mocker.spy(node, "_accept_order") + # generate an order and let the node process it + order_id = str(uuid4()) + order = get_order_new(order_id) + node.process_order(order) + + rclpy.spin_once(node) + + spy_accept_order.assert_called_once_with(order=order, mode=OrderAcceptModes.NEW) + + rclpy.spin_once(adapter_node) + + # check node states were properly updated + assert node._current_order == order + assert node._current_state.order_id == order_id + assert node._current_state.order_update_id == 0 + + # The order has 5 nodes and 4 edges but the first edge and node + # are processed as soon as the order is accepted. + assert len(node._current_state.node_states) == 4 + assert len(node._current_state.edge_states) == 4 + + assert node._current_state.last_node_id == "node1" + assert node._current_state.last_node_sequence_id == 0 + + # Assert the navigation through nodes goal was sent to the adapter, + # and that the parameters matches order's released edges + # Note: the standard assumes the vehicle is on the first node already, + # so the first navigation command is to the second order node. + + spy_send_adapter_navigate_through_nodes.assert_called_once_with( + edges=order.edges[:4], nodes=order.nodes[1:5] + ) + + feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage() + # Check that a feedback of the current node doesn't affect the current state + feedback_msg.feedback.last_node = order.nodes[0] + node._navigate_through_nodes_feedback_callback(feedback_msg) + + spy_process_last_edge_node.assert_not_called() + spy_process_last_edge_node.reset_mock() + + assert len(node._current_state.node_states) == 4 + assert len(node._current_state.edge_states) == 4 + assert node._current_state.last_node_id == "node1" + assert node._current_state.last_node_sequence_id == 0 + + # Next node has been reached and a feedback message is published + feedback_msg.feedback.last_node = order.nodes[1] + node._navigate_through_nodes_feedback_callback(feedback_msg) + + spy_process_last_edge_node.assert_called_once() + spy_process_last_edge_node.reset_mock() + + assert len(node._current_state.node_states) == 3 + assert len(node._current_state.edge_states) == 3 + assert node._current_state.last_node_id == "node2" + assert node._current_state.last_node_sequence_id == 2 + + # Next node has been reached and a feedback message is published + feedback_msg.feedback.last_node = order.nodes[2] + node._navigate_through_nodes_feedback_callback(feedback_msg) + + spy_process_last_edge_node.assert_called_once() + spy_process_last_edge_node.reset_mock() + + assert len(node._current_state.node_states) == 2 + assert len(node._current_state.edge_states) == 2 + assert node._current_state.last_node_id == "node3" + assert node._current_state.last_node_sequence_id == 4 + + # Next node has been reached and a feedback message is published + feedback_msg.feedback.last_node = order.nodes[3] + node._navigate_through_nodes_feedback_callback(feedback_msg) + + spy_process_last_edge_node.assert_called_once() + spy_process_last_edge_node.reset_mock() + + assert len(node._current_state.node_states) == 1 + assert len(node._current_state.edge_states) == 1 + assert node._current_state.last_node_id == "node4" + assert node._current_state.last_node_sequence_id == 6 + + # Last node reached as indicated by the result coming through + # A feedback message shouldn't be published for the final node in a navigation order + + # Simulate the adapter reached navigation goals + future = Future() + future.set_result(result=NavigateThroughNodes.Result()) + node._navigate_to_node_result_callback(future) + + spy_process_last_edge_node.assert_called_once() + spy_process_last_edge_node.reset_mock() + assert len(node._current_state.node_states) == 0 + assert len(node._current_state.edge_states) == 0 + assert node._current_state.last_node_id == "node1" + assert node._current_state.last_node_sequence_id == 8 + + +# Tests the scenario where there are some unreleased nodes on the horizon and navigate through nodes is required +def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes( + mocker, + adapter_node, + action_server_nav_to_node, + action_server_nav_through_nodes, + action_server_process_vda_action, + service_get_state, + service_supported_actions, +): + nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True) + node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param]) + node.logger.set_level(LoggingSeverity.DEBUG) + + # add a spy to validate used navigation goal parameters + spy_send_adapter_navigate_through_nodes = mocker.spy( + node, "send_adapter_navigate_through_nodes" + ) + + spy_process_last_edge_node = mocker.spy( + node, "_process_last_edge_node" + ) + + # add a spy to validate accept order is called correctly + spy_accept_order = mocker.spy(node, "_accept_order") + # generate an order and let the node process it order_id = str(uuid4()) order = get_order_w_unreleased_new(order_id)