Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Add types for span, metrics, throttler (#336)
Browse files Browse the repository at this point in the history
* Add types for span, metrics, throttler

Signed-off-by: Kai Mueller <[email protected]>

* Fix tags types

Signed-off-by: Kai Mueller <[email protected]>

* Fix reference list type

Signed-off-by: Kai Mueller <[email protected]>

* Fix finish_time type

Signed-off-by: Kai Mueller <[email protected]>

* Fix parent_id type

Signed-off-by: Kai Mueller <[email protected]>

* Fix flags type

Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/tracer.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Add span_context types

Signed-off-by: Kai Mueller <[email protected]>

* Add types for methods

Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span_context.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span_context.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span_context.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span_context.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span_context.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Fix imports

Signed-off-by: Kai Mueller <[email protected]>

* Add types for timer

Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

* Fix import

Signed-off-by: Kai Mueller <[email protected]>

* Update jaeger_client/span.py

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: Kai Mueller <[email protected]>

Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
kasium and yurishkuro authored Sep 9, 2021
1 parent 4ee7dcc commit ac8e752
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 47 deletions.
31 changes: 23 additions & 8 deletions jaeger_client/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Optional, Callable, Dict


class MetricsFactory(object):
"""Generates new metrics."""

def _noop(self, *args):
pass

def create_counter(self, name, tags=None):
def create_counter(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[int], None]:
"""
Generates a new counter from the given name and tags and returns
a callable function used to increment the counter.
Expand All @@ -29,7 +34,9 @@ def create_counter(self, name, tags=None):
"""
return self._noop

def create_timer(self, name, tags=None):
def create_timer(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[float], None]:
"""
Generates a new timer from the given name and tags and returns
a callable function used to record a float duration in microseconds.
Expand All @@ -40,7 +47,9 @@ def create_timer(self, name, tags=None):
"""
return self._noop

def create_gauge(self, name, tags=None):
def create_gauge(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[float], None]:
"""
Generates a new gauge from the given name and tags and returns
a callable function used to update the gauge.
Expand All @@ -55,25 +64,31 @@ def create_gauge(self, name, tags=None):
class LegacyMetricsFactory(MetricsFactory):
"""A MetricsFactory adapter for legacy Metrics class."""

def __init__(self, metrics):
def __init__(self, metrics: 'Metrics') -> None:
self._metrics = metrics

def create_counter(self, name, tags=None):
def create_counter(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[int], None]:
key = self._get_key(name, tags)

def increment(value):
def increment(value: int) -> Optional[Any]:
return self._metrics.count(key, value)
return increment

def create_timer(self, name, tags=None):
def create_timer(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[float], None]:
key = self._get_key(name, tags)

def record(value):
# Convert microseconds to milliseconds for legacy
return self._metrics.timing(key, value / 1000.0)
return record

def create_gauge(self, name, tags=None):
def create_gauge(
self, name: str, tags: Optional[Dict[str, str]] = None
) -> Callable[[float], None]:
key = self._get_key(name, tags)

def update(value):
Expand Down
13 changes: 9 additions & 4 deletions jaeger_client/metrics/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
from jaeger_client.metrics import MetricsFactory
from collections import defaultdict
from prometheus_client import Counter, Gauge
from typing import Any, Optional, Dict, Callable, DefaultDict


class PrometheusMetricsFactory(MetricsFactory):
"""
Provides metrics backed by Prometheus
"""
def __init__(self, namespace='', service_name_label=None):
self._cache = defaultdict(object)
def __init__(self, namespace: str = '', service_name_label: Optional[str] = None) -> None:
self._cache: DefaultDict = defaultdict(object)
self._namespace = namespace
self._service_name_label = service_name_label

Expand Down Expand Up @@ -55,14 +56,18 @@ def _get_metric(self, metricType, name, tags):

return metric

def create_counter(self, name, tags=None):
def create_counter(
self, name: str, tags: Optional[Dict[str, Any]] = None
) -> Callable[[int], None]:
counter = self._get_metric(Counter, name, tags)

def increment(value):
counter.inc(value)
return increment

def create_gauge(self, name, tags=None):
def create_gauge(
self, name: str, tags: Optional[Dict[str, Any]] = None
) -> Callable[[float], None]:
gauge = self._get_metric(Gauge, name, tags)

def update(value):
Expand Down
54 changes: 34 additions & 20 deletions jaeger_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
import threading
import time
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, List

import opentracing
from opentracing.ext import tags as ext_tags
from .tracer import Reference
from . import codecs, thrift
from .constants import SAMPLED_FLAG, DEBUG_FLAG
from .span_context import SpanContext
import jaeger_client.thrift_gen.jaeger.ttypes as ttypes

if TYPE_CHECKING:
from .tracer import Tracer

logger = logging.getLogger('jaeger_tracing')

Expand All @@ -32,23 +39,30 @@ class Span(opentracing.Span):
'operation_name', 'start_time', 'end_time',
'logs', 'tags', 'finished', 'update_lock']

def __init__(self, context, tracer, operation_name,
tags=None, start_time=None, references=None):
def __init__(
self,
context: SpanContext,
tracer: 'Tracer',
operation_name: str,
tags: Optional[Dict[str, Any]] = None,
start_time: Optional[float] = None,
references: Optional[List[Reference]] = None
) -> None:
super(Span, self).__init__(context=context, tracer=tracer)
self.operation_name = operation_name
self.start_time = start_time or time.time()
self.end_time = None
self.end_time: Optional[float] = None
self.finished = False
self.update_lock = threading.Lock()
self.references = references
# we store tags and logs as Thrift objects to avoid extra allocations
self.tags = []
self.logs = []
self.tags: List[ttypes.Tag] = []
self.logs: List[ttypes.Log] = []
if tags:
for k, v in tags.items():
self.set_tag(k, v)

def set_operation_name(self, operation_name):
def set_operation_name(self, operation_name: str) -> 'Span':
"""
Set or change the operation name.
Expand All @@ -59,7 +73,7 @@ def set_operation_name(self, operation_name):
self.operation_name = operation_name
return self

def finish(self, finish_time=None):
def finish(self, finish_time: Optional[float] = None) -> None:
"""Indicate that the work represented by this span has been completed
or terminated, and is ready to be sent to the Reporter.
Expand All @@ -81,7 +95,7 @@ def finish(self, finish_time=None):

self.tracer.report_span(self)

def set_tag(self, key, value):
def set_tag(self, key: str, value: Any) -> 'Span':
"""
:param key:
:param value:
Expand Down Expand Up @@ -120,7 +134,7 @@ def _set_sampling_priority(self, value):
return True
return False

def log_kv(self, key_values, timestamp=None):
def log_kv(self, key_values: Dict[str, Any], timestamp: Optional[float] = None) -> 'Span':
if self.is_sampled():
timestamp = timestamp if timestamp else time.time()
# TODO handle exception logging, 'python.exception.type' etc.
Expand All @@ -134,7 +148,7 @@ def log_kv(self, key_values, timestamp=None):
self.logs.append(log)
return self

def set_baggage_item(self, key, value):
def set_baggage_item(self, key: str, value: Optional[str]) -> 'Span':
prev_value = self.get_baggage_item(key=key)
new_context = self.context.with_baggage_item(key=key, value=value)
with self.update_lock:
Expand All @@ -151,45 +165,45 @@ def set_baggage_item(self, key, value):
self.log_kv(key_values=logs)
return self

def get_baggage_item(self, key):
def get_baggage_item(self, key: str) -> Optional[str]:
return self.context.baggage.get(key)

def is_sampled(self):
def is_sampled(self) -> bool:
return self.context.flags & SAMPLED_FLAG == SAMPLED_FLAG

def is_debug(self):
def is_debug(self) -> bool:
return self.context.flags & DEBUG_FLAG == DEBUG_FLAG

def is_rpc(self):
def is_rpc(self) -> bool:
for tag in self.tags:
if tag.key == ext_tags.SPAN_KIND:
return tag.vStr == ext_tags.SPAN_KIND_RPC_CLIENT or \
tag.vStr == ext_tags.SPAN_KIND_RPC_SERVER
return False

def is_rpc_client(self):
def is_rpc_client(self) -> bool:
for tag in self.tags:
if tag.key == ext_tags.SPAN_KIND:
return tag.vStr == ext_tags.SPAN_KIND_RPC_CLIENT
return False

@property
def trace_id(self):
def trace_id(self) -> int:
return self.context.trace_id

@property
def span_id(self):
def span_id(self) -> int:
return self.context.span_id

@property
def parent_id(self):
def parent_id(self) -> Optional[int]:
return self.context.parent_id

@property
def flags(self):
def flags(self) -> int:
return self.context.flags

def __repr__(self):
def __repr__(self) -> str:
c = codecs.span_context_to_string(
trace_id=self.context.trace_id, span_id=self.context.span_id,
parent_id=self.context.parent_id, flags=self.context.flags)
Expand Down
23 changes: 16 additions & 7 deletions jaeger_client/span_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@


import opentracing
from typing import Dict, Optional


class SpanContext(opentracing.SpanContext):
__slots__ = ['trace_id', 'span_id', 'parent_id', 'flags',
'_baggage', '_debug_id']

"""Implements opentracing.SpanContext"""
def __init__(self, trace_id, span_id, parent_id, flags, baggage=None, debug_id=None):
def __init__(
self,
trace_id: int,
span_id: int,
parent_id: Optional[int],
flags: int,
baggage: Optional[Dict[str, str]] = None,
debug_id: Optional[str] = None
) -> None:
self.trace_id = trace_id
self.span_id = span_id
self.parent_id = parent_id or None
Expand All @@ -30,10 +39,10 @@ def __init__(self, trace_id, span_id, parent_id, flags, baggage=None, debug_id=N
self._debug_id = debug_id

@property
def baggage(self):
def baggage(self) -> Dict[str, str]:
return self._baggage or opentracing.SpanContext.EMPTY_BAGGAGE

def with_baggage_item(self, key, value):
def with_baggage_item(self, key: str, value: Optional[str]) -> 'SpanContext':
baggage = dict(self._baggage)
if value is not None:
baggage[key] = value
Expand All @@ -48,16 +57,16 @@ def with_baggage_item(self, key, value):
)

@property
def has_trace(self):
return self.trace_id and self.span_id and self.flags is not None
def has_trace(self) -> bool:
return bool(self.trace_id and self.span_id and self.flags is not None)

@property
def is_debug_id_container_only(self):
def is_debug_id_container_only(self) -> bool:
"""Deprecated, not used by Jaeger."""
return not self.trace_id and self._debug_id is not None

@property
def debug_id(self):
def debug_id(self) -> Optional[str]:
return self._debug_id

@staticmethod
Expand Down
15 changes: 8 additions & 7 deletions jaeger_client/throttler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
import random
from threading import Lock
from typing import Any, Optional

from tornado.ioloop import PeriodicCallback

Expand Down Expand Up @@ -56,17 +57,17 @@ class RemoteThrottler(Throttler):
- error_reporter: ErrorReporter instance
"""

def __init__(self, channel, service_name, **kwargs):
def __init__(self, channel: Any, service_name: str, **kwargs: Any) -> None:
self.channel = channel
self.service_name = service_name
self.client_id = None
self.client_id: Optional[int] = None
self.refresh_interval = \
kwargs.get('refresh_interval', DEFAULT_THROTTLER_REFRESH_INTERVAL)
self.logger = kwargs.get('logger', default_logger)
metrics_factory = kwargs.get('metrics_factory', MetricsFactory())
self.metrics = ThrottlerMetrics(metrics_factory)
self.error_reporter = kwargs.get('error_reporter', ErrorReporter(Metrics()))
self.credits = {}
self.credits: dict = {}
self.lock = Lock()
self.running = True
self.periodic = None
Expand All @@ -77,7 +78,7 @@ def __init__(self, channel, service_name, **kwargs):
else:
self.channel.io_loop.add_callback(self._init_polling)

def is_allowed(self, operation):
def is_allowed(self, operation: str) -> bool:
with self.lock:
if operation not in self.credits:
self.credits[operation] = 0.0
Expand All @@ -91,7 +92,7 @@ def is_allowed(self, operation):
self.credits[operation] = value - MINIMUM_CREDITS
return True

def set_client_id(self, client_id):
def set_client_id(self, client_id: int) -> None:
with self.lock:
if self.client_id is None:
self.client_id = client_id
Expand Down Expand Up @@ -184,7 +185,7 @@ def _update_credits(self, response):
self.credits[op] += balance
self.logger.debug('credits = %s', self.credits)

def close(self):
def close(self) -> None:
with self.lock:
self.running = False
if self.periodic:
Expand All @@ -196,7 +197,7 @@ class ThrottlerMetrics(object):
Metrics specific to throttler.
"""

def __init__(self, metrics_factory):
def __init__(self, metrics_factory: MetricsFactory) -> None:
self.throttled_debug_spans = \
metrics_factory.create_counter(name='jaeger:throttled_debug_spans')
self.throttler_update_success = \
Expand Down
2 changes: 1 addition & 1 deletion jaeger_client/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def start_span(self,
parent_id=parent_id, flags=flags,
baggage=baggage)
span = Span(context=span_ctx, tracer=self,
operation_name=operation_name,
operation_name=operation_name or '',
tags=tags, start_time=start_time, references=valid_references)

self._emit_span_metrics(span=span, join=rpc_server)
Expand Down

0 comments on commit ac8e752

Please sign in to comment.