Skip to content

Commit

Permalink
Merge pull request #157 from robotpy/alert-tests
Browse files Browse the repository at this point in the history
wpilib: Add context support and tests to Alert
  • Loading branch information
virtuald authored Feb 16, 2025
2 parents 1bf304b + e52deb0 commit 895bc49
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 2 deletions.
12 changes: 12 additions & 0 deletions subprojects/robotpy-wpilib/gen/Alert.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,15 @@ classes:
SetText:
GetText:
GetType:
inline_code: |
.def("close", [](frc::Alert &self) {
py::gil_scoped_release release;
self.Set(false);
}, py::doc("Disables the alert"))
.def("__enter__", [](frc::Alert &self) -> frc::Alert& {
return self;
})
.def("__exit__", [](frc::Alert &self, py::args args) {
py::gil_scoped_release release;
self.Set(false);
})
12 changes: 10 additions & 2 deletions subprojects/robotpy-wpilib/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import ntcore
import wpilib
from wpilib.simulation._simulation import _resetWpilibSimulationData


@pytest.fixture
Expand All @@ -11,7 +12,15 @@ def cfg_logging(caplog):


@pytest.fixture(scope="function")
def nt(cfg_logging):
def wpilib_state():
try:
yield None
finally:
_resetWpilibSimulationData()


@pytest.fixture(scope="function")
def nt(cfg_logging, wpilib_state):
instance = ntcore.NetworkTableInstance.getDefault()
instance.startLocal()

Expand All @@ -20,4 +29,3 @@ def nt(cfg_logging):
finally:
instance.stopLocal()
instance._reset()
wpilib._wpilib._clearSmartDashboardData()
223 changes: 223 additions & 0 deletions subprojects/robotpy-wpilib/tests/test_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import typing as T

import pytest

from ntcore import NetworkTableInstance
from wpilib import Alert, SmartDashboard
from wpilib.simulation import pauseTiming, resumeTiming, stepTiming

AlertType = Alert.AlertType


@pytest.fixture(scope="function")
def group_name(nt, request):

group_name = f"AlertTest_{request.node.name}"
yield group_name

SmartDashboard.updateValues()
assert len(get_active_alerts(nt, group_name, AlertType.kError)) == 0
assert len(get_active_alerts(nt, group_name, AlertType.kWarning)) == 0
assert len(get_active_alerts(nt, group_name, AlertType.kInfo)) == 0


def get_subscriber_for_type(
nt: NetworkTableInstance, group_name: str, alert_type: AlertType
):
subtable_name = {
AlertType.kError: "errors",
AlertType.kWarning: "warnings",
AlertType.kInfo: "infos",
}.get(alert_type, "unknown")
topic = f"/SmartDashboard/{group_name}/{subtable_name}"
return nt.getStringArrayTopic(topic).subscribe([])


def get_active_alerts(
nt: NetworkTableInstance, group_name: str, alert_type: AlertType
) -> T.List[str]:
SmartDashboard.updateValues()
with get_subscriber_for_type(nt, group_name, alert_type) as sub:
return sub.get()


def is_alert_active(
nt: NetworkTableInstance, group_name: str, text: str, alert_type: AlertType
):
active_alerts = get_active_alerts(nt, group_name, alert_type)
return text in active_alerts


def assert_state(
nt: NetworkTableInstance,
group_name: str,
alert_type: AlertType,
expected_state: T.List[str],
):
assert expected_state == get_active_alerts(nt, group_name, alert_type)


def test_set_unset_single(nt, group_name):
with Alert(group_name, "one", AlertType.kError) as one:

assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)

one.set(False)
assert not is_alert_active(nt, group_name, "one", AlertType.kError)


def test_set_unset_multiple(nt, group_name):
with (
Alert(group_name, "one", AlertType.kError) as one,
Alert(group_name, "two", AlertType.kInfo) as two,
):

assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)
assert not is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(True)
two.set(True)
assert is_alert_active(nt, group_name, "one", AlertType.kError)
assert is_alert_active(nt, group_name, "two", AlertType.kInfo)

one.set(False)
assert not is_alert_active(nt, group_name, "one", AlertType.kError)
assert is_alert_active(nt, group_name, "two", AlertType.kInfo)


def test_set_is_idempotent(nt, group_name):
group_name = group_name
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
b.set(True)
c.set(True)

start_state = get_active_alerts(nt, group_name, AlertType.kInfo)
assert set(start_state) == {"A", "B", "C"}

b.set(True)
assert_state(nt, group_name, AlertType.kInfo, start_state)

a.set(True)
assert_state(nt, group_name, AlertType.kInfo, start_state)


def test_close_unsets_alert(nt, group_name):
group_name = group_name
with Alert(group_name, "alert", AlertType.kWarning) as alert:
alert.set(True)
assert is_alert_active(nt, group_name, "alert", AlertType.kWarning)
assert not is_alert_active(nt, group_name, "alert", AlertType.kWarning)


def test_set_text_while_unset(nt, group_name):
group_name = group_name
with Alert(group_name, "BEFORE", AlertType.kInfo) as alert:
assert alert.getText() == "BEFORE"
alert.set(True)
assert is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.set(False)
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.setText("AFTER")
assert alert.getText() == "AFTER"
alert.set(True)
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
assert is_alert_active(nt, group_name, "AFTER", AlertType.kInfo)


def test_set_text_while_set(nt, group_name):
with Alert(group_name, "BEFORE", AlertType.kInfo) as alert:
assert alert.getText() == "BEFORE"
alert.set(True)
assert is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
alert.setText("AFTER")
assert alert.getText() == "AFTER"
assert not is_alert_active(nt, group_name, "BEFORE", AlertType.kInfo)
assert is_alert_active(nt, group_name, "AFTER", AlertType.kInfo)


def test_set_text_does_not_affect_sort(nt, group_name):
pauseTiming()
try:
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
stepTiming(1)
b.set(True)
stepTiming(1)
c.set(True)

expected_state = get_active_alerts(nt, group_name, AlertType.kInfo)
expected_state[expected_state.index("B")] = "AFTER"

b.setText("AFTER")
assert_state(nt, group_name, AlertType.kInfo, expected_state)
finally:
resumeTiming()


def test_sort_order(nt, group_name):
pauseTiming()
try:
with (
Alert(group_name, "A", AlertType.kInfo) as a,
Alert(group_name, "B", AlertType.kInfo) as b,
Alert(group_name, "C", AlertType.kInfo) as c,
):

a.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["A"])

stepTiming(1)
b.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["B", "A"])

stepTiming(1)
c.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B", "A"])

stepTiming(1)
c.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["B", "A"])

stepTiming(1)
c.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B", "A"])

stepTiming(1)
a.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["C", "B"])

stepTiming(1)
b.set(False)
assert_state(nt, group_name, AlertType.kInfo, ["C"])

stepTiming(1)
b.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["B", "C"])

stepTiming(1)
a.set(True)
assert_state(nt, group_name, AlertType.kInfo, ["A", "B", "C"])
finally:
resumeTiming()

0 comments on commit 895bc49

Please sign in to comment.