Skip to content

Commit

Permalink
move worker to azure exporter, will refactor in the next PR
Browse files Browse the repository at this point in the history
  • Loading branch information
reyang committed May 14, 2019
1 parent 59aa7ce commit 23e5c82
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

import atexit
import threading
import time

from opencensus.common.schedule import Queue
from opencensus.common.schedule import Worker
from opencensus.common.schedule import QueueEvent
from opencensus.ext.azure.common import Options


Expand Down Expand Up @@ -51,3 +53,37 @@ def emit(self, batch, event=None):
# for logs).
def export(self, items):
self._queue.puts(items, block=False)


class Worker(threading.Thread):
daemon = True

def __init__(self, src, dst):
self.src = src
self.dst = dst
self._stopping = False
super(Worker, self).__init__()

def run(self):
src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
if batch and isinstance(batch[-1], QueueEvent):
dst.emit(batch[:-1], event=batch[-1])
if batch[-1] is src.EXIT_EVENT:
break
else:
continue
dst.emit(batch)

def stop(self, timeout=None):
start_time = time.time()
wait_time = timeout
if self.is_alive() and not self._stopping:
self._stopping = True
self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time)
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if self.src.EXIT_EVENT.wait(timeout=wait_time):
return time.time() - start_time # time taken to stop
34 changes: 0 additions & 34 deletions opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,37 +136,3 @@ def puts(self, items, block=True, timeout=None):
else:
for item in items:
self.put(item, block, timeout)


class Worker(threading.Thread):
daemon = True

def __init__(self, src, dst):
self.src = src
self.dst = dst
self._stopping = False
super(Worker, self).__init__()

def run(self):
src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
if batch and isinstance(batch[-1], QueueEvent):
dst.emit(batch[:-1], event=batch[-1])
if batch[-1] is src.EXIT_EVENT:
break
else:
continue
dst.emit(batch)

def stop(self, timeout=None):
start_time = time.time()
wait_time = timeout
if self.is_alive() and not self._stopping:
self._stopping = True
self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time)
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if self.src.EXIT_EVENT.wait(timeout=wait_time):
return time.time() - start_time # time taken to stop
9 changes: 9 additions & 0 deletions tests/unit/common/test_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,12 @@ def test_gets(self):
queue.puts((1, 2, 3, 4, 5))
result = queue.gets(count=5, timeout=0)
self.assertEquals(result, (1, 2, 3, 4, 5))

def test_gets_event(self):
queue = Queue(capacity=100)
event = QueueEvent('test')
queue.puts((event, 1, 2, 3, event))
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEquals(result, (event,))
result = queue.gets(count=5, timeout=TIMEOUT)
self.assertEquals(result, (1, 2, 3, event))

0 comments on commit 23e5c82

Please sign in to comment.