Skip to content

Commit

Permalink
Added test that does a whole order with no gaps in released status
Browse files Browse the repository at this point in the history
  • Loading branch information
ben.holden committed Sep 12, 2023
1 parent a7a3466 commit 0516117
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions vda5050_connector/test/test_vda5050_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,130 @@ def test_vda5050_controller_node_new_order_nav_through_nodes(
# add a spy to validate accept order is called correctly
spy_accept_order = mocker.spy(node, "_accept_order")

# generate an order and let the node process it
order_id = str(uuid4())
order = get_order_new(order_id)
node.process_order(order)

rclpy.spin_once(node)

spy_accept_order.assert_called_once_with(order=order, mode=OrderAcceptModes.NEW)

rclpy.spin_once(adapter_node)

# check node states were properly updated
assert node._current_order == order
assert node._current_state.order_id == order_id
assert node._current_state.order_update_id == 0

# The order has 5 nodes and 4 edges but the first edge and node
# are processed as soon as the order is accepted.
assert len(node._current_state.node_states) == 4
assert len(node._current_state.edge_states) == 4

assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 0

# Assert the navigation through nodes goal was sent to the adapter,
# and that the parameters matches order's released edges
# Note: the standard assumes the vehicle is on the first node already,
# so the first navigation command is to the second order node.

spy_send_adapter_navigate_through_nodes.assert_called_once_with(
edges=order.edges[:4], nodes=order.nodes[1:5]
)

feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage()
# Check that a feedback of the current node doesn't affect the current state
feedback_msg.feedback.last_node = order.nodes[0]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_not_called()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 4
assert len(node._current_state.edge_states) == 4
assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 0

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[1]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 3
assert len(node._current_state.edge_states) == 3
assert node._current_state.last_node_id == "node2"
assert node._current_state.last_node_sequence_id == 2

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[2]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 2
assert len(node._current_state.edge_states) == 2
assert node._current_state.last_node_id == "node3"
assert node._current_state.last_node_sequence_id == 4

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[3]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 1
assert len(node._current_state.edge_states) == 1
assert node._current_state.last_node_id == "node4"
assert node._current_state.last_node_sequence_id == 6

# Last node reached as indicated by the result coming through
# A feedback message shouldn't be published for the final node in a navigation order

# Simulate the adapter reached navigation goals
future = Future()
future.set_result(result=NavigateThroughNodes.Result())
node._navigate_to_node_result_callback(future)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()
assert len(node._current_state.node_states) == 0
assert len(node._current_state.edge_states) == 0
assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 8


# Tests the scenario where there are some unreleased nodes on the horizon and navigate through nodes is required
def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes(
mocker,
adapter_node,
action_server_nav_to_node,
action_server_nav_through_nodes,
action_server_process_vda_action,
service_get_state,
service_supported_actions,
):
nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True)
node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param])
node.logger.set_level(LoggingSeverity.DEBUG)

# add a spy to validate used navigation goal parameters
spy_send_adapter_navigate_through_nodes = mocker.spy(
node, "send_adapter_navigate_through_nodes"
)

spy_process_last_edge_node = mocker.spy(
node, "_process_last_edge_node"
)

# add a spy to validate accept order is called correctly
spy_accept_order = mocker.spy(node, "_accept_order")

# generate an order and let the node process it
order_id = str(uuid4())
order = get_order_w_unreleased_new(order_id)
Expand Down

0 comments on commit 0516117

Please sign in to comment.