Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support wildcard search #5

Open
wants to merge 11 commits into
base: remote-tracking
Choose a base branch
from
35 changes: 30 additions & 5 deletions webapp/graphite/carbonlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ def precheck(self, metric, timestamp=None):
request = dict(type='cache-query-precheck', metric=metric, timestamp=timestamp)
results = self.send_request(request)
log.cache("CarbonLink cache-query-precheck request for %s" % (metric))
return results["exists"]
return (results["exists"], results["partial_exists"])

def expand_query(self, metric):
request = dict(type='cache-query-expand-wildcards', metric=metric)
results = self.send_request(request)
log.cache("CarbonLink cache-query-expand-wildcards request for %s" % (metric))
return results["queries"]

def get_metadata(self, metric, key):
request = dict(type='get-metadata', metric=metric, key=key)
Expand Down Expand Up @@ -148,15 +154,20 @@ def send_request(self, request):
return result

def _is_all_request(self, request):
return self._is_carbon_request(request) or self._is_wildcard_request(request)

def _is_carbon_request(self, request):
return request['metric'].startswith(settings.CARBON_METRIC_PREFIX) and (request['type'] not in ['get-storageschema', 'cache-query-precheck'])

def _is_wildcard_request(self, request):
return request['type'] == 'cache-query-expand-wildcards'

def send_request_to_all(self, request):
metric = request['metric']
serialized_request = pickle.dumps(request, protocol=-1)
len_prefix = struct.pack("!L", len(serialized_request))
request_packet = len_prefix + serialized_request
results = {}
results.setdefault('datapoints', {})
results = self._preprocess_send_all_result(request["type"])

for host in self.hosts:
conn = self.get_connection(host)
Expand All @@ -172,11 +183,25 @@ def send_request_to_all(self, request):
if 'error' in result:
log.cache("Error getting data from cache %s: %s" % (str(host), result['error']))
else:
if len(result['datapoints']) > 1:
results['datapoints'].update(result['datapoints'])
self._postprocess_send_all_result(request["type"], results, result)
log.cache("CarbonLink finished receiving %s from %s" % (str(metric), str(host)))
return results

def _preprocess_send_all_result(self, rqst_type):
results = {}
if rqst_type == "cache-query-expand-wildcards":
results.setdefault("queries", [])
else:
results.setdefault('datapoints', {})
return results

def _postprocess_send_all_result(self, rqst_type, results, result):
if rqst_type == "cache-query-expand-wildcards":
results["queries"] += result.get("queries")
elif rqst_type == "cache-query":
if len(result['datapoints']) > 1:
results['datapoints'].update(result['datapoints'])

def recv_response(self, conn):
len_prefix = recv_exactly(conn, 4)
body_size = struct.unpack("!L", len_prefix)[0]
Expand Down
48 changes: 39 additions & 9 deletions webapp/graphite/finders/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,47 @@ class CarbonCacheFinder:
def __init__(self):
pass

def find_nodes(self, query):
def find_nodes(self, query, cache_incomplete_nodes=None):
clean_patterns = query.pattern.replace('\\', '')
has_wildcard = clean_patterns.find('{') > -1 or clean_patterns.find('[') > -1 or clean_patterns.find('*') > -1 or clean_patterns.find('?') > -1

# 1) CarbonLink has some hosts
# 2) has no wildcard
if CarbonLink.hosts and not has_wildcard:
if cache_incomplete_nodes is None:
cache_incomplete_nodes = {}

# CarbonLink has some hosts
if CarbonLink.hosts:
metric = clean_patterns
exists = CarbonLink.precheck(metric, query.startTime)

