diff --git a/authentication/api.py b/authentication/api.py index 6a3135e5..8d5ee436 100644 --- a/authentication/api.py +++ b/authentication/api.py @@ -2,6 +2,7 @@ import logging +import requests from django.conf import settings from django.contrib.auth import get_user_model from oauthlib.oauth2 import ( @@ -45,8 +46,7 @@ def keycloak_session_init(url, **kwargs): } def update_token(token): - log_str = f"Refreshing Keycloak token {token}" - log.debug(log_str) + log.debug("Refreshing Keycloak token %s", {token}) KeycloakAdminToken.objects.all().delete() KeycloakAdminToken.objects.create( authorization_token=token.get("access_token"), @@ -59,18 +59,59 @@ def check_for_token(): token = KeycloakAdminToken.latest() if not token: - this_dumb_message_because_ruff_is_annoying = "No token found" - raise TokenExpiredError(this_dumb_message_because_ruff_is_annoying) + token_error_msg = "No token found" # noqa: S105 + raise TokenExpiredError(token_error_msg) return token - # Note: we call get_user_info here in both the try and the except because you won't - # get a token error until you attempt to make a request. + def regenerate_token(client, token): + """Regenerate the token, or raise an exception.""" + + try: + session = OAuth2Session(client=client) + token = session.fetch_token( + token_url=token_url, + client_id=settings.KEYCLOAK_ADMIN_CLIENT_ID, + client_secret=settings.KEYCLOAK_ADMIN_CLIENT_SECRET, + verify=False, + ) + + update_token(token) + session = OAuth2Session(client=client, token=token) + except InvalidGrantError: + log.exception( + ( + "keycloak_session_init couldn't refresh token %s because of an" + " invalid grant error" + ), + {token}, + ) + return None + except TokenExpiredError: + log.exception( + ( + "keycloak_session_init couldn't refresh token %s because of an" + " expired token error" + ), + {token}, + ) + return None + except requests.exceptions.RequestException: + log.exception( + ( + "keycloak_session_init couldn't refresh token %s because of an" + " HTTP error" + ), + {token}, + ) + return None + + return session + try: token = check_for_token() - log_str = f"Trying to start up a session with token {token.token_formatted}" - log.debug(log_str) + log.debug("Trying to start up a session with token %s", {token.token_formatted}) session = OAuth2Session( client=client, @@ -82,23 +123,22 @@ def check_for_token(): keycloak_response = session.get(url, **kwargs).json() except (InvalidGrantError, TokenExpiredError) as ige: - log_str = f"Token error, trying to get a new token: {ige}" - log.debug(log_str) - - session = OAuth2Session(client=client) - token = session.fetch_token( - token_url=token_url, - client_id=settings.KEYCLOAK_ADMIN_CLIENT_ID, - client_secret=settings.KEYCLOAK_ADMIN_CLIENT_SECRET, - verify=False, - ) + log.debug("Token error, trying to get a new token: %s", {ige}) + + session = regenerate_token(client, token) - update_token(token) - session = OAuth2Session(client=client, token=token) keycloak_response = session.get(url, **kwargs).json() + except requests.exceptions.RequestException: + log.exception( + ( + "keycloak_session_init couldn't establish the session because of" + " an HTTP error: %s" + ), + {token}, + ) + return None - log_str = f"Keycloak response: {keycloak_response}" - log.debug(log_str) + log.debug("Keycloak response: %s", {keycloak_response}) return keycloak_response