Skip to content

Commit

Permalink
Tests: harden TestNetwork even more
Browse files Browse the repository at this point in the history
- take into account that payloads with DATA1 may be in flight when we
  update the task
- test task.stop()
- use more lenient timeout
- use threading.Condition for improved readability
- wait for periodicity to be established before validation; some
  platforms are a little bit flaky
  • Loading branch information
erlend-aasland committed Jul 10, 2024
1 parent 3aa509d commit 0277ad3
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions test/test_network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
from threading import Event
import threading

import canopen
import can
Expand Down Expand Up @@ -65,30 +65,47 @@ def test_send_periodic(self):
self.addCleanup(self.network.disconnect)

acc = []
event = Event()
condition = threading.Condition()

def hook(_, data, ts):
acc.append((data, ts))
event.set()
item = data, ts
acc.append(item)
condition.notify()

self.network.subscribe(COB_ID, hook)
self.addCleanup(self.network.unsubscribe, COB_ID)

task = self.network.send_periodic(COB_ID, DATA1, PERIOD)
self.addCleanup(task.stop)

event.wait(PERIOD*2)

# Update task data.
def periodicity():
# Check if periodicity is established; flakiness have been observed
# on macOS.
if len(acc) >= 2:
delta = acc[-1][1] - acc[-2][1]
return round(delta, ndigits=1) == PERIOD
return False

# Wait for frames to arrive; then check the result.
with condition:
condition.wait_for(periodicity, TIMEOUT)
self.assertTrue(all(v[0] == DATA1 for v in acc))

# Update task data, which implicitly restarts the timer.
# Wait for frames to arrive; then check the result.
task.update(DATA2)
event.clear()
event.wait(PERIOD*2)
task.stop()

with condition:
acc.clear()
condition.wait_for(periodicity, TIMEOUT)
# Find the first message with new data, and verify that all subsequent
# messages also carry the new payload.
data = [v[0] for v in acc]
self.assertEqual(data, [DATA1, DATA2])
ts = [v[1] for v in acc]
self.assertAlmostEqual(ts[1]-ts[0], PERIOD, places=1)
idx = data.index(DATA2)
self.assertTrue(all(v[0] == DATA2 for v in acc[idx:]))

# Stop the task.
task.stop()
self.assertIsNone(self.network.bus.recv(TIMEOUT))


class TestScanner(unittest.TestCase):
Expand Down

0 comments on commit 0277ad3

Please sign in to comment.