Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor async exporter #642

Merged
merged 25 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class Options(Object):
prototype = Object(
endpoint='https://dc.services.visualstudio.com/v2/track',
export_interval=15.0,
grace_period=5.0,
instrumentation_key=os.getenv('APPINSIGHTS_INSTRUMENTATIONKEY', None),
max_batch_size=100,
minimum_retry_interval=60, # minimum retry interval in seconds
proxy=None,
storage_maintenance_period=60,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
import threading
import time

from opencensus.common.schedule import Queue
from opencensus.common.schedule import QueueEvent
from opencensus.ext.azure.common import Options


class BaseExporter(object):
def __init__(self, **options):
options = Options(**options)
self.export_interval = options.export_interval
self.max_batch_size = options.max_batch_size
# TODO: queue should be moved to tracer
# too much refactor work, leave to the next PR
self._queue = Queue(capacity=8192) # TODO: make this configurable
self.EXIT_EVENT = self._queue.EXIT_EVENT
# TODO: worker should not be created in the base exporter
self._worker = Worker(self._queue, self)
self._worker.start()
atexit.register(self._worker.stop, options.grace_period)

# Ideally we don't want to have `emit`
# Exporter will have one public method - `export`, which is a blocking
# method, running inside worker threads.
def emit(self, batch, event=None):
raise NotImplementedError # pragma: NO COVER
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's cleaner if we remove emit and make export the only API method here. What about making the batch size a configurable option in the exporter package, but not an attribute of the exporter class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we will remove emit, and only have export here.

I wonder what should we do when there are multiple exporters, and they try to configure the batch size? Having the MTU concept part of exporter seems to be an advantage, since if we have something like aggregated exporter, it can determine the MTU based on underlying exporters' MTU.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment early this afternoon, please take a look at the diff and see if it is better explained.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about supporting multiple exporters, the better it sounds to have one queue per worker. If we did this the tracer could add spans to a wrapper that adds a span to the queue for each registered exporter. This is what I assume you mean by aggregated exporter, and it's what the agent does in multiconsumer.go.

If the aggregated exporter has a single underlying queue we can only drain the queue as fast as the longest export interval (/smallest MTU), which could cause us to drop spans that other exporters would otherwise export in time. It also makes the implementation more complex.

For now all we need to do is make sure the design in this PR doesn't preclude multiple exporters, we don't actually have to support them yet.


# TODO: we shouldn't have this at the beginning
# Tracer should own the queue, exporter shouldn't even know if the
# source is a queue or not.
reyang marked this conversation as resolved.
Show resolved Hide resolved
# Tracer puts span_data into the queue.
# Worker gets span_data from the src (here is the queue) and feed into
# the dst (exporter).
# Exporter defines the MTU (max_batch_size) and export_interval.
# There can be one worker for each queue, or multiple workers for each
# queue, or shared workers among queues (e.g. queue for traces, queue
# for logs).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have to solve the problem of multiple exporters. If there are multiple workers per queue they'll either have to process each item at the same time as the others or queue items somewhere else to support multiple batch sizes/export intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

def export(self, items):
self._queue.puts(items, block=False)


class Worker(threading.Thread):
daemon = True

def __init__(self, src, dst):
self.src = src
self.dst = dst
self._stopping = False
super(Worker, self).__init__()

def run(self):
src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
if batch and isinstance(batch[-1], QueueEvent):
dst.emit(batch[:-1], event=batch[-1])
if batch[-1] is src.EXIT_EVENT:
break
else:
continue
dst.emit(batch)

def stop(self, timeout=None):
start_time = time.time()
wait_time = timeout
if self.is_alive() and not self._stopping:
self._stopping = True
self.src.put(self.src.EXIT_EVENT, block=True, timeout=wait_time)
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if self.src.EXIT_EVENT.wait(timeout=wait_time):
return time.time() - start_time # time taken to stop
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import json
import requests

from opencensus.common.transports.async_ import AsyncTransport
from opencensus.common.schedule import PeriodicTask
from opencensus.ext.azure.common import Options
from opencensus.ext.azure.common import utils
from opencensus.ext.azure.common.exporter import BaseExporter
from opencensus.ext.azure.common.protocol import Data
from opencensus.ext.azure.common.protocol import Envelope
from opencensus.ext.azure.common.protocol import RemoteDependency
from opencensus.ext.azure.common.protocol import Request
from opencensus.ext.azure.common.storage import LocalFileStorage
from opencensus.trace import base_exporter
from opencensus.trace import execution_context
from opencensus.trace.span import SpanKind

Expand All @@ -34,7 +32,7 @@
__all__ = ['AzureExporter']


class AzureExporter(base_exporter.Exporter):
class AzureExporter(BaseExporter):
"""An exporter that sends traces to Microsoft Azure Monitor.

:type options: dict
Expand All @@ -51,17 +49,7 @@ def __init__(self, **options):
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
)
self.transport = AsyncTransport(
self,
max_batch_size=100,
wait_period=self.options.export_interval,
)
self._transmission_task = PeriodicTask(
interval=self.options.storage_maintenance_period,
function=self._transmission_routine,
)
self._transmission_task.daemon = True
self._transmission_task.start()
super(AzureExporter, self).__init__(**options)

def span_data_to_envelope(self, sd):
envelope = Envelope(
Expand Down Expand Up @@ -124,7 +112,7 @@ def span_data_to_envelope(self, sd):
# TODO: links, tracestate, tags, attrs
return envelope

def _transmission_routine(self):
def _transmit_from_storage(self):
for blob in self.storage.gets():
if blob.lease(self.options.timeout + 5):
envelopes = blob.get() # TODO: handle error
Expand All @@ -142,8 +130,6 @@ def _transmit(self, envelopes):
Return the next retry time in seconds for retryable failure.
This function should never throw exception.
"""
if not envelopes:
return 0
# TODO: prevent requests being tracked
blacklist_hostnames = execution_context.get_opencensus_attr(
'blacklist_hostnames',
Expand Down Expand Up @@ -236,23 +222,23 @@ def _transmit(self, envelopes):
# server side error (non-retryable)
return -response.status_code

def emit(self, span_datas):
"""
:type span_datas: list of :class:
`~opencensus.trace.span_data.SpanData`
:param list of opencensus.trace.span_data.SpanData span_datas:
SpanData tuples to emit
"""
envelopes = [self.span_data_to_envelope(sd) for sd in span_datas]
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
def emit(self, batch, event=None):
try:
if batch:
envelopes = [self.span_data_to_envelope(sd) for sd in batch]
result = self._transmit(envelopes)
if result > 0:
self.storage.put(envelopes, result)
if event:
if event is self.EXIT_EVENT:
self._transmit_from_storage() # send files before exit
event.set()
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
except Exception as ex:
logger.exception('Transmission exception: %s.', ex)

def export(self, span_datas):
"""
:type span_datas: list of :class:
`~opencensus.trace.span_data.SpanData`
:param list of opencensus.trace.span_data.SpanData span_datas:
SpanData tuples to export
"""
self.transport.export(span_datas)
def _stop(self, timeout=None):
self.storage.close()
return self._worker.stop(timeout)
Loading