Skip to content

Commit

Permalink
add coverage test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
reyang committed May 15, 2019
1 parent 5525c4a commit cfea29b
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions tests/unit/common/test_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import unittest

from opencensus.common.schedule import PeriodicTask
from opencensus.common.schedule import Queue
from opencensus.common.schedule import QueueEvent

Expand All @@ -30,20 +31,50 @@ def test_basic(self):

class TestQueue(unittest.TestCase):
def test_gets(self):
queue = Queue(capacity=100)
queue = Queue(capacity=10)
queue.puts((1, 2, 3))
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEquals(result, (1, 2, 3))
self.assertEqual(result, (1, 2, 3))

queue.puts((1, 2, 3, 4, 5))
result = queue.gets(count=5, timeout=0)
self.assertEquals(result, (1, 2, 3, 4, 5))
result = queue.gets(count=3, timeout=TIMEOUT)
self.assertEqual(result, (1, 2, 3))
result = queue.gets(count=3, timeout=TIMEOUT)
self.assertEqual(result, (4, 5))

def test_gets_event(self):
queue = Queue(capacity=100)
queue = Queue(capacity=10)
event = QueueEvent('test')
queue.puts((event, 1, 2, 3, event))
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEquals(result, (event,))
self.assertEqual(result, (event,))
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEquals(result, (1, 2, 3, event))
self.assertEqual(result, (1, 2, 3, event))

task = PeriodicTask(TIMEOUT / 10, lambda: queue.put(1))
task.start()
try:
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEqual(result, (1, 1, 1, 1, 1))
finally:
task.cancel()
task.join()

def test_flush_timeout(self):
queue = Queue(capacity=10)
self.assertIsNone(queue.flush(timeout=TIMEOUT))
def proc():
for item in queue.gets(count=1, timeout=TIMEOUT):
if isinstance(item, QueueEvent):
item.set()
task = PeriodicTask(TIMEOUT / 10, proc)
task.start()
try:
self.assertIsNotNone(queue.flush())
finally:
task.cancel()
task.join()

def test_puts_timeout(self):
queue = Queue(capacity=10)
queue.puts(range(100), timeout=TIMEOUT)

0 comments on commit cfea29b

Please sign in to comment.