-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor async exporter #642
Conversation
Here goes an example how it works: import time
from opencensus.ext.azure.common.exporter import BaseExporter
class AsyncPrintExporter(BaseExporter):
def emit(self, batch, event=None):
# if event is EXIT, persist the data before trying to send
print(batch, event)
if event:
event.set()
x = AsyncPrintExporter(export_interval=1, max_batch_size=3)
x.export([1, 2, 3, 4, 5, 6, 7])
print('time taken to flush', x._queue.flush(timeout=5.0))
time.sleep(2)
x.export([8, 9, 10, 11])
time.sleep(5)
x.export([12, 13, 14, 15, 16]) Output:
|
Points 2, 3, 4, and 5 sound like clear and important improvements, and I think you should push the changes here up into the core library. I'm less sure about making exporters inherit from this class. Composition seems better in this case, even if we only have a single implementation for the queue (or "transport") class -- it's surprising that creating an exporter spawns a background thread, makes it difficult to test exporters in isolation, etc. We'll also need a strategy for using multiple exporters eventually, and each having their own background thread will make this difficult. I'll take a second pass and add comments on the implementation. |
self._thread.join(timeout=wait_time) | ||
if self._thread.is_alive(): | ||
return | ||
return time.time() - start_time # time taken to flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something to keep in mind for the future: this is a good use case for time.monotonic
, but it's not clear that it's worth it if we still have to fall back to time.time
for py2 compatibility.
Agree with @c24t. It's great to have 2,3,4,5 which also match the way how exporters are implemented in other languages. For 2, it would be great if you can also share your idea on when event queue gets full (census-instrumentation/opencensus-specs#262). For 4, you can also take a look at https://github.com/census-instrumentation/opencensus-go/blob/master/metric/metricexport/reader.go#L59-L73. |
Sure. I will take a step by step approach:
That's a good feedback, I think eventually we might also have scenario where one exporter will have multiple worker threads taking data from the same queue. Probably we will have three concepts here: queue, exporter and worker. |
self._worker.start() | ||
atexit.register(self._worker.stop, options.grace_period) | ||
|
||
# Ideally we don't want to have emit and export |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c24t here goes a simple diagram of what the final state should look like, please review. Queues and workers will be owned by the core SDK as global instances. Traces/logs/metrics exporter will be a resource consumer (providing In the future, we might want to move the storage to a common place as well. |
return | ||
elapsed_time = time.time() - start_time | ||
wait_time = timeout and max(timeout - elapsed_time, 0) | ||
if event.wait(timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's surprising to see the exporter set this event instead of the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only the exporter would know when the batch got handled, instead of relying on the return of export
? This gives the flexibility for exporter to do async work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but the only reason you need the event is to get the duration? It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.
What do you imagine using the duration for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but the only reason you need the event is to get the duration?
Besides duration, the exporter could tell if there is an explicit intention to flush the queue, or it is under situation where the application is exiting.
It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.
Yep, it is better not to force every exporter to handle the event, unless they explicitly ask for it (and benefit from it).
What do you imagine using the duration for?
I think the duration is less important than the event itself. The event could be useful for telling whether we're about to exit, or there is explicit intention to flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the goal is to let exporters do some cleanup work on shutdown, what do you think about having a separate shutdown
API method instead of the event?
@bogdandrutu suggests an interface like this, implemented by the worker + queue class and the exporters:
interface SpanConsumer
void addSpans(List<Span>)
# add all spans to the queue
void shutdown()
# 1. stop accepting new spans (addSpans now raises)
# 2. flush existing spans
# 3. call shutdown on next consumer in pipeline
# 4. cleanup work, e.g. shut down worker thread
For the exporters addSpans
is export
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the proposal here:
- Looks like
addSpans
will run under the users' context, and return immediately after it enqueues the spans (or discard the spans if queue is full). shutdown
is a blocking API, which does the cleanup work, and return after done the cleanup. (will shutdown has an input called remaining_time, or it could block indefinitely?)- How do we plan to do flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For (2) passing a timeout down the chain seems like it could work. I see how a long blocking call could be a problem here.
For (3) shutdown is effectively a flush -- the queue would send all spans to the exporter, the exporter would try to send all spans to the backend. Where would you expect to call flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One benefit of flush
is that users would know if it returns true, the telemetry data is safe. The application can also decide if it should proceed or stop if the telemetry is important (e.g. auditing events).
src = self.src | ||
dst = self.dst | ||
while True: | ||
batch = src.gets(dst.max_batch_size, dst.export_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems odd to get these from the exporter instead of making them attributes of the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to the design of networking layer, where the MTU is defined by the low level stack.
Worker shouldn't know MTU, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the worker in the networking analogy? As it is it seems the worker does know the MTU, it just has to get it from the exporter.
Removing this from the exporter would mean the worker only has to call export
, including it means it's part of the exporter API.
Compare this to the java implementation where the worker has the batch size and export interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker + Queue
seems to be the upper level network stack.
I was considering multiple exporter scenario, for example exporter A with MTU=100 and interval=1s, exporter B with MTU 1000 and interval=10s. The aggregated exporter which export to both A and B could have interval=1 and MTU=100, while accumulating data internally and send them in bigger batches for B.
If we have exporters configuring the interval and MTU, the aggregated exporter needs to have a way to get that knowledge from A and B?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to avoid this altogether by having a separate queue for each exporter, in which case the aggregated exporter doesn't have to know anything about the exporters it forwards spans to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean each exporter will have its own queue + worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's the simplest solution: the exporter class itself doesn't own the queue, and may even be stateless if you don't need e.g. to persist spans to disk. Each exporter comes with a factory for initializing the queue and worker.
In this case the tracer would send spans to multiple queues, one for each exporter. E.g. as SpanQueueRegistry
here:
exporter package:
# create the queue, worker, and exporter
(Exporter, SpanQueue) get_exporter(config)
class Exporter
# called by the worker thread
export(list<Span>)
class SpanQueueRegistry
# add spans to all queues to be exported
void add_spans(list<Span>)
# tracer adds spans to queue instead of calling export directly
Tracer(SpanQueueRegistry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this approach a try yesterday and it seems to create other problems.
Given this PR is running for a long time, probably we should split it in several stages?
Today exporter owns the queue and worker, we can decouple the queue first.
Here goes my proposal:
- Have the queue in the
common
namespace. UseAzureExporter
as an experiment/PoC. [This PR] - Let the core SDK take ownership of the queue creation, all exporters switch to that queue. [Next PR] After this, the exporters are not going to block the core SDK.
- Prototype a queue multiplexer, which takes data from the queue and send them to multiple exporters. In this PR, we can explore how to manage worker threads.
- Decouple the worker.
@c24t does this sound okay? If yes, I will move Worker
class to AzureExporter
for now, so we can focus on the Queue
class in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds great, and since the changes are largely contained in the azure exporter I don't see any problem merging this PR as (1) into master. We can figure out the API changes for the other exporters in (2).
while True: | ||
batch = src.gets(dst.max_batch_size, dst.export_interval) | ||
if batch and isinstance(batch[-1], QueueEvent): | ||
dst.emit(batch[:-1], event=batch[-1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to do this without exposing the event to the exporter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking this as a benefit to tell the exporter explicitly about the intention.
For example, if we have a slow network, the exporter can decide to persist the data locally to prevent data loss on exit / flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two possible ways in my mind to make it optional for exporters.
- Having a base exporter that handle the event by default.
class BaseExporter(object):
def export_internal(self, batch, event):
try:
return self.export(batch)
finally:
if event:
event.set()
def export(self, batch):
pass
- Use runtime inspection (a bit dirty).
export_thunk = exporter.export
if 'event' in inspect.signature(export_thunk).parameters:
def export(batch, event):
try:
exporter.export(batch)
finally:
if event:
event.set()
export_thunk = export
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) looks better to me, but does shutdown
solve the same problem this does?
contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py
Outdated
Show resolved
Hide resolved
contrib/opencensus-ext-azure/opencensus/ext/azure/common/exporter.py
Outdated
Show resolved
Hide resolved
# payload = transform(span_data) | ||
# self.transmit(payload) | ||
def emit(self, batch, event=None): | ||
raise NotImplementedError # pragma: NO COVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's cleaner if we remove emit
and make export
the only API method here. What about making the batch size a configurable option in the exporter package, but not an attribute of the exporter class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we will remove emit, and only have export
here.
I wonder what should we do when there are multiple exporters, and they try to configure the batch size? Having the MTU concept part of exporter seems to be an advantage, since if we have something like aggregated exporter
, it can determine the MTU based on underlying exporters' MTU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the comment early this afternoon, please take a look at the diff and see if it is better explained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think about supporting multiple exporters, the better it sounds to have one queue per worker. If we did this the tracer could add spans to a wrapper that adds a span to the queue for each registered exporter. This is what I assume you mean by aggregated exporter, and it's what the agent does in multiconsumer.go.
If the aggregated exporter has a single underlying queue we can only drain the queue as fast as the longest export interval (/smallest MTU), which could cause us to drop spans that other exporters would otherwise export in time. It also makes the implementation more complex.
For now all we need to do is make sure the design in this PR doesn't preclude multiple exporters, we don't actually have to support them yet.
# Exporter defines the MTU (max_batch_size) and export_interval. | ||
# There can be one worker for each queue, or multiple workers for each | ||
# queue, or shared workers among queues (e.g. queue for traces, queue | ||
# for logs). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still have to solve the problem of multiple exporters. If there are multiple workers per queue they'll either have to process each item at the same time as the others or queue items somewhere else to support multiple batch sizes/export intervals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
try: | ||
self._queue.put(item, block, timeout) | ||
except queue.Full: | ||
pass # TODO: log data loss |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good use for metrics, we should try to emit the same metric for all clients for unexported spans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here goes the metrics that we should consider census-instrumentation/opencensus-specs#262 (comment).
return | ||
elapsed_time = time.time() - start_time | ||
wait_time = timeout and max(timeout - elapsed_time, 0) | ||
if event.wait(timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but the only reason you need the event is to get the duration? It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.
What do you imagine using the duration for?
src = self.src | ||
dst = self.dst | ||
while True: | ||
batch = src.gets(dst.max_batch_size, dst.export_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the worker in the networking analogy? As it is it seems the worker does know the MTU, it just has to get it from the exporter.
Removing this from the exporter would mean the worker only has to call export
, including it means it's part of the exporter API.
Compare this to the java implementation where the worker has the batch size and export interval.
7f1d4d9
to
59aa7ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's revisit the API in the next PR.
This is a preview to collect early feedbacks, I'm trying to address the following things:
flush
and_stop
, based on eventing. Both will return None if timeout, or the actual time taken.PeriodTask
, use the concept of interval instead ofwait_time
(which doesn't count the time taken to transform/send data).2, 3, 4 are important requirements for Azure exporter.
I'm trying to take a general approach, would be happy to either keep it inside Azure or contribute to the core opencensus-python lib.