diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py index 2703c6da9..ce8a8ccf2 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py @@ -13,9 +13,11 @@ # limitations under the License. import atexit +import threading +import time from opencensus.common.schedule import Queue -from opencensus.common.schedule import Worker +from opencensus.common.schedule import QueueEvent from opencensus.ext.azure.common import Options @@ -51,3 +53,37 @@ def emit(self, batch, event=None): # for logs). def export(self, items): self._queue.puts(items, block=False) + + +class Worker(threading.Thread): + daemon = True + + def __init__(self, src, dst): + self.src = src + self.dst = dst + self._stopping = False + super(Worker, self).__init__() + + def run(self): + src = self.src + dst = self.dst + while True: + batch = src.gets(dst.max_batch_size, dst.export_interval) + if batch and isinstance(batch[-1], QueueEvent): + dst.emit(batch[:-1], event=batch[-1]) + if batch[-1] is src.EXIT_EVENT: + break + else: + continue + dst.emit(batch) + + def stop(self, timeout=None): + start_time = time.time() + wait_time = timeout + if self.is_alive() and not self._stopping: + self._stopping = True + self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time) + elapsed_time = time.time() - start_time + wait_time = timeout and max(timeout - elapsed_time, 0) + if self.src.EXIT_EVENT.wait(timeout=wait_time): + return time.time() - start_time # time taken to stop diff --git a/opencensus/common/schedule/__init__.py b/opencensus/common/schedule/__init__.py index 0c8223182..512a578d9 100644 --- a/opencensus/common/schedule/__init__.py +++ b/opencensus/common/schedule/__init__.py @@ -136,37 +136,3 @@ def puts(self, items, block=True, timeout=None): else: for item in items: self.put(item, block, timeout) - - -class Worker(threading.Thread): - daemon = True - - def __init__(self, src, dst): - self.src = src - self.dst = dst - self._stopping = False - super(Worker, self).__init__() - - def run(self): - src = self.src - dst = self.dst - while True: - batch = src.gets(dst.max_batch_size, dst.export_interval) - if batch and isinstance(batch[-1], QueueEvent): - dst.emit(batch[:-1], event=batch[-1]) - if batch[-1] is src.EXIT_EVENT: - break - else: - continue - dst.emit(batch) - - def stop(self, timeout=None): - start_time = time.time() - wait_time = timeout - if self.is_alive() and not self._stopping: - self._stopping = True - self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time) - elapsed_time = time.time() - start_time - wait_time = timeout and max(timeout - elapsed_time, 0) - if self.src.EXIT_EVENT.wait(timeout=wait_time): - return time.time() - start_time # time taken to stop diff --git a/tests/unit/common/test_schedule.py b/tests/unit/common/test_schedule.py index 4ad07fd16..8fde2cc9a 100644 --- a/tests/unit/common/test_schedule.py +++ b/tests/unit/common/test_schedule.py @@ -38,3 +38,12 @@ def test_gets(self): queue.puts((1, 2, 3, 4, 5)) result = queue.gets(count=5, timeout=0) self.assertEquals(result, (1, 2, 3, 4, 5)) + + def test_gets_event(self): + queue = Queue(capacity=100) + event = QueueEvent('test') + queue.puts((event, 1, 2, 3, event)) + result = queue.gets(count=5, timeout=TIMEOUT) + self.assertEquals(result, (event,)) + result = queue.gets(count=5, timeout=TIMEOUT) + self.assertEquals(result, (1, 2, 3, event))