Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Refactor remote sampler update logic (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored Mar 19, 2017
1 parent dde99f5 commit 1b7c5b1
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 94 deletions.
70 changes: 38 additions & 32 deletions jaeger_client/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,56 +421,62 @@ def _sampling_request_callback(self, future):

response = future.result()
try:
sampler, strategies = \
self._parse_sampling_strategy(self.sampler, response.body)
sampling_strategies_response = json.loads(response.body)
except Exception as e:
self.error_reporter.error(
Metrics.SAMPLER_ERRORS, 1,
'Fail to parse sampling strategy '
'from jaeger-agent: %s [%s]', e, response.body)
return

with self.lock:
if isinstance(sampler, AdaptiveSampler):
sampler.update(strategies)
elif self.sampler == sampler:
return
self.sampler = sampler
self.logger.debug('Tracing sampler set to %s', sampler)

def _poll_sampling_manager(self):
self.logger.debug('Requesting tracing sampler refresh')
fut = self._channel.request_sampling_strategy(
self.service_name, timeout=15)
fut.add_done_callback(self._sampling_request_callback)
self._update_sampler(sampling_strategies_response)
self.logger.debug('Tracing sampler set to %s', self.sampler)

def close(self):
def _update_sampler(self, response):
with self.lock:
self.running = False
if self.periodic is not None:
self.periodic.stop()
try:
if response.get(OPERATION_SAMPLING_STR):
self._update_adaptive_sampler(response.get(OPERATION_SAMPLING_STR))
else:
self._update_rate_limiting_or_probabilistic_sampler(response)
except Exception as e:
self.error_reporter.error(
Metrics.SAMPLER_ERRORS, 1,
'Fail to update sampler'
'from jaeger-agent: %s [%s]', e, response)

def _update_adaptive_sampler(self, per_operation_strategies):
if isinstance(self.sampler, AdaptiveSampler):
self.sampler.update(per_operation_strategies)
else:
self.sampler = AdaptiveSampler(per_operation_strategies, self.max_operations)

def _parse_sampling_strategy(self, sampler, body):
response = json.loads(body)
def _update_rate_limiting_or_probabilistic_sampler(self, response):
s_type = response[STRATEGY_TYPE_STR]
operation_strategies = response.get(OPERATION_SAMPLING_STR)
if operation_strategies:
if isinstance(sampler, AdaptiveSampler):
return sampler, operation_strategies
return AdaptiveSampler(operation_strategies, self.max_operations), operation_strategies
if s_type == SamplingManager.SamplingStrategyType.PROBABILISTIC:
sampling_rate = response[PROBABILISTIC_SAMPLING_STR][SAMPLING_RATE_STR]
if 0 <= sampling_rate <= 1.0:
return ProbabilisticSampler(rate=sampling_rate), None
else:
raise ValueError(
'Probabilistic sampling rate not in [0, 1] range: %s' % sampling_rate)
new_sampler = ProbabilisticSampler(rate=sampling_rate)
elif s_type == SamplingManager.SamplingStrategyType.RATE_LIMITING:
mtps = response[RATE_LIMITING_SAMPLING_STR][MAX_TRACES_PER_SECOND_STR]
if 0 <= mtps < 500:
return RateLimitingSampler(max_traces_per_second=mtps), None
new_sampler = RateLimitingSampler(max_traces_per_second=mtps)
else:
raise ValueError(
'Rate limiting parameter not in [0, 500] range: %s' % mtps)
else:
raise ValueError('Unsupported sampling strategy type: %s' % s_type)

if self.sampler != new_sampler:
self.sampler = new_sampler

def _poll_sampling_manager(self):
self.logger.debug('Requesting tracing sampler refresh')
fut = self._channel.request_sampling_strategy(
self.service_name, timeout=15)
fut.add_done_callback(self._sampling_request_callback)

def close(self):
with self.lock:
self.running = False
if self.periodic is not None:
self.periodic.stop()
40 changes: 40 additions & 0 deletions tests/test_local_agent_net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest
import tornado.web
from urlparse import urlparse
from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.config import DEFAULT_REPORTING_PORT

test_strategy = """
{
"strategyType":0,
"probabilisticSampling":
{
"samplingRate":0.002
}
}
"""


class AgentHandler(tornado.web.RequestHandler):
def get(self):
self.write(test_strategy)

application = tornado.web.Application([
(r"/sampling", AgentHandler),
])

@pytest.fixture
def app():
return application


@pytest.mark.gen_test
def test_request_sampling_strategy(http_client, base_url):
o = urlparse(base_url)
sender = LocalAgentSender(
host='localhost',
sampling_port=o.port,
reporting_port=DEFAULT_REPORTING_PORT
)
response = yield sender.request_sampling_strategy(service_name='svc', timeout=15)
assert response.body == test_strategy
Loading

0 comments on commit 1b7c5b1

Please sign in to comment.