diff --git a/clickhouse_driver/log.py b/clickhouse_driver/log.py index e1967131..1f8c61ee 100644 --- a/clickhouse_driver/log.py +++ b/clickhouse_driver/log.py @@ -1,6 +1,22 @@ import logging -logger = logging.getLogger(__name__) + +def set_server_log_time(record): + if hasattr(record, 'server_created'): + record.created = record.server_created + record.msecs = record.server_msecs + delattr(record, 'server_created') + delattr(record, 'server_msecs') + return True + + +def setup_logger(): + logger = logging.getLogger(__name__) + logger.addFilter(set_server_log_time) + return logger + + +logger = setup_logger() # Keep in sync with ClickHouse priorities # https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/InternalTextLogsQueue.cpp @@ -37,6 +53,17 @@ def log_block(block): # thread_number in servers prior 20.x thread_id = row.get('thread_id') or row['thread_number'] + # calculate timestamp-related log record attributes, + # based on values received from the server + server_created = row['event_time'].timestamp() + \ + row['event_time_microseconds'] / 1000000 + server_msecs = row['event_time_microseconds'] // 1000 + 0.0 + + server_timestamps = { + 'server_created': server_created, + 'server_msecs': server_msecs, + } + logger.info( '[ %s ] [ %s ] {%s} <%s> %s: %s', row['host_name'], @@ -44,5 +71,6 @@ def log_block(block): row['query_id'], priority, row['source'], - row['text'] + row['text'], + extra=server_timestamps, ) diff --git a/docs/features.rst b/docs/features.rst index f9476044..d52742d3 100644 --- a/docs/features.rst +++ b/docs/features.rst @@ -256,6 +256,10 @@ Query logs can be received from server by using `send_logs_level` setting: 2018-12-14 10:24:53,875 INFO clickhouse_driver.log: [ klebedev-ThinkPad-T460 ] [ 25 ] {b328ad33-60e8-4012-b4cc-97f44a7b28f2} MemoryTracker: Peak memory usage (for query): 40.23 KiB. [(1,)] + .. note:: + + For logs received from ClickHouse, server timestamp is used as log record timestamp. + Multiple hosts -------------- diff --git a/tests/test_blocks.py b/tests/test_blocks.py index ff992678..ea15f4ee 100644 --- a/tests/test_blocks.py +++ b/tests/test_blocks.py @@ -1,4 +1,7 @@ +import copy +import logging import types +from datetime import datetime from unittest.mock import patch from clickhouse_driver.errors import ServerException @@ -245,3 +248,37 @@ def test_logs_with_compression(self): query = 'SELECT 1' client.execute(query, settings=settings) self.assertIn(query, buffer.getvalue()) + + def test_logs_timestamp_values(self): + filter = logging.Logger.filter + filter_inputs = [] + + # deepcopy allows to save original LogRecord state, + # since filter function updates record object + def filter_side_effect(logger, record): + filter_inputs.append(copy.deepcopy(record)) + return filter(logger, record) + + with capture_logging('clickhouse_driver.log', 'INFO') as buffer: + with patch('logging.Logger.filter', autospec=True) as filter_mock: + filter_mock.side_effect = filter_side_effect + + settings = {'send_logs_level': 'debug'} + query = 'SELECT 1' + self.client.execute(query, settings=settings) + + filter_mock.assert_called() + source_record = filter_inputs[0] + + log_timestamp = datetime \ + .fromtimestamp(source_record.server_created) \ + .strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] + log_line = buffer.getvalue().splitlines()[0] + + # assert original record timestamps are non equal + self.assertNotEqual( + source_record.server_created, + source_record.created, + ) + # assert result record contains server timestamp + assert log_line.startswith(log_timestamp) diff --git a/tests/util.py b/tests/util.py index db48166c..5939bb30 100644 --- a/tests/util.py +++ b/tests/util.py @@ -44,8 +44,10 @@ def __init__(self, logger_name, level): def __enter__(self): buffer = StringIO() + formatter = logging.Formatter('%(asctime)s %(message)s') self.new_handler = logging.StreamHandler(buffer) + self.new_handler.setFormatter(formatter) self.logger.addHandler(self.new_handler) self.old_logger_level = self.logger.level self.logger.setLevel(self.level)