Skip to content

Commit

Permalink
Comments and linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ben.holden committed Sep 14, 2023
1 parent 8a40def commit 50f551b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 125 deletions.
6 changes: 5 additions & 1 deletion vda5050_connector/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def publish_feedback(self, feedback):
)
self.feedback_pub.publish(feedback_message)


class MockActionServerNavigateThroughNodes:
def __init__(self, node):
self.logger = node.get_logger()
Expand Down Expand Up @@ -145,7 +146,8 @@ def publish_feedback(self, feedback):
goal_id=self.goal_id, feedback=feedback
)
self.feedback_pub.publish(feedback_message)



class MockActionServerProcessVDAAction:
def __init__(self, node):
self.logger = node.get_logger()
Expand Down Expand Up @@ -213,10 +215,12 @@ def adapter_node(setup_rclpy):
def action_server_nav_to_node(adapter_node):
return MockActionServerNavigateToNode(adapter_node)


@pytest.fixture()
def action_server_nav_through_nodes(adapter_node):
return MockActionServerNavigateThroughNodes(adapter_node)


@pytest.fixture
def action_server_process_vda_action(adapter_node):
return MockActionServerProcessVDAAction(adapter_node)
Expand Down
80 changes: 43 additions & 37 deletions vda5050_connector/test/test_vda5050_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def get_order_update(order_id=str(uuid4()), order_update_id=0):
],
)


def get_order_w_unreleased_new(order_id=str(uuid4()), order_update_id=0):
return Order(
header_id=0,
Expand Down Expand Up @@ -342,6 +343,7 @@ def get_order_w_unreleased_new(order_id=str(uuid4()), order_update_id=0):
],
)


def get_stitch_orders(order_id=str(uuid4())):
action1 = Action(
action_type="foo",
Expand Down Expand Up @@ -787,8 +789,8 @@ def test_vda5050_controller_node_reject_order(
error=OrderRejectErrors.ORDER_UPDATE_ERROR,
description="New update id 0 lower than old update id 1",
)
# Tests the original node list with the nav through nodes functionality


def test_vda5050_controller_node_new_order_nav_through_nodes(
mocker,
adapter_node,
Expand All @@ -798,15 +800,17 @@ def test_vda5050_controller_node_new_order_nav_through_nodes(
service_get_state,
service_supported_actions,
):
nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True)
"""Tests the original node list with the nav through nodes functionality."""
nav_through_nodes_param = Parameter(
"enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True)
node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param])
node.logger.set_level(LoggingSeverity.DEBUG)

# add a spy to validate used navigation goal parameters
spy_send_adapter_navigate_through_nodes = mocker.spy(
node, "send_adapter_navigate_through_nodes"
)

spy_process_last_edge_node = mocker.spy(
node, "_process_last_edge_node"
)
Expand Down Expand Up @@ -842,31 +846,31 @@ def test_vda5050_controller_node_new_order_nav_through_nodes(
# and that the parameters matches order's released edges
# Note: the standard assumes the vehicle is on the first node already,
# so the first navigation command is to the second order node.

spy_send_adapter_navigate_through_nodes.assert_called_once_with(
edges=order.edges[:4], nodes=order.nodes[1:5]
)

feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage()
# Check that a feedback of the current node doesn't affect the current state
feedback_msg.feedback.last_node = order.nodes[0]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_not_called()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 4
assert len(node._current_state.edge_states) == 4
assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 0

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[1]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 3
assert len(node._current_state.edge_states) == 3
assert node._current_state.last_node_id == "node2"
Expand All @@ -875,44 +879,43 @@ def test_vda5050_controller_node_new_order_nav_through_nodes(
# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[2]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 2
assert len(node._current_state.edge_states) == 2
assert node._current_state.last_node_id == "node3"
assert node._current_state.last_node_sequence_id == 4

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[3]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 1
assert len(node._current_state.edge_states) == 1
assert node._current_state.last_node_id == "node4"
assert node._current_state.last_node_sequence_id == 6

# Last node reached as indicated by the result coming through
# A feedback message shouldn't be published for the final node in a navigation order

# Simulate the adapter reached navigation goals
future = Future()
future.set_result(result=NavigateThroughNodes.Result())
node._navigate_to_node_result_callback(future)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()
assert len(node._current_state.node_states) == 0
assert len(node._current_state.edge_states) == 0
assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 8


# Tests the scenario where there are some unreleased nodes on the horizon and navigate through nodes is required


def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes(
mocker,
adapter_node,
Expand All @@ -922,15 +925,17 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes(
service_get_state,
service_supported_actions,
):
nav_through_nodes_param = Parameter("enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True)
"""Tests with some unreleased nodes on the horizon and navigate through nodes."""
nav_through_nodes_param = Parameter(
"enable_nav_through_nodes", type_=Parameter.Type.BOOL, value=True)
node = VDA5050Controller(parameter_overrides=[nav_through_nodes_param])
node.logger.set_level(LoggingSeverity.DEBUG)

# add a spy to validate used navigation goal parameters
spy_send_adapter_navigate_through_nodes = mocker.spy(
node, "send_adapter_navigate_through_nodes"
)

spy_process_last_edge_node = mocker.spy(
node, "_process_last_edge_node"
)
Expand Down Expand Up @@ -966,31 +971,31 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes(
# and that the parameters matches order's released edges
# Note: the standard assumes the vehicle is on the first node already,
# so the first navigation command is to the second order node.

spy_send_adapter_navigate_through_nodes.assert_called_once_with(
edges=order.edges[:3], nodes=order.nodes[1:4]
)

feedback_msg = NavigateThroughNodes.Impl.FeedbackMessage()
# Check that a feedback of the current node doesn't affect the current state
feedback_msg.feedback.last_node = order.nodes[0]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_not_called()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 4
assert len(node._current_state.edge_states) == 4
assert node._current_state.last_node_id == "node1"
assert node._current_state.last_node_sequence_id == 0

# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[1]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 3
assert len(node._current_state.edge_states) == 3
assert node._current_state.last_node_id == "node2"
Expand All @@ -999,30 +1004,31 @@ def test_vda5050_controller_node_new_order_nav_through_nodes_unreleased_nodes(
# Next node has been reached and a feedback message is published
feedback_msg.feedback.last_node = order.nodes[2]
node._navigate_through_nodes_feedback_callback(feedback_msg)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()

assert len(node._current_state.node_states) == 2
assert len(node._current_state.edge_states) == 2
assert node._current_state.last_node_id == "node3"
assert node._current_state.last_node_sequence_id == 4

# Last node reached as indicated by the result coming through
# A feedback message shouldn't be published for the final node in a navigation order

# Simulate the adapter reached navigation goals
future = Future()
future.set_result(result=NavigateThroughNodes.Result())
node._navigate_to_node_result_callback(future)

spy_process_last_edge_node.assert_called_once()
spy_process_last_edge_node.reset_mock()
assert len(node._current_state.node_states) == 1
assert len(node._current_state.edge_states) == 1
assert node._current_state.last_node_id == "node4"
assert node._current_state.last_node_sequence_id == 6

# This is the final node with a released horizon. Therefore a request should be flagged on the next tick

# This is the final node with a released horizon.
# Therefore a request should be flagged on the next tick
node._on_active_order()
assert node._current_state.new_base_request == True
assert node._current_state.new_base_request is True
Loading

0 comments on commit 50f551b

Please sign in to comment.