Skip to content

Commit

Permalink
Merge branches 'HH-99521' and 'HH-99900'
Browse files Browse the repository at this point in the history
  • Loading branch information
dzharikhin committed Oct 31, 2019
3 parents 34222db + 31381e4 + aba4002 commit 02536f6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 16 deletions.
76 changes: 61 additions & 15 deletions frontik/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
import socket
import time
from asyncio import Future
from collections import Counter
from functools import partial
from random import shuffle, random

import pycurl
import logging
from lxml import etree
from tornado.escape import to_unicode, utf8
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.curl_httpclient import CurlAsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPResponse, HTTPError
from tornado.httputil import HTTPHeaders
from tornado.options import options

from frontik import media_types
from frontik.debug import DEBUG_HEADER_NAME, response_from_debug
from frontik.request_context import get_request_id, get_request
from frontik.request_context import get_request_id, get_request, get_handler_name
from frontik.util import make_url, make_body, make_mfd

OUTER_TIMEOUT_MS_HEADER = 'X-Outer-Timeout-Ms'
Expand Down Expand Up @@ -452,25 +453,70 @@ def get_retries_count(self):


class TimeoutChecker:
def __init__(self, outer_caller, outer_timeout_ms, time_since_outer_request_start_ms_supplier):
def __init__(self, outer_caller, outer_timeout_ms, time_since_outer_request_start_ms_supplier, *,
threshold_ms=100, send_timeout_stats_interval_ms=60000):
self.outer_caller = outer_caller
self.outer_timeout_ms = outer_timeout_ms
self.time_since_outer_request_start_ms_supplier = time_since_outer_request_start_ms_supplier

def check(self, balanced_request: BalancedHttpRequest):
self.timeout_counters = Counter()
self.threshold_ms = threshold_ms
periodic_callback = PeriodicCallback(partial(self.send_stats, send_timeout_stats_interval_ms),
send_timeout_stats_interval_ms)
periodic_callback.start()

class LoggingData:
__slots__ = ('outer_caller', 'outer_timeout_ms',
'handler', 'upstream', 'request_timeout_ms',
'already_spent_time_ms')

def __init__(self, outer_caller, outer_timeout_ms,
handler, upstream, request_timeout_ms,
already_spent_time_ms):
self.outer_caller = outer_caller
self.outer_timeout_ms = outer_timeout_ms
self.handler = handler
self.upstream = upstream
self.request_timeout_ms = request_timeout_ms
self.already_spent_time_ms = already_spent_time_ms

def __hash__(self):
return hash((self.outer_caller, self.outer_timeout_ms,
self.handler, self.upstream, self.request_timeout_ms))

def __eq__(self, other):
return self.outer_caller == other.outer_caller \
and self.outer_timeout_ms == other.outer_timeout_ms \
and self.handler == other.handler \
and self.request_timeout_ms == other.request_timeout_ms

def send_stats(self, interval_ms):
for data, count in self.timeout_counters.items():
http_client_logger.error('For last %d ms, got %d requests from <%s> expecting timeout=%d ms, '
'but calling upstream <%s> from handler <%s> with timeout %d ms, '
'arbitrary we spend %d ms before the call',
interval_ms,
count,
data.outer_caller,
data.outer_timeout_ms,
data.upstream,
data.handler,
data.request_timeout_ms,
data.already_spent_time_ms)
self.timeout_counters.clear()

def check(self, request: BalancedHttpRequest):
if self.outer_timeout_ms:
already_spent_time_ms = self.time_since_outer_request_start_ms_supplier() * 1000
expected_timeout_ms = self.outer_timeout_ms - already_spent_time_ms
request_timeout_ms = balanced_request.request_time_left * 1000
if request_timeout_ms > expected_timeout_ms:
http_client_logger.error('Incoming request from <%s> expects timeout=%d ms'
', we have been working already for %d ms '
'and now trying to call <%s> with timeout %d ms',
self.outer_caller,
self.outer_timeout_ms,
already_spent_time_ms,
balanced_request.uri.partition('?')[0],
request_timeout_ms)
request_timeout_ms = request.request_time_left * 1000
diff = request_timeout_ms - expected_timeout_ms
if diff > self.threshold_ms:
data = TimeoutChecker.LoggingData(self.outer_caller, self.outer_timeout_ms,
get_handler_name(),
request.upstream.name if request.upstream else None,
request_timeout_ms,
already_spent_time_ms)
self.timeout_counters[data] += 1


class HttpClientFactory:
Expand Down
4 changes: 3 additions & 1 deletion frontik/integrations/service_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from consul.aio import Consul
from frontik.integrations import Integration, integrations_logger
from frontik.options import options
from frontik.version import version
import socket
from asyncio import Future
from typing import Optional
Expand All @@ -22,7 +23,7 @@ def initialize_app(self, app) -> Optional[Future]:
host = socket.gethostname()
self.consul = Consul(host=options.consul_host, port=options.consul_port)
self.service_name = options.app
self.service_id = f'{self.service_name}-{options.datacenter}-{host}'
self.service_id = f'{self.service_name}-{options.datacenter}-{host}-{options.port}'

http_check = Check.http(
f'http://{host}:{options.port}/status',
Expand All @@ -37,6 +38,7 @@ def initialize_app(self, app) -> Optional[Future]:
port=options.port,
check=http_check,
tags=options.consul_tags,
meta={'serviceVersion': version}
))

def deinitialize_app(self, app) -> Optional[Future]:
Expand Down

0 comments on commit 02536f6

Please sign in to comment.