Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor async exporter #642

Merged
merged 25 commits into from
May 15, 2019
Merged

Refactor async exporter #642

merged 25 commits into from
May 15, 2019

Conversation

reyang
Copy link
Contributor

@reyang reyang commented May 2, 2019

This is a preview to collect early feedbacks, I'm trying to address the following things:

  1. Refactor exporter, avoid the extra concept of "transport".
  2. Provide an upper bound for the queue, instead of letting the queue go wild and exhaust the memory.
  3. Have a clear semantic of flush and _stop, based on eventing. Both will return None if timeout, or the actual time taken.
  4. Have a consistent behavior as PeriodTask, use the concept of interval instead of wait_time (which doesn't count the time taken to transform/send data).
  5. Avoid the concept of sync and async exporter, all exporters should be async. In the future we might provide a sync interface for exporters to capture certain contextual information. The concept will be similar to the way operating systems handles IRQ using DPC.

2, 3, 4 are important requirements for Azure exporter.
I'm trying to take a general approach, would be happy to either keep it inside Azure or contribute to the core opencensus-python lib.

@reyang
Copy link
Contributor Author

reyang commented May 2, 2019

Here goes an example how it works:

import time
from opencensus.ext.azure.common.exporter import BaseExporter

class AsyncPrintExporter(BaseExporter):
    def emit(self, batch, event=None):
        # if event is EXIT, persist the data before trying to send
        print(batch, event)
        if event:
            event.set()

x = AsyncPrintExporter(export_interval=1, max_batch_size=3)
x.export([1, 2, 3, 4, 5, 6, 7])
print('time taken to flush', x._queue.flush(timeout=5.0))
time.sleep(2)
x.export([8, 9, 10, 11])
time.sleep(5)
x.export([12, 13, 14, 15, 16])

Output:

(1, 2, 3) None
(4, 5, 6) None
(7,) QueueEvent(SYNC(timeout=5.0))
time taken to flush 0.0029981136322021484
() None
(8, 9, 10) None
(11,) None
() None
() None
() None
(12, 13, 14) None
(15, 16) QueueEvent(EXIT)

@c24t
Copy link
Member

c24t commented May 2, 2019

Points 2, 3, 4, and 5 sound like clear and important improvements, and I think you should push the changes here up into the core library.

I'm less sure about making exporters inherit from this class. Composition seems better in this case, even if we only have a single implementation for the queue (or "transport") class -- it's surprising that creating an exporter spawns a background thread, makes it difficult to test exporters in isolation, etc. We'll also need a strategy for using multiple exporters eventually, and each having their own background thread will make this difficult.

I'll take a second pass and add comments on the implementation.

self._thread.join(timeout=wait_time)
if self._thread.is_alive():
return
return time.time() - start_time # time taken to flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to keep in mind for the future: this is a good use case for time.monotonic, but it's not clear that it's worth it if we still have to fall back to time.time for py2 compatibility.

@songy23
Copy link
Contributor

songy23 commented May 2, 2019

Agree with @c24t. It's great to have 2,3,4,5 which also match the way how exporters are implemented in other languages.

For 2, it would be great if you can also share your idea on when event queue gets full (census-instrumentation/opencensus-specs#262).

For 4, you can also take a look at https://github.com/census-instrumentation/opencensus-go/blob/master/metric/metricexport/reader.go#L59-L73.

@reyang
Copy link
Contributor Author

reyang commented May 3, 2019

Points 2, 3, 4, and 5 sound like clear and important improvements, and I think you should push the changes here up into the core library.

Sure. I will take a step by step approach:

  1. [This PR] Move the Queue and BaseExporter to opencensus.common (instead of making it trace specific, I think we will need to use it for logs as well).
  2. [This PR] Move AzureExporter to this new mechanism.
  3. [In separate PRs] move existing stuff to the new model.
  4. [In a separate PR] retire the old mechanism.

I'm less sure about making exporters inherit from this class. Composition seems better in this case, even if we only have a single implementation for the queue (or "transport") class -- it's surprising that creating an exporter spawns a background thread, makes it difficult to test exporters in isolation, etc. We'll also need a strategy for using multiple exporters eventually, and each having their own background thread will make this difficult.

That's a good feedback, I think eventually we might also have scenario where one exporter will have multiple worker threads taking data from the same queue. Probably we will have three concepts here: queue, exporter and worker.

self._worker.start()
atexit.register(self._worker.stop, options.grace_period)

# Ideally we don't want to have emit and export
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c24t @songy23 I've dumped some thoughts here, please comment inline with your feedbacks.

@reyang
Copy link
Contributor Author

reyang commented May 7, 2019

@c24t here goes a simple diagram of what the final state should look like, please review.

image

Queues and workers will be owned by the core SDK as global instances. Traces/logs/metrics exporter will be a resource consumer (providing export method) instead of owning any queue or thread.

In the future, we might want to move the storage to a common place as well.

return
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if event.wait(timeout):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's surprising to see the exporter set this event instead of the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only the exporter would know when the batch got handled, instead of relying on the return of export? This gives the flexibility for exporter to do async work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but the only reason you need the event is to get the duration? It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.

What do you imagine using the duration for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but the only reason you need the event is to get the duration?

Besides duration, the exporter could tell if there is an explicit intention to flush the queue, or it is under situation where the application is exiting.

It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.

Yep, it is better not to force every exporter to handle the event, unless they explicitly ask for it (and benefit from it).

What do you imagine using the duration for?

I think the duration is less important than the event itself. The event could be useful for telling whether we're about to exit, or there is explicit intention to flush.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to let exporters do some cleanup work on shutdown, what do you think about having a separate shutdown API method instead of the event?

@bogdandrutu suggests an interface like this, implemented by the worker + queue class and the exporters:

interface SpanConsumer

void addSpans(List<Span>)
# add all spans to the queue

void shutdown()
# 1. stop accepting new spans (addSpans now raises)
# 2. flush existing spans
# 3. call shutdown on next consumer in pipeline
# 4. cleanup work, e.g. shut down worker thread

For the exporters addSpans is export.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the proposal here:

  1. Looks like addSpans will run under the users' context, and return immediately after it enqueues the spans (or discard the spans if queue is full).
  2. shutdown is a blocking API, which does the cleanup work, and return after done the cleanup. (will shutdown has an input called remaining_time, or it could block indefinitely?)
  3. How do we plan to do flush?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For (2) passing a timeout down the chain seems like it could work. I see how a long blocking call could be a problem here.

For (3) shutdown is effectively a flush -- the queue would send all spans to the exporter, the exporter would try to send all spans to the backend. Where would you expect to call flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One benefit of flush is that users would know if it returns true, the telemetry data is safe. The application can also decide if it should proceed or stop if the telemetry is important (e.g. auditing events).

src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems odd to get these from the exporter instead of making them attributes of the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the design of networking layer, where the MTU is defined by the low level stack.
Worker shouldn't know MTU, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the worker in the networking analogy? As it is it seems the worker does know the MTU, it just has to get it from the exporter.

Removing this from the exporter would mean the worker only has to call export, including it means it's part of the exporter API.

Compare this to the java implementation where the worker has the batch size and export interval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker + Queue seems to be the upper level network stack.

I was considering multiple exporter scenario, for example exporter A with MTU=100 and interval=1s, exporter B with MTU 1000 and interval=10s. The aggregated exporter which export to both A and B could have interval=1 and MTU=100, while accumulating data internally and send them in bigger batches for B.

If we have exporters configuring the interval and MTU, the aggregated exporter needs to have a way to get that knowledge from A and B?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to avoid this altogether by having a separate queue for each exporter, in which case the aggregated exporter doesn't have to know anything about the exporters it forwards spans to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean each exporter will have its own queue + worker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's the simplest solution: the exporter class itself doesn't own the queue, and may even be stateless if you don't need e.g. to persist spans to disk. Each exporter comes with a factory for initializing the queue and worker.

In this case the tracer would send spans to multiple queues, one for each exporter. E.g. as SpanQueueRegistry here:

exporter package:
   # create the queue, worker, and exporter 
  (Exporter, SpanQueue) get_exporter(config) 
  
  class Exporter
    # called by the worker thread
    export(list<Span>)

class SpanQueueRegistry
  # add spans to all queues to be exported
  void add_spans(list<Span>) 

# tracer adds spans to queue instead of calling export directly
Tracer(SpanQueueRegistry)

Copy link
Contributor Author

@reyang reyang May 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this approach a try yesterday and it seems to create other problems.
Given this PR is running for a long time, probably we should split it in several stages?
Today exporter owns the queue and worker, we can decouple the queue first.

Here goes my proposal:

  1. Have the queue in the common namespace. Use AzureExporter as an experiment/PoC. [This PR]
  2. Let the core SDK take ownership of the queue creation, all exporters switch to that queue. [Next PR] After this, the exporters are not going to block the core SDK.
  3. Prototype a queue multiplexer, which takes data from the queue and send them to multiple exporters. In this PR, we can explore how to manage worker threads.
  4. Decouple the worker.

@c24t does this sound okay? If yes, I will move Worker class to AzureExporter for now, so we can focus on the Queue class in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great, and since the changes are largely contained in the azure exporter I don't see any problem merging this PR as (1) into master. We can figure out the API changes for the other exporters in (2).

opencensus/common/schedule/__init__.py Outdated Show resolved Hide resolved
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
if batch and isinstance(batch[-1], QueueEvent):
dst.emit(batch[:-1], event=batch[-1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to do this without exposing the event to the exporter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this as a benefit to tell the exporter explicitly about the intention.
For example, if we have a slow network, the exporter can decide to persist the data locally to prevent data loss on exit / flush.

Copy link
Contributor Author

@reyang reyang May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two possible ways in my mind to make it optional for exporters.

  1. Having a base exporter that handle the event by default.
class BaseExporter(object):
    def export_internal(self, batch, event):
        try:
            return self.export(batch)
        finally:
            if event:
                event.set()

    def export(self, batch):
        pass
  1. Use runtime inspection (a bit dirty).
export_thunk = exporter.export
if 'event' in inspect.signature(export_thunk).parameters:
    def export(batch, event):
        try:
            exporter.export(batch)
        finally:
            if event:
                event.set()
    export_thunk = export

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) looks better to me, but does shutdown solve the same problem this does?

# payload = transform(span_data)
# self.transmit(payload)
def emit(self, batch, event=None):
raise NotImplementedError # pragma: NO COVER
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's cleaner if we remove emit and make export the only API method here. What about making the batch size a configurable option in the exporter package, but not an attribute of the exporter class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we will remove emit, and only have export here.

I wonder what should we do when there are multiple exporters, and they try to configure the batch size? Having the MTU concept part of exporter seems to be an advantage, since if we have something like aggregated exporter, it can determine the MTU based on underlying exporters' MTU.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment early this afternoon, please take a look at the diff and see if it is better explained.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about supporting multiple exporters, the better it sounds to have one queue per worker. If we did this the tracer could add spans to a wrapper that adds a span to the queue for each registered exporter. This is what I assume you mean by aggregated exporter, and it's what the agent does in multiconsumer.go.

If the aggregated exporter has a single underlying queue we can only drain the queue as fast as the longest export interval (/smallest MTU), which could cause us to drop spans that other exporters would otherwise export in time. It also makes the implementation more complex.

For now all we need to do is make sure the design in this PR doesn't preclude multiple exporters, we don't actually have to support them yet.

# Exporter defines the MTU (max_batch_size) and export_interval.
# There can be one worker for each queue, or multiple workers for each
# queue, or shared workers among queues (e.g. queue for traces, queue
# for logs).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have to solve the problem of multiple exporters. If there are multiple workers per queue they'll either have to process each item at the same time as the others or queue items somewhere else to support multiple batch sizes/export intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

try:
self._queue.put(item, block, timeout)
except queue.Full:
pass # TODO: log data loss
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good use for metrics, we should try to emit the same metric for all clients for unexported spans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here goes the metrics that we should consider census-instrumentation/opencensus-specs#262 (comment).

return
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if event.wait(timeout):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but the only reason you need the event is to get the duration? It seems better to me not to force exporters to handle the event, especially if the event would be internal to the queue otherwise.

What do you imagine using the duration for?

opencensus/common/schedule/__init__.py Outdated Show resolved Hide resolved
src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the worker in the networking analogy? As it is it seems the worker does know the MTU, it just has to get it from the exporter.

Removing this from the exporter would mean the worker only has to call export, including it means it's part of the exporter API.

Compare this to the java implementation where the worker has the batch size and export interval.

Copy link
Member

@c24t c24t left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's revisit the API in the next PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants