From 143001738aa0d5b9a4dc62640d44ce94c722fec3 Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Wed, 23 Oct 2019 18:37:26 +0300 Subject: [PATCH 1/6] HH-99521 add threshold and batch send --- frontik/http_client.py | 64 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/frontik/http_client.py b/frontik/http_client.py index b1cb28b94..69c4db601 100644 --- a/frontik/http_client.py +++ b/frontik/http_client.py @@ -4,6 +4,7 @@ import socket import time from asyncio import Future +from collections import Counter from functools import partial from random import shuffle, random @@ -11,7 +12,7 @@ import logging from lxml import etree from tornado.escape import to_unicode, utf8 -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback from tornado.curl_httpclient import CurlAsyncHTTPClient from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPResponse, HTTPError from tornado.httputil import HTTPHeaders @@ -452,25 +453,66 @@ def get_retries_count(self): class TimeoutChecker: - def __init__(self, outer_caller, outer_timeout_ms, time_since_outer_request_start_ms_supplier): + def __init__(self, outer_caller, outer_timeout_ms, time_since_outer_request_start_ms_supplier, *, + threshold_ms=100, send_timeout_stats_interval_ms=60000): self.outer_caller = outer_caller self.outer_timeout_ms = outer_timeout_ms self.time_since_outer_request_start_ms_supplier = time_since_outer_request_start_ms_supplier + self.timeout_counters = Counter() + self.threshold_ms = threshold_ms + periodic_callback = PeriodicCallback(partial(self.send_stats, send_timeout_stats_interval_ms), + send_timeout_stats_interval_ms) + periodic_callback.start() + + class LoggingData: + def __init__(self, outer_caller, outer_timeout_ms, already_spent_time_ms, uri, request_timeout_ms): + self.outer_caller = outer_caller + self.outer_timeout_ms = outer_timeout_ms, + self.already_spent_time_ms = already_spent_time_ms, + self.uri = uri, + self.request_timeout_ms = request_timeout_ms + + def __hash__(self): + return hash((self.outer_caller, self.outer_timeout_ms, self.uri, self.request_timeout_ms)) + + def __eq__(self, other): + return self.outer_caller == other.outer_caller \ + and self.outer_timeout_ms == other.outer_timeout_ms \ + and self.uri == other.uri \ + and self.request_timeout_ms == other.request_timeout_ms + + def send_stats(self, interval_ms): + for data, count in self.timeout_counters.items(): + if count <= 1: + http_client_logger.error('Incoming request from <%s> expects timeout=%d ms' + ', we have been working already for %d ms ' + 'and now trying to call <%s> with timeout %d ms', + data.outer_caller, + data.outer_timeout_ms, + data.already_spent_time_ms, + data.balanced_request.uri.partition('?')[0], + data.request_timeout_ms) + else: + http_client_logger.error('For last %d ms, got %d requests from <%s> expecting timeout=%d ms, ' + 'but calling <%s> with timeout %d ms', + interval_ms, + count, + data.outer_caller, + data.outer_timeout_ms, + data.balanced_request.uri.partition('?')[0], + data.request_timeout_ms) + self.timeout_counters.clear() def check(self, balanced_request: BalancedHttpRequest): if self.outer_timeout_ms: already_spent_time_ms = self.time_since_outer_request_start_ms_supplier() * 1000 expected_timeout_ms = self.outer_timeout_ms - already_spent_time_ms request_timeout_ms = balanced_request.request_time_left * 1000 - if request_timeout_ms > expected_timeout_ms: - http_client_logger.error('Incoming request from <%s> expects timeout=%d ms' - ', we have been working already for %d ms ' - 'and now trying to call <%s> with timeout %d ms', - self.outer_caller, - self.outer_timeout_ms, - already_spent_time_ms, - balanced_request.uri.partition('?')[0], - request_timeout_ms) + diff = request_timeout_ms - expected_timeout_ms + if diff > self.threshold_ms: + data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms, already_spent_time_ms, + balanced_request.uri.partition('?')[0], request_timeout_ms) + self.timeout_counters[data] += 1 class HttpClientFactory: From 652e089565364b13b9d2239442611e0b6ebc978d Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Tue, 29 Oct 2019 13:45:18 +0300 Subject: [PATCH 2/6] HH-99521 add url compaction --- frontik/http_client.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/frontik/http_client.py b/frontik/http_client.py index 69c4db601..56f73f32f 100644 --- a/frontik/http_client.py +++ b/frontik/http_client.py @@ -25,6 +25,7 @@ OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms' USER_AGENT_HEADER = 'User-Agent' +NON_HEX = {chr(item) for item in range(ord('g'), ord('z') + 1)} def HTTPResponse__repr__(self): @@ -503,6 +504,26 @@ def send_stats(self, interval_ms): data.request_timeout_ms) self.timeout_counters.clear() + @staticmethod + def _filter_part(original_part, min_compaction_len, min_hash_len, replacement): + part_len = len(original_part) + if part_len < min_compaction_len: + return original_part + number_count = sum(c.isdigit() for c in original_part) + if number_count == part_len: + return replacement + if any(s in original_part for s in NON_HEX): + return original_part + if part_len > min_hash_len and number_count >= min_compaction_len: + return replacement + return original_part + + @staticmethod + def _compact_url(uri, min_compaction_len=4, min_hash_len=16, replacement='[id-or-hash]'): + path = uri.partition('?')[0] + return '/'.join([TimeoutChecker._filter_part(part, min_compaction_len, min_hash_len, replacement) + for part in path.split('/')]) + def check(self, balanced_request: BalancedHttpRequest): if self.outer_timeout_ms: already_spent_time_ms = self.time_since_outer_request_start_ms_supplier() * 1000 @@ -510,8 +531,9 @@ def check(self, balanced_request: BalancedHttpRequest): request_timeout_ms = balanced_request.request_time_left * 1000 diff = request_timeout_ms - expected_timeout_ms if diff > self.threshold_ms: + compacted_url = TimeoutChecker._compact_url(balanced_request.uri) data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms, already_spent_time_ms, - balanced_request.uri.partition('?')[0], request_timeout_ms) + compacted_url, request_timeout_ms) self.timeout_counters[data] += 1 From c14fcdd9ef90c4777e18b7914103e89c6c43c98e Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Tue, 29 Oct 2019 17:39:45 +0300 Subject: [PATCH 3/6] HH-99521 compact by upstream+handler --- frontik/http_client.py | 59 +++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/frontik/http_client.py b/frontik/http_client.py index 56f73f32f..5130b1e12 100644 --- a/frontik/http_client.py +++ b/frontik/http_client.py @@ -20,12 +20,11 @@ from frontik import media_types from frontik.debug import DEBUG_HEADER_NAME, response_from_debug -from frontik.request_context import get_request_id, get_request +from frontik.request_context import get_request_id, get_request, get_handler_name from frontik.util import make_url, make_body, make_mfd OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms' USER_AGENT_HEADER = 'User-Agent' -NON_HEX = {chr(item) for item in range(ord('g'), ord('z') + 1)} def HTTPResponse__repr__(self): @@ -466,20 +465,28 @@ def __init__(self, outer_caller, outer_timeout_ms, time_since_outer_request_star periodic_callback.start() class LoggingData: - def __init__(self, outer_caller, outer_timeout_ms, already_spent_time_ms, uri, request_timeout_ms): + __slots__ = ('outer_caller', 'outer_timeout_ms', + 'handler', 'upstream', 'request_timeout_ms', + 'already_spent_time_ms') + + def __init__(self, outer_caller, outer_timeout_ms, + handler, upstream, request_timeout_ms, + already_spent_time_ms): self.outer_caller = outer_caller - self.outer_timeout_ms = outer_timeout_ms, - self.already_spent_time_ms = already_spent_time_ms, - self.uri = uri, + self.outer_timeout_ms = outer_timeout_ms + self.handler = handler + self.upstream = upstream self.request_timeout_ms = request_timeout_ms + self.already_spent_time_ms = already_spent_time_ms def __hash__(self): - return hash((self.outer_caller, self.outer_timeout_ms, self.uri, self.request_timeout_ms)) + return hash((self.outer_caller, self.outer_timeout_ms, + self.handler, self.upstream, self.request_timeout_ms)) def __eq__(self, other): return self.outer_caller == other.outer_caller \ and self.outer_timeout_ms == other.outer_timeout_ms \ - and self.uri == other.uri \ + and self.handler == other.handler \ and self.request_timeout_ms == other.request_timeout_ms def send_stats(self, interval_ms): @@ -487,43 +494,25 @@ def send_stats(self, interval_ms): if count <= 1: http_client_logger.error('Incoming request from <%s> expects timeout=%d ms' ', we have been working already for %d ms ' - 'and now trying to call <%s> with timeout %d ms', + 'and now trying to call upstream <%s> from handler <%s> with timeout %d ms', data.outer_caller, data.outer_timeout_ms, data.already_spent_time_ms, - data.balanced_request.uri.partition('?')[0], + data.upstream, + data.handler, data.request_timeout_ms) else: http_client_logger.error('For last %d ms, got %d requests from <%s> expecting timeout=%d ms, ' - 'but calling <%s> with timeout %d ms', + 'but calling upstream <%s> from handler <%s> with timeout %d ms', interval_ms, count, data.outer_caller, data.outer_timeout_ms, - data.balanced_request.uri.partition('?')[0], + data.upstream, + data.handler, data.request_timeout_ms) self.timeout_counters.clear() - @staticmethod - def _filter_part(original_part, min_compaction_len, min_hash_len, replacement): - part_len = len(original_part) - if part_len < min_compaction_len: - return original_part - number_count = sum(c.isdigit() for c in original_part) - if number_count == part_len: - return replacement - if any(s in original_part for s in NON_HEX): - return original_part - if part_len > min_hash_len and number_count >= min_compaction_len: - return replacement - return original_part - - @staticmethod - def _compact_url(uri, min_compaction_len=4, min_hash_len=16, replacement='[id-or-hash]'): - path = uri.partition('?')[0] - return '/'.join([TimeoutChecker._filter_part(part, min_compaction_len, min_hash_len, replacement) - for part in path.split('/')]) - def check(self, balanced_request: BalancedHttpRequest): if self.outer_timeout_ms: already_spent_time_ms = self.time_since_outer_request_start_ms_supplier() * 1000 @@ -531,9 +520,9 @@ def check(self, balanced_request: BalancedHttpRequest): request_timeout_ms = balanced_request.request_time_left * 1000 diff = request_timeout_ms - expected_timeout_ms if diff > self.threshold_ms: - compacted_url = TimeoutChecker._compact_url(balanced_request.uri) - data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms, already_spent_time_ms, - compacted_url, request_timeout_ms) + data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms, + get_handler_name(), balanced_request.upstream, request_timeout_ms, + already_spent_time_ms) self.timeout_counters[data] += 1 From b3526e51cfaf91801e2430f4b063423d2fa6c150 Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Tue, 29 Oct 2019 18:37:27 +0300 Subject: [PATCH 4/6] HH-99521 unify message --- frontik/http_client.py | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/frontik/http_client.py b/frontik/http_client.py index 5130b1e12..18713c7e2 100644 --- a/frontik/http_client.py +++ b/frontik/http_client.py @@ -491,26 +491,17 @@ def __eq__(self, other): def send_stats(self, interval_ms): for data, count in self.timeout_counters.items(): - if count <= 1: - http_client_logger.error('Incoming request from <%s> expects timeout=%d ms' - ', we have been working already for %d ms ' - 'and now trying to call upstream <%s> from handler <%s> with timeout %d ms', - data.outer_caller, - data.outer_timeout_ms, - data.already_spent_time_ms, - data.upstream, - data.handler, - data.request_timeout_ms) - else: - http_client_logger.error('For last %d ms, got %d requests from <%s> expecting timeout=%d ms, ' - 'but calling upstream <%s> from handler <%s> with timeout %d ms', - interval_ms, - count, - data.outer_caller, - data.outer_timeout_ms, - data.upstream, - data.handler, - data.request_timeout_ms) + http_client_logger.error('For last %d ms, got %d requests from <%s> expecting timeout=%d ms, ' + 'but calling upstream <%s> from handler <%s> with timeout %d ms, ' + 'arbitrary we spend %d ms before the call', + interval_ms, + count, + data.outer_caller, + data.outer_timeout_ms, + data.upstream, + data.handler, + data.request_timeout_ms, + data.already_spent_time_ms) self.timeout_counters.clear() def check(self, balanced_request: BalancedHttpRequest): From 31381e45321619cdf5a360f5c017242661557e8e Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Thu, 31 Oct 2019 17:02:08 +0300 Subject: [PATCH 5/6] HH-99521 use upstream name --- frontik/http_client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/frontik/http_client.py b/frontik/http_client.py index 18713c7e2..2d7b7fce5 100644 --- a/frontik/http_client.py +++ b/frontik/http_client.py @@ -504,15 +504,17 @@ def send_stats(self, interval_ms): data.already_spent_time_ms) self.timeout_counters.clear() - def check(self, balanced_request: BalancedHttpRequest): + def check(self, request: BalancedHttpRequest): if self.outer_timeout_ms: already_spent_time_ms = self.time_since_outer_request_start_ms_supplier() * 1000 expected_timeout_ms = self.outer_timeout_ms - already_spent_time_ms - request_timeout_ms = balanced_request.request_time_left * 1000 + request_timeout_ms = request.request_time_left * 1000 diff = request_timeout_ms - expected_timeout_ms if diff > self.threshold_ms: data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms, - get_handler_name(), balanced_request.upstream, request_timeout_ms, + get_handler_name(), + request.upstream.name if request.upstream else None, + request_timeout_ms, already_spent_time_ms) self.timeout_counters[data] += 1 From aba40027866ea1cfc4c420e303c7752a0c1cd548 Mon Sep 17 00:00:00 2001 From: Dmitriy Zharikhin Date: Thu, 31 Oct 2019 19:01:13 +0300 Subject: [PATCH 6/6] HH-99900 add port to service_id and version --- frontik/integrations/service_discovery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/frontik/integrations/service_discovery.py b/frontik/integrations/service_discovery.py index 1f2bbb534..7c0ee1f9c 100644 --- a/frontik/integrations/service_discovery.py +++ b/frontik/integrations/service_discovery.py @@ -3,6 +3,7 @@ from consul.aio import Consul from frontik.integrations import Integration, integrations_logger from frontik.options import options +from frontik.version import version import socket from asyncio import Future from typing import Optional @@ -22,7 +23,7 @@ def initialize_app(self, app) -> Optional[Future]: host = socket.gethostname() self.consul = Consul(host=options.consul_host, port=options.consul_port) self.service_name = options.app - self.service_id = f'{self.service_name}-{options.datacenter}-{host}' + self.service_id = f'{self.service_name}-{options.datacenter}-{host}-{options.port}' http_check = Check.http( f'http://{host}:{options.port}/status', @@ -37,6 +38,7 @@ def initialize_app(self, app) -> Optional[Future]: port=options.port, check=http_check, tags=options.consul_tags, + meta={'serviceVersion': version} )) def deinitialize_app(self, app) -> Optional[Future]: