-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor async exporter #642
Changes from all commits
5e1f94d
a3e19a0
40a2539
5291df0
d223c49
ddd7e86
6376a5b
e8a9bc0
3ac4740
ac02e4a
8f9e94b
96fce1c
4c5377b
fb4ad64
0bed696
4299b25
335c02d
94fad67
c5fd415
8a7bfde
de5fd77
5525c4a
cfea29b
369c072
9aab0fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# Copyright 2019, OpenCensus Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import atexit | ||
import threading | ||
import time | ||
|
||
from opencensus.common.schedule import Queue | ||
from opencensus.common.schedule import QueueEvent | ||
from opencensus.ext.azure.common import Options | ||
|
||
|
||
class BaseExporter(object): | ||
def __init__(self, **options): | ||
options = Options(**options) | ||
self.export_interval = options.export_interval | ||
self.max_batch_size = options.max_batch_size | ||
# TODO: queue should be moved to tracer | ||
# too much refactor work, leave to the next PR | ||
self._queue = Queue(capacity=8192) # TODO: make this configurable | ||
self.EXIT_EVENT = self._queue.EXIT_EVENT | ||
# TODO: worker should not be created in the base exporter | ||
self._worker = Worker(self._queue, self) | ||
self._worker.start() | ||
atexit.register(self._worker.stop, options.grace_period) | ||
|
||
# Ideally we don't want to have `emit` | ||
# Exporter will have one public method - `export`, which is a blocking | ||
# method, running inside worker threads. | ||
def emit(self, batch, event=None): | ||
raise NotImplementedError # pragma: NO COVER | ||
|
||
# TODO: we shouldn't have this at the beginning | ||
# Tracer should own the queue, exporter shouldn't even know if the | ||
# source is a queue or not. | ||
reyang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Tracer puts span_data into the queue. | ||
# Worker gets span_data from the src (here is the queue) and feed into | ||
# the dst (exporter). | ||
# Exporter defines the MTU (max_batch_size) and export_interval. | ||
# There can be one worker for each queue, or multiple workers for each | ||
# queue, or shared workers among queues (e.g. queue for traces, queue | ||
# for logs). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still have to solve the problem of multiple exporters. If there are multiple workers per queue they'll either have to process each item at the same time as the others or queue items somewhere else to support multiple batch sizes/export intervals. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. |
||
def export(self, items): | ||
self._queue.puts(items, block=False) # pragma: NO COVER | ||
|
||
|
||
class Worker(threading.Thread): | ||
daemon = True | ||
|
||
def __init__(self, src, dst): | ||
self.src = src | ||
self.dst = dst | ||
self._stopping = False | ||
super(Worker, self).__init__() | ||
|
||
def run(self): # pragma: NO COVER | ||
src = self.src | ||
dst = self.dst | ||
while True: | ||
batch = src.gets(dst.max_batch_size, dst.export_interval) | ||
if batch and isinstance(batch[-1], QueueEvent): | ||
dst.emit(batch[:-1], event=batch[-1]) | ||
if batch[-1] is src.EXIT_EVENT: | ||
break | ||
else: | ||
continue | ||
dst.emit(batch) | ||
|
||
def stop(self, timeout=None): # pragma: NO COVER | ||
start_time = time.time() | ||
wait_time = timeout | ||
if self.is_alive() and not self._stopping: | ||
self._stopping = True | ||
self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time) | ||
elapsed_time = time.time() - start_time | ||
wait_time = timeout and max(timeout - elapsed_time, 0) | ||
if self.src.EXIT_EVENT.wait(timeout=wait_time): | ||
return time.time() - start_time # time taken to stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's cleaner if we remove
emit
and makeexport
the only API method here. What about making the batch size a configurable option in the exporter package, but not an attribute of the exporter class?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we will remove emit, and only have
export
here.I wonder what should we do when there are multiple exporters, and they try to configure the batch size? Having the MTU concept part of exporter seems to be an advantage, since if we have something like
aggregated exporter
, it can determine the MTU based on underlying exporters' MTU.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the comment early this afternoon, please take a look at the diff and see if it is better explained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think about supporting multiple exporters, the better it sounds to have one queue per worker. If we did this the tracer could add spans to a wrapper that adds a span to the queue for each registered exporter. This is what I assume you mean by aggregated exporter, and it's what the agent does in multiconsumer.go.
If the aggregated exporter has a single underlying queue we can only drain the queue as fast as the longest export interval (/smallest MTU), which could cause us to drop spans that other exporters would otherwise export in time. It also makes the implementation more complex.
For now all we need to do is make sure the design in this PR doesn't preclude multiple exporters, we don't actually have to support them yet.