# Let's combine these two cases:
# 1) has_wildcard
# 2) single metric query
# Expand queries in CarbonLink
# we will get back a list of tuples (metric_name, is_leaf) here.
# For example,
# [(metric1, False), (metric2, True)]
metrics = CarbonLink.expand_query(metric)
# dedup, because of BranchNodes
metrics = list(set(metrics))
# check all metrics in same valid query range
prechecks = []
for m, is_leaf in metrics:
if is_leaf:
prechecks.append(CarbonLink.precheck(m, query.startTime))
else: # return True for BranchNode
prechecks.append((True, True))
exists = all((exist for exist, partial_exist in prechecks))
partial_exists = all((partial_exist for exist, partial_exist in prechecks))
if exists:
metric_path = metric
# TODO: check any info we need to put into reader @here
reader = CarbonCacheReader(metric)
yield LeafNode(metric_path, reader)
for metric, is_leaf in metrics:
if is_leaf:
reader = CarbonCacheReader(metric)
yield LeafNode(metric, reader)
else:
yield BranchNode(metric)
elif partial_exists:
for metric, is_leaf in metrics:
if is_leaf:
reader = CarbonCacheReader(metric)
cache_incomplete_nodes[metric] = LeafNode(metric, reader)
else:
cache_incomplete_nodes[metric] = BranchNode(metric)
2 changes: 1 addition & 1 deletion webapp/graphite/metrics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def find_matches():

def find_view(request):
"View for finding metrics matching a given pattern"
profile = getProfile(request)

queryParams = request.GET.copy()
queryParams.update(request.POST)
Expand Down Expand Up @@ -148,6 +147,7 @@ def find_view(request):
log.info("received remote find request: pattern=%s from=%s until=%s local_only=%s format=%s matches=%d" % (query, fromTime, untilTime, local_only, format, len(matches)))

if format == 'treejson':
profile = getProfile(request)
content = tree_json(matches, base_path, wildcards=profile.advancedUI or wildcards)
response = json_response_for(request, content, jsonp=jsonp)

Expand Down
30 changes: 24 additions & 6 deletions webapp/graphite/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def __init__(self, finders=None, carbon_cache_finder=None, hosts=None):


def find(self, pattern, startTime=None, endTime=None, local=False, headers=None):
# Force graphite-web to search both cache and disk.
if not startTime:
startTime = 0
query = FindQuery(pattern, startTime, endTime, local)

for match in self.find_all(query, headers):
Expand All @@ -67,9 +70,17 @@ def find_all(self, query, headers=None):
# if we can fetch all data from carbon-cache, then
# DO NOT hit disk. It helps us reduce iowait.
# Please use the right version of carbon-cache.
# For wildcard query, carbon-cache returns None certainly...
for leaf_node in self.carbon_cache_finder.find_nodes(query):
found_in_cache = False

# Let's cache nodes with incomplete results, in case we need it and
# don't have to query carbon-cache again.
nodes_with_incomplete_result = {}

for leaf_node in self.carbon_cache_finder.find_nodes(query, nodes_with_incomplete_result):
yield leaf_node
found_in_cache = True

if found_in_cache and query.startTime != 0:
return

# Start local searches
Expand Down Expand Up @@ -110,6 +121,14 @@ def find_all(self, query, headers=None):
for node in nodes:
nodes_by_path[node.path].append(node)

# That means we should search all matched nodes.
# it would merge nodes with new metrics that only exists in carbon-cache
if query.startTime == 0:
# merge any new metric node that only exists in carbon-cache
for name, node in nodes_with_incomplete_result.iteritems():
if name not in nodes_by_path:
nodes_by_path[name].append(node)

log.info("Got all find results in %fs" % (time.time() - start))

# Search Carbon Cache if nodes_by_path is empty
Expand All @@ -123,11 +142,10 @@ def find_all(self, query, headers=None):
# because carbon-cache doesn't have enough data. However, if we reach
# this point, that means we should return all we have in carbon cache.
if not nodes_by_path:
query.startTime = None
for leaf_node in self.carbon_cache_finder.find_nodes(query):
for name, node in nodes_with_incomplete_result.iteritems():
# it only exists one value
yield leaf_node
return
yield node
return

# Reduce matching nodes for each path to a minimal set
found_branch_nodes = set()
Expand Down