From 355fffe59ef4e59b5283e838a1a4e26095a6e333 Mon Sep 17 00:00:00 2001 From: Miaodric Date: Fri, 26 Jul 2024 15:59:22 +0800 Subject: [PATCH] update timestamp into seconds when migration from es with time-reference Change-Id: I3bb534411984f07d0abf60c37ff2393d45dca249 update timestamp into seconds when migration from es with time-reference Change-Id: If56812bb2c5c34628ad6c74dc9d261e2812a14c9 --- aliyun/log/logitem.py | 17 +- aliyun/log/util.py | 717 +++++++++++++++++++++--------------------- 2 files changed, 369 insertions(+), 365 deletions(-) diff --git a/aliyun/log/logitem.py b/aliyun/log/logitem.py index bf1ed30..59ba8c0 100755 --- a/aliyun/log/logitem.py +++ b/aliyun/log/logitem.py @@ -12,7 +12,7 @@ class LogItem(object): """ LogItem used to present a log, it contains log time and multiple key/value pairs to present the log contents. - :type timestamp: int + :type timestamp: int with seconds as unit :param timestamp: time of the log item, the default time is the now time. :type contents: tuple(key-value) list @@ -20,9 +20,12 @@ class LogItem(object): """ def __init__(self, timestamp=None, time_nano_part=None, contents=None): - nano_time = int(time.time()* 10**9) - self.timestamp = int(timestamp) if timestamp else int(nano_time / 1000000000) - self.time_nano_part= int(time_nano_part) if time_nano_part else int(nano_time%1000000000) + nano_time = int(time.time() * 10**9) + self.timestamp_seconds = int(timestamp) if timestamp else int(nano_time / 1000000000) + # milliseconds + if self.timestamp_seconds > 1e10: + self.timestamp_seconds = int(self.timestamp_seconds / 1000.0) + self.time_nano_part = int(time_nano_part) if time_nano_part else int(nano_time % 1000000000) self.contents = copy.deepcopy(contents) if contents else [] def push_back(self, key, value): @@ -56,7 +59,7 @@ def get_time(self): :return: int, log time """ - return self.timestamp + return self.timestamp_seconds def get_time_nano_part(self): """ Get log time nano part @@ -70,7 +73,7 @@ def set_time(self, timestamp): :type timestamp: int :param timestamp: log time """ - self.timestamp = int(timestamp) + self.timestamp_seconds = int(timestamp) def set_time_nano_part(self, time_nano_part): """ Set log time nano part @@ -80,5 +83,5 @@ def set_time_nano_part(self, time_nano_part): self.time_nano_part = int(time_nano_part) def log_print(self): - print('time', self.timestamp) + print('time', self.timestamp_seconds) print('contents', self.contents) diff --git a/aliyun/log/util.py b/aliyun/log/util.py index 9d45d50..bebecfd 100755 --- a/aliyun/log/util.py +++ b/aliyun/log/util.py @@ -1,358 +1,359 @@ -#!/usr/bin/env python -# encoding: utf-8 - -import sys -import base64 -try: - import collections.abc as collections_abc -except ImportError: - import collections as collections_abc -import hashlib -import hmac -import socket -import six -from datetime import datetime, tzinfo, timedelta -from dateutil import parser -from .logexception import LogException -import re -import logging -import json -import struct -import zlib - -try: - import lz4 - if hasattr(lz4, 'loads') and hasattr(lz4, 'dumps'): - def lz_decompress(raw_size, data): - return lz4.loads(struct.pack('= 256: - return False - pattern = re.compile(r'^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$') - if pattern.match(ip): - return True - return False - - @staticmethod - def get_host_ip(logHost): - """ If it is not match your local ip, you should fill the PutLogsRequest - parameter source by yourself. - """ - s = None - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect((logHost, 80)) - ip = s.getsockname()[0] - return ip - except Exception: - return '127.0.0.1' - finally: - if s: - s.close() - - @staticmethod - def compress_data(data): - import zlib - return zlib.compress(data, 6) - - @staticmethod - def cal_md5(content): - return hashlib.md5(content).hexdigest().upper() - - @staticmethod - def hmac_sha1(content, key): - if isinstance(content, six.text_type): # hmac.new accept 8-bit str - content = content.encode('utf-8') - if isinstance(key, six.text_type): # hmac.new accept 8-bit str - key = key.encode('utf-8') - - hashed = hmac.new(key, content, hashlib.sha1).digest() - return base64_encodestring(hashed).rstrip() - - @staticmethod - def canonicalized_log_headers(headers): - content = '' - for key in sorted(six.iterkeys(headers)): - if key[:6].lower() in ('x-log-', 'x-acs-'): # x-log- header - content += key + ':' + str(headers[key]) + "\n" - return content - - @staticmethod - def url_encode(params): - for key, value in params.items(): # urllib.urlencode accept 8-bit str - if isinstance(value, six.text_type): - params[key] = value.encode('utf-8') - return six.moves.urllib.parse.urlencode(params, True) - - @staticmethod - def canonicalized_resource(resource, params): - if params: - urlString = '' - for key, value in sorted(six.iteritems(params)): - urlString += u"{0}={1}&".format( - key, value.decode('utf8') if isinstance(value, six.binary_type) else value) - resource += '?' + urlString[:-1] - if six.PY3: - return resource - else: - return resource.encode('utf8') - - return resource - - @staticmethod - def to_ansi(data): - pass - - @staticmethod - def convert_unicode_to_str(data): - """ - Py2, always translate to utf8 from unicode - Py3, always translate to unicode - :param data: - :return: - """ - if six.PY2 and isinstance(data, six.text_type): - return data.encode('utf8') - elif six.PY3 and isinstance(data, six.binary_type): - return data.decode('utf8') - elif isinstance(data, collections_abc.Mapping): - return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v)) - for k, v in six.iteritems(data)) - elif isinstance(data, collections_abc.Iterable) and not isinstance(data, (six.binary_type, six.string_types)): - return type(data)(map(Util.convert_unicode_to_str, data)) - - return data - - @staticmethod - def is_lz4_available(): - return lz4_available - - @staticmethod - def h_v_t(header, key): - """ - get header value with title - try to get key from header and consider case sensitive - e.g. header['x-log-abc'] or header['X-Log-Abc'] - :param header: - :param key: - :return: - """ - if key not in header: - key = key.title() - - if key not in header: - raise ValueError("Unexpected header in response, missing: " + key + " headers:\n" + str(header)) - - return header[key] - - @staticmethod - def h_v_td(header, key, default): - """ - get header value with title with default value - try to get key from header and consider case sensitive - e.g. header['x-log-abc'] or header['X-Log-Abc'] - :param header: - :param key: - :param default: - :return: - """ - if key not in header: - key = key.title() - - return header.get(key, default) - - @staticmethod - def v_or_d(v, default=None): - """ returns default value if v is None - else return v - """ - return v if v is not None else default - - @staticmethod - def uncompress_response(header, response): - """ - Supported compress type [lz4, gzip, deflate] - """ - compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower() - if compress_type == 'lz4': - raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize')) - if Util.is_lz4_available(): - return lz_decompress(raw_size, response) - else: - raise LogException("ClientHasNoLz4", "There's no Lz4 lib available to decompress the response", resp_header=header, resp_body=response) - elif compress_type in ('gzip', 'deflate'): - return zlib.decompress(response) - else: - raise LogException('UnknownCompressType', 'Unknown compress type: ' + compress_type, resp_header=header, resp_body=response) - -ZERO = timedelta(0) - - -class UTC(tzinfo): - """UTC""" - - def utcoffset(self, dt): - return ZERO - - def tzname(self, dt): - return "UTC" - - def dst(self, dt): - return ZERO - - -utc = UTC() - - -def _get_total_seconds(delta): - return ((delta.days * 86400 + delta.seconds) * 10**6 + delta.microseconds) / 10**6 - - -def parse_timestamp(tm): - if isinstance(tm, (int, float)) or \ - (isinstance(tm, (six.text_type, six.binary_type)) and tm.isdigit()): - return int(tm) - - try: - dt = parser.parse(tm) - except ValueError as ex: - try: - # try to use dateparser to parse the format. - from dateparser import parse - try: - dt = parse(tm) - if dt is None: - raise ex - except Exception as e: - logger.error("fail to parse date: {0}, detail: {1}".format(tm, e)) - raise e - - except ImportError as ex2: - raise ex - - if sys.version_info[:2] == (2, 6): - if dt.tzinfo is None: - return int(_get_total_seconds(dt - datetime(1970, 1, 1))) - return int(_get_total_seconds(dt - datetime(1970, 1, 1, tzinfo=utc))) - elif six.PY2: - if dt.tzinfo is None: - return int((dt - datetime(1970, 1, 1)).total_seconds()) - return int((dt - datetime(1970, 1, 1, tzinfo=utc)).total_seconds()) - else: - return int(dt.timestamp()) - - -def is_stats_query(query): - """ - check if the query is a normal search or select query - :param query: - :return: - """ - if not query: - return False - - # remove all " enclosed strings - nq = re.sub(r'"[^"]*"', '', query) - - # check if there's | .... select - if re.findall(r'\|.*\bselect\b', nq, re.I|re.DOTALL): - return True - - return False - - -class PrefixLoggerAdapter(logging.LoggerAdapter): - def __init__(self, prefix, extra, *args, **kwargs): - super(PrefixLoggerAdapter, self).__init__(*args, **kwargs) - self._prefix = prefix - self._extra = extra - - def process(self, msg, kwargs): - kwargs['extra'] = kwargs.get('extra', {}) - kwargs['extra'].update(self._extra) - - return "{0}{1}".format(self._prefix, msg), kwargs - - -def maxcompute_sink_deserialize(obj): - return obj.getParams() - - -def oss_sink_deserialize(obj): - return { - "type": obj.getType(), - "roleArn": obj.getRoleArn(), - "bucket": obj.getBucket(), - "prefix": obj.getPrefix(), - "suffix": obj.getSuffix(), - "pathFormat": obj.getPathFormat(), - "pathFormatType": obj.getPathFormatType(), - "bufferSize": obj.getBufferSize(), - "bufferInterval": obj.getBufferInterval(), - "timeZone": obj.getTimeZone(), - "contentType": obj.getContentType(), - "compressionType": obj.getCompressionType(), - "contentDetail": obj.getContentDetail(), - } - - -def export_deserialize(obj, sink): - return json.dumps({ - "configuration": { - "fromTime": obj.getConfiguration().getFromTime(), - "logstore": obj.getConfiguration().getLogstore(), - "roleArn": obj.getConfiguration().getRoleArn(), - "sink": sink, - "toTime": obj.getConfiguration().getToTime(), - "version": obj.getConfiguration().getVersion(), - }, - "displayName": obj.getDisplayName(), - "name": obj.getName(), - "recyclable": obj.getRecyclable(), - "schedule": { - "runImmediately": obj.getSchedule().isRunImmediately(), - "type": obj.getSchedule().getType(), - }, - "type": obj.getType(), - }) +#!/usr/bin/env python +# encoding: utf-8 + +import sys +import base64 +try: + import collections.abc as collections_abc +except ImportError: + import collections as collections_abc +import hashlib +import hmac +import socket +import six +from datetime import datetime, tzinfo, timedelta +from dateutil import parser +from .logexception import LogException +import re +import logging +import json +import struct +import zlib + +try: + import lz4 + if hasattr(lz4, 'loads') and hasattr(lz4, 'dumps'): + def lz_decompress(raw_size, data): + return lz4.loads(struct.pack('= 256: + return False + pattern = re.compile(r'^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$') + if pattern.match(ip): + return True + return False + + @staticmethod + def get_host_ip(logHost): + """ If it is not match your local ip, you should fill the PutLogsRequest + parameter source by yourself. + """ + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect((logHost, 80)) + ip = s.getsockname()[0] + return ip + except Exception: + return '127.0.0.1' + finally: + if s: + s.close() + + @staticmethod + def compress_data(data): + import zlib + return zlib.compress(data, 6) + + @staticmethod + def cal_md5(content): + return hashlib.md5(content).hexdigest().upper() + + @staticmethod + def hmac_sha1(content, key): + if isinstance(content, six.text_type): # hmac.new accept 8-bit str + content = content.encode('utf-8') + if isinstance(key, six.text_type): # hmac.new accept 8-bit str + key = key.encode('utf-8') + + hashed = hmac.new(key, content, hashlib.sha1).digest() + return base64_encodestring(hashed).rstrip() + + @staticmethod + def canonicalized_log_headers(headers): + content = '' + for key in sorted(six.iterkeys(headers)): + if key[:6].lower() in ('x-log-', 'x-acs-'): # x-log- header + content += key + ':' + str(headers[key]) + "\n" + return content + + @staticmethod + def url_encode(params): + for key, value in params.items(): # urllib.urlencode accept 8-bit str + if isinstance(value, six.text_type): + params[key] = value.encode('utf-8') + return six.moves.urllib.parse.urlencode(params, True) + + @staticmethod + def canonicalized_resource(resource, params): + if params: + urlString = '' + for key, value in sorted(six.iteritems(params)): + urlString += u"{0}={1}&".format( + key, value.decode('utf8') if isinstance(value, six.binary_type) else value) + resource += '?' + urlString[:-1] + if six.PY3: + return resource + else: + return resource.encode('utf8') + + return resource + + @staticmethod + def to_ansi(data): + pass + + @staticmethod + def convert_unicode_to_str(data): + """ + Py2, always translate to utf8 from unicode + Py3, always translate to unicode + :param data: + :return: + """ + if six.PY2 and isinstance(data, six.text_type): + return data.encode('utf8') + elif six.PY3 and isinstance(data, six.binary_type): + return data.decode('utf8') + elif isinstance(data, collections_abc.Mapping): + return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v)) + for k, v in six.iteritems(data)) + elif isinstance(data, collections_abc.Iterable) and not isinstance(data, (six.binary_type, six.string_types)): + return type(data)(map(Util.convert_unicode_to_str, data)) + + return data + + @staticmethod + def is_lz4_available(): + return lz4_available + + @staticmethod + def h_v_t(header, key): + """ + get header value with title + try to get key from header and consider case sensitive + e.g. header['x-log-abc'] or header['X-Log-Abc'] + :param header: + :param key: + :return: + """ + if key not in header: + key = key.title() + + if key not in header: + raise ValueError("Unexpected header in response, missing: " + key + " headers:\n" + str(header)) + + return header[key] + + @staticmethod + def h_v_td(header, key, default): + """ + get header value with title with default value + try to get key from header and consider case sensitive + e.g. header['x-log-abc'] or header['X-Log-Abc'] + :param header: + :param key: + :param default: + :return: + """ + if key not in header: + key = key.title() + + return header.get(key, default) + + @staticmethod + def v_or_d(v, default=None): + """ returns default value if v is None + else return v + """ + return v if v is not None else default + + @staticmethod + def uncompress_response(header, response): + """ + Supported compress type [lz4, gzip, deflate] + """ + compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower() + if compress_type == 'lz4': + raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize')) + if Util.is_lz4_available(): + return lz_decompress(raw_size, response) + else: + raise LogException("ClientHasNoLz4", "There's no Lz4 lib available to decompress the response", resp_header=header, resp_body=response) + elif compress_type in ('gzip', 'deflate'): + return zlib.decompress(response) + else: + raise LogException('UnknownCompressType', 'Unknown compress type: ' + compress_type, resp_header=header, resp_body=response) + +ZERO = timedelta(0) + + +class UTC(tzinfo): + """UTC""" + + def utcoffset(self, dt): + return ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return ZERO + + +utc = UTC() + + +def _get_total_seconds(delta): + return ((delta.days * 86400 + delta.seconds) * 10**6 + delta.microseconds) / 10**6 + + +def parse_timestamp(tm): + if isinstance(tm, (int, float)) or \ + (isinstance(tm, (six.text_type, six.binary_type)) and tm.isdigit()): + return int(tm) + + try: + dt = parser.parse(tm) + except ValueError as ex: + try: + # try to use dateparser to parse the format. + from dateparser import parse + try: + dt = parse(tm) + if dt is None: + raise ex + except Exception as e: + logger.error("fail to parse date: {0}, detail: {1}".format(tm, e)) + raise e + + except ImportError as ex2: + logger.exception("import error: {}".format(ex2)) + raise ex + + if sys.version_info[:2] == (2, 6): + if dt.tzinfo is None: + return int(_get_total_seconds(dt - datetime(1970, 1, 1))) + return int(_get_total_seconds(dt - datetime(1970, 1, 1, tzinfo=utc))) + elif six.PY2: + if dt.tzinfo is None: + return int((dt - datetime(1970, 1, 1)).total_seconds()) + return int((dt - datetime(1970, 1, 1, tzinfo=utc)).total_seconds()) + else: + return int(dt.timestamp()) + + +def is_stats_query(query): + """ + check if the query is a normal search or select query + :param query: + :return: + """ + if not query: + return False + + # remove all " enclosed strings + nq = re.sub(r'"[^"]*"', '', query) + + # check if there's | .... select + if re.findall(r'\|.*\bselect\b', nq, re.I|re.DOTALL): + return True + + return False + + +class PrefixLoggerAdapter(logging.LoggerAdapter): + def __init__(self, prefix, extra, *args, **kwargs): + super(PrefixLoggerAdapter, self).__init__(*args, **kwargs) + self._prefix = prefix + self._extra = extra + + def process(self, msg, kwargs): + kwargs['extra'] = kwargs.get('extra', {}) + kwargs['extra'].update(self._extra) + + return "{0}{1}".format(self._prefix, msg), kwargs + + +def maxcompute_sink_deserialize(obj): + return obj.getParams() + + +def oss_sink_deserialize(obj): + return { + "type": obj.getType(), + "roleArn": obj.getRoleArn(), + "bucket": obj.getBucket(), + "prefix": obj.getPrefix(), + "suffix": obj.getSuffix(), + "pathFormat": obj.getPathFormat(), + "pathFormatType": obj.getPathFormatType(), + "bufferSize": obj.getBufferSize(), + "bufferInterval": obj.getBufferInterval(), + "timeZone": obj.getTimeZone(), + "contentType": obj.getContentType(), + "compressionType": obj.getCompressionType(), + "contentDetail": obj.getContentDetail(), + } + + +def export_deserialize(obj, sink): + return json.dumps({ + "configuration": { + "fromTime": obj.getConfiguration().getFromTime(), + "logstore": obj.getConfiguration().getLogstore(), + "roleArn": obj.getConfiguration().getRoleArn(), + "sink": sink, + "toTime": obj.getConfiguration().getToTime(), + "version": obj.getConfiguration().getVersion(), + }, + "displayName": obj.getDisplayName(), + "name": obj.getName(), + "recyclable": obj.getRecyclable(), + "schedule": { + "runImmediately": obj.getSchedule().isRunImmediately(), + "type": obj.getSchedule().getType(), + }, + "type": obj.getType(), + })