From 97e85800d33cd8c03bce5f6a6264a75491a36351 Mon Sep 17 00:00:00 2001 From: mhorky Date: Thu, 11 Jan 2024 13:45:25 +0100 Subject: [PATCH] Change handling of deprecated `datetime.datetime.utcnow()` `utcnow()` is deprecated since Python 3.12. `datetime.datetime.now(datetime.timezone.utc)` is a timezone-aware equivalent. connection.py's `drift_check` logic has been extended a bit. --- src/rhsm/certificate.py | 6 ++-- src/rhsm/certificate2.py | 6 ++-- src/rhsm/connection.py | 50 ++++++++++++++++++------------- src/rhsmlib/facts/hwprobe.py | 8 ++--- test/rhsm/unit/test_connection.py | 13 ++++---- test/stubs.py | 14 ++++----- 6 files changed, 55 insertions(+), 42 deletions(-) diff --git a/src/rhsm/certificate.py b/src/rhsm/certificate.py index c165b8c000..cac322e606 100644 --- a/src/rhsm/certificate.py +++ b/src/rhsm/certificate.py @@ -194,7 +194,7 @@ def valid(self, on_date: datetime.datetime = None) -> bool: :return: True if valid. """ valid_range = self.validRange() - gmt = datetime.datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) @@ -207,7 +207,7 @@ def expired(self, on_date: datetime.datetime = None) -> bool: :return: True if valid. """ valid_range = self.validRange() - gmt = datetime.datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) @@ -612,7 +612,7 @@ def has_now(self) -> bool: :return: True if valid. """ - gmt: datetime.datetime = datetime.datetime.utcnow() + gmt: datetime.datetime = datetime.datetime.now(datetime.timezone.utc) gmt = gmt.replace(tzinfo=GMT()) return self.has_date(gmt) diff --git a/src/rhsm/certificate2.py b/src/rhsm/certificate2.py index 89355245e1..6c152d10ca 100644 --- a/src/rhsm/certificate2.py +++ b/src/rhsm/certificate2.py @@ -534,14 +534,14 @@ def __init__( self.issuer: Optional[dict] = issuer def is_valid(self, on_date: Optional[datetime.datetime] = None): - gmt = datetime.datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) return self.valid_range.has_date(gmt) def is_expired(self, on_date: Optional[datetime.datetime] = None): - gmt = datetime.datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) @@ -658,7 +658,7 @@ def provided_paths(self): return paths def is_expiring(self, on_date=None): - gmt = datetime.datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) diff --git a/src/rhsm/connection.py b/src/rhsm/connection.py index 82cf04ea2a..1b65cada4c 100644 --- a/src/rhsm/connection.py +++ b/src/rhsm/connection.py @@ -83,24 +83,21 @@ def normalized_host(host: str) -> str: return host -def drift_check(utc_time_string: str, hours: int = 1) -> bool: - """ - Takes in a RFC 1123 date and returns True if the current time - is greater than the supplied number of hours - """ - drift = False - if utc_time_string: - try: - # This may have a timezone (utc) - utc_datetime = dateutil.parser.parse(utc_time_string) - # This should not have a timezone, but we know it will be utc. - # We need our timezones to match in order to compare - local_datetime = datetime.datetime.utcnow().replace(tzinfo=utc_datetime.tzinfo) - delta = datetime.timedelta(hours=hours) - drift = abs((utc_datetime - local_datetime)) > delta - except Exception as e: - log.error(e) +def get_time_drift(timestamp: str) -> datetime.timedelta: + """Get a difference between server and local clock. + :param timestamp: A timezone-unaware timestamp in RFC 1123 format. + :returns: Absolute difference between server and local time. + """ + # RFC 1123: 'Fri, 12 Jan 2024 08:10:46 GMT' + timestamp: datetime.datetime = dateutil.parser.parse(timestamp) + if timestamp.tzinfo.tzname(timestamp) != "UTC": + log.warning(f"Expected UTC timestamp, got '{timestamp}', drift check may be off.") + # dateutil has its own tzinfo object representing UTC + timestamp = timestamp.replace(tzinfo=datetime.timezone.utc) + + now = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) + drift: datetime.timedelta = abs(timestamp - now) return drift @@ -1206,9 +1203,22 @@ def _request( self.__conn.max_request_num = max_requests_num log.debug(f"Max number of requests: {max_requests_num} is used from 'Keep-Alive' HTTP header") - # Look for server drift, and log a warning - if drift_check(response.getheader("date")): - log.warning("Clock skew detected, please check your system time") + # Look for a time drift and log if the system is significantly different from server clock + response_sent_at: Optional[str] = response.getheader("date") + if response_sent_at is not None: + try: + drift: datetime.timedelta = get_time_drift(response_sent_at) + message: str = ( + f"Local system clock seems to be off by {drift}, please check your system time." + ) + if drift > datetime.timedelta(hours=1): + log.warning(message) + elif drift > datetime.timedelta(minutes=15): + log.debug(message) + except Exception: + log.exception( + f"Could not check if local clock is off from server's time '{response_sent_at}'" + ) # FIXME: we should probably do this in a wrapper method # so we can use the request method for normal http diff --git a/src/rhsmlib/facts/hwprobe.py b/src/rhsmlib/facts/hwprobe.py index b1ffb78551..e2124e8eba 100644 --- a/src/rhsmlib/facts/hwprobe.py +++ b/src/rhsmlib/facts/hwprobe.py @@ -23,7 +23,7 @@ import subprocess import sys -from datetime import datetime, timedelta +import datetime from rhsmlib.facts import cpuinfo from rhsmlib.facts import collector @@ -230,9 +230,9 @@ def get_last_boot(self) -> Dict[str, str]: # spacewalk/blob/master/client/rhel/rhn-client-tools/src/bin/rhn_check.py try: uptime = float(open("/proc/uptime", "r").read().split()[0]) - uptime_delta = timedelta(seconds=uptime) - now = datetime.utcnow() - last_boot_date: datetime = now - uptime_delta + uptime_delta = datetime.timedelta(seconds=uptime) + now = datetime.datetime.now(datetime.timezone.utc) + last_boot_date: datetime.datetime = now - uptime_delta last_boot: str = last_boot_date.strftime("%Y-%m-%d %H:%M:%S") + " UTC" except Exception as e: log.warning("Error reading uptime information %s", e) diff --git a/test/rhsm/unit/test_connection.py b/test/rhsm/unit/test_connection.py index e6d0212b8f..52e59fd785 100644 --- a/test/rhsm/unit/test_connection.py +++ b/test/rhsm/unit/test_connection.py @@ -11,6 +11,7 @@ # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. # +import datetime import locale import unittest import os @@ -28,7 +29,7 @@ GoneException, UnknownContentException, RemoteServerException, - drift_check, + get_time_drift, ExpiredIdentityCertException, UnauthorizedException, ForbiddenException, @@ -43,7 +44,6 @@ from unittest.mock import Mock, patch, mock_open from datetime import date -from time import strftime, gmtime from rhsm import ourjson as json from collections import namedtuple @@ -1013,11 +1013,14 @@ class DriftTest(unittest.TestCase): def test_big_drift(self): # let's move this back to just a few hours before the # end of time, so this test doesn't fail on 32bit machines - self.assertTrue(drift_check("Mon, 18 Jan 2038 19:10:56 GMT", 6)) + drift = get_time_drift("Mon, 18 Jan 2038 19:10:56 GMT") + self.assertTrue(drift > datetime.timedelta(hours=6)) def test_no_drift(self): - header = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) - self.assertFalse(drift_check(header)) + now = datetime.datetime.now(datetime.timezone.utc) + header = now.strftime("%a, %d %b %Y %H:%M:%S GMT") + drift = get_time_drift(header) + self.assertTrue(drift < datetime.timedelta(seconds=1)) class GoneExceptionTest(ExceptionTest): diff --git a/test/stubs.py b/test/stubs.py index 96afad0c4b..527f10571e 100644 --- a/test/stubs.py +++ b/test/stubs.py @@ -13,7 +13,7 @@ # from collections import defaultdict -from datetime import datetime, timedelta +import datetime import io from unittest import mock import random @@ -228,9 +228,9 @@ def __init__(self, product, provided_products=None, start_date=None, end_date=No self.provided_tags = set(provided_tags) if not start_date: - start_date = datetime.now() - timedelta(days=100) + start_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=100) if not end_date: - end_date = datetime.now() + timedelta(days=365) + end_date = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) path = "/path/to/fake_product.pem" @@ -285,9 +285,9 @@ def __init__( products = products + provided_products if not start_date: - start_date = datetime.utcnow() + start_date = datetime.datetime.now(datetime.timezone.utc) if not end_date: - end_date = start_date + timedelta(days=365) + end_date = start_date + datetime.timedelta(days=365) # to simulate a cert with no product sku = None @@ -344,11 +344,11 @@ def delete(self): self.is_deleted = True def is_expiring(self, on_date=None): - gmt = datetime.utcnow() + gmt = datetime.datetime.now(datetime.timezone.utc) if on_date: gmt = on_date gmt = gmt.replace(tzinfo=GMT()) - warning_time = timedelta(days=int(self.order.warning_period)) + warning_time = datetime.timedelta(days=int(self.order.warning_period)) return self.valid_range.end() - warning_time < gmt