Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jj22ee committed Feb 5, 2024
1 parent 0f0fe7f commit e473844
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 159 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import time
from typing import Optional, Sequence

from opentelemetry.context import Context
from opentelemetry.sdk.trace.sampling import (
Decision,
Sampler,
SamplingResult,
TraceIdRatioBased,
_get_parent_trace_state,
)
from opentelemetry.sdk.trace.sampling import ALWAYS_ON, Sampler, SamplingResult, TraceIdRatioBased
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes


class _FallbackSampler(Sampler):
def __init__(self):
# Override default log level
self.__default_sampler = TraceIdRatioBased(0.05)
self.__last_take = -1

# Add Reservoir and Ratio samplers
# TODO: Add Reservoir sampler
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)

# pylint: disable=no-self-use
def should_sample(
Expand All @@ -35,13 +25,8 @@ def should_sample(
links: Sequence[Link] = None,
trace_state: TraceState = None,
) -> SamplingResult:
# TODO: add sampling functionality
current_time = time.process_time()
if current_time - self.__last_take >= 1.0:
res = SamplingResult(Decision.RECORD_AND_SAMPLE, trace_state=_get_parent_trace_state(parent_context))
self.__last_take = time.process_time()
return res
return self.__default_sampler.should_sample(
# TODO: add reservoir + fixed rate sampling
return ALWAYS_ON.should_sample(
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
class _Rule:
def __init__(self, sampling_rule):
self.sampling_rule = sampling_rule
# self.fixed_rate_sampler = None
self.statistics = _SamplingStatisticsDocument(0, 0, 0)
self.next_target_fetch_time = time.process_time()

# TODO change to rate limiter given rate, add fallback sampler, no need parent based
self.rervoir_sampler = ALWAYS_ON
# TODO change to rate limiter given rate, add fixed rate sampler
self.reservoir_sampler = ALWAYS_ON
# self.fixed_rate_sampler = None

def should_sample(
self,
Expand All @@ -35,40 +35,25 @@ def should_sample(
links: Sequence[Link] = None,
trace_state: TraceState = None,
) -> SamplingResult:
return self.rervoir_sampler.should_sample(
return self.reservoir_sampler.should_sample(
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
)
# res = SamplingResult(Decision.RECORD_AND_SAMPLE, trace_state=_get_parent_trace_state(parent_context))
# return res
# pass

def matches(self, resource: Resource, attributes: Attributes):
http_target = None #
http_url = None #
http_method = None #
http_host = None #
service_name = None #

# URL_PATH or URL_QUERY
# HTTP_REQUEST_METHOD
# URL_FULL
# SERVER_ADDRESS or SERVER_PORT
# PEER_SERVICE

print(
"%s %s %s %s ",
SpanAttributes.HTTP_TARGET,
SpanAttributes.HTTP_METHOD,
SpanAttributes.HTTP_URL,
SpanAttributes.HTTP_HOST,
)
http_target = None
http_url = None
http_method = None
http_host = None
service_name = None

if attributes is not None:
print("attributes is not None")
http_target = attributes.get(SpanAttributes.HTTP_TARGET, None)
http_method = attributes.get(SpanAttributes.HTTP_METHOD, None)
http_url = attributes.get(SpanAttributes.HTTP_URL, None)
http_host = attributes.get(SpanAttributes.HTTP_HOST, None)
# NOTE: The above span attribute keys are deprecated in favor of:
# URL_PATH/URL_QUERY, HTTP_REQUEST_METHOD, URL_FULL, SERVER_ADDRESS/SERVER_PORT
# For now, the old attribut keys are kept for consistency with other centralized samplers

service_name = resource.attributes.get(ResourceAttributes.SERVICE_NAME, "")

Expand All @@ -84,57 +69,13 @@ def matches(self, resource: Resource, attributes: Attributes):
else:
http_target = http_url[path_index:]

print(
"%s %s %s",
_Matcher.attribute_match(attributes, self.sampling_rule.Attributes),
attributes,
" - ",
self.sampling_rule.Attributes,
)
print(
"%s %s %s",
_Matcher.wild_card_match(http_target, self.sampling_rule.URLPath),
http_target,
" - ",
self.sampling_rule.URLPath,
)
print(
"%s %s %s",
_Matcher.wild_card_match(http_method, self.sampling_rule.HTTPMethod),
http_method,
" - ",
self.sampling_rule.HTTPMethod,
)
print(
"%s %s %s",
_Matcher.wild_card_match(http_host, self.sampling_rule.Host),
http_host,
" - ",
self.sampling_rule.Host,
)
print(
"%s %s %s",
_Matcher.wild_card_match(service_name, self.sampling_rule.ServiceName),
service_name,
" - ",
self.sampling_rule.ServiceName,
)
print(
"%s %s %s",
_Matcher.wild_card_match(self.get_service_type(resource), self.sampling_rule.ServiceType),
self.get_service_type(resource),
" - ",
self.sampling_rule.ServiceType,
)

return (
_Matcher.attribute_match(attributes, self.sampling_rule.Attributes)
and _Matcher.wild_card_match(http_target, self.sampling_rule.URLPath)
and _Matcher.wild_card_match(http_method, self.sampling_rule.HTTPMethod)
and _Matcher.wild_card_match(http_host, self.sampling_rule.Host)
and _Matcher.wild_card_match(service_name, self.sampling_rule.ServiceName)
and _Matcher.wild_card_match(self.get_service_type(resource), self.sampling_rule.ServiceType)
# _Matcher.wild_card_match(self.get_arn(attributes, resource), self.sampling_rule.ResourceARN) # no support
)

# pylint: disable=no-self-use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self, resource: Resource, fallback_sampler: _FallbackSampler, lock:
self.resource = resource
self._fallback_sampler = fallback_sampler
self._last_modified = datetime.datetime.now()
print(self._last_modified.timestamp())

def should_sample(
self,
Expand All @@ -39,10 +38,7 @@ def should_sample(
trace_state: TraceState = None,
):
for rule in self.rules:
print("Trying: %s", rule.sampling_rule.RuleName)
if rule.matches(self.resource, attributes):

print("Matched with: %s", rule.sampling_rule.RuleName)
return rule.should_sample(
parent_context,
trace_id,
Expand Down Expand Up @@ -74,6 +70,7 @@ def update_sampling_rules(self, new_sampling_rules):
# map list of rules by each rule's sampling_rule name
rule_map = {rule.sampling_rule.RuleName: rule for rule in self.rules}

# If a sampling rule has not changed, keep its respective rule in the cache.
for index, new_rule in enumerate(temp_rules):
rule_name_to_check = new_rule.sampling_rule.RuleName
if rule_name_to_check in rule_map:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
def __lt__(self, other):
if self.Priority == other.Priority:
# String order priority example:
# "A","Abc","ab","abc","abcdef"
# "A","Abc","a","ab","abc","abcdef"
return self.RuleName < other.RuleName
return self.Priority < other.Priority

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import random
from logging import getLogger
from threading import Lock, Timer
from typing import Optional, Sequence
Expand Down Expand Up @@ -34,7 +35,6 @@ class AwsXRayRemoteSampler(Sampler):
log_level: custom log level configuration for remote sampler (Optional)
"""

# __resource: Resource
__polling_interval: int
__xray_client: _AwsXRaySamplingClient

Expand All @@ -50,7 +50,7 @@ def __init__(
_logger.setLevel(log_level)

self.__xray_client = _AwsXRaySamplingClient(endpoint, log_level=log_level)
self.__rule_polling_jitter = 0 # random.uniform(0.0, 5.0)
self.__rule_polling_jitter = random.uniform(0.0, 5.0)
self.__polling_interval = polling_interval
self.__fallback_sampler = _FallbackSampler()

Expand Down Expand Up @@ -83,6 +83,7 @@ def should_sample(
) -> SamplingResult:

if self.__rule_cache.expired():
_logger.info("Rule cache is expired so using fallback sampling strategy")
return self.__fallback_sampler.should_sample(
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state
)
Expand Down

0 comments on commit e473844

Please sign in to comment